package org.apache.flink.table.runtime.window.aligned;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.TreeMap;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.runtime.state.keyed.KeyedMapState;
import org.apache.flink.runtime.state.subkeyed.SubKeyedValueState;
import org.apache.flink.table.api.window.TimeWindow;
import org.apache.flink.table.codegen.GeneratedSubKeyedAggsHandleFunction;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.dataformat.JoinedRow;
import org.apache.flink.table.dataformat.util.BaseRowUtil;
import org.apache.flink.table.runtime.functions.ExecutionContext;
import org.apache.flink.table.runtime.functions.SubKeyedAggsHandleFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/table/runtime/window/aligned/BufferedAlignedWindowAggregator.class */
public final class BufferedAlignedWindowAggregator implements AlignedWindowAggregator<BaseRow, TimeWindow, BaseRow> {
    private static final long serialVersionUID = 1;
    private final TypeInformation<BaseRow> accTypeInfo;
    private final TypeInformation<BaseRow> aggResultTypeInfo;
    private final GeneratedSubKeyedAggsHandleFunction<TimeWindow> generatedWindowAggregator;
    private final long minibatchSize;
    private final boolean sendRetraction;
    private transient ExecutionContext ctx;
    private transient SubKeyedAggsHandleFunction<TimeWindow> windowAggregator;
    private transient TreeMap<TimeWindow, Map<BaseRow, BufferEntry>> buffer;
    private transient SubKeyedValueState<BaseRow, TimeWindow, BaseRow> windowAccState;
    private transient KeyedMapState<BaseRow, TimeWindow, Integer> bufferStatusState;
    private transient SubKeyedValueState<BaseRow, TimeWindow, BaseRow> previousState;
    private transient int numOfElements;
    private transient JoinedRow outRow;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/flink/table/runtime/window/aligned/BufferedAlignedWindowAggregator$BufferEntry.class */
    public static final class BufferEntry {
        private BaseRow accumulator;
        private boolean bufferIsFullData;
        private boolean deltaSinceLastFire;
        private boolean fired;

        protected BufferEntry(BaseRow baseRow, boolean z, boolean z2, boolean z3) {
            this.accumulator = baseRow;
            this.bufferIsFullData = z;
            this.deltaSinceLastFire = z2;
            this.fired = z3;
        }

        public static BufferEntry of(BaseRow baseRow) {
            return new BufferEntry(baseRow, true, true, false);
        }

        public void markNewElementReceived() {
            this.deltaSinceLastFire = true;
        }

        public void markBufferFlushed() {
            this.bufferIsFullData = false;
        }

        public void markFired() {
            this.deltaSinceLastFire = false;
            this.fired = true;
        }

        public BaseRow getAccumulator() {
            return this.accumulator;
        }

        public void setAccumulator(BaseRow baseRow) {
            this.accumulator = baseRow;
        }

        public boolean isFired() {
            return this.fired;
        }

        public boolean deltaSinceLastFire() {
            return this.deltaSinceLastFire;
        }

        public boolean bufferIsFullData() {
            return this.bufferIsFullData;
        }

        public int getStatus() {
            int i = 0;
            if (this.bufferIsFullData) {
                i = 0 | 1;
            }
            if (this.deltaSinceLastFire) {
                i |= 2;
            }
            if (this.fired) {
                i |= 4;
            }
            return i;
        }

        public void setStatus(int i) {
            this.bufferIsFullData = (i & 1) != 0;
            this.deltaSinceLastFire = (i & 2) != 0;
            this.fired = (i & 4) != 0;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            BufferEntry bufferEntry = (BufferEntry) obj;
            return this.bufferIsFullData == bufferEntry.bufferIsFullData && this.deltaSinceLastFire == bufferEntry.deltaSinceLastFire && this.fired == bufferEntry.fired && Objects.equals(this.accumulator, bufferEntry.accumulator);
        }

        public int hashCode() {
            return Objects.hash(this.accumulator, Boolean.valueOf(this.bufferIsFullData), Boolean.valueOf(this.deltaSinceLastFire), Boolean.valueOf(this.fired));
        }
    }

    public BufferedAlignedWindowAggregator(TypeInformation<BaseRow> typeInformation, TypeInformation<BaseRow> typeInformation2, GeneratedSubKeyedAggsHandleFunction<TimeWindow> generatedSubKeyedAggsHandleFunction, long j, boolean z) {
        this.accTypeInfo = typeInformation;
        this.aggResultTypeInfo = typeInformation2;
        this.generatedWindowAggregator = generatedSubKeyedAggsHandleFunction;
        this.minibatchSize = j;
        this.sendRetraction = z;
    }

