/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.window;

import java.util.Collection;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.table.api.window.TimeWindow;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.runtime.functions.ExecutionContext;
import org.apache.flink.table.runtime.functions.ExecutionContextImpl;
import org.apache.flink.table.runtime.window.aligned.AlignedWindowAggregator;
import org.apache.flink.table.runtime.window.aligned.AlignedWindowTrigger;
import org.apache.flink.table.runtime.window.assigners.SlidingWindowAssigner;
import org.apache.flink.table.runtime.window.assigners.TumblingWindowAssigner;
import org.apache.flink.table.runtime.window.assigners.WindowAssigner;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;

public abstract class AbstractAlignedWindowOperator
extends AbstractStreamOperator<BaseRow>
implements OneInputStreamOperator<BaseRow, BaseRow> {
    private static final long serialVersionUID = 1L;
    protected static final String LATE_ELEMENTS_DROPPED_METRIC_NAME = "numLateRecordsDropped";
    protected static final String LATE_ELEMENTS_DROPPED_RATE_METRIC_NAME = "lateRecordsDroppedRate";
    protected static final String WATERMARK_LATENCY_METRIC_NAME = "watermarkLatency";
    protected final AlignedWindowAggregator<BaseRow, TimeWindow, BaseRow> windowRunner;
    protected final WindowAssigner<TimeWindow> windowAssigner;
    protected final AlignedWindowTrigger windowTrigger;
    protected final int rowtimeIndex;
    protected transient long nextTriggerTime;
    protected transient TimeWindow nextTriggerWindow;
    protected transient TimestampedCollector<BaseRow> collector;
    private transient boolean functionsClosed = false;
    private transient Gauge<Long> watermarkLatency;
    private transient long currentWatermark;

    public AbstractAlignedWindowOperator(AlignedWindowAggregator<BaseRow, TimeWindow, BaseRow> windowRunner, WindowAssigner<TimeWindow> windowAssigner, AlignedWindowTrigger windowTrigger, int rowtimeIndex) {
        this.windowRunner = windowRunner;
        this.windowTrigger = windowTrigger;
        if (!(windowAssigner instanceof SlidingWindowAssigner) && !(windowAssigner instanceof TumblingWindowAssigner)) {
            throw new IllegalArgumentException("Currently aligned window only support sliding and tumbling windows.");
        }
        Preconditions.checkArgument(!windowAssigner.isEventTime() || rowtimeIndex >= 0);
        this.windowAssigner = windowAssigner;
        this.rowtimeIndex = rowtimeIndex;
        this.setChainingStrategy(ChainingStrategy.ALWAYS);
    }

    public void open() throws Exception {
        super.open();
        this.collector = new TimestampedCollector(this.output);
        this.collector.eraseTimestamp();
        this.nextTriggerTime = Long.MIN_VALUE;
        this.nextTriggerWindow = null;
        TimeWindow.Serializer windowSerializer = new TimeWindow.Serializer();
        ExecutionContextImpl ctx = new ExecutionContextImpl(this, (RuntimeContext)this.getRuntimeContext(), windowSerializer);
        this.windowRunner.open(new WindowContextImpl(ctx, (Collector<BaseRow>)this.collector));
        this.watermarkLatency = this.metrics.gauge(WATERMARK_LATENCY_METRIC_NAME, () -> {
            long watermark = this.currentWatermark;
            if (watermark < 0L) {
                return 0L;
            }
            return System.currentTimeMillis() - watermark;
        });
    }

    public void close() throws Exception {
        super.close();
        this.collector = null;
        this.functionsClosed = true;
        this.windowRunner.close();
    }

    public void dispose() throws Exception {
        super.dispose();
        this.collector = null;
        if (!this.functionsClosed) {
            this.functionsClosed = true;
            this.windowRunner.close();
        }
    }

    public void processWatermark(Watermark mark) throws Exception {
        this.currentWatermark = mark.getTimestamp();
        this.advanceWatermark(this.currentWatermark);
        super.processWatermark(mark);
    }

    protected abstract void advanceWatermark(long var1) throws Exception;

    protected abstract BaseRow getKey(BaseRow var1) throws Exception;

    protected boolean isWindowLate(TimeWindow window, long watermark) {
        long cleanupTime = window.maxTimestamp();
        return cleanupTime <= watermark;
    }

    protected boolean needTriggerWindow(TimeWindow window, long watermark) {
        long cleanupTime = window.maxTimestamp();
        return cleanupTime <= watermark;
    }

    protected void expireWindows(Collection<TimeWindow> windowsToExpire) throws Exception {
        for (TimeWindow window : windowsToExpire) {
            this.windowRunner.expireWindow(window);
        }
    }

    public long getCurrentWatermark() {
        return this.currentWatermark;
    }

    public void endInput() throws Exception {
    }

    protected Gauge<Long> getWatermarkLatency() {
        return this.watermarkLatency;
    }

    private class WindowContextImpl
    implements AlignedWindowAggregator.Context {
        private final ExecutionContext executionContext;
        private final Collector<BaseRow> out;

        public WindowContextImpl(ExecutionContext executionContext, Collector<BaseRow> out) {
            this.executionContext = executionContext;
            this.out = out;
        }

        @Override
        public ExecutionContext getExecutionContext() throws Exception {
            return this.executionContext;
        }

        @Override
        public Collector<BaseRow> getCollector() throws Exception {
            return this.out;
        }

        @Override
        public OperatorStateBackend getOpStateStore() throws Exception {
            return AbstractAlignedWindowOperator.this.getOperatorStateBackend();
        }
    }
}

