package org.apache.flink.table.runtime.bundle;

import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
import org.apache.flink.table.dataformat.BaseRow;

/* loaded from: input_file:org/apache/flink/table/runtime/bundle/MiniBatchAssignerOperator.class */
public class MiniBatchAssignerOperator extends AbstractStreamOperator<BaseRow> implements OneInputStreamOperator<BaseRow, BaseRow>, ProcessingTimeCallback {
    private static final long serialVersionUID = 1;
    private final long intervalMs;
    private transient long currentWatermark;

    public MiniBatchAssignerOperator(long j) {
        this.intervalMs = j;
        this.chainingStrategy = ChainingStrategy.ALWAYS;
    }

    public void open() throws Exception {
        super.open();
        this.currentWatermark = 0L;
        getProcessingTimeService().registerTimer(getProcessingTimeService().getCurrentProcessingTime() + this.intervalMs, this);
        getRuntimeContext().getMetricGroup().gauge("currentBatch", (String) () -> {
            return Long.valueOf(this.currentWatermark);
        });
    }

    public void processElement(StreamRecord<BaseRow> streamRecord) throws Exception {
        long currentProcessingTime = getProcessingTimeService().getCurrentProcessingTime();
        long j = currentProcessingTime - (currentProcessingTime % this.intervalMs);
        if (j > this.currentWatermark) {
            this.currentWatermark = j;
            this.output.emitWatermark(new Watermark(j));
        }
        this.output.collect(streamRecord);
    }

    public void onProcessingTime(long j) throws Exception {
        long currentProcessingTime = getProcessingTimeService().getCurrentProcessingTime();
        long j2 = currentProcessingTime - (currentProcessingTime % this.intervalMs);
        if (j2 > this.currentWatermark) {
            this.currentWatermark = j2;
            this.output.emitWatermark(new Watermark(j2));
        }
        getProcessingTimeService().registerTimer(j2 + this.intervalMs, this);
    }

    public void processWatermark(Watermark watermark) throws Exception {
        if (watermark.getTimestamp() != Long.MAX_VALUE || this.currentWatermark == Long.MAX_VALUE) {
            return;
        }
        this.currentWatermark = Long.MAX_VALUE;
        this.output.emitWatermark(watermark);
    }

    public void endInput() throws Exception {
    }

    public void close() throws Exception {
        super.close();
    }
}
