/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.io;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.io.InputProcessor;
import org.apache.flink.streaming.runtime.metrics.MinWatermarkGauge;
import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatusSubMaintainer;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.checkpointlock.CheckpointLockDelegate;

class OneInputProcessor
implements InputProcessor,
StatusWatermarkValve.ValveOutputHandler {
    private Counter numRecordsIn;
    private final OneInputStreamOperator operator;
    private final WatermarkGauge watermarkGauge = new WatermarkGauge();
    private final StatusWatermarkValve statusWatermarkValve;
    private final CheckpointLockDelegate checkpointLockDelegate;
    private final TaskMetricGroup taskMetricGroup;
    private final StreamStatusSubMaintainer streamStatusSubMaintainer;

    public OneInputProcessor(StreamStatusSubMaintainer streamStatusSubMaintainer, OneInputStreamOperator operator, Object checkpointLock, TaskMetricGroup taskMetricGroup, MinWatermarkGauge minAllInputWatermarkGauge, int channelCount) {
        this.streamStatusSubMaintainer = streamStatusSubMaintainer;
        this.checkpointLockDelegate = new CheckpointLockDelegate(Preconditions.checkNotNull((Object)checkpointLock));
        this.taskMetricGroup = (TaskMetricGroup)Preconditions.checkNotNull((Object)taskMetricGroup);
        this.statusWatermarkValve = new StatusWatermarkValve(channelCount, this);
        this.operator = (OneInputStreamOperator)Preconditions.checkNotNull((Object)operator);
        operator.getMetricGroup().gauge("currentInputWatermark", (Gauge)this.watermarkGauge);
        minAllInputWatermarkGauge.addWatermarkGauge(this.watermarkGauge);
        this.numRecordsIn = ((OperatorMetricGroup)operator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();
    }

    @Override
    public void processRecord(StreamRecord streamRecord, int channelIndex) throws Exception {
        this.checkpointLockDelegate.lockAndRun(() -> {
            this.numRecordsIn.inc();
            this.operator.setKeyContextElement1(streamRecord);
            this.operator.processElement(streamRecord);
        });
    }

    @Override
    public void processLatencyMarker(LatencyMarker latencyMarker, int channelIndex) throws Exception {
        this.checkpointLockDelegate.lockAndRun(() -> this.operator.processLatencyMarker(latencyMarker));
    }

    @Override
    public void processWatermark(Watermark watermark, int channelIndex) throws Exception {
        this.statusWatermarkValve.inputWatermark(watermark, channelIndex);
    }

    @Override
    public void processStreamStatus(StreamStatus streamStatus, int channelIndex) throws Exception {
        this.statusWatermarkValve.inputStreamStatus(streamStatus, channelIndex);
    }

    @Override
    public void endInput() throws Exception {
        this.operator.endInput();
    }

    @Override
    public void handleWatermark(Watermark watermark) {
        try {
            this.watermarkGauge.setCurrentWatermark(watermark.getTimestamp());
            this.operator.processWatermark(watermark);
        }
        catch (Exception e) {
            throw new RuntimeException("Exception occurred while processing valve output watermark: ", e);
        }
    }

    @Override
    public void handleStreamStatus(StreamStatus streamStatus) {
        this.streamStatusSubMaintainer.updateStreamStatus(streamStatus);
    }

    @Override
    public void release() {
        this.streamStatusSubMaintainer.release();
    }

    @VisibleForTesting
    WatermarkGauge getWatermarkGauge() {
        return this.watermarkGauge;
    }
}

