/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.window;

import java.util.ArrayList;
import java.util.Collection;
import org.apache.flink.dropwizard.metrics.DropwizardMeterWrapper;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Meter;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.api.window.TimeWindow;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.runtime.window.AbstractAlignedWindowOperator;
import org.apache.flink.table.runtime.window.aligned.AlignedWindowAggregator;
import org.apache.flink.table.runtime.window.aligned.AlignedWindowTrigger;
import org.apache.flink.table.runtime.window.assigners.WindowAssigner;

public class KeyedAlignedWindowOperator
extends AbstractAlignedWindowOperator {
    private static final long serialVersionUID = 1L;
    private transient Counter numLateRecordsDropped;
    private transient Meter lateRecordsDroppedRate;

    public KeyedAlignedWindowOperator(AlignedWindowAggregator<BaseRow, TimeWindow, BaseRow> windowRunner, WindowAssigner<TimeWindow> windowAssigner, AlignedWindowTrigger windowTrigger, int rowtimeIndex) {
        super(windowRunner, windowAssigner, windowTrigger, rowtimeIndex);
    }

    @Override
    public void open() throws Exception {
        super.open();
        this.numLateRecordsDropped = this.metrics.counter("numLateRecordsDropped");
        this.lateRecordsDroppedRate = this.metrics.meter("lateRecordsDroppedRate", (Meter)new DropwizardMeterWrapper(new com.codahale.metrics.Meter()));
    }

    public void processElement(StreamRecord<BaseRow> record) throws Exception {
        BaseRow inputRow = (BaseRow)record.getValue();
        long timestamp = inputRow.getLong(this.rowtimeIndex);
        Collection windows = this.windowAssigner.assignWindows(inputRow, timestamp);
        boolean isElementDropped = true;
        for (TimeWindow window : windows) {
            if (this.isWindowLate(window, this.getCurrentWatermark())) continue;
            isElementDropped = false;
            this.windowRunner.addElement(this.getKey(inputRow), window, inputRow);
        }
        if (isElementDropped) {
            this.numLateRecordsDropped.inc();
            this.lateRecordsDroppedRate.markEvent();
        }
    }

    @Override
    protected BaseRow getKey(BaseRow input) throws Exception {
        return (BaseRow)this.getCurrentKey();
    }

    public void snapshotState(StateSnapshotContext context) throws Exception {
        super.snapshotState(context);
        this.windowRunner.snapshot();
    }

    @Override
    protected void advanceWatermark(long watermark) throws Exception {
        if (this.nextTriggerTime == Long.MIN_VALUE) {
            TimeWindow lowestWindow = this.windowRunner.lowestWindow();
            if (lowestWindow == null) {
                return;
            }
            this.nextTriggerTime = lowestWindow.maxTimestamp();
            this.nextTriggerWindow = lowestWindow;
            if (watermark < this.nextTriggerTime) {
                this.nextTriggerTime = this.windowTrigger.nextTriggerTime(watermark);
                this.nextTriggerWindow = this.windowTrigger.nextTriggerWindow(watermark);
            }
        }
        ArrayList<TimeWindow> windowsToExpire = new ArrayList<TimeWindow>();
        for (TimeWindow window : this.windowRunner.ascendingWindows()) {
            if (this.isWindowLate(window, watermark)) {
                windowsToExpire.add(window);
            }
            if (watermark < this.nextTriggerTime || window.compareTo(this.nextTriggerWindow) < 0) continue;
            if (this.needTriggerWindow(window, watermark)) {
                this.windowRunner.fireWindow(window);
                continue;
            }
            this.nextTriggerTime = this.windowTrigger.nextTriggerTime(watermark);
            this.nextTriggerWindow = this.windowTrigger.nextTriggerWindow(watermark);
            break;
        }
        this.expireWindows(windowsToExpire);
    }

    public boolean requireState() {
        return true;
    }

    protected Counter getNumLateRecordsDropped() {
        return this.numLateRecordsDropped;
    }
}

