/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.window;

import java.util.ArrayList;
import java.util.Collection;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.dropwizard.metrics.DropwizardMeterWrapper;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Meter;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.api.window.TimeWindow;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.runtime.functions.ExecutionContextImpl;
import org.apache.flink.table.runtime.window.aligned.AlignedWindowAggregator;
import org.apache.flink.table.runtime.window.aligned.AlignedWindowTrigger;
import org.apache.flink.table.runtime.window.assigners.SlidingWindowAssigner;
import org.apache.flink.table.runtime.window.assigners.TumblingWindowAssigner;
import org.apache.flink.table.runtime.window.assigners.WindowAssigner;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;

public class AlignedWindowOperator
extends AbstractStreamOperator<BaseRow>
implements OneInputStreamOperator<BaseRow, BaseRow> {
    private static final long serialVersionUID = 1L;
    private static final String LATE_ELEMENTS_DROPPED_METRIC_NAME = "numLateRecordsDropped";
    private static final String LATE_ELEMENTS_DROPPED_RATE_METRIC_NAME = "lateRecordsDroppedRate";
    private static final String WATERMARK_LATENCY_METRIC_NAME = "watermarkLatency";
    private final AlignedWindowAggregator<BaseRow, TimeWindow, BaseRow> windowRunner;
    private final WindowAssigner<TimeWindow> windowAssigner;
    private final AlignedWindowTrigger windowTrigger;
    private final int rowtimeIndex;
    private transient long nextTriggerTime;
    private transient TimeWindow nextTriggerWindow;
    private transient TimestampedCollector<BaseRow> collector;
    private transient boolean functionsClosed = false;
    private transient Counter numLateRecordsDropped;
    private transient Meter lateRecordsDroppedRate;
    private transient Gauge<Long> watermarkLatency;
    private transient long currentWatermark;

    public AlignedWindowOperator(AlignedWindowAggregator<BaseRow, TimeWindow, BaseRow> windowRunner, WindowAssigner<TimeWindow> windowAssigner, AlignedWindowTrigger windowTrigger, int rowtimeIndex) {
        this.windowRunner = windowRunner;
        this.windowTrigger = windowTrigger;
        if (!(windowAssigner instanceof SlidingWindowAssigner) && !(windowAssigner instanceof TumblingWindowAssigner)) {
            throw new IllegalArgumentException("Currently aligned window only support sliding and tumbling windows.");
        }
        Preconditions.checkArgument(!windowAssigner.isEventTime() || rowtimeIndex >= 0);
        this.windowAssigner = windowAssigner;
        this.rowtimeIndex = rowtimeIndex;
        this.setChainingStrategy(ChainingStrategy.ALWAYS);
    }

    public void open() throws Exception {
        super.open();
        this.collector = new TimestampedCollector(this.output);
        this.collector.eraseTimestamp();
        this.nextTriggerTime = Long.MIN_VALUE;
        this.nextTriggerWindow = null;
        TimeWindow.Serializer windowSerializer = new TimeWindow.Serializer();
        ExecutionContextImpl ctx = new ExecutionContextImpl(this, (RuntimeContext)this.getRuntimeContext(), windowSerializer);
        this.windowRunner.open(ctx);
        this.numLateRecordsDropped = this.metrics.counter(LATE_ELEMENTS_DROPPED_METRIC_NAME);
        this.lateRecordsDroppedRate = this.metrics.meter(LATE_ELEMENTS_DROPPED_RATE_METRIC_NAME, (Meter)new DropwizardMeterWrapper(new com.codahale.metrics.Meter()));
        this.watermarkLatency = this.metrics.gauge(WATERMARK_LATENCY_METRIC_NAME, () -> {
            long watermark = this.currentWatermark;
            if (watermark < 0L) {
                return 0L;
            }
            return System.currentTimeMillis() - watermark;
        });
    }

    public void close() throws Exception {
        super.close();
        this.collector = null;
        this.functionsClosed = true;
        this.windowRunner.close();
    }

    public void dispose() throws Exception {
        super.dispose();
        this.collector = null;
        if (!this.functionsClosed) {
            this.functionsClosed = true;
            this.windowRunner.close();
        }
    }

    public void processElement(StreamRecord<BaseRow> record) throws Exception {
        BaseRow inputRow = (BaseRow)record.getValue();
        long timestamp = inputRow.getLong(this.rowtimeIndex);
        Collection<TimeWindow> windows = this.windowAssigner.assignWindows(inputRow, timestamp);
        boolean isElementDropped = true;
        for (TimeWindow window : windows) {
            if (this.isWindowLate(window, this.currentWatermark)) continue;
            isElementDropped = false;
            this.windowRunner.addElement(this.currentKey(), window, inputRow);
        }
        if (isElementDropped) {
            this.numLateRecordsDropped.inc();
            this.lateRecordsDroppedRate.markEvent();
        }
    }

    public void processWatermark(Watermark mark) throws Exception {
        this.currentWatermark = mark.getTimestamp();
        this.advanceWatermark(this.currentWatermark);
        super.processWatermark(mark);
    }

    private void advanceWatermark(long watermark) throws Exception {
        if (this.nextTriggerTime == Long.MIN_VALUE) {
            TimeWindow lowestWindow = this.windowRunner.lowestWindow();
            if (lowestWindow == null) {
                return;
            }
            this.nextTriggerTime = lowestWindow.maxTimestamp();
            this.nextTriggerWindow = lowestWindow;
            if (watermark < this.nextTriggerTime) {
                this.nextTriggerTime = this.windowTrigger.nextTriggerTime(watermark);
                this.nextTriggerWindow = this.windowTrigger.nextTriggerWindow(watermark);
            }
        }
        if (watermark >= this.nextTriggerTime) {
            for (TimeWindow window : this.windowRunner.ascendingWindows(this.nextTriggerWindow)) {
                if (this.needTriggerWindow(window, watermark)) {
                    this.windowRunner.fireWindow(window, (Collector<BaseRow>)this.collector);
                    continue;
                }
                this.nextTriggerTime = this.windowTrigger.nextTriggerTime(watermark);
                this.nextTriggerWindow = this.windowTrigger.nextTriggerWindow(watermark);
                break;
            }
        }
        ArrayList<TimeWindow> windowsToExpire = new ArrayList<TimeWindow>();
        for (TimeWindow window : this.windowRunner.ascendingWindows()) {
            if (!this.isWindowLate(window, watermark)) break;
            windowsToExpire.add(window);
        }
        this.expireWindows(windowsToExpire);
    }

    private boolean isWindowLate(TimeWindow window, long watermark) {
        long cleanupTime = window.maxTimestamp();
        return cleanupTime <= watermark;
    }

    private boolean needTriggerWindow(TimeWindow window, long watermark) {
        long cleanupTime = window.maxTimestamp();
        return cleanupTime <= watermark;
    }

    private void expireWindows(Collection<TimeWindow> windowsToExpire) throws Exception {
        for (TimeWindow window : windowsToExpire) {
            this.windowRunner.expireWindow(window);
        }
    }

    public void snapshotState(StateSnapshotContext context) throws Exception {
        super.snapshotState(context);
        this.windowRunner.snapshot();
    }

    public void endInput() throws Exception {
    }

    private BaseRow currentKey() {
        return (BaseRow)this.getCurrentKey();
    }

    protected Counter getNumLateRecordsDropped() {
        return this.numLateRecordsDropped;
    }

    protected Gauge<Long> getWatermarkLatency() {
        return this.watermarkLatency;
    }

    public boolean requireState() {
        return true;
    }
}

