/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.window;

import java.util.Collection;
import java.util.Collections;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.dropwizard.metrics.DropwizardMeterWrapper;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Meter;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.api.window.TimeWindow;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.runtime.window.AbstractAlignedWindowOperator;
import org.apache.flink.table.runtime.window.aligned.AlignedWindowAggregator;
import org.apache.flink.table.runtime.window.aligned.AlignedWindowTrigger;
import org.apache.flink.table.runtime.window.assigners.SlidingWindowAssigner;
import org.apache.flink.table.runtime.window.assigners.WindowAssigner;

public class LocalAlignedWindowOperator
extends AbstractAlignedWindowOperator {
    private static final long serialVersionUID = 1L;
    private final KeySelector<BaseRow, BaseRow> keySelector;
    private final boolean finishBundleBeforeSnapshot;
    private transient Counter numLateRecordsDropped;
    private transient Meter lateRecordsDroppedRate;

    public LocalAlignedWindowOperator(AlignedWindowAggregator<BaseRow, TimeWindow, BaseRow> windowRunner, WindowAssigner<TimeWindow> windowAssigner, AlignedWindowTrigger windowTrigger, int rowtimeIndex, boolean finishBundleBeforeSnapshot, KeySelector<BaseRow, BaseRow> keySelector) {
        super(windowRunner, windowAssigner, windowTrigger, rowtimeIndex);
        this.finishBundleBeforeSnapshot = finishBundleBeforeSnapshot;
        this.keySelector = keySelector;
    }

    @Override
    public void open() throws Exception {
        super.open();
        this.numLateRecordsDropped = this.metrics.counter("numLateRecordsDropped");
        this.lateRecordsDroppedRate = this.metrics.meter("lateRecordsDroppedRate", (Meter)new DropwizardMeterWrapper(new com.codahale.metrics.Meter()));
    }

    public void processElement(StreamRecord<BaseRow> record) throws Exception {
        Collection windows;
        BaseRow inputRow = (BaseRow)record.getValue();
        long timestamp = inputRow.getLong(this.rowtimeIndex);
        if (this.windowAssigner instanceof SlidingWindowAssigner) {
            TimeWindow pane = ((SlidingWindowAssigner)this.windowAssigner).assignPane(inputRow, timestamp);
            windows = Collections.singletonList(pane);
        } else {
            windows = this.windowAssigner.assignWindows(inputRow, timestamp);
        }
        boolean isElementDropped = true;
        for (TimeWindow window : windows) {
            if (this.isWindowLate(window, this.getCurrentWatermark())) continue;
            isElementDropped = false;
            this.windowRunner.addElement(this.getKey(inputRow), window, inputRow);
        }
        if (isElementDropped) {
            this.numLateRecordsDropped.inc();
            this.lateRecordsDroppedRate.markEvent();
        }
    }

    @Override
    protected void advanceWatermark(long watermark) throws Exception {
        this.finishBundle();
    }

    public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
        if (this.finishBundleBeforeSnapshot) {
            this.finishBundle();
        }
    }

    private void finishBundle() throws Exception {
        for (TimeWindow window : this.windowRunner.windows()) {
            this.windowRunner.fireWindow(window);
        }
        this.windowRunner.expireAllWindows();
    }

    public void snapshotState(StateSnapshotContext context) throws Exception {
        super.snapshotState(context);
        if (!this.finishBundleBeforeSnapshot) {
            this.windowRunner.snapshot();
        }
    }

    @Override
    protected BaseRow getKey(BaseRow input) throws Exception {
        return this.keySelector.getKey(input);
    }

    public boolean requireState() {
        return !this.finishBundleBeforeSnapshot;
    }
}