    @Override // org.apache.flink.table.runtime.window.aligned.AlignedWindowAggregator
    public void open(ExecutionContext executionContext) throws Exception {
        this.ctx = executionContext;
        this.buffer = new TreeMap<>();
        this.numOfElements = 0;
        this.outRow = new JoinedRow();
        BaseRowUtil.setAccumulate(this.outRow);
        this.windowAggregator = (SubKeyedAggsHandleFunction) this.generatedWindowAggregator.newInstance(executionContext.getRuntimeContext().getUserCodeClassLoader());
        this.windowAggregator.open(executionContext);
        this.windowAccState = executionContext.getSubKeyedValueState(new ValueStateDescriptor("window-acc", this.accTypeInfo));
        this.bufferStatusState = executionContext.getKeyedMapState(new MapStateDescriptor("window-buffer", new TimeWindow.Serializer(), IntSerializer.INSTANCE));
        restore();
        if (this.sendRetraction) {
            this.previousState = executionContext.getSubKeyedValueState(new ValueStateDescriptor("previous-agg-result", this.aggResultTypeInfo));
        }
        executionContext.getRuntimeContext().getMetricGroup().gauge("bundleSize", (String) () -> {
            return Integer.valueOf(this.numOfElements);
        });
        executionContext.getRuntimeContext().getMetricGroup().gauge("bundleRatio", (String) () -> {
            int size = this.buffer.size();
            return size == 0 ? Double.valueOf(0.0d) : Double.valueOf((1.0d * this.numOfElements) / size);
        });
    }

    @Override // org.apache.flink.table.runtime.window.aligned.AlignedWindowAggregator
    public void addElement(BaseRow baseRow, TimeWindow timeWindow, BaseRow baseRow2) throws Exception {
        BaseRow accumulator;
        this.numOfElements++;
        Map map = (Map) this.buffer.computeIfAbsent(timeWindow, timeWindow2 -> {
            return new HashMap();
        });
        BufferEntry bufferEntry = (BufferEntry) map.get(baseRow);
        if (bufferEntry == null) {
            accumulator = this.windowAggregator.createAccumulators();
            bufferEntry = BufferEntry.of(accumulator);
            map.put(baseRow, bufferEntry);
        } else {
            accumulator = bufferEntry.getAccumulator();
        }
        bufferEntry.markNewElementReceived();
        this.windowAggregator.setAccumulators(null, accumulator);
        if (BaseRowUtil.isAccumulateMsg(baseRow2)) {
            this.windowAggregator.accumulate(baseRow2);
        } else {
            this.windowAggregator.retract(baseRow2);
        }
        bufferEntry.setAccumulator(this.windowAggregator.getAccumulators());
        if (this.minibatchSize <= 0 || this.numOfElements <= this.minibatchSize) {
            return;
        }
        snapshot();
    }

    @Override // org.apache.flink.table.runtime.window.aligned.AlignedWindowAggregator
    public void fireWindow(TimeWindow timeWindow, Collector<BaseRow> collector) throws Exception {
        Map<BaseRow, BufferEntry> map = this.buffer.get(timeWindow);
        if (map == null) {
            return;
        }
        for (Map.Entry<BaseRow, BufferEntry> entry : map.entrySet()) {
            BaseRow key = entry.getKey();
            BufferEntry value = entry.getValue();
            Preconditions.checkNotNull(value, "buffer is empty under key: %s window: %s", key, timeWindow);
            doFire(key, timeWindow, value, collector);
        }
    }

