package org.apache.flink.table.runtime.window;

import java.util.Collection;
import java.util.Iterator;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.table.api.window.TimeWindow;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.runtime.functions.ExecutionContext;
import org.apache.flink.table.runtime.functions.ExecutionContextImpl;
import org.apache.flink.table.runtime.window.aligned.AlignedWindowAggregator;
import org.apache.flink.table.runtime.window.aligned.AlignedWindowTrigger;
import org.apache.flink.table.runtime.window.assigners.SlidingWindowAssigner;
import org.apache.flink.table.runtime.window.assigners.TumblingWindowAssigner;
import org.apache.flink.table.runtime.window.assigners.WindowAssigner;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/table/runtime/window/AbstractAlignedWindowOperator.class */
public abstract class AbstractAlignedWindowOperator extends AbstractStreamOperator<BaseRow> implements OneInputStreamOperator<BaseRow, BaseRow> {
    private static final long serialVersionUID = 1;
    protected static final String LATE_ELEMENTS_DROPPED_METRIC_NAME = "numLateRecordsDropped";
    protected static final String LATE_ELEMENTS_DROPPED_RATE_METRIC_NAME = "lateRecordsDroppedRate";
    protected static final String WATERMARK_LATENCY_METRIC_NAME = "watermarkLatency";
    protected final AlignedWindowAggregator<BaseRow, TimeWindow, BaseRow> windowRunner;
    protected final WindowAssigner<TimeWindow> windowAssigner;
    protected final AlignedWindowTrigger windowTrigger;
    protected final int rowtimeIndex;
    protected transient long nextTriggerTime;
    protected transient TimeWindow nextTriggerWindow;
    protected transient TimestampedCollector<BaseRow> collector;
    private transient boolean functionsClosed = false;
    private transient Gauge<Long> watermarkLatency;
    private transient long currentWatermark;

    /* loaded from: input_file:org/apache/flink/table/runtime/window/AbstractAlignedWindowOperator$WindowContextImpl.class */
    private class WindowContextImpl implements AlignedWindowAggregator.Context {
        private final ExecutionContext executionContext;
        private final Collector<BaseRow> out;

        public WindowContextImpl(ExecutionContext executionContext, Collector<BaseRow> collector) {
            this.executionContext = executionContext;
            this.out = collector;
        }

        @Override // org.apache.flink.table.runtime.window.aligned.AlignedWindowAggregator.Context
        public ExecutionContext getExecutionContext() throws Exception {
            return this.executionContext;
        }

        @Override // org.apache.flink.table.runtime.window.aligned.AlignedWindowAggregator.Context
        public Collector<BaseRow> getCollector() throws Exception {
            return this.out;
        }

        @Override // org.apache.flink.table.runtime.window.aligned.AlignedWindowAggregator.Context
        public OperatorStateBackend getOpStateStore() throws Exception {
            return AbstractAlignedWindowOperator.this.getOperatorStateBackend();
        }
    }

    public AbstractAlignedWindowOperator(AlignedWindowAggregator<BaseRow, TimeWindow, BaseRow> alignedWindowAggregator, WindowAssigner<TimeWindow> windowAssigner, AlignedWindowTrigger alignedWindowTrigger, int i) {
        this.windowRunner = alignedWindowAggregator;
        this.windowTrigger = alignedWindowTrigger;
        if (!(windowAssigner instanceof SlidingWindowAssigner) && !(windowAssigner instanceof TumblingWindowAssigner)) {
            throw new IllegalArgumentException("Currently aligned window only support sliding and tumbling windows.");
        }
        Preconditions.checkArgument(!windowAssigner.isEventTime() || i >= 0);
        this.windowAssigner = windowAssigner;
        this.rowtimeIndex = i;
        setChainingStrategy(ChainingStrategy.ALWAYS);
    }

    public void open() throws Exception {
        super.open();
        this.collector = new TimestampedCollector<>(this.output);
        this.collector.eraseTimestamp();
        this.nextTriggerTime = Long.MIN_VALUE;
        this.nextTriggerWindow = null;
        this.windowRunner.open(new WindowContextImpl(new ExecutionContextImpl(this, getRuntimeContext(), new TimeWindow.Serializer()), this.collector));
        this.watermarkLatency = this.metrics.gauge(WATERMARK_LATENCY_METRIC_NAME, () -> {
            long j = this.currentWatermark;
            if (j < 0) {
                return 0L;
            }
            return Long.valueOf(System.currentTimeMillis() - j);
        });
    }

    public void close() throws Exception {
        super.close();
        this.collector = null;
        this.functionsClosed = true;
        this.windowRunner.close();
    }

    public void dispose() throws Exception {
        super.dispose();
        this.collector = null;
        if (this.functionsClosed) {
            return;
        }
        this.functionsClosed = true;
        this.windowRunner.close();
    }

    public void processWatermark(Watermark watermark) throws Exception {
        this.currentWatermark = watermark.getTimestamp();
        advanceWatermark(this.currentWatermark);
        super.processWatermark(watermark);
    }

    protected abstract void advanceWatermark(long j) throws Exception;

    protected abstract BaseRow getKey(BaseRow baseRow) throws Exception;

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean isWindowLate(TimeWindow timeWindow, long j) {
        return timeWindow.maxTimestamp() <= j;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean needTriggerWindow(TimeWindow timeWindow, long j) {
        return timeWindow.maxTimestamp() <= j;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void expireWindows(Collection<TimeWindow> collection) throws Exception {
        Iterator<TimeWindow> it = collection.iterator();
        while (it.hasNext()) {
            this.windowRunner.expireWindow(it.next());
        }
    }

    public long getCurrentWatermark() {
        return this.currentWatermark;
    }

    public void endInput() throws Exception {
    }

    protected Gauge<Long> getWatermarkLatency() {
        return this.watermarkLatency;
    }
}
