package org.apache.flink.table.runtime.operator.window;

import java.util.Collection;
import java.util.Iterator;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.Merger;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.dropwizard.metrics.DropwizardMeterWrapper;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Meter;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.state2.keyed.KeyedState;
import org.apache.flink.runtime.state2.keyed.KeyedStateDescriptor;
import org.apache.flink.runtime.state2.subkeyed.SubKeyedState;
import org.apache.flink.runtime.state2.subkeyed.SubKeyedStateDescriptor;
import org.apache.flink.runtime.state2.subkeyed.SubKeyedValueState;
import org.apache.flink.runtime.state2.subkeyed.SubKeyedValueStateDescriptor;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.api.window.Window;
import org.apache.flink.table.codegen.GeneratedRecordEqualiser;
import org.apache.flink.table.codegen.GeneratedSubKeyedAggsHandleFunction;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.dataformat.GenericRow;
import org.apache.flink.table.dataformat.JoinedRow;
import org.apache.flink.table.runtime.functions.ExecutionContextImpl;
import org.apache.flink.table.runtime.functions.SubKeyedAggsHandleFunction;
import org.apache.flink.table.runtime.operator.window.assigners.MergingWindowAssigner;
import org.apache.flink.table.runtime.operator.window.assigners.PanedWindowAssigner;
import org.apache.flink.table.runtime.operator.window.assigners.WindowAssigner;
import org.apache.flink.table.runtime.operator.window.internal.GeneralWindowProcessFunction;
import org.apache.flink.table.runtime.operator.window.internal.InternalWindowProcessFunction;
import org.apache.flink.table.runtime.operator.window.internal.MergingWindowProcessFunction;
import org.apache.flink.table.runtime.operator.window.internal.PanedWindowProcessFunction;
import org.apache.flink.table.runtime.operator.window.triggers.Trigger;
import org.apache.flink.table.runtime.sort.RecordEqualiser;
import org.apache.flink.table.types.BaseRowType;
import org.apache.flink.table.types.InternalType;
import org.apache.flink.table.typeutils.TypeUtils;
import org.apache.flink.table.util.BaseRowUtil;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/table/runtime/operator/window/WindowOperator.class */
public class WindowOperator<K, W extends Window> extends AbstractStreamOperator<BaseRow> implements OneInputStreamOperator<BaseRow, BaseRow>, Triggerable<K, W> {
    private static final long serialVersionUID = 1;
    private static final String LATE_ELEMENTS_DROPPED_METRIC_NAME = "numLateRecordsDropped";
    private static final String LATE_ELEMENTS_DROPPED_RATE_METRIC_NAME = "lateRecordsDroppedRate";
    private static final String WATERMARK_LATENCY_METRIC_NAME = "watermarkLatency";
    private final WindowAssigner<W> windowAssigner;
    private final Trigger<W> trigger;
    private final TypeSerializer<W> windowSerializer;
    private TypeSerializer<K> keySerializer;
    private final boolean sendRetraction;
    private final InternalType[] inputFieldTypes;
    private final InternalType[] accumulatorTypes;
    private final InternalType[] aggResultTypes;
    private final InternalType[] windowPropertyTypes;
    private final int rowtimeIndex;
    private final long allowedLateness;
    private SubKeyedAggsHandleFunction<W> windowAggregator;
    private GeneratedSubKeyedAggsHandleFunction<W> generatedWindowAggregator;
    private RecordEqualiser equaliser;
    private GeneratedRecordEqualiser generatedEqualiser;
    private transient InternalWindowProcessFunction<K, W> windowFunction;
    private transient TimestampedCollector<BaseRow> collector;
    private transient boolean functionsClosed = false;
    private transient Counter numLateRecordsDropped;
    private transient Meter lateRecordsDroppedRate;
    private transient Gauge<Long> watermarkLatency;
    private transient InternalTimerService<K, W> internalTimerService;
    private transient SubKeyedValueState<K, W, BaseRow> windowState;
    private transient SubKeyedValueState<K, W, BaseRow> previousState;
    private transient WindowOperator<K, W>.TriggerContext triggerContext;
    private transient JoinedRow reuseOutput;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/table/runtime/operator/window/WindowOperator$TriggerContext.class */
    public class TriggerContext implements Trigger.OnMergeContext {
        protected W window;
        protected Collection<W> mergedWindows;

