package org.apache.flink.table.runtime.window;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import org.apache.flink.dropwizard.metrics.DropwizardMeterWrapper;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Meter;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.api.window.TimeWindow;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.runtime.functions.ExecutionContextImpl;
import org.apache.flink.table.runtime.window.aligned.AlignedWindowAggregator;
import org.apache.flink.table.runtime.window.aligned.AlignedWindowTrigger;
import org.apache.flink.table.runtime.window.assigners.SlidingWindowAssigner;
import org.apache.flink.table.runtime.window.assigners.TumblingWindowAssigner;
import org.apache.flink.table.runtime.window.assigners.WindowAssigner;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/table/runtime/window/AlignedWindowOperator.class */
public class AlignedWindowOperator extends AbstractStreamOperator<BaseRow> implements OneInputStreamOperator<BaseRow, BaseRow> {
    private static final long serialVersionUID = 1;
    private static final String LATE_ELEMENTS_DROPPED_METRIC_NAME = "numLateRecordsDropped";
    private static final String LATE_ELEMENTS_DROPPED_RATE_METRIC_NAME = "lateRecordsDroppedRate";
    private static final String WATERMARK_LATENCY_METRIC_NAME = "watermarkLatency";
    private final AlignedWindowAggregator<BaseRow, TimeWindow, BaseRow> windowRunner;
    private final WindowAssigner<TimeWindow> windowAssigner;
    private final AlignedWindowTrigger windowTrigger;
    private final int rowtimeIndex;
    private transient long nextTriggerTime;
    private transient TimeWindow nextTriggerWindow;
    private transient TimestampedCollector<BaseRow> collector;
    private transient boolean functionsClosed = false;
    private transient Counter numLateRecordsDropped;
    private transient Meter lateRecordsDroppedRate;
    private transient Gauge<Long> watermarkLatency;
    private transient long currentWatermark;

    public AlignedWindowOperator(AlignedWindowAggregator<BaseRow, TimeWindow, BaseRow> alignedWindowAggregator, WindowAssigner<TimeWindow> windowAssigner, AlignedWindowTrigger alignedWindowTrigger, int i) {
        this.windowRunner = alignedWindowAggregator;
        this.windowTrigger = alignedWindowTrigger;
        if (!(windowAssigner instanceof SlidingWindowAssigner) && !(windowAssigner instanceof TumblingWindowAssigner)) {
            throw new IllegalArgumentException("Currently aligned window only support sliding and tumbling windows.");
        }
        Preconditions.checkArgument(!windowAssigner.isEventTime() || i >= 0);
        this.windowAssigner = windowAssigner;
        this.rowtimeIndex = i;
        setChainingStrategy(ChainingStrategy.ALWAYS);
    }

    public void open() throws Exception {
        super.open();
        this.collector = new TimestampedCollector<>(this.output);
        this.collector.eraseTimestamp();
        this.nextTriggerTime = Long.MIN_VALUE;
        this.nextTriggerWindow = null;
        this.windowRunner.open(new ExecutionContextImpl(this, getRuntimeContext(), new TimeWindow.Serializer()));
        this.numLateRecordsDropped = this.metrics.counter(LATE_ELEMENTS_DROPPED_METRIC_NAME);
        this.lateRecordsDroppedRate = this.metrics.meter(LATE_ELEMENTS_DROPPED_RATE_METRIC_NAME, new DropwizardMeterWrapper(new com.codahale.metrics.Meter()));
        this.watermarkLatency = this.metrics.gauge(WATERMARK_LATENCY_METRIC_NAME, () -> {
            long j = this.currentWatermark;
            if (j < 0) {
                return 0L;
            }
            return Long.valueOf(System.currentTimeMillis() - j);
        });
    }

    public void close() throws Exception {
        super.close();
        this.collector = null;
        this.functionsClosed = true;
        this.windowRunner.close();
    }

    public void dispose() throws Exception {
        super.dispose();
        this.collector = null;
        if (this.functionsClosed) {
            return;
        }
        this.functionsClosed = true;
        this.windowRunner.close();
    }

    public void processElement(StreamRecord<BaseRow> streamRecord) throws Exception {
        BaseRow baseRow = (BaseRow) streamRecord.getValue();
        boolean z = true;
        for (TimeWindow timeWindow : this.windowAssigner.assignWindows(baseRow, baseRow.getLong(this.rowtimeIndex))) {
            if (!isWindowLate(timeWindow, this.currentWatermark)) {
                z = false;
                this.windowRunner.addElement(currentKey(), timeWindow, baseRow);
            }
        }
        if (z) {
            this.numLateRecordsDropped.inc();
            this.lateRecordsDroppedRate.markEvent();
        }
    }

    public void processWatermark(Watermark watermark) throws Exception {
        this.currentWatermark = watermark.getTimestamp();
        advanceWatermark(this.currentWatermark);
        super.processWatermark(watermark);
    }

    private void advanceWatermark(long j) throws Exception {
        if (this.nextTriggerTime == Long.MIN_VALUE) {
            TimeWindow lowestWindow = this.windowRunner.lowestWindow();
            if (lowestWindow == null) {
                return;
            }
            this.nextTriggerTime = lowestWindow.maxTimestamp();
            this.nextTriggerWindow = lowestWindow;
            if (j < this.nextTriggerTime) {
                this.nextTriggerTime = this.windowTrigger.nextTriggerTime(j);
                this.nextTriggerWindow = this.windowTrigger.nextTriggerWindow(j);
            }
        }
        if (j >= this.nextTriggerTime) {
            Iterator<TimeWindow> it = this.windowRunner.ascendingWindows(this.nextTriggerWindow).iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                TimeWindow next = it.next();
                if (!needTriggerWindow(next, j)) {
                    this.nextTriggerTime = this.windowTrigger.nextTriggerTime(j);
                    this.nextTriggerWindow = this.windowTrigger.nextTriggerWindow(j);
                    break;
                }
                this.windowRunner.fireWindow(next, this.collector);
            }
        }
        ArrayList arrayList = new ArrayList();
        for (TimeWindow timeWindow : this.windowRunner.ascendingWindows()) {
            if (!isWindowLate(timeWindow, j)) {
                break;
            } else {
                arrayList.add(timeWindow);
            }
        }
        expireWindows(arrayList);
    }

    private boolean isWindowLate(TimeWindow timeWindow, long j) {
        return timeWindow.maxTimestamp() <= j;
    }

    private boolean needTriggerWindow(TimeWindow timeWindow, long j) {
        return timeWindow.maxTimestamp() <= j;
    }

    private void expireWindows(Collection<TimeWindow> collection) throws Exception {
        Iterator<TimeWindow> it = collection.iterator();
        while (it.hasNext()) {
            this.windowRunner.expireWindow(it.next());
        }
    }

    public void snapshotState(StateSnapshotContext stateSnapshotContext) throws Exception {
        super.snapshotState(stateSnapshotContext);
        this.windowRunner.snapshot();
    }

    public void endInput() throws Exception {
    }

    private BaseRow currentKey() {
        return (BaseRow) getCurrentKey();
    }

    protected Counter getNumLateRecordsDropped() {
        return this.numLateRecordsDropped;
    }

    protected Gauge<Long> getWatermarkLatency() {
        return this.watermarkLatency;
    }

    public boolean requireState() {
        return true;
    }
}
