/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators.windowing;

import java.io.Serializable;
import java.util.Collection;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.AggregatingState;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.state.AppendingState;
import org.apache.flink.api.common.state.FoldingState;
import org.apache.flink.api.common.state.FoldingStateDescriptor;
import org.apache.flink.api.common.state.KeyedStateStore;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.MergingState;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateBinder;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.state.DefaultKeyedStateStore;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.internal.InternalAppendingState;
import org.apache.flink.runtime.state.internal.InternalListState;
import org.apache.flink.runtime.state.internal.InternalMergingState;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.api.windowing.assigners.BaseAlignedWindowAssigner;
import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.operators.windowing.MergingWindowSet;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.OutputTag;
import org.apache.flink.util.Preconditions;

@Internal
public class WindowOperator<K, IN, ACC, OUT, W extends Window>
extends AbstractUdfStreamOperator<OUT, InternalWindowFunction<ACC, OUT, K, W>>
implements OneInputStreamOperator<IN, OUT>,
Triggerable<K, W> {
    private static final long serialVersionUID = 1L;
    protected final WindowAssigner<? super IN, W> windowAssigner;
    private final KeySelector<IN, K> keySelector;
    private final Trigger<? super IN, ? super W> trigger;
    private final StateDescriptor<? extends AppendingState<IN, ACC>, ?> windowStateDescriptor;
    protected final TypeSerializer<K> keySerializer;
    protected final TypeSerializer<W> windowSerializer;
    protected final long allowedLateness;
    protected final OutputTag<IN> lateDataOutputTag;
    private static final String LATE_ELEMENTS_DROPPED_METRIC_NAME = "numLateRecordsDropped";
    protected transient Counter numLateRecordsDropped;
    private transient InternalAppendingState<K, W, IN, ACC, ACC> windowState;
    private transient InternalMergingState<K, W, IN, ACC, ACC> windowMergingState;
    private transient InternalListState<K, VoidNamespace, Tuple2<W, W>> mergingSetsState;
    protected transient TimestampedCollector<OUT> timestampedCollector;
    protected transient Context triggerContext = new Context(this, null, null);
    protected transient WindowContext processContext;
    protected transient WindowAssigner.WindowAssignerContext windowAssignerContext;
    protected transient InternalTimerService<W> internalTimerService;

    public WindowOperator(WindowAssigner<? super IN, W> windowAssigner, TypeSerializer<W> windowSerializer, KeySelector<IN, K> keySelector, TypeSerializer<K> keySerializer, StateDescriptor<? extends AppendingState<IN, ACC>, ?> windowStateDescriptor, InternalWindowFunction<ACC, OUT, K, W> windowFunction, Trigger<? super IN, ? super W> trigger, long allowedLateness, OutputTag<IN> lateDataOutputTag) {
        super(windowFunction);
        Preconditions.checkArgument((!(windowAssigner instanceof BaseAlignedWindowAssigner) ? 1 : 0) != 0, (Object)("The " + windowAssigner.getClass().getSimpleName() + " cannot be used with a WindowOperator. This assigner is only used with the AccumulatingProcessingTimeWindowOperator and the AggregatingProcessingTimeWindowOperator"));
        Preconditions.checkArgument((allowedLateness >= 0L ? 1 : 0) != 0);
        Preconditions.checkArgument((windowStateDescriptor == null || windowStateDescriptor.isSerializerInitialized() ? 1 : 0) != 0, (Object)"window state serializer is not properly initialized");
        this.windowAssigner = (WindowAssigner)Preconditions.checkNotNull(windowAssigner);
        this.windowSerializer = (TypeSerializer)Preconditions.checkNotNull(windowSerializer);
        this.keySelector = (KeySelector)Preconditions.checkNotNull(keySelector);
        this.keySerializer = (TypeSerializer)Preconditions.checkNotNull(keySerializer);
        this.windowStateDescriptor = windowStateDescriptor;
        this.trigger = (Trigger)Preconditions.checkNotNull(trigger);
        this.allowedLateness = allowedLateness;
        this.lateDataOutputTag = lateDataOutputTag;
        this.setChainingStrategy(ChainingStrategy.ALWAYS);
    }

    @Override
    public void open() throws Exception {
        super.open();
        this.numLateRecordsDropped = this.metrics.counter(LATE_ELEMENTS_DROPPED_METRIC_NAME);
        this.timestampedCollector = new TimestampedCollector(this.output);
        this.internalTimerService = this.getInternalTimerService("window-timers", this.windowSerializer, this);
        this.triggerContext = new Context(this, null, null);
        this.processContext = new WindowContext(this, null);
        this.windowAssignerContext = new WindowAssigner.WindowAssignerContext(){

            @Override
            public long getCurrentProcessingTime() {
                return WindowOperator.this.internalTimerService.currentProcessingTime();
            }
        };
        if (this.windowStateDescriptor != null) {
            this.windowState = (InternalAppendingState)this.getContextStateHelper().getOrCreateKeyedState(this.windowSerializer, this.windowStateDescriptor);
        }
        if (this.windowAssigner instanceof MergingWindowAssigner) {
            if (this.windowState instanceof InternalMergingState) {
                this.windowMergingState = (InternalMergingState)this.windowState;
            }
            Class<Tuple2> typedTuple = Tuple2.class;
            TupleSerializer tupleSerializer = new TupleSerializer(typedTuple, new TypeSerializer[]{this.windowSerializer, this.windowSerializer});
            ListStateDescriptor mergingSetsStateDescriptor = new ListStateDescriptor("merging-window-set", (TypeSerializer)tupleSerializer);
            this.mergingSetsState = (InternalListState)this.getOrCreateKeyedState(VoidNamespaceSerializer.INSTANCE, mergingSetsStateDescriptor);
            this.mergingSetsState.setCurrentNamespace((Object)VoidNamespace.INSTANCE);
        }
    }

    @Override
    public void close() throws Exception {
        super.close();
        this.timestampedCollector = null;
        this.triggerContext = null;
        this.processContext = null;
        this.windowAssignerContext = null;
    }

    @Override
    public void dispose() throws Exception {
        super.dispose();
        this.timestampedCollector = null;
        this.triggerContext = null;
        this.processContext = null;
        this.windowAssignerContext = null;
    }

    @Override
    public void processElement(StreamRecord<IN> element) throws Exception {
        Collection<W> elementWindows = this.windowAssigner.assignWindows(element.getValue(), element.getTimestamp(), this.windowAssignerContext);
        boolean isSkippedElement = true;
        final Object key = this.getKeyContext().getCurrentKey();
        if (this.windowAssigner instanceof MergingWindowAssigner) {
            MergingWindowSet<Window> mergingWindows = this.getMergingWindowSet();
            for (Window window : elementWindows) {
                Window actualWindow = mergingWindows.addWindow(window, new MergingWindowSet.MergeFunction<W>(){

                    @Override
                    public void merge(W mergeResult, Collection<W> mergedWindows, W stateWindowResult, Collection<W> mergedStateWindows) throws Exception {
                        if (WindowOperator.this.windowAssigner.isEventTime() && ((Window)mergeResult).maxTimestamp() + WindowOperator.this.allowedLateness <= WindowOperator.this.internalTimerService.currentWatermark()) {
                            throw new UnsupportedOperationException("The end timestamp of an event-time window cannot become earlier than the current watermark by merging. Current watermark: " + WindowOperator.this.internalTimerService.currentWatermark() + " window: " + mergeResult);
                        }
                        if (!WindowOperator.this.windowAssigner.isEventTime() && ((Window)mergeResult).maxTimestamp() <= WindowOperator.this.internalTimerService.currentProcessingTime()) {
                            throw new UnsupportedOperationException("The end timestamp of a processing-time window cannot become earlier than the current processing time by merging. Current processing time: " + WindowOperator.this.internalTimerService.currentProcessingTime() + " window: " + mergeResult);
                        }
                        WindowOperator.this.triggerContext.key = key;
                        WindowOperator.this.triggerContext.window = mergeResult;
                        WindowOperator.this.triggerContext.onMerge(mergedWindows);
                        for (Window m : mergedWindows) {
                            WindowOperator.this.triggerContext.window = m;
                            WindowOperator.this.triggerContext.clear();
                            WindowOperator.this.deleteCleanupTimer(m);
                        }
                        WindowOperator.this.windowMergingState.mergeNamespaces(stateWindowResult, mergedStateWindows);
                    }
                });
                if (this.isWindowLate(actualWindow)) {
                    mergingWindows.retireWindow(actualWindow);
                    continue;
                }
                isSkippedElement = false;
                Window stateWindow = mergingWindows.getStateWindow(actualWindow);
                if (stateWindow == null) {
                    throw new IllegalStateException("Window " + window + " is not in in-flight window set.");
                }
                this.windowState.setCurrentNamespace((Object)stateWindow);
                this.windowState.add(element.getValue());
                this.triggerContext.key = key;
                this.triggerContext.window = actualWindow;
                TriggerResult triggerResult = this.triggerContext.onElement(element);
                if (triggerResult.isFire()) {
                    Object contents = this.windowState.get();
                    if (contents == null) continue;
                    this.emitWindowContents(actualWindow, contents);
                }
                if (triggerResult.isPurge()) {
                    this.windowState.clear();
                }
                this.registerCleanupTimer(actualWindow);
            }
            mergingWindows.persist();
        } else {
            for (Window window : elementWindows) {
                if (this.isWindowLate(window)) continue;
                isSkippedElement = false;
                this.windowState.setCurrentNamespace((Object)window);
                this.windowState.add(element.getValue());
                this.triggerContext.key = key;
                this.triggerContext.window = window;
                TriggerResult triggerResult = this.triggerContext.onElement(element);
                if (triggerResult.isFire()) {
                    Object contents = this.windowState.get();
                    if (contents == null) continue;
                    this.emitWindowContents(window, contents);
                }
                if (triggerResult.isPurge()) {
                    this.windowState.clear();
                }
                this.registerCleanupTimer(window);
            }
        }
        if (isSkippedElement && this.isElementLate(element)) {
            if (this.lateDataOutputTag != null) {
                this.sideOutput(element);
            } else {
                this.numLateRecordsDropped.inc();
            }
        }
    }

    @Override
    public void endInput() throws Exception {
    }

    @Override
    public void onEventTime(InternalTimer<K, W> timer) throws Exception {
        Object contents;
        MergingWindowSet mergingWindows;
        this.triggerContext.key = timer.getKey();
        this.triggerContext.window = (Window)timer.getNamespace();
        if (this.windowAssigner instanceof MergingWindowAssigner) {
            mergingWindows = this.getMergingWindowSet();
            W stateWindow = mergingWindows.getStateWindow(this.triggerContext.window);
            if (stateWindow == null) {
                return;
            }
            this.windowState.setCurrentNamespace(stateWindow);
        } else {
            this.windowState.setCurrentNamespace(this.triggerContext.window);
            mergingWindows = null;
        }
        TriggerResult triggerResult = this.triggerContext.onEventTime(timer.getTimestamp());
        if (triggerResult.isFire() && (contents = this.windowState.get()) != null) {
            this.emitWindowContents(this.triggerContext.window, contents);
        }
        if (triggerResult.isPurge()) {
            this.windowState.clear();
        }
        if (this.windowAssigner.isEventTime() && this.isCleanupTime(this.triggerContext.window, timer.getTimestamp())) {
            this.clearAllState(this.triggerContext.window, (AppendingState<IN, ACC>)this.windowState, mergingWindows);
        }
        if (mergingWindows != null) {
            mergingWindows.persist();
        }
    }

    @Override
    public void onProcessingTime(InternalTimer<K, W> timer) throws Exception {
        Object contents;
        MergingWindowSet mergingWindows;
        this.triggerContext.key = timer.getKey();
        this.triggerContext.window = (Window)timer.getNamespace();
        if (this.windowAssigner instanceof MergingWindowAssigner) {
            mergingWindows = this.getMergingWindowSet();
            W stateWindow = mergingWindows.getStateWindow(this.triggerContext.window);
            if (stateWindow == null) {
                return;
            }
            this.windowState.setCurrentNamespace(stateWindow);
        } else {
            this.windowState.setCurrentNamespace(this.triggerContext.window);
            mergingWindows = null;
        }
        TriggerResult triggerResult = this.triggerContext.onProcessingTime(timer.getTimestamp());
        if (triggerResult.isFire() && (contents = this.windowState.get()) != null) {
            this.emitWindowContents(this.triggerContext.window, contents);
        }
        if (triggerResult.isPurge()) {
            this.windowState.clear();
        }
        if (!this.windowAssigner.isEventTime() && this.isCleanupTime(this.triggerContext.window, timer.getTimestamp())) {
            this.clearAllState(this.triggerContext.window, (AppendingState<IN, ACC>)this.windowState, mergingWindows);
        }
        if (mergingWindows != null) {
            mergingWindows.persist();
        }
    }

    private void clearAllState(W window, AppendingState<IN, ACC> windowState, MergingWindowSet<W> mergingWindows) throws Exception {
        windowState.clear();
        this.triggerContext.clear();
        this.processContext.window = window;
        this.processContext.clear();
        if (mergingWindows != null) {
            mergingWindows.retireWindow(window);
            mergingWindows.persist();
        }
    }

    private void emitWindowContents(W window, ACC contents) throws Exception {
        this.timestampedCollector.setAbsoluteTimestamp(((Window)window).maxTimestamp());
        this.processContext.window = window;
        ((InternalWindowFunction)this.userFunction).process(this.triggerContext.key, window, this.processContext, contents, this.timestampedCollector);
    }

    protected void sideOutput(StreamRecord<IN> element) {
        this.output.collect(this.lateDataOutputTag, element);
    }

    protected MergingWindowSet<W> getMergingWindowSet() throws Exception {
        MergingWindowAssigner mergingAssigner = (MergingWindowAssigner)this.windowAssigner;
        return new MergingWindowSet(mergingAssigner, this.mergingSetsState);
    }

    protected boolean isWindowLate(W window) {
        return this.windowAssigner.isEventTime() && this.cleanupTime(window) <= this.internalTimerService.currentWatermark();
    }

    protected boolean isElementLate(StreamRecord<IN> element) {
        return this.windowAssigner.isEventTime() && element.getTimestamp() + this.allowedLateness <= this.internalTimerService.currentWatermark();
    }

    protected void registerCleanupTimer(W window) {
        long cleanupTime = this.cleanupTime(window);
        if (cleanupTime == Long.MAX_VALUE) {
            return;
        }
        if (this.windowAssigner.isEventTime()) {
            this.triggerContext.registerEventTimeTimer(cleanupTime);
        } else {
            this.triggerContext.registerProcessingTimeTimer(cleanupTime);
        }
    }

    protected void deleteCleanupTimer(W window) {
        long cleanupTime = this.cleanupTime(window);
        if (cleanupTime == Long.MAX_VALUE) {
            return;
        }
        if (this.windowAssigner.isEventTime()) {
            this.triggerContext.deleteEventTimeTimer(cleanupTime);
        } else {
            this.triggerContext.deleteProcessingTimeTimer(cleanupTime);
        }
    }

    private long cleanupTime(W window) {
        if (this.windowAssigner.isEventTime()) {
            long cleanupTime = ((Window)window).maxTimestamp() + this.allowedLateness;
            return cleanupTime >= ((Window)window).maxTimestamp() ? cleanupTime : Long.MAX_VALUE;
        }
        return ((Window)window).maxTimestamp();
    }

    protected final boolean isCleanupTime(W window, long time) {
        return time == this.cleanupTime(window);
    }

    @VisibleForTesting
    public Trigger<? super IN, ? super W> getTrigger() {
        return this.trigger;
    }

    @VisibleForTesting
    public KeySelector<IN, K> getKeySelector() {
        return this.keySelector;
    }

    @VisibleForTesting
    public WindowAssigner<? super IN, W> getWindowAssigner() {
        return this.windowAssigner;
    }

    @VisibleForTesting
    public StateDescriptor<? extends AppendingState<IN, ACC>, ?> getStateDescriptor() {
        return this.windowStateDescriptor;
    }

    protected static class Timer<K, W extends Window>
    implements Comparable<Timer<K, W>> {
        protected long timestamp;
        protected K key;
        protected W window;

        public Timer(long timestamp, K key, W window) {
            this.timestamp = timestamp;
            this.key = key;
            this.window = window;
        }

        @Override
        public int compareTo(Timer<K, W> o) {
            return Long.compare(this.timestamp, o.timestamp);
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            Timer timer = (Timer)o;
            return this.timestamp == timer.timestamp && this.key.equals(timer.key) && this.window.equals(timer.window);
        }

        public int hashCode() {
            int result = (int)(this.timestamp ^ this.timestamp >>> 32);
            result = 31 * result + this.key.hashCode();
            result = 31 * result + this.window.hashCode();
            return result;
        }

        public String toString() {
            return "Timer{timestamp=" + this.timestamp + ", key=" + this.key + ", window=" + this.window + '}';
        }
    }

    public static class Context
    implements Trigger.OnMergeContext {
        protected K key;
        protected W window;
        protected Collection<W> mergedWindows;
        final /* synthetic */ WindowOperator this$0;

        public Context(K key, W window) {
            this.this$0 = this$0;
            this.key = key;
            this.window = window;
        }

        @Override
        public MetricGroup getMetricGroup() {
            return this.this$0.getMetricGroup();
        }

        @Override
        public long getCurrentWatermark() {
            return this.this$0.internalTimerService.currentWatermark();
        }

        @Override
        public <S extends Serializable> ValueState<S> getKeyValueState(String name, Class<S> stateType, S defaultState) {
            TypeInformation typeInfo;
            Preconditions.checkNotNull(stateType, (String)"The state type class must not be null");
            try {
                typeInfo = TypeExtractor.getForClass(stateType);
            }
            catch (Exception e) {
                throw new RuntimeException("Cannot analyze type '" + stateType.getName() + "' from the class alone, due to generic type parameters. Please specify the TypeInformation directly.", e);
            }
            return this.getKeyValueState(name, typeInfo, defaultState);
        }

        @Override
        public <S extends Serializable> ValueState<S> getKeyValueState(String name, TypeInformation<S> stateType, S defaultState) {
            Preconditions.checkNotNull((Object)name, (String)"The name of the state must not be null");
            Preconditions.checkNotNull(stateType, (String)"The state type information must not be null");
            ValueStateDescriptor stateDesc = new ValueStateDescriptor(name, stateType.createSerializer(this.this$0.getExecutionConfig()), defaultState);
            return (ValueState)this.getPartitionedState((StateDescriptor<S, ?>)stateDesc);
        }

        @Override
        public <S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor) {
            try {
                return (S)this.this$0.getPartitionedState(this.window, this.this$0.windowSerializer, stateDescriptor);
            }
            catch (Exception e) {
                throw new RuntimeException("Could not retrieve state", e);
            }
        }

        @Override
        public <S extends MergingState<?, ?>> void mergePartitionedState(StateDescriptor<S, ?> stateDescriptor) {
            if (this.mergedWindows != null && this.mergedWindows.size() > 0) {
                try {
                    State state = this.this$0.getContextStateHelper().getOrCreateKeyedState(this.this$0.windowSerializer, stateDescriptor);
                    if (!(state instanceof InternalMergingState)) {
                        throw new IllegalArgumentException("The given state descriptor does not refer to a mergeable state (MergingState)");
                    }
                    ((InternalMergingState)state).mergeNamespaces(this.window, this.mergedWindows);
                }
                catch (Exception e) {
                    throw new RuntimeException("Error while merging state.", e);
                }
            }
        }

        @Override
        public long getCurrentProcessingTime() {
            return this.this$0.internalTimerService.currentProcessingTime();
        }

        @Override
        public void registerProcessingTimeTimer(long time) {
            this.this$0.internalTimerService.registerProcessingTimeTimer(this.window, time);
        }

        @Override
        public void registerEventTimeTimer(long time) {
            this.this$0.internalTimerService.registerEventTimeTimer(this.window, time);
        }

        @Override
        public void deleteProcessingTimeTimer(long time) {
            this.this$0.internalTimerService.deleteProcessingTimeTimer(this.window, time);
        }

        @Override
        public void deleteEventTimeTimer(long time) {
            this.this$0.internalTimerService.deleteEventTimeTimer(this.window, time);
        }

        public TriggerResult onElement(StreamRecord<IN> element) throws Exception {
            return this.this$0.trigger.onElement(element.getValue(), element.getTimestamp(), this.window, this);
        }

        public TriggerResult onProcessingTime(long time) throws Exception {
            return this.this$0.trigger.onProcessingTime(time, this.window, this);
        }

        public TriggerResult onEventTime(long time) throws Exception {
            return this.this$0.trigger.onEventTime(time, this.window, this);
        }

        public void onMerge(Collection<W> mergedWindows) throws Exception {
            this.mergedWindows = mergedWindows;
            this.this$0.trigger.onMerge(this.window, this);
        }

        public void clear() throws Exception {
            this.this$0.trigger.clear(this.window, this);
        }

        public String toString() {
            return "Context{key=" + this.key + ", window=" + this.window + '}';
        }
    }

    public static class WindowContext
    implements InternalWindowFunction.InternalWindowContext {
        protected W window;
        protected AbstractPerWindowStateStore windowState;
        final /* synthetic */ WindowOperator this$0;

        public WindowContext(W window) {
            this.this$0 = this$0;
            this.window = window;
            this.windowState = this$0.windowAssigner instanceof MergingWindowAssigner ? this$0.new MergingWindowStateStore((StateBinder)this$0.getContextStateHelper(), this$0.getExecutionConfig()) : this$0.new PerWindowStateStore((StateBinder)this$0.getContextStateHelper(), this$0.getExecutionConfig());
        }

        public String toString() {
            return "WindowContext{Window = " + this.window.toString() + "}";
        }

        public void clear() throws Exception {
            ((InternalWindowFunction)this.this$0.userFunction).clear(this.window, this);
        }

        @Override
        public long currentProcessingTime() {
            return this.this$0.internalTimerService.currentProcessingTime();
        }

        @Override
        public long currentWatermark() {
            return this.this$0.internalTimerService.currentWatermark();
        }

        @Override
        public KeyedStateStore windowState() {
            this.windowState.window = this.window;
            return this.windowState;
        }

        @Override
        public KeyedStateStore globalState() {
            return this.this$0.getKeyedStateStore();
        }

        @Override
        public <X> void output(OutputTag<X> outputTag, X value) {
            if (outputTag == null) {
                throw new IllegalArgumentException("OutputTag must not be null.");
            }
            this.this$0.output.collect(outputTag, new StreamRecord<X>(value, ((Window)this.window).maxTimestamp()));
        }
    }

    public class PerWindowStateStore
    extends AbstractPerWindowStateStore {
        public PerWindowStateStore(StateBinder contextStateBinder, ExecutionConfig executionConfig) {
            super(contextStateBinder, executionConfig);
        }

        protected <S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor) throws Exception {
            return (S)WindowOperator.this.getPartitionedState(this.window, WindowOperator.this.windowSerializer, stateDescriptor);
        }
    }

    public class MergingWindowStateStore
    extends AbstractPerWindowStateStore {
        public MergingWindowStateStore(StateBinder contextStateBinder, ExecutionConfig executionConfig) {
            super(contextStateBinder, executionConfig);
        }

        public <T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties) {
            throw new UnsupportedOperationException("Per-window state is not allowed when using merging windows.");
        }

        public <T> ListState<T> getListState(ListStateDescriptor<T> stateProperties) {
            throw new UnsupportedOperationException("Per-window state is not allowed when using merging windows.");
        }

        public <T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> stateProperties) {
            throw new UnsupportedOperationException("Per-window state is not allowed when using merging windows.");
        }

        public <IN, ACC, OUT> AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT> stateProperties) {
            throw new UnsupportedOperationException("Per-window state is not allowed when using merging windows.");
        }

        public <T, A> FoldingState<T, A> getFoldingState(FoldingStateDescriptor<T, A> stateProperties) {
            throw new UnsupportedOperationException("Per-window state is not allowed when using merging windows.");
        }

        public <UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateProperties) {
            throw new UnsupportedOperationException("Per-window state is not allowed when using merging windows.");
        }
    }

    public abstract class AbstractPerWindowStateStore
    extends DefaultKeyedStateStore {
        protected W window;

        public AbstractPerWindowStateStore(StateBinder contextStateBinder, ExecutionConfig executionConfig) {
            super(contextStateBinder, executionConfig);
        }
    }
}