        private TriggerContext() {
        }

        public void open() throws Exception {
            WindowOperator.this.trigger.open(this);
        }

        public boolean onElement(BaseRow baseRow, long j) throws Exception {
            return WindowOperator.this.trigger.onElement(baseRow, j, this.window);
        }

        public boolean onProcessingTime(long j) throws Exception {
            return WindowOperator.this.trigger.onProcessingTime(j, this.window);
        }

        public boolean onEventTime(long j) throws Exception {
            return WindowOperator.this.trigger.onEventTime(j, this.window);
        }

        public void onMerge() throws Exception {
            WindowOperator.this.trigger.onMerge(this.window, this);
        }

        @Override // org.apache.flink.table.runtime.operator.window.triggers.Trigger.TriggerContext
        public <T> ValueState<T> getValueState(ValueStateDescriptor<T> valueStateDescriptor) {
            valueStateDescriptor.initializeSerializerUnlessSet(new ExecutionConfig());
            return new WindowScopeValueState(WindowOperator.this.getSubKeyedState(new SubKeyedValueStateDescriptor(valueStateDescriptor.getName(), WindowOperator.this.keySerializer, WindowOperator.this.windowSerializer, valueStateDescriptor.getSerializer())));
        }

        @Override // org.apache.flink.table.runtime.operator.window.triggers.Trigger.OnMergeContext
        public <T> T mergeValueState(ValueState<T> valueState, Merger<T> merger) {
            SubKeyedValueState subKeyedValueState = ((WindowScopeValueState) valueState).keyedState;
            if (this.mergedWindows == null || this.mergedWindows.size() <= 0) {
                return null;
            }
            Object obj = subKeyedValueState.get(WindowOperator.this.currentKey(), this.window);
            Iterator<W> it = this.mergedWindows.iterator();
            while (it.hasNext()) {
                obj = merger.merge(obj, subKeyedValueState.get(WindowOperator.this.currentKey(), it.next()));
            }
            subKeyedValueState.put(WindowOperator.this.currentKey(), this.window, obj);
            return (T) obj;
        }

        @Override // org.apache.flink.table.runtime.operator.window.triggers.Trigger.TriggerContext
        public long getCurrentProcessingTime() {
            return WindowOperator.this.internalTimerService.currentProcessingTime();
        }

        @Override // org.apache.flink.table.runtime.operator.window.triggers.Trigger.TriggerContext
        public long getCurrentWatermark() {
            return WindowOperator.this.internalTimerService.currentWatermark();
        }

        @Override // org.apache.flink.table.runtime.operator.window.triggers.Trigger.TriggerContext
        public MetricGroup getMetricGroup() {
            return WindowOperator.this.getMetricGroup();
        }

        @Override // org.apache.flink.table.runtime.operator.window.triggers.Trigger.TriggerContext
        public void registerProcessingTimeTimer(long j) {
            WindowOperator.this.internalTimerService.registerProcessingTimeTimer(WindowOperator.this.getCurrentKey(), this.window, j);
        }

        @Override // org.apache.flink.table.runtime.operator.window.triggers.Trigger.TriggerContext
        public void registerEventTimeTimer(long j) {
            WindowOperator.this.internalTimerService.registerEventTimeTimer(WindowOperator.this.getCurrentKey(), this.window, j);
        }

        @Override // org.apache.flink.table.runtime.operator.window.triggers.Trigger.TriggerContext
        public void deleteProcessingTimeTimer(long j) {
            WindowOperator.this.internalTimerService.deleteProcessingTimeTimer(WindowOperator.this.getCurrentKey(), this.window, j);
        }

        @Override // org.apache.flink.table.runtime.operator.window.triggers.Trigger.TriggerContext
        public void deleteEventTimeTimer(long j) {
            WindowOperator.this.internalTimerService.deleteEventTimeTimer(WindowOperator.this.getCurrentKey(), this.window, j);
        }

        public void clear() throws Exception {
            WindowOperator.this.trigger.clear(this.window);
        }
    }

