package org.apache.flink.table.runtime;

import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.runtime.fault.tolerant.ProcessingTimeCallbackOperator;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/table/runtime/WatermarkAssignerOperator.class */
public class WatermarkAssignerOperator extends ProcessingTimeCallbackOperator<BaseRow> implements OneInputStreamOperator<BaseRow, BaseRow> {
    private static final long serialVersionUID = 1;
    private final int rowtimeIndex;
    private final long offset;
    private final long idleTimeout;
    private transient long watermarkInterval;
    private transient long currentWatermark;
    private transient long currentMaxTimestamp;
    private transient long lastRecordTime;
    private transient StreamStatusMaintainer streamStatusMaintainer;

    public WatermarkAssignerOperator(int i, long j, long j2) {
        this.rowtimeIndex = i;
        this.offset = j;
        if (j2 != -1) {
            Preconditions.checkArgument(j2 >= 1, "The idle timeout cannot be smaller than 1 ms.");
        }
        this.idleTimeout = j2;
        this.chainingStrategy = ChainingStrategy.ALWAYS;
    }

    @Override // org.apache.flink.table.runtime.fault.tolerant.ProcessingTimeCallbackOperator
    public void open() throws Exception {
        super.open();
        this.currentWatermark = 0L;
        this.currentMaxTimestamp = 0L;
        this.watermarkInterval = getExecutionConfig().getAutoWatermarkInterval();
        this.lastRecordTime = getProcessingTimeService().getCurrentProcessingTime();
        this.streamStatusMaintainer = getContainingTask().getStreamStatusMaintainer();
        if (this.watermarkInterval > 0) {
            getProcessingTimeService().registerTimer(getProcessingTimeService().getCurrentProcessingTime() + this.watermarkInterval, this);
        }
    }

    public void processElement(StreamRecord<BaseRow> streamRecord) throws Exception {
        if (this.idleTimeout != -1) {
            this.streamStatusMaintainer.toggleStreamStatus(StreamStatus.ACTIVE);
            this.lastRecordTime = getProcessingTimeService().getCurrentProcessingTime();
        }
        BaseRow baseRow = (BaseRow) streamRecord.getValue();
        if (baseRow.isNullAt(this.rowtimeIndex)) {
            throw new RuntimeException("RowTime field should not be null, please convert it to a non-null long value.");
        }
        this.currentMaxTimestamp = Math.max(this.currentMaxTimestamp, baseRow.getLong(this.rowtimeIndex));
        this.output.collect(streamRecord);
        if (this.currentMaxTimestamp - (this.currentWatermark + this.offset) > this.watermarkInterval) {
            advanceWatermark();
        }
    }

    private void advanceWatermark() {
        long j = this.currentMaxTimestamp - this.offset;
        if (j > this.currentWatermark) {
            this.currentWatermark = j;
            this.output.emitWatermark(new Watermark(j));
        }
    }

    public void onProcessingTime(long j) throws Exception {
        advanceWatermark();
        if (this.idleTimeout != -1 && getProcessingTimeService().getCurrentProcessingTime() - this.lastRecordTime > this.idleTimeout) {
            this.streamStatusMaintainer.toggleStreamStatus(StreamStatus.IDLE);
        }
        getProcessingTimeService().registerTimer(getProcessingTimeService().getCurrentProcessingTime() + this.watermarkInterval, this);
    }

    public void processWatermark(Watermark watermark) throws Exception {
        if (watermark.getTimestamp() != Long.MAX_VALUE || this.currentWatermark == Long.MAX_VALUE) {
            return;
        }
        if (this.idleTimeout != -1) {
            this.streamStatusMaintainer.toggleStreamStatus(StreamStatus.ACTIVE);
        }
        this.currentWatermark = Long.MAX_VALUE;
        this.output.emitWatermark(watermark);
    }

    public void endInput() throws Exception {
        processWatermark(Watermark.MAX_WATERMARK);
    }

    public void close() throws Exception {
        super.close();
        advanceWatermark();
    }
}