    private void doFire(BaseRow baseRow, TimeWindow timeWindow, BufferEntry bufferEntry, Collector<BaseRow> collector) throws Exception {
        this.ctx.setCurrentKey(baseRow);
        if (bufferEntry.deltaSinceLastFire()) {
            if (bufferEntry.bufferIsFullData()) {
                this.windowAggregator.setAccumulators(null, bufferEntry.getAccumulator());
            } else {
                BaseRow accumulator = bufferEntry.getAccumulator();
                this.windowAggregator.setAccumulators(timeWindow, (BaseRow) this.windowAccState.get(baseRow, timeWindow));
                this.windowAggregator.merge(timeWindow, accumulator);
                bufferEntry.setAccumulator(this.windowAggregator.createAccumulators());
                bufferEntry.markBufferFlushed();
            }
            BaseRow value = this.windowAggregator.getValue(timeWindow);
            if (this.sendRetraction) {
                if (bufferEntry.isFired()) {
                    this.outRow.replace(baseRow, (BaseRow) this.previousState.get(baseRow, timeWindow));
                    BaseRowUtil.setRetract(this.outRow);
                    collector.collect(this.outRow);
                }
                this.outRow.replace(baseRow, value);
                BaseRowUtil.setAccumulate(this.outRow);
                collector.collect(this.outRow);
                this.previousState.put(baseRow, timeWindow, value);
            } else {
                this.outRow.replace(baseRow, value);
                collector.collect(this.outRow);
            }
        }
        bufferEntry.markFired();
    }

    @Override // org.apache.flink.table.runtime.window.aligned.AlignedWindowAggregator
    public void expireWindow(TimeWindow timeWindow) throws Exception {
        Map<BaseRow, BufferEntry> remove = this.buffer.remove(timeWindow);
        if (remove == null) {
            return;
        }
        Iterator<Map.Entry<BaseRow, BufferEntry>> it = remove.entrySet().iterator();
        while (it.hasNext()) {
            BaseRow key = it.next().getKey();
            this.windowAccState.remove(key, timeWindow);
            this.bufferStatusState.remove(key, timeWindow);
            if (this.sendRetraction) {
                this.previousState.remove(key, timeWindow);
            }
            this.ctx.setCurrentKey(key);
            this.windowAggregator.cleanup(timeWindow);
        }
    }

    @Override // org.apache.flink.table.runtime.window.aligned.AlignedWindowAggregator
    public TimeWindow lowestWindow() {
        if (this.buffer.isEmpty()) {
            return null;
        }
        return this.buffer.firstKey();
    }

    @Override // org.apache.flink.table.runtime.window.aligned.AlignedWindowAggregator
    public Iterable<TimeWindow> ascendingWindows() {
        return this.buffer.navigableKeySet();
    }

    @Override // org.apache.flink.table.runtime.window.aligned.AlignedWindowAggregator
    public Iterable<TimeWindow> ascendingWindows(TimeWindow timeWindow) throws Exception {
        return this.buffer.navigableKeySet().tailSet(timeWindow);
    }

    @Override // org.apache.flink.table.runtime.window.aligned.AlignedWindowAggregator
    public void snapshot() throws Exception {
        this.numOfElements = 0;
        for (Map.Entry<TimeWindow, Map<BaseRow, BufferEntry>> entry : this.buffer.entrySet()) {
            TimeWindow key = entry.getKey();
            for (Map.Entry<BaseRow, BufferEntry> entry2 : entry.getValue().entrySet()) {
                BaseRow key2 = entry2.getKey();
                BufferEntry value = entry2.getValue();
                BaseRow accumulator = value.getAccumulator();
                this.ctx.setCurrentKey(key2);
                BaseRow baseRow = (BaseRow) this.windowAccState.get(key2, key);
                if (baseRow == null) {
                    baseRow = this.windowAggregator.createAccumulators();
                }
                this.windowAggregator.setAccumulators(key, baseRow);
                this.windowAggregator.merge(key, accumulator);
                this.windowAccState.put(key2, key, this.windowAggregator.getAccumulators());
                value.setAccumulator(this.windowAggregator.createAccumulators());
                value.markBufferFlushed();
                this.bufferStatusState.add(key2, key, Integer.valueOf(value.getStatus()));
            }
        }
    }

    public void restore() throws Exception {
        for (Map.Entry entry : this.bufferStatusState.getAll().entrySet()) {
            BaseRow baseRow = (BaseRow) entry.getKey();
            for (Map.Entry entry2 : ((Map) entry.getValue()).entrySet()) {
                TimeWindow timeWindow = (TimeWindow) entry2.getKey();
                Integer num = (Integer) entry2.getValue();
                BufferEntry of = BufferEntry.of(this.windowAggregator.createAccumulators());
                of.setStatus(num.intValue());
                ((Map) this.buffer.computeIfAbsent(timeWindow, timeWindow2 -> {
                    return new HashMap();
                })).put(baseRow, of);
            }
        }
    }

    @Override // org.apache.flink.table.runtime.window.aligned.AlignedWindowAggregator
    public void close() throws Exception {
    }
}