    /* loaded from: input_file:org/apache/flink/table/runtime/operator/window/WindowOperator$WindowContext.class */
    private class WindowContext implements InternalWindowProcessFunction.Context<K, W> {
        private WindowContext() {
        }

        @Override // org.apache.flink.table.runtime.operator.window.internal.InternalWindowProcessFunction.Context
        public <S extends State> S getKeyedState(StateDescriptor<S, ?> stateDescriptor) {
            return (S) WindowOperator.this.getState(stateDescriptor);
        }

        @Override // org.apache.flink.table.runtime.operator.window.internal.InternalWindowProcessFunction.Context
        public <V, S extends SubKeyedState<K, W, V>> S getSubKeyedState(SubKeyedStateDescriptor<K, W, V, S> subKeyedStateDescriptor) {
            return (S) WindowOperator.this.getSubKeyedState(subKeyedStateDescriptor);
        }

        @Override // org.apache.flink.table.runtime.operator.window.internal.InternalWindowProcessFunction.Context
        public <V, S extends KeyedState<K, V>> S getKeyedState(KeyedStateDescriptor<K, V, S> keyedStateDescriptor) {
            return (S) WindowOperator.this.getKeyedState(keyedStateDescriptor);
        }

        @Override // org.apache.flink.table.runtime.operator.window.internal.InternalWindowProcessFunction.Context
        public K currentKey() {
            return (K) WindowOperator.this.currentKey();
        }

        @Override // org.apache.flink.table.runtime.operator.window.internal.InternalWindowProcessFunction.Context
        public long currentProcessingTime() {
            return WindowOperator.this.internalTimerService.currentProcessingTime();
        }

        @Override // org.apache.flink.table.runtime.operator.window.internal.InternalWindowProcessFunction.Context
        public long currentWatermark() {
            return WindowOperator.this.internalTimerService.currentWatermark();
        }

        @Override // org.apache.flink.table.runtime.operator.window.internal.InternalWindowProcessFunction.Context
        public BaseRow getWindowAccumulators(W w) throws Exception {
            return (BaseRow) WindowOperator.this.windowState.get(currentKey(), w);
        }

        @Override // org.apache.flink.table.runtime.operator.window.internal.InternalWindowProcessFunction.Context
        public void setWindowAccumulators(W w, BaseRow baseRow) throws Exception {
            WindowOperator.this.windowState.put(currentKey(), w, baseRow);
        }

        @Override // org.apache.flink.table.runtime.operator.window.internal.InternalWindowProcessFunction.Context
        public void clearWindowState(W w) throws Exception {
            WindowOperator.this.windowState.remove(currentKey(), w);
            WindowOperator.this.windowAggregator.cleanup(w);
        }

        @Override // org.apache.flink.table.runtime.operator.window.internal.InternalWindowProcessFunction.Context
        public void clearPreviousState(W w) throws Exception {
            if (WindowOperator.this.previousState != null) {
                WindowOperator.this.previousState.remove(currentKey(), w);
            }
        }

        @Override // org.apache.flink.table.runtime.operator.window.internal.InternalWindowProcessFunction.Context
        public void clearTrigger(W w) throws Exception {
            WindowOperator.this.triggerContext.window = w;
            WindowOperator.this.triggerContext.clear();
        }

        @Override // org.apache.flink.table.runtime.operator.window.internal.InternalWindowProcessFunction.Context
        public void deleteCleanupTimer(W w) throws Exception {
            long cleanupTime = WindowOperator.this.cleanupTime(w);
            if (cleanupTime == Long.MAX_VALUE) {
                return;
            }
            if (WindowOperator.this.windowAssigner.isEventTime()) {
                WindowOperator.this.triggerContext.deleteEventTimeTimer(cleanupTime);
            } else {
                WindowOperator.this.triggerContext.deleteProcessingTimeTimer(cleanupTime);
            }
        }

        @Override // org.apache.flink.table.runtime.operator.window.internal.InternalWindowProcessFunction.Context
        public void onMerge(W w, Collection<W> collection) throws Exception {
            WindowOperator.this.triggerContext.window = w;
            WindowOperator.this.triggerContext.mergedWindows = collection;
            WindowOperator.this.triggerContext.onMerge();
        }
    }

