/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.io;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.io.InputProcessor;
import org.apache.flink.streaming.runtime.io.TwoInputWatermarkProcessor;
import org.apache.flink.streaming.runtime.metrics.MinWatermarkGauge;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatusSubMaintainer;
import org.apache.flink.util.Preconditions;

class FirstOfTwoInputProcessor
implements InputProcessor,
StatusWatermarkValve.ValveOutputHandler {
    private Counter numRecordsIn;
    private final TwoInputStreamOperator operator;
    private final StatusWatermarkValve statusWatermarkValve;
    private final Object checkpointLock;
    private final TaskMetricGroup taskMetricGroup;
    private final StreamStatusSubMaintainer streamStatusSubMaintainer;
    private final TwoInputWatermarkProcessor watermarkProcessor;

    public FirstOfTwoInputProcessor(StreamStatusSubMaintainer streamStatusSubMaintainer, TwoInputStreamOperator operator, Object checkpointLock, TaskMetricGroup taskMetricGroup, MinWatermarkGauge minAllInputWatermarkGauge, int channelCount) {
        this.streamStatusSubMaintainer = (StreamStatusSubMaintainer)Preconditions.checkNotNull((Object)streamStatusSubMaintainer);
        this.checkpointLock = Preconditions.checkNotNull((Object)checkpointLock);
        this.taskMetricGroup = (TaskMetricGroup)Preconditions.checkNotNull((Object)taskMetricGroup);
        this.statusWatermarkValve = new StatusWatermarkValve(channelCount, this);
        this.operator = (TwoInputStreamOperator)Preconditions.checkNotNull((Object)operator);
        this.watermarkProcessor = new TwoInputWatermarkProcessor(operator, minAllInputWatermarkGauge);
        this.numRecordsIn = ((OperatorMetricGroup)operator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void processRecord(StreamRecord streamRecord, int channelIndex) throws Exception {
        Object object = this.checkpointLock;
        synchronized (object) {
            this.numRecordsIn.inc();
            this.operator.setKeyContextElement1(streamRecord);
            this.operator.processElement1(streamRecord);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void processLatencyMarker(LatencyMarker latencyMarker, int channelIndex) throws Exception {
        Object object = this.checkpointLock;
        synchronized (object) {
            this.operator.processLatencyMarker1(latencyMarker);
        }
    }

    @Override
    public void processWatermark(Watermark watermark, int channelIndex) throws Exception {
        this.statusWatermarkValve.inputWatermark(watermark, channelIndex);
    }

    @Override
    public void processStreamStatus(StreamStatus streamStatus, int channelIndex) throws Exception {
        this.statusWatermarkValve.inputStreamStatus(streamStatus, channelIndex);
    }

    @Override
    public void endInput() throws Exception {
        this.operator.endInput1();
    }

    @Override
    public void handleWatermark(Watermark watermark) {
        try {
            this.watermarkProcessor.getInput1WatermarkGauge().setCurrentWatermark(watermark.getTimestamp());
            this.operator.processWatermark1(watermark);
        }
        catch (Exception e) {
            throw new RuntimeException("Exception occurred while processing valve output watermark: ", e);
        }
    }

    @Override
    public void handleStreamStatus(StreamStatus streamStatus) {
        this.streamStatusSubMaintainer.updateStreamStatus(streamStatus);
    }

    @Override
    public void release() {
        this.streamStatusSubMaintainer.release();
    }

    @VisibleForTesting
    TwoInputWatermarkProcessor getWatermarkProcessor() {
        return this.watermarkProcessor;
    }
}

