package org.apache.flink.table.runtime.window;

import java.util.ArrayList;
import java.util.Iterator;
import org.apache.flink.dropwizard.metrics.DropwizardMeterWrapper;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Meter;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.api.window.TimeWindow;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.runtime.window.aligned.AlignedWindowAggregator;
import org.apache.flink.table.runtime.window.aligned.AlignedWindowTrigger;
import org.apache.flink.table.runtime.window.assigners.WindowAssigner;

/* loaded from: input_file:org/apache/flink/table/runtime/window/KeyedAlignedWindowOperator.class */
public class KeyedAlignedWindowOperator extends AbstractAlignedWindowOperator {
    private static final long serialVersionUID = 1;
    private transient Counter numLateRecordsDropped;
    private transient Meter lateRecordsDroppedRate;

    public KeyedAlignedWindowOperator(AlignedWindowAggregator<BaseRow, TimeWindow, BaseRow> alignedWindowAggregator, WindowAssigner<TimeWindow> windowAssigner, AlignedWindowTrigger alignedWindowTrigger, int i) {
        super(alignedWindowAggregator, windowAssigner, alignedWindowTrigger, i);
    }

    @Override // org.apache.flink.table.runtime.window.AbstractAlignedWindowOperator
    public void open() throws Exception {
        super.open();
        this.numLateRecordsDropped = this.metrics.counter("numLateRecordsDropped");
        this.lateRecordsDroppedRate = this.metrics.meter("lateRecordsDroppedRate", new DropwizardMeterWrapper(new com.codahale.metrics.Meter()));
    }

    public void processElement(StreamRecord<BaseRow> streamRecord) throws Exception {
        BaseRow baseRow = (BaseRow) streamRecord.getValue();
        boolean z = true;
        for (TimeWindow timeWindow : this.windowAssigner.assignWindows(baseRow, baseRow.getLong(this.rowtimeIndex))) {
            if (!isWindowLate(timeWindow, getCurrentWatermark())) {
                z = false;
                this.windowRunner.addElement(getKey(baseRow), timeWindow, baseRow);
            }
        }
        if (z) {
            this.numLateRecordsDropped.inc();
            this.lateRecordsDroppedRate.markEvent();
        }
    }

    @Override // org.apache.flink.table.runtime.window.AbstractAlignedWindowOperator
    protected BaseRow getKey(BaseRow baseRow) throws Exception {
        return (BaseRow) getCurrentKey();
    }

    public void snapshotState(StateSnapshotContext stateSnapshotContext) throws Exception {
        super.snapshotState(stateSnapshotContext);
        this.windowRunner.snapshot();
    }

    @Override // org.apache.flink.table.runtime.window.AbstractAlignedWindowOperator
    protected void advanceWatermark(long j) throws Exception {
        if (this.nextTriggerTime == Long.MIN_VALUE) {
            TimeWindow lowestWindow = this.windowRunner.lowestWindow();
            if (lowestWindow == null) {
                return;
            }
            this.nextTriggerTime = lowestWindow.maxTimestamp();
            this.nextTriggerWindow = lowestWindow;
            if (j < this.nextTriggerTime) {
                this.nextTriggerTime = this.windowTrigger.nextTriggerTime(j);
                this.nextTriggerWindow = this.windowTrigger.nextTriggerWindow(j);
            }
        }
        ArrayList arrayList = new ArrayList();
        Iterator<TimeWindow> it = this.windowRunner.ascendingWindows().iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            TimeWindow next = it.next();
            if (isWindowLate(next, j)) {
                arrayList.add(next);
            }
            if (j >= this.nextTriggerTime && next.compareTo(this.nextTriggerWindow) >= 0) {
                if (!needTriggerWindow(next, j)) {
                    this.nextTriggerTime = this.windowTrigger.nextTriggerTime(j);
                    this.nextTriggerWindow = this.windowTrigger.nextTriggerWindow(j);
                    break;
                }
                this.windowRunner.fireWindow(next);
            }
        }
        expireWindows(arrayList);
    }

    public boolean requireState() {
        return true;
    }

    protected Counter getNumLateRecordsDropped() {
        return this.numLateRecordsDropped;
    }
}