    /* loaded from: input_file:org/apache/flink/table/runtime/operator/window/WindowOperator$WindowScopeValueState.class */
    private class WindowScopeValueState<T> implements ValueState<T> {
        private final SubKeyedValueState<K, W, T> keyedState;

        private WindowScopeValueState(SubKeyedValueState<K, W, T> subKeyedValueState) {
            this.keyedState = subKeyedValueState;
        }

        public T value() {
            return (T) this.keyedState.get(WindowOperator.this.currentKey(), WindowOperator.this.triggerContext.window);
        }

        public void update(T t) {
            this.keyedState.put(WindowOperator.this.currentKey(), WindowOperator.this.triggerContext.window, t);
        }

        public void clear() {
            this.keyedState.remove(WindowOperator.this.currentKey(), WindowOperator.this.triggerContext.window);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public WindowOperator(SubKeyedAggsHandleFunction<W> subKeyedAggsHandleFunction, RecordEqualiser recordEqualiser, WindowAssigner<W> windowAssigner, Trigger<W> trigger, TypeSerializer<W> typeSerializer, InternalType[] internalTypeArr, InternalType[] internalTypeArr2, InternalType[] internalTypeArr3, InternalType[] internalTypeArr4, int i, boolean z, long j) {
        Preconditions.checkArgument(j >= 0);
        this.windowAggregator = (SubKeyedAggsHandleFunction) Preconditions.checkNotNull(subKeyedAggsHandleFunction);
        this.equaliser = (RecordEqualiser) Preconditions.checkNotNull(recordEqualiser);
        this.windowAssigner = (WindowAssigner) Preconditions.checkNotNull(windowAssigner);
        this.trigger = (Trigger) Preconditions.checkNotNull(trigger);
        this.windowSerializer = (TypeSerializer) Preconditions.checkNotNull(typeSerializer);
        this.inputFieldTypes = (InternalType[]) Preconditions.checkNotNull(internalTypeArr);
        this.accumulatorTypes = (InternalType[]) Preconditions.checkNotNull(internalTypeArr2);
        this.aggResultTypes = (InternalType[]) Preconditions.checkNotNull(internalTypeArr3);
        this.windowPropertyTypes = (InternalType[]) Preconditions.checkNotNull(internalTypeArr4);
        this.allowedLateness = j;
        this.sendRetraction = z;
        Preconditions.checkArgument(!windowAssigner.isEventTime() || i >= 0);
        this.rowtimeIndex = i;
        setChainingStrategy(ChainingStrategy.ALWAYS);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public WindowOperator(GeneratedSubKeyedAggsHandleFunction<W> generatedSubKeyedAggsHandleFunction, GeneratedRecordEqualiser generatedRecordEqualiser, WindowAssigner<W> windowAssigner, Trigger<W> trigger, TypeSerializer<W> typeSerializer, InternalType[] internalTypeArr, InternalType[] internalTypeArr2, InternalType[] internalTypeArr3, InternalType[] internalTypeArr4, int i, boolean z, long j) {
        Preconditions.checkArgument(j >= 0);
        this.generatedWindowAggregator = (GeneratedSubKeyedAggsHandleFunction) Preconditions.checkNotNull(generatedSubKeyedAggsHandleFunction);
        this.generatedEqualiser = (GeneratedRecordEqualiser) Preconditions.checkNotNull(generatedRecordEqualiser);
        this.windowAssigner = (WindowAssigner) Preconditions.checkNotNull(windowAssigner);
        this.trigger = (Trigger) Preconditions.checkNotNull(trigger);
        this.windowSerializer = (TypeSerializer) Preconditions.checkNotNull(typeSerializer);
        this.inputFieldTypes = (InternalType[]) Preconditions.checkNotNull(internalTypeArr);
        this.accumulatorTypes = (InternalType[]) Preconditions.checkNotNull(internalTypeArr2);
        this.aggResultTypes = (InternalType[]) Preconditions.checkNotNull(internalTypeArr3);
        this.windowPropertyTypes = (InternalType[]) Preconditions.checkNotNull(internalTypeArr4);
        this.allowedLateness = j;
        this.sendRetraction = z;
        Preconditions.checkArgument(!windowAssigner.isEventTime() || i >= 0);
        this.rowtimeIndex = i;
        setChainingStrategy(ChainingStrategy.ALWAYS);
    }

    public void open() throws Exception {
        super.open();
        this.keySerializer = getKeySerializer();
        this.collector = new TimestampedCollector<>(this.output);
        this.collector.eraseTimestamp();
        this.internalTimerService = getInternalTimerService("window-timers", this.windowSerializer, this);
        this.triggerContext = new TriggerContext();
        this.triggerContext.open();
        this.windowState = getSubKeyedState(new SubKeyedValueStateDescriptor("window-aggs", this.keySerializer, this.windowSerializer, TypeUtils.createSerializer(new BaseRowType((Class<?>) BaseRow.class, this.accumulatorTypes))));
        if (this.sendRetraction) {
            this.previousState = getSubKeyedState(new SubKeyedValueStateDescriptor("previous-aggs", this.keySerializer, this.windowSerializer, TypeUtils.createSerializer(new BaseRowType((Class<?>) BaseRow.class, (InternalType[]) ArrayUtils.addAll(this.aggResultTypes, this.windowPropertyTypes)))));
        }
        if (this.generatedWindowAggregator != null) {
            this.windowAggregator = (SubKeyedAggsHandleFunction) this.generatedWindowAggregator.newInstance(getRuntimeContext().getUserCodeClassLoader());
        }
        if (this.generatedEqualiser != null) {
            this.equaliser = (RecordEqualiser) this.generatedEqualiser.newInstance(getRuntimeContext().getUserCodeClassLoader());
        }
        WindowContext windowContext = new WindowContext();
        this.windowAggregator.open(new ExecutionContextImpl(this, getRuntimeContext(), this.windowSerializer));
        if (this.windowAssigner instanceof MergingWindowAssigner) {
            this.windowFunction = new MergingWindowProcessFunction((MergingWindowAssigner) this.windowAssigner, this.windowAggregator, this.keySerializer, this.windowSerializer, this.allowedLateness);
        } else if (this.windowAssigner instanceof PanedWindowAssigner) {
            this.windowFunction = new PanedWindowProcessFunction((PanedWindowAssigner) this.windowAssigner, this.windowAggregator, this.allowedLateness);
        } else {
            this.windowFunction = new GeneralWindowProcessFunction(this.windowAssigner, this.windowAggregator, this.allowedLateness);
        }
        this.windowFunction.open(windowContext);
        this.reuseOutput = new JoinedRow();
        this.numLateRecordsDropped = this.metrics.counter(LATE_ELEMENTS_DROPPED_METRIC_NAME);
        this.lateRecordsDroppedRate = this.metrics.meter(LATE_ELEMENTS_DROPPED_RATE_METRIC_NAME, new DropwizardMeterWrapper(new com.codahale.metrics.Meter()));
        this.watermarkLatency = this.metrics.gauge(WATERMARK_LATENCY_METRIC_NAME, () -> {
            long currentWatermark = this.internalTimerService.currentWatermark();
            if (currentWatermark < 0) {
                return 0L;
            }
            return Long.valueOf(this.internalTimerService.currentProcessingTime() - currentWatermark);
        });
    }

    public void close() throws Exception {
        super.close();
        this.collector = null;
        this.triggerContext = null;
        this.functionsClosed = true;
        this.windowAggregator.close();
    }

    public void dispose() throws Exception {
        super.dispose();
        this.collector = null;
        this.triggerContext = null;
        if (this.functionsClosed) {
            return;
        }
        this.functionsClosed = true;
        this.windowAggregator.close();
    }

    public void processElement(StreamRecord<BaseRow> streamRecord) throws Exception {
        GenericRow genericRow = BaseRowUtil.toGenericRow((BaseRow) streamRecord.getValue(), this.inputFieldTypes);
        long j = this.windowAssigner.isEventTime() ? genericRow.getLong(this.rowtimeIndex) : this.internalTimerService.currentProcessingTime();
        boolean z = true;
        for (W w : this.windowFunction.assignStateNamespace(genericRow, j)) {
            z = false;
            BaseRow baseRow = (BaseRow) this.windowState.get(currentKey(), w);
            if (baseRow == null) {
                baseRow = this.windowAggregator.createAccumulators();
            }
            this.windowAggregator.setAccumulators(w, baseRow);
            if (BaseRowUtil.isAccumulateMsg(genericRow)) {
                this.windowAggregator.accumulate(genericRow);
            } else {
                this.windowAggregator.retract(genericRow);
            }
            this.windowState.put(currentKey(), w, this.windowAggregator.getAccumulators());
        }
        for (W w2 : this.windowFunction.assignActualWindows(genericRow, j)) {
            z = false;
            this.triggerContext.window = w2;
            if (this.triggerContext.onElement(genericRow, j)) {
                emitWindowResult(w2);
            }
            registerCleanupTimer(w2);
        }
        if (z) {
            this.numLateRecordsDropped.inc();
            this.lateRecordsDroppedRate.markEvent();
        }
    }

    public void onEventTime(InternalTimer<K, W> internalTimer) throws Exception {
        setCurrentKey(internalTimer.getKey());
        this.triggerContext.window = (W) internalTimer.getNamespace();
        if (this.triggerContext.onEventTime(internalTimer.getTimestamp())) {
            emitWindowResult(this.triggerContext.window);
        }
        if (this.windowAssigner.isEventTime()) {
            this.windowFunction.cleanWindowIfNeeded(this.triggerContext.window, internalTimer.getTimestamp());
        }
    }

    public void onProcessingTime(InternalTimer<K, W> internalTimer) throws Exception {
        setCurrentKey(internalTimer.getKey());
        this.triggerContext.window = (W) internalTimer.getNamespace();
        if (this.triggerContext.onProcessingTime(internalTimer.getTimestamp())) {
            emitWindowResult(this.triggerContext.window);
        }
        if (this.windowAssigner.isEventTime()) {
            return;
        }
        this.windowFunction.cleanWindowIfNeeded(this.triggerContext.window, internalTimer.getTimestamp());
    }

    private void emitWindowResult(W w) throws Exception {
        BaseRow windowAggregationResult = this.windowFunction.getWindowAggregationResult(w);
        if (!this.sendRetraction) {
            this.reuseOutput.replace((BaseRow) getCurrentKey(), windowAggregationResult);
            this.collector.collect(this.reuseOutput);
            return;
        }
        BaseRow baseRow = (BaseRow) this.previousState.get(currentKey(), w);
        if (baseRow == null) {
            this.reuseOutput.replace((BaseRow) getCurrentKey(), windowAggregationResult);
            BaseRowUtil.setAccumulate(this.reuseOutput);
            this.collector.collect(this.reuseOutput);
            this.previousState.put(currentKey(), w, windowAggregationResult);
            return;
        }
        if (this.equaliser.equals(windowAggregationResult, baseRow)) {
            return;
        }
        this.reuseOutput.replace((BaseRow) getCurrentKey(), baseRow);
        BaseRowUtil.setRetract(this.reuseOutput);
        this.collector.collect(this.reuseOutput);
        this.reuseOutput.replace((BaseRow) getCurrentKey(), windowAggregationResult);
        BaseRowUtil.setAccumulate(this.reuseOutput);
        this.collector.collect(this.reuseOutput);
        this.previousState.put(currentKey(), w, windowAggregationResult);
    }

    public void endInput() throws Exception {
    }

    private void registerCleanupTimer(W w) {
        long cleanupTime = cleanupTime(w);
        if (cleanupTime == Long.MAX_VALUE) {
            return;
        }
        if (this.windowAssigner.isEventTime()) {
            this.triggerContext.registerEventTimeTimer(cleanupTime);
        } else {
            this.triggerContext.registerProcessingTimeTimer(cleanupTime);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public long cleanupTime(W w) {
        if (!this.windowAssigner.isEventTime()) {
            return Math.max(0L, w.maxTimestamp());
        }
        long max = Math.max(0L, w.maxTimestamp() + this.allowedLateness);
        if (max >= w.maxTimestamp()) {
            return max;
        }
        return Long.MAX_VALUE;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public K currentKey() {
        return (K) getCurrentKey();
    }

    protected Counter getNumLateRecordsDropped() {
        return this.numLateRecordsDropped;
    }

    protected Gauge<Long> getWatermarkLatency() {
        return this.watermarkLatency;
    }

    public boolean requireState() {
        return true;
    }
}
