/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.window.internal;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.state.keyed.KeyedMapState;
import org.apache.flink.runtime.state.keyed.KeyedMapStateDescriptor;
import org.apache.flink.table.api.window.Window;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.runtime.functions.SubKeyedAggsHandleFunction;
import org.apache.flink.table.runtime.window.assigners.MergingWindowAssigner;
import org.apache.flink.table.runtime.window.internal.InternalWindowProcessFunction;
import org.apache.flink.table.runtime.window.internal.MergingWindowSet;

public class MergingWindowProcessFunction<K, W extends Window>
extends InternalWindowProcessFunction<K, W> {
    private static final long serialVersionUID = -2866771637946397223L;
    private final MergingWindowAssigner<W> windowAssigner;
    private final TypeSerializer<K> keySerializer;
    private final TypeSerializer<W> windowSerializer;
    private transient MergingWindowSet<K, W> mergingWindows;
    private transient MergingFunctionImpl mergingFunction;
    private List<W> reuseActualWindows;

    public MergingWindowProcessFunction(MergingWindowAssigner<W> windowAssigner, SubKeyedAggsHandleFunction<W> windowAggregator, TypeSerializer<K> keySerializer, TypeSerializer<W> windowSerializer, long allowedLateness) {
        super(windowAssigner, windowAggregator, allowedLateness);
        this.windowAssigner = windowAssigner;
        this.keySerializer = keySerializer;
        this.windowSerializer = windowSerializer;
    }

    @Override
    public void open(InternalWindowProcessFunction.Context<K, W> ctx) throws Exception {
        super.open(ctx);
        KeyedMapStateDescriptor mappingStateDescriptor = new KeyedMapStateDescriptor("session-window-mapping", this.keySerializer, this.windowSerializer, this.windowSerializer);
        KeyedMapState windowMapping = (KeyedMapState)ctx.getKeyedState(mappingStateDescriptor);
        this.mergingWindows = new MergingWindowSet(this.windowAssigner, windowMapping);
        this.mergingFunction = new MergingFunctionImpl();
    }

    @Override
    public Collection<W> assignStateNamespace(BaseRow inputRow, long timestamp) throws Exception {
        Collection elementWindows = this.windowAssigner.assignWindows(inputRow, timestamp);
        this.mergingWindows.initializeCache(this.ctx.currentKey());
        this.reuseActualWindows = new ArrayList<W>(1);
        for (Window window : elementWindows) {
            Window actualWindow = this.mergingWindows.addWindow(this.ctx.currentKey(), window, this.mergingFunction);
            if (this.isWindowLate(actualWindow)) {
                this.mergingWindows.retireWindow(this.ctx.currentKey(), actualWindow);
                continue;
            }
            this.reuseActualWindows.add(actualWindow);
        }
        ArrayList<Window> affectedWindows = new ArrayList<Window>(this.reuseActualWindows.size());
        for (Window actual : this.reuseActualWindows) {
            affectedWindows.add(this.mergingWindows.getStateWindow(this.ctx.currentKey(), actual));
        }
        return affectedWindows;
    }

    @Override
    public Collection<W> assignActualWindows(BaseRow inputRow, long timestamp) throws Exception {
        return this.reuseActualWindows;
    }

    @Override
    public BaseRow getWindowAggregationResult(W window) throws Exception {
        W stateWindow = this.mergingWindows.getStateWindow(this.ctx.currentKey(), window);
        BaseRow acc = this.ctx.getWindowAccumulators(stateWindow);
        if (acc == null) {
            acc = this.windowAggregator.createAccumulators();
        }
        this.windowAggregator.setAccumulators(stateWindow, acc);
        return this.windowAggregator.getValue(window);
    }

    @Override
    public void cleanWindowIfNeeded(W window, long currentTime2) throws Exception {
        if (this.isCleanupTime(window, currentTime2)) {
            this.ctx.clearTrigger(window);
            W stateWindow = this.mergingWindows.getStateWindow(this.ctx.currentKey(), window);
            this.ctx.clearWindowState(stateWindow);
            this.mergingWindows.initializeCache(this.ctx.currentKey());
            this.mergingWindows.retireWindow(this.ctx.currentKey(), window);
        }
    }

    private class MergingFunctionImpl
    implements MergingWindowSet.MergeFunction<W> {
        private MergingFunctionImpl() {
        }

        @Override
        public void merge(W mergeResult, Collection<W> mergedWindows, W stateWindowResult, Collection<W> stateWindowsToBeMerged) throws Exception {
            if (MergingWindowProcessFunction.this.windowAssigner.isEventTime() && ((Window)mergeResult).maxTimestamp() + MergingWindowProcessFunction.this.allowedLateness <= MergingWindowProcessFunction.this.ctx.currentWatermark()) {
                throw new UnsupportedOperationException("The end timestamp of an event-time window cannot become earlier than the current watermark by merging. Current watermark: " + MergingWindowProcessFunction.this.ctx.currentWatermark() + " window: " + mergeResult);
            }
            if (!MergingWindowProcessFunction.this.windowAssigner.isEventTime() && ((Window)mergeResult).maxTimestamp() <= MergingWindowProcessFunction.this.ctx.currentProcessingTime()) {
                throw new UnsupportedOperationException("The end timestamp of a processing-time window cannot become earlier than the current processing time by merging. Current processing time: " + MergingWindowProcessFunction.this.ctx.currentProcessingTime() + " window: " + mergeResult);
            }
            MergingWindowProcessFunction.this.ctx.onMerge(mergeResult, stateWindowsToBeMerged);
            for (Window m : mergedWindows) {
                MergingWindowProcessFunction.this.ctx.clearTrigger(m);
                MergingWindowProcessFunction.this.ctx.deleteCleanupTimer(m);
            }
            if (!stateWindowsToBeMerged.isEmpty()) {
                BaseRow targetAcc = MergingWindowProcessFunction.this.ctx.getWindowAccumulators(stateWindowResult);
                if (targetAcc == null) {
                    targetAcc = MergingWindowProcessFunction.this.windowAggregator.createAccumulators();
                }
                MergingWindowProcessFunction.this.windowAggregator.setAccumulators(stateWindowResult, targetAcc);
                for (Window w : stateWindowsToBeMerged) {
                    BaseRow acc = MergingWindowProcessFunction.this.ctx.getWindowAccumulators(w);
                    if (acc != null) {
                        MergingWindowProcessFunction.this.windowAggregator.merge(w, acc);
                    }
                    MergingWindowProcessFunction.this.ctx.clearWindowState(w);
                    MergingWindowProcessFunction.this.ctx.clearPreviousState(w);
                }
                targetAcc = MergingWindowProcessFunction.this.windowAggregator.getAccumulators();
                MergingWindowProcessFunction.this.ctx.setWindowAccumulators(stateWindowResult, targetAcc);
            }
        }
    }
}

