/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators.windowing;

import java.util.Collection;
import java.util.List;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.state.AppendingState;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.state.subkeyed.SubKeyedListState;
import org.apache.flink.runtime.state.subkeyed.SubKeyedListStateDescriptor;
import org.apache.flink.shaded.guava18.com.google.common.base.Function;
import org.apache.flink.shaded.guava18.com.google.common.collect.FluentIterable;
import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.evictors.Evictor;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.operators.windowing.MergingWindowSet;
import org.apache.flink.streaming.runtime.operators.windowing.TimestampedValue;
import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.OutputTag;
import org.apache.flink.util.Preconditions;

@Internal
public class EvictingWindowOperator<K, IN, OUT, W extends Window>
extends WindowOperator<K, IN, Iterable<IN>, OUT, W> {
    private static final long serialVersionUID = 1L;
    private final Evictor<? super IN, ? super W> evictor;
    private final StateDescriptor<? extends ListState<StreamRecord<IN>>, ?> evictingWindowStateDescriptor;
    private transient EvictorContext evictorContext;
    private transient SubKeyedListState<Object, W, StreamRecord<IN>> evictingWindowState;

    public EvictingWindowOperator(WindowAssigner<? super IN, W> windowAssigner, TypeSerializer<W> windowSerializer, KeySelector<IN, K> keySelector, TypeSerializer<K> keySerializer, StateDescriptor<? extends ListState<StreamRecord<IN>>, ?> windowStateDescriptor, InternalWindowFunction<Iterable<IN>, OUT, K, W> windowFunction, Trigger<? super IN, ? super W> trigger, Evictor<? super IN, ? super W> evictor, long allowedLateness, OutputTag<IN> lateDataOutputTag) {
        super(windowAssigner, windowSerializer, keySelector, keySerializer, null, windowFunction, trigger, allowedLateness, lateDataOutputTag);
        this.evictor = (Evictor)Preconditions.checkNotNull(evictor);
        this.evictingWindowStateDescriptor = (StateDescriptor)Preconditions.checkNotNull(windowStateDescriptor);
    }

    @Override
    public void processElement(StreamRecord<IN> element) throws Exception {
        Collection elementWindows = this.windowAssigner.assignWindows(element.getValue(), element.getTimestamp(), this.windowAssignerContext);
        boolean isSkippedElement = true;
        final Object key = this.getKeyContext().getCurrentKey();
        if (this.windowAssigner instanceof MergingWindowAssigner) {
            MergingWindowSet<Window> mergingWindows = this.getMergingWindowSet();
            for (Window window : elementWindows) {
                Window actualWindow = mergingWindows.addWindow(window, new MergingWindowSet.MergeFunction<W>(){

                    @Override
                    public void merge(W mergeResult, Collection<W> mergedWindows, W stateWindowResult, Collection<W> mergedStateWindows) throws Exception {
                        if (EvictingWindowOperator.this.windowAssigner.isEventTime() && ((Window)mergeResult).maxTimestamp() + EvictingWindowOperator.this.allowedLateness <= EvictingWindowOperator.this.internalTimerService.currentWatermark()) {
                            throw new UnsupportedOperationException("The end timestamp of an event-time window cannot become earlier than the current watermark by merging. Current watermark: " + EvictingWindowOperator.this.internalTimerService.currentWatermark() + " window: " + mergeResult);
                        }
                        if (!EvictingWindowOperator.this.windowAssigner.isEventTime() && ((Window)mergeResult).maxTimestamp() <= EvictingWindowOperator.this.internalTimerService.currentProcessingTime()) {
                            throw new UnsupportedOperationException("The end timestamp of a processing-time window cannot become earlier than the current processing time by merging. Current processing time: " + EvictingWindowOperator.this.internalTimerService.currentProcessingTime() + " window: " + mergeResult);
                        }
                        EvictingWindowOperator.this.triggerContext.key = key;
                        EvictingWindowOperator.this.triggerContext.window = mergeResult;
                        EvictingWindowOperator.this.triggerContext.onMerge(mergedWindows);
                        for (Window m : mergedWindows) {
                            EvictingWindowOperator.this.triggerContext.window = m;
                            EvictingWindowOperator.this.triggerContext.clear();
                            EvictingWindowOperator.this.deleteCleanupTimer(m);
                        }
                        EvictingWindowOperator.this.mergeNamespaces(EvictingWindowOperator.this.evictingWindowState, stateWindowResult, mergedStateWindows);
                    }
                });
                if (this.isWindowLate(actualWindow)) {
                    mergingWindows.retireWindow(actualWindow);
                    continue;
                }
                isSkippedElement = false;
                Window stateWindow = mergingWindows.getStateWindow(actualWindow);
                if (stateWindow == null) {
                    throw new IllegalStateException("Window " + window + " is not in in-flight window set.");
                }
                this.evictingWindowState.add(this.getCurrentKey(), (Object)stateWindow, element);
                this.triggerContext.key = key;
                this.triggerContext.window = actualWindow;
                this.evictorContext.key = key;
                this.evictorContext.window = actualWindow;
                TriggerResult triggerResult = this.triggerContext.onElement(element);
                if (triggerResult.isFire()) {
                    Iterable contents = (Iterable)this.evictingWindowState.get(this.getCurrentKey(), (Object)stateWindow);
                    if (contents == null) continue;
                    this.emitWindowContents(actualWindow, contents, this.evictingWindowState, stateWindow);
                }
                if (triggerResult.isPurge()) {
                    this.evictingWindowState.remove(this.getCurrentKey(), (Object)stateWindow);
                }
                this.registerCleanupTimer(actualWindow);
            }
            mergingWindows.persist();
        } else {
            for (Window window : elementWindows) {
                if (this.isWindowLate(window)) continue;
                isSkippedElement = false;
                this.evictingWindowState.add(this.getCurrentKey(), (Object)window, element);
                this.triggerContext.key = key;
                this.triggerContext.window = window;
                this.evictorContext.key = key;
                this.evictorContext.window = window;
                TriggerResult triggerResult = this.triggerContext.onElement(element);
                if (triggerResult.isFire()) {
                    Iterable contents = (Iterable)this.evictingWindowState.get(this.getCurrentKey(), (Object)window);
                    if (contents == null) continue;
                    this.emitWindowContents(window, contents, this.evictingWindowState, window);
                }
                if (triggerResult.isPurge()) {
                    this.evictingWindowState.remove(this.getCurrentKey(), (Object)window);
                }
                this.registerCleanupTimer(window);
            }
        }
        if (isSkippedElement && this.isElementLate(element)) {
            if (this.lateDataOutputTag != null) {
                this.sideOutput(element);
            } else {
                this.numLateRecordsDropped.inc();
            }
        }
    }

    /*
     * Exception decompiling
     */
    @Override
    public void onEventTime(InternalTimer<K, W> timer) throws Exception {
        /*
         * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
         * 
         * java.lang.NullPointerException: Cannot invoke "org.benf.cfr.reader.bytecode.analysis.types.GenericTypeBinder.getBindingFor(org.benf.cfr.reader.bytecode.analysis.types.JavaTypeInstance)" because "res" is null
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.op3rewriters.GenericInferer.getGtbNullFiltered(GenericInferer.java:87)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.op3rewriters.GenericInferer.inferGenericObjectInfoFromCalls(GenericInferer.java:139)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:484)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
         *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
         *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
         *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
         *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
         *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
         *     at org.benf.cfr.reader.Main.main(Main.java:54)
         */
        throw new IllegalStateException("Decompilation failed");
    }

    /*
     * Exception decompiling
     */
    @Override
    public void onProcessingTime(InternalTimer<K, W> timer) throws Exception {
        /*
         * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
         * 
         * java.lang.NullPointerException: Cannot invoke "org.benf.cfr.reader.bytecode.analysis.types.GenericTypeBinder.getBindingFor(org.benf.cfr.reader.bytecode.analysis.types.JavaTypeInstance)" because "res" is null
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.op3rewriters.GenericInferer.getGtbNullFiltered(GenericInferer.java:87)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.op3rewriters.GenericInferer.inferGenericObjectInfoFromCalls(GenericInferer.java:139)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:484)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
         *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
         *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
         *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
         *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
         *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
         *     at org.benf.cfr.reader.Main.main(Main.java:54)
         */
        throw new IllegalStateException("Decompilation failed");
    }

    private void emitWindowContents(W window, Iterable<StreamRecord<IN>> contents, SubKeyedListState<Object, W, StreamRecord<IN>> windowState, W currNamespace) throws Exception {
        this.timestampedCollector.setAbsoluteTimestamp(((Window)window).maxTimestamp());
        FluentIterable recordsWithTimestamp = FluentIterable.from(contents).transform(new Function<StreamRecord<IN>, TimestampedValue<IN>>(){

            public TimestampedValue<IN> apply(StreamRecord<IN> input) {
                return TimestampedValue.from(input);
            }
        });
        this.evictorContext.evictBefore(recordsWithTimestamp, Iterables.size((Iterable)recordsWithTimestamp));
        FluentIterable projectedContents = recordsWithTimestamp.transform(new Function<TimestampedValue<IN>, IN>(){

            public IN apply(TimestampedValue<IN> input) {
                return input.getValue();
            }
        });
        this.processContext.window = this.triggerContext.window;
        ((InternalWindowFunction)this.userFunction).process(this.triggerContext.key, this.triggerContext.window, this.processContext, projectedContents, this.timestampedCollector);
        this.evictorContext.evictAfter(recordsWithTimestamp, Iterables.size((Iterable)recordsWithTimestamp));
        windowState.remove(this.getCurrentKey(), currNamespace);
        for (TimestampedValue record : recordsWithTimestamp) {
            windowState.add(this.getCurrentKey(), currNamespace, record.getStreamRecord());
        }
    }

    private void clearAllState(W window, SubKeyedListState<Object, W, StreamRecord<IN>> windowState, MergingWindowSet<W> mergingWindows, W currNamespace) throws Exception {
        windowState.remove(this.getCurrentKey(), currNamespace);
        this.triggerContext.clear();
        this.processContext.window = window;
        this.processContext.clear();
        if (mergingWindows != null) {
            mergingWindows.retireWindow(window);
            mergingWindows.persist();
        }
    }

    @Override
    public void open() throws Exception {
        super.open();
        this.evictorContext = new EvictorContext(this, null, null);
        SubKeyedListStateDescriptor subKeyedListStateDescriptor = new SubKeyedListStateDescriptor(this.evictingWindowStateDescriptor.getName(), this.getKeySerializer(), this.windowSerializer, ((ListStateDescriptor)this.evictingWindowStateDescriptor).getElementSerializer());
        this.evictingWindowState = (SubKeyedListState)this.getSubKeyedState(subKeyedListStateDescriptor);
    }

    @Override
    public void close() throws Exception {
        super.close();
        this.evictorContext = null;
    }

    @Override
    public void dispose() throws Exception {
        super.dispose();
        this.evictorContext = null;
    }

    @VisibleForTesting
    public Evictor<? super IN, ? super W> getEvictor() {
        return this.evictor;
    }

    @Override
    @VisibleForTesting
    public StateDescriptor<? extends AppendingState<IN, Iterable<IN>>, ?> getStateDescriptor() {
        return this.evictingWindowStateDescriptor;
    }

    private void mergeNamespaces(SubKeyedListState<Object, W, StreamRecord<IN>> state, W target, Collection<W> windows) {
        if (windows != null) {
            for (Window window : windows) {
                List list = (List)state.get(this.getCurrentKey(), (Object)window);
                state.addAll(this.getCurrentKey(), target, (Collection)list);
                state.remove(this.getCurrentKey(), (Object)window);
            }
        }
    }

    static class EvictorContext
    implements Evictor.EvictorContext {
        protected K key;
        protected W window;
        final /* synthetic */ EvictingWindowOperator this$0;

        public EvictorContext(K key, W window) {
            this.this$0 = this$0;
            this.key = key;
            this.window = window;
        }

        @Override
        public long getCurrentProcessingTime() {
            return this.this$0.internalTimerService.currentProcessingTime();
        }

        @Override
        public long getCurrentWatermark() {
            return this.this$0.internalTimerService.currentWatermark();
        }

        @Override
        public MetricGroup getMetricGroup() {
            return this.this$0.getMetricGroup();
        }

        public K getKey() {
            return this.key;
        }

        void evictBefore(Iterable<TimestampedValue<IN>> elements, int size) {
            this.this$0.evictor.evictBefore(elements, size, this.window, this);
        }

        void evictAfter(Iterable<TimestampedValue<IN>> elements, int size) {
            this.this$0.evictor.evictAfter(elements, size, this.window, this);
        }
    }
}

