/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.window.aligned;

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.TreeMap;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.runtime.state.keyed.KeyedMapState;
import org.apache.flink.runtime.state.subkeyed.SubKeyedValueState;
import org.apache.flink.table.api.window.TimeWindow;
import org.apache.flink.table.codegen.GeneratedSubKeyedAggsHandleFunction;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.dataformat.JoinedRow;
import org.apache.flink.table.dataformat.util.BaseRowUtil;
import org.apache.flink.table.runtime.functions.ExecutionContext;
import org.apache.flink.table.runtime.functions.SubKeyedAggsHandleFunction;
import org.apache.flink.table.runtime.window.aligned.AlignedWindowAggregator;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;

public abstract class BufferedAlignedWindowAggregator
implements AlignedWindowAggregator<BaseRow, TimeWindow, BaseRow> {
    private static final long serialVersionUID = 1L;
    private final TypeInformation<BaseRow> accTypeInfo;
    private final TypeInformation<BaseRow> aggResultTypeInfo;
    private final long minibatchSize;
    private final boolean sendRetraction;
    private final GeneratedSubKeyedAggsHandleFunction<TimeWindow> generatedWindowAggregator;
    protected transient SubKeyedAggsHandleFunction<TimeWindow> windowAggregator;
    private transient ExecutionContext ctx;
    protected transient TreeMap<TimeWindow, Map<BaseRow, BufferEntry>> buffer;
    private transient SubKeyedValueState<BaseRow, TimeWindow, BaseRow> windowAccState;
    private transient KeyedMapState<BaseRow, TimeWindow, Integer> bufferStatusState;
    private transient SubKeyedValueState<BaseRow, TimeWindow, BaseRow> previousState;
    private transient int numOfElements;
    private transient JoinedRow outRow;
    private transient Collector<BaseRow> out;

    public BufferedAlignedWindowAggregator(TypeInformation<BaseRow> accTypeInfo, TypeInformation<BaseRow> aggResultTypeInfo, GeneratedSubKeyedAggsHandleFunction<TimeWindow> generatedWindowAggregator, long minibatchSize, boolean sendRetraction) {
        this.accTypeInfo = accTypeInfo;
        this.aggResultTypeInfo = aggResultTypeInfo;
        this.minibatchSize = minibatchSize;
        this.sendRetraction = sendRetraction;
        this.generatedWindowAggregator = generatedWindowAggregator;
    }

    @Override
    public void open(AlignedWindowAggregator.Context context) throws Exception {
        this.ctx = context.getExecutionContext();
        this.buffer = new TreeMap();
        this.numOfElements = 0;
        this.outRow = new JoinedRow();
        this.out = context.getCollector();
        BaseRowUtil.setAccumulate(this.outRow);
        this.windowAggregator = (SubKeyedAggsHandleFunction)this.generatedWindowAggregator.newInstance(this.ctx.getRuntimeContext().getUserCodeClassLoader());
        this.windowAggregator.open(this.ctx);
        ValueStateDescriptor<BaseRow> windowStateDescriptor = new ValueStateDescriptor<BaseRow>("window-acc", this.accTypeInfo);
        this.windowAccState = this.ctx.getSubKeyedValueState(windowStateDescriptor);
        MapStateDescriptor<TimeWindow, Integer> bufferStatusStateDescriptor = new MapStateDescriptor<TimeWindow, Integer>("window-buffer", new TimeWindow.Serializer(), IntSerializer.INSTANCE);
        this.bufferStatusState = this.ctx.getKeyedMapState(bufferStatusStateDescriptor);
        this.restore();
        if (this.sendRetraction) {
            ValueStateDescriptor<BaseRow> previousStateDescriptor = new ValueStateDescriptor<BaseRow>("previous-agg-result", this.aggResultTypeInfo);
            this.previousState = this.ctx.getSubKeyedValueState(previousStateDescriptor);
        }
        this.ctx.getRuntimeContext().getMetricGroup().gauge("bundleSize", () -> this.numOfElements);
        this.ctx.getRuntimeContext().getMetricGroup().gauge("bundleRatio", () -> {
            int numOfKeys = this.buffer.size();
            if (numOfKeys == 0) {
                return 0.0;
            }
            return 1.0 * (double)this.numOfElements / (double)numOfKeys;
        });
    }

    @Override
    public void addElement(BaseRow key, TimeWindow window, BaseRow input) throws Exception {
        BaseRow accumulator;
        ++this.numOfElements;
        Map keyAccs = this.buffer.computeIfAbsent(window, k -> new HashMap());
        BufferEntry bufferEntry = (BufferEntry)keyAccs.get(key);
        if (bufferEntry == null) {
            accumulator = this.windowAggregator.createAccumulators();
            bufferEntry = BufferEntry.of(accumulator);
            keyAccs.put(key, bufferEntry);
        } else {
            accumulator = bufferEntry.getAccumulator();
        }
        bufferEntry.markNewElementReceived();
        accumulator = this.processInput(window, input, accumulator);
        bufferEntry.setAccumulator(accumulator);
        if (this.minibatchSize > 0L && (long)this.numOfElements > this.minibatchSize) {
            this.snapshot();
        }
    }

    protected abstract BaseRow processInput(TimeWindow var1, BaseRow var2, BaseRow var3) throws Exception;

    private void doFire(BaseRow key, TimeWindow window, BufferEntry bufferEntry) throws Exception {
        this.ctx.setCurrentKey(key);
        if (bufferEntry.deltaSinceLastFire()) {
            if (bufferEntry.bufferIsFullData()) {
                BaseRow acc = bufferEntry.getAccumulator();
                this.windowAggregator.setAccumulators(null, acc);
            } else {
                BaseRow bufferAcc = bufferEntry.getAccumulator();
                BaseRow stateAcc = (BaseRow)this.windowAccState.get((Object)key, (Object)window);
                this.windowAggregator.setAccumulators(window, stateAcc);
                this.windowAggregator.merge(window, bufferAcc);
                bufferEntry.setAccumulator(this.windowAggregator.createAccumulators());
                bufferEntry.markBufferFlushed();
            }
            BaseRow aggResult = this.windowAggregator.getValue(window);
            if (this.sendRetraction) {
                if (bufferEntry.isFired()) {
                    BaseRow previousAggResult = (BaseRow)this.previousState.get((Object)key, (Object)window);
                    this.outRow.replace(key, previousAggResult);
                    BaseRowUtil.setRetract(this.outRow);
                    this.out.collect(this.outRow);
                }
                this.outRow.replace(key, aggResult);
                BaseRowUtil.setAccumulate(this.outRow);
                this.out.collect(this.outRow);
                this.previousState.put((Object)key, (Object)window, (Object)aggResult);
            } else {
                this.outRow.replace(key, aggResult);
                this.out.collect(this.outRow);
            }
        }
        bufferEntry.markFired();
    }

    @Override
    public void fireWindow(TimeWindow window) throws Exception {
        Map<BaseRow, BufferEntry> fired = this.buffer.get(window);
        if (fired == null) {
            return;
        }
        for (Map.Entry<BaseRow, BufferEntry> entry : fired.entrySet()) {
            BaseRow key = entry.getKey();
            BufferEntry bufferEntry = entry.getValue();
            Preconditions.checkNotNull(bufferEntry, "buffer is empty under key: %s window: %s", key, window);
            this.doFire(key, window, bufferEntry);
        }
    }

    @Override
    public void expireWindow(TimeWindow window) throws Exception {
        Map<BaseRow, BufferEntry> removed = this.buffer.remove(window);
        if (removed == null) {
            return;
        }
        for (Map.Entry<BaseRow, BufferEntry> entry : removed.entrySet()) {
            BaseRow key = entry.getKey();
            this.windowAccState.remove((Object)key, (Object)window);
            this.bufferStatusState.remove((Object)key, (Object)window);
            if (this.sendRetraction) {
                this.previousState.remove((Object)key, (Object)window);
            }
            this.ctx.setCurrentKey(key);
            this.windowAggregator.cleanup(window);
            this.numOfElements -= entry.getValue().getNumOfElements();
        }
    }

    @Override
    public void expireAllWindows() throws Exception {
        for (TimeWindow w : this.windows()) {
            this.expireWindow(w);
        }
    }

    @Override
    public TimeWindow lowestWindow() {
        if (this.buffer.isEmpty()) {
            return null;
        }
        return this.buffer.firstKey();
    }

    @Override
    public Iterable<TimeWindow> windows() throws Exception {
        return this.buffer.keySet();
    }

    @Override
    public Iterable<TimeWindow> ascendingWindows() {
        return this.buffer.navigableKeySet();
    }

    @Override
    public Iterable<TimeWindow> ascendingWindows(TimeWindow fromWindow) throws Exception {
        return this.buffer.navigableKeySet().tailSet(fromWindow);
    }

    @Override
    public void snapshot() throws Exception {
        this.numOfElements = 0;
        BaseRow originalKey = this.ctx.currentKey();
        for (Map.Entry<TimeWindow, Map<BaseRow, BufferEntry>> entry : this.buffer.entrySet()) {
            TimeWindow window = entry.getKey();
            for (Map.Entry<BaseRow, BufferEntry> keyAccEntry : entry.getValue().entrySet()) {
                BaseRow key = keyAccEntry.getKey();
                BufferEntry bufferEntry = keyAccEntry.getValue();
                if (!bufferEntry.deltaSinceSnapshot()) continue;
                BaseRow bufferAcc = bufferEntry.getAccumulator();
                this.ctx.setCurrentKey(key);
                BaseRow stateAcc = (BaseRow)this.windowAccState.get((Object)key, (Object)window);
                if (stateAcc == null) {
                    stateAcc = this.windowAggregator.createAccumulators();
                }
                this.windowAggregator.setAccumulators(window, stateAcc);
                this.windowAggregator.merge(window, bufferAcc);
                this.windowAccState.put((Object)key, (Object)window, (Object)this.windowAggregator.getAccumulators());
                bufferEntry.setAccumulator(this.windowAggregator.createAccumulators());
                bufferEntry.markBufferFlushed();
                int status = bufferEntry.getStatus();
                this.bufferStatusState.add((Object)key, (Object)window, (Object)status);
            }
        }
        this.ctx.setCurrentKey(originalKey);
    }

    protected void restore() throws Exception {
        Map bufferStatus = this.bufferStatusState.getAll();
        for (Map.Entry entry : bufferStatus.entrySet()) {
            BaseRow key = (BaseRow)entry.getKey();
            for (Map.Entry windowStatus : ((Map)entry.getValue()).entrySet()) {
                TimeWindow window = (TimeWindow)windowStatus.getKey();
                Integer status = (Integer)windowStatus.getValue();
                BufferEntry bufferEntry = BufferEntry.of(this.windowAggregator.createAccumulators());
                bufferEntry.setStatus(status);
                Map keyAccs = this.buffer.computeIfAbsent(window, k -> new HashMap());
                keyAccs.put(key, bufferEntry);
            }
        }
    }

    @Override
    public void close() throws Exception {
    }

    protected static final class BufferEntry {
        private BaseRow accumulator;
        private int numOfElements;
        private boolean bufferIsFullData;
        private boolean deltaSinceSnapshot;
        private boolean deltaSinceLastFire;
        private boolean fired;

        protected BufferEntry(BaseRow accumulator, int numOfElements, boolean bufferIsFullData, boolean deltaSinceSnapshot, boolean deltaSinceLastFire, boolean fired) {
            this.numOfElements = numOfElements;
            this.accumulator = accumulator;
            this.bufferIsFullData = bufferIsFullData;
            this.deltaSinceSnapshot = deltaSinceSnapshot;
            this.deltaSinceLastFire = deltaSinceLastFire;
            this.fired = fired;
        }

        public static BufferEntry of(BaseRow accumulator) {
            return new BufferEntry(accumulator, 0, true, false, true, false);
        }

        public void markNewElementReceived() {
            this.deltaSinceLastFire = true;
            this.deltaSinceSnapshot = true;
            ++this.numOfElements;
        }

        public void markBufferFlushed() {
            this.bufferIsFullData = false;
            this.deltaSinceSnapshot = false;
            this.numOfElements = 0;
        }

        public void markFired() {
            this.deltaSinceLastFire = false;
            this.fired = true;
        }

        public BaseRow getAccumulator() {
            return this.accumulator;
        }

        public int getNumOfElements() {
            return this.numOfElements;
        }

        public void setAccumulator(BaseRow accumulator) {
            this.accumulator = accumulator;
        }

        public boolean isFired() {
            return this.fired;
        }

        public boolean deltaSinceLastFire() {
            return this.deltaSinceLastFire;
        }

        public boolean bufferIsFullData() {
            return this.bufferIsFullData;
        }

        public boolean deltaSinceSnapshot() {
            return this.deltaSinceSnapshot;
        }

        public int getStatus() {
            int status = 0;
            if (this.bufferIsFullData) {
                status |= 1;
            }
            if (this.deltaSinceSnapshot) {
                status |= 2;
            }
            if (this.deltaSinceLastFire) {
                status |= 4;
            }
            if (this.fired) {
                status |= 8;
            }
            return status;
        }

        public void setStatus(int status) {
            this.bufferIsFullData = (status & 1) != 0;
            this.deltaSinceSnapshot = (status & 2) != 0;
            this.deltaSinceLastFire = (status & 4) != 0;
            this.fired = (status & 8) != 0;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            BufferEntry that = (BufferEntry)o;
            return this.bufferIsFullData == that.bufferIsFullData && this.deltaSinceSnapshot == that.deltaSinceSnapshot && this.deltaSinceLastFire == that.deltaSinceLastFire && this.fired == that.fired && Objects.equals(this.accumulator, that.accumulator);
        }

        public int hashCode() {
            return Objects.hash(this.accumulator, this.bufferIsFullData, this.deltaSinceSnapshot, this.deltaSinceLastFire, this.fired);
        }
    }
}

