package org.apache.flink.table.runtime.window;

import java.util.Collections;
import java.util.Iterator;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.dropwizard.metrics.DropwizardMeterWrapper;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Meter;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.api.window.TimeWindow;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.runtime.window.aligned.AlignedWindowAggregator;
import org.apache.flink.table.runtime.window.aligned.AlignedWindowTrigger;
import org.apache.flink.table.runtime.window.assigners.SlidingWindowAssigner;
import org.apache.flink.table.runtime.window.assigners.WindowAssigner;

/* loaded from: input_file:org/apache/flink/table/runtime/window/LocalAlignedWindowOperator.class */
public class LocalAlignedWindowOperator extends AbstractAlignedWindowOperator {
    private static final long serialVersionUID = 1;
    private final KeySelector<BaseRow, BaseRow> keySelector;
    private final boolean finishBundleBeforeSnapshot;
    private transient Counter numLateRecordsDropped;
    private transient Meter lateRecordsDroppedRate;

    public LocalAlignedWindowOperator(AlignedWindowAggregator<BaseRow, TimeWindow, BaseRow> alignedWindowAggregator, WindowAssigner<TimeWindow> windowAssigner, AlignedWindowTrigger alignedWindowTrigger, int i, boolean z, KeySelector<BaseRow, BaseRow> keySelector) {
        super(alignedWindowAggregator, windowAssigner, alignedWindowTrigger, i);
        this.finishBundleBeforeSnapshot = z;
        this.keySelector = keySelector;
    }

    @Override // org.apache.flink.table.runtime.window.AbstractAlignedWindowOperator
    public void open() throws Exception {
        super.open();
        this.numLateRecordsDropped = this.metrics.counter("numLateRecordsDropped");
        this.lateRecordsDroppedRate = this.metrics.meter("lateRecordsDroppedRate", new DropwizardMeterWrapper(new com.codahale.metrics.Meter()));
    }

    public void processElement(StreamRecord<BaseRow> streamRecord) throws Exception {
        BaseRow baseRow = (BaseRow) streamRecord.getValue();
        long j = baseRow.getLong(this.rowtimeIndex);
        boolean z = true;
        for (TimeWindow timeWindow : this.windowAssigner instanceof SlidingWindowAssigner ? Collections.singletonList(((SlidingWindowAssigner) this.windowAssigner).assignPane((Object) baseRow, j)) : this.windowAssigner.assignWindows(baseRow, j)) {
            if (!isWindowLate(timeWindow, getCurrentWatermark())) {
                z = false;
                this.windowRunner.addElement(getKey(baseRow), timeWindow, baseRow);
            }
        }
        if (z) {
            this.numLateRecordsDropped.inc();
            this.lateRecordsDroppedRate.markEvent();
        }
    }

    @Override // org.apache.flink.table.runtime.window.AbstractAlignedWindowOperator
    protected void advanceWatermark(long j) throws Exception {
        finishBundle();
    }

    public void prepareSnapshotPreBarrier(long j) throws Exception {
        if (this.finishBundleBeforeSnapshot) {
            finishBundle();
        }
    }

    private void finishBundle() throws Exception {
        Iterator<TimeWindow> it = this.windowRunner.windows().iterator();
        while (it.hasNext()) {
            this.windowRunner.fireWindow(it.next());
        }
        this.windowRunner.expireAllWindows();
    }

    public void snapshotState(StateSnapshotContext stateSnapshotContext) throws Exception {
        super.snapshotState(stateSnapshotContext);
        if (this.finishBundleBeforeSnapshot) {
            return;
        }
        this.windowRunner.snapshot();
    }

    @Override // org.apache.flink.table.runtime.window.AbstractAlignedWindowOperator
    protected BaseRow getKey(BaseRow baseRow) throws Exception {
        return this.keySelector.getKey(baseRow);
    }

    public boolean requireState() {
        return !this.finishBundleBeforeSnapshot;
    }
}
