/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.io;

import java.io.IOException;
import java.util.BitSet;
import java.util.Collection;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
import org.apache.flink.runtime.io.network.api.serialization.SerializerManagerUtility;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.metrics.SumAndCount;
import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.plugable.DeserializationDelegate;
import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.operators.TwoInputSelection;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.io.InputProcessorUtil;
import org.apache.flink.streaming.runtime.io.SelectedReadingBarrierHandler;
import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
import org.apache.flink.streaming.runtime.streamrecord.ReusingRecordValueDeserializationDelegate;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
import org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class StreamTwoInputProcessor<IN1, IN2> {
    private static final Logger LOG = LoggerFactory.getLogger(StreamTwoInputProcessor.class);
    private final RecordDeserializer<DeserializationDelegate<StreamElement>>[] recordDeserializerOfChannels;
    private RecordDeserializer<DeserializationDelegate<StreamElement>>[] currentRecordDeserializerOfInputs;
    private final DeserializationDelegate<StreamElement>[] deserializationDelegateOfInputs;
    private final InputGate[] inputs;
    private final SelectedReadingBarrierHandler barrierHandler;
    private final Object lock;
    private StreamStatus firstStatus;
    private StreamStatus secondStatus;
    private final StatusWatermarkValve[] statusWatermarkValveOfInputs;
    private final int[] numChannelsOfInputs;
    private final BitSet[] eopChannelsOfInputs;
    private int[] currentChannelOfInputs;
    private final StreamStatusMaintainer streamStatusMaintainer;
    private final TwoInputStreamOperator<IN1, IN2, ?> streamOperator;
    private TwoInputSelection inputSelection;
    private boolean[] isEndInputs;
    private int lastReadingInputIndex = 0;
    private final WatermarkGauge[] watermarkGaugeOfInputs;
    private Counter numRecordsIn;
    private Counter numRecordsReceived;
    private boolean isFinished;
    private boolean enableTracingMetrics;
    private int tracingMetricsInterval;
    private long tracingInputCount;
    private SumAndCount taskLatency;
    private SumAndCount waitInput;
    private long lastProcessedTime = -1L;

    public StreamTwoInputProcessor(Collection<InputGate> inputGates1, Collection<InputGate> inputGates2, TypeSerializer<IN1> inputSerializer1, TypeSerializer<IN2> inputSerializer2, boolean isCheckpointingEnabled, TwoInputStreamTask<IN1, IN2, ?> checkpointedTask, CheckpointingMode checkpointMode, Object lock, IOManager ioManager, Configuration taskManagerConfig, StreamStatusMaintainer streamStatusMaintainer, TwoInputStreamOperator<IN1, IN2, ?> streamOperator, TaskIOMetricGroup metrics, WatermarkGauge input1WatermarkGauge, WatermarkGauge input2WatermarkGauge, boolean objectReuse, boolean enableTracingMetrics, int tracingMetricsInterval) throws IOException {
        int i;
        this.barrierHandler = InputProcessorUtil.createCheckpointBarrierHandler(isCheckpointingEnabled, checkpointedTask, checkpointMode, ioManager, taskManagerConfig, inputGates1, inputGates2);
        Preconditions.checkState((this.barrierHandler.getSubInputGateCount() == 2 ? 1 : 0) != 0);
        int numberOfInputs = this.barrierHandler.getSubInputGateCount();
        this.inputs = new InputGate[numberOfInputs];
        for (int i2 = 0; i2 < numberOfInputs; ++i2) {
            this.inputs[i2] = this.barrierHandler.getSubInputGate(i2);
        }
        this.lock = Preconditions.checkNotNull((Object)lock);
        StreamElementSerializer<IN1> ser1 = new StreamElementSerializer<IN1>(inputSerializer1);
        StreamElementSerializer<IN2> ser2 = new StreamElementSerializer<IN2>(inputSerializer2);
        this.deserializationDelegateOfInputs = new DeserializationDelegate[]{objectReuse ? new ReusingRecordValueDeserializationDelegate<IN1>(ser1) : new NonReusingDeserializationDelegate(ser1), objectReuse ? new ReusingRecordValueDeserializationDelegate<IN2>(ser2) : new NonReusingDeserializationDelegate(ser2)};
        int numberOfAllInputChannels = this.barrierHandler.getNumberOfInputChannels();
        SerializerManagerUtility serializerManagerUtility = new SerializerManagerUtility(taskManagerConfig);
        this.recordDeserializerOfChannels = serializerManagerUtility.createRecordDeserializers(this.barrierHandler.getAllInputChannels(), ioManager.getSpillingDirectoriesPaths());
        this.numChannelsOfInputs = new int[numberOfInputs];
        for (i = 0; i < numberOfInputs; ++i) {
            this.numChannelsOfInputs[i] = this.inputs[i].getNumberOfInputChannels();
        }
        this.eopChannelsOfInputs = new BitSet[numberOfInputs];
        for (i = 0; i < numberOfInputs; ++i) {
            this.eopChannelsOfInputs[i] = new BitSet(this.numChannelsOfInputs[i]);
        }
        this.firstStatus = StreamStatus.ACTIVE;
        this.secondStatus = StreamStatus.ACTIVE;
        this.streamStatusMaintainer = (StreamStatusMaintainer)Preconditions.checkNotNull((Object)streamStatusMaintainer);
        this.streamOperator = (TwoInputStreamOperator)Preconditions.checkNotNull(streamOperator);
        this.statusWatermarkValveOfInputs = new StatusWatermarkValve[]{new StatusWatermarkValve(this.numChannelsOfInputs[0], new ForwardingValveOutputHandler1(streamOperator, lock)), new StatusWatermarkValve(this.numChannelsOfInputs[1], new ForwardingValveOutputHandler2(streamOperator, lock))};
        this.watermarkGaugeOfInputs = new WatermarkGauge[]{input1WatermarkGauge, input2WatermarkGauge};
        metrics.gauge("checkpointAlignmentTime", this.barrierHandler::getAlignmentDurationNanos);
        this.currentRecordDeserializerOfInputs = new RecordDeserializer[numberOfInputs];
        this.currentChannelOfInputs = new int[]{-1, -1};
        this.inputSelection = streamOperator.firstInputSelection();
        this.isEndInputs = new boolean[]{false, false};
        this.enableTracingMetrics = enableTracingMetrics;
        this.tracingMetricsInterval = tracingMetricsInterval;
        this.tracingInputCount = 0L;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean processInput() throws Exception {
        if (this.isFinished) {
            return false;
        }
        if (this.numRecordsIn == null) {
            try {
                this.numRecordsIn = ((OperatorMetricGroup)this.streamOperator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();
            }
            catch (Exception e) {
                LOG.warn("An exception occurred during the metrics setup.", (Throwable)e);
                this.numRecordsIn = new SimpleCounter();
            }
        }
        if (this.numRecordsReceived == null) {
            try {
                this.numRecordsReceived = ((OperatorMetricGroup)this.streamOperator.getMetricGroup()).parent().getIOMetricGroup().getNumRecordsReceived();
            }
            catch (Exception e) {
                LOG.warn("An exception occurred during the metrics setup.", (Throwable)e);
                this.numRecordsReceived = new SimpleCounter();
            }
        }
        if (this.enableTracingMetrics) {
            if (this.taskLatency == null) {
                this.taskLatency = new SumAndCount("taskLatency", (MetricGroup)((OperatorMetricGroup)this.streamOperator.getMetricGroup()).parent());
            }
            if (this.waitInput == null) {
                this.waitInput = new SumAndCount("waitInput", (MetricGroup)((OperatorMetricGroup)this.streamOperator.getMetricGroup()).parent());
            }
        }
        while (true) {
            BufferOrEvent bufferOrEvent;
            int flag;
            int readingIndex = this.inputSelection == TwoInputSelection.ANY ? ((flag = (this.currentRecordDeserializerOfInputs[0] == null ? 0 : 1) | (this.currentRecordDeserializerOfInputs[1] == null ? 0 : 2)) == 3 ? this.lastReadingInputIndex : (flag > 0 ? (flag == 1 ? 0 : 1) : 0)) : this.getInputIndexFromInputSelection(this.inputSelection);
            if (this.currentRecordDeserializerOfInputs[readingIndex] != null) {
                this.lastReadingInputIndex = readingIndex;
                RecordDeserializer.DeserializationResult result = this.currentRecordDeserializerOfInputs[readingIndex].getNextRecord(this.deserializationDelegateOfInputs[readingIndex]);
                if (result.isBufferConsumed()) {
                    this.currentRecordDeserializerOfInputs[readingIndex].getCurrentBuffer().recycleBuffer();
                    this.currentRecordDeserializerOfInputs[readingIndex] = null;
                }
                if (result.isFullRecord()) {
                    boolean recordProcessed;
                    this.numRecordsReceived.inc();
                    ++this.tracingInputCount;
                    if (this.enableTracingMetrics && this.tracingInputCount % (long)this.tracingMetricsInterval == 0L) {
                        long start = System.nanoTime();
                        this.waitInput.update(start - this.lastProcessedTime);
                        recordProcessed = this.processRecordOrMark(readingIndex);
                        this.lastProcessedTime = System.nanoTime();
                        this.taskLatency.update(this.lastProcessedTime - start);
                    } else {
                        recordProcessed = this.processRecordOrMark(readingIndex);
                    }
                    if (!recordProcessed) continue;
                    return true;
                }
            }
            if (this.inputSelection == TwoInputSelection.ANY) {
                bufferOrEvent = this.barrierHandler.getNextNonBlocked();
            } else {
                if (this.isEndInputs[readingIndex]) {
                    throw new IOException("Unexpected reading selection: " + (Object)((Object)this.inputSelection) + ", because the input has finished.");
                }
                bufferOrEvent = this.barrierHandler.getNextNonBlocked(this.inputs[readingIndex]);
            }
            if (bufferOrEvent == null) break;
            if (bufferOrEvent.isBuffer()) {
                int channelIndex = bufferOrEvent.getChannelIndex();
                int inputIndex = this.getInputIndexFromChannel(channelIndex);
                this.currentRecordDeserializerOfInputs[inputIndex] = this.recordDeserializerOfChannels[channelIndex];
                this.currentRecordDeserializerOfInputs[inputIndex].setNextBuffer(bufferOrEvent.getBuffer());
                this.currentChannelOfInputs[inputIndex] = channelIndex;
                this.lastReadingInputIndex = inputIndex;
                continue;
            }
            AbstractEvent event = bufferOrEvent.getEvent();
            if (event.getClass() != EndOfPartitionEvent.class) {
                throw new IOException("Unexpected event: " + event);
            }
            int channelIndex = bufferOrEvent.getChannelIndex();
            int inputIndex = this.getInputIndexFromChannel(channelIndex);
            this.eopChannelsOfInputs[inputIndex].set(channelIndex);
            if (this.eopChannelsOfInputs[inputIndex].cardinality() != this.numChannelsOfInputs[inputIndex]) continue;
            Object object = this.lock;
            synchronized (object) {
                if (inputIndex == 0) {
                    this.streamOperator.endInput1();
                } else {
                    this.streamOperator.endInput2();
                }
                this.isEndInputs[inputIndex] = true;
                this.inputSelection = TwoInputSelection.ANY;
            }
        }
        this.isFinished = true;
        if (!this.barrierHandler.isEmpty()) {
            throw new IllegalStateException("Trailing data in checkpoint barrier handler.");
        }
        return false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean processRecordOrMark(int readingIndex) throws Exception {
        StreamElement recordOrWatermark = (StreamElement)this.deserializationDelegateOfInputs[readingIndex].getInstance();
        if (recordOrWatermark.isWatermark()) {
            this.statusWatermarkValveOfInputs[readingIndex].inputWatermark(recordOrWatermark.asWatermark(), this.currentChannelOfInputs[readingIndex] - (readingIndex == 0 ? 0 : this.numChannelsOfInputs[0]));
            return false;
        }
        if (recordOrWatermark.isStreamStatus()) {
            this.statusWatermarkValveOfInputs[readingIndex].inputStreamStatus(recordOrWatermark.asStreamStatus(), this.currentChannelOfInputs[readingIndex] - (readingIndex == 0 ? 0 : this.numChannelsOfInputs[0]));
            return false;
        }
        if (recordOrWatermark.isLatencyMarker()) {
            Object object = this.lock;
            synchronized (object) {
                if (readingIndex == 0) {
                    this.streamOperator.processLatencyMarker1(recordOrWatermark.asLatencyMarker());
                } else {
                    this.streamOperator.processLatencyMarker2(recordOrWatermark.asLatencyMarker());
                }
            }
            return false;
        }
        if (readingIndex == 0) {
            StreamRecord record = recordOrWatermark.asRecord();
            Object object = this.lock;
            synchronized (object) {
                this.numRecordsIn.inc();
                this.streamOperator.setKeyContextElement1(record);
                this.inputSelection = this.streamOperator.processElement1(record);
            }
        }
        StreamRecord record = recordOrWatermark.asRecord();
        Object object = this.lock;
        synchronized (object) {
            this.numRecordsIn.inc();
            this.streamOperator.setKeyContextElement2(record);
            this.inputSelection = this.streamOperator.processElement2(record);
        }
        return true;
    }

    private int getInputIndexFromChannel(int channelIndex) {
        return channelIndex < this.numChannelsOfInputs[0] ? 0 : 1;
    }

    private int getInputIndexFromInputSelection(TwoInputSelection inputSelection) {
        return inputSelection == TwoInputSelection.FIRST ? 0 : (inputSelection == TwoInputSelection.SECOND ? 1 : -1);
    }

    public void cleanup() throws IOException {
        for (RecordDeserializer<DeserializationDelegate<StreamElement>> deserializer : this.recordDeserializerOfChannels) {
            Buffer buffer = deserializer.getCurrentBuffer();
            if (buffer != null && !buffer.isRecycled()) {
                buffer.recycleBuffer();
            }
            deserializer.clear();
        }
        this.barrierHandler.cleanup();
    }

    private class ForwardingValveOutputHandler2
    implements StatusWatermarkValve.ValveOutputHandler {
        private final TwoInputStreamOperator<IN1, IN2, ?> operator;
        private final Object lock;

        private ForwardingValveOutputHandler2(TwoInputStreamOperator<IN1, IN2, ?> operator, Object lock) {
            this.operator = (TwoInputStreamOperator)Preconditions.checkNotNull(operator);
            this.lock = Preconditions.checkNotNull((Object)lock);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void handleWatermark(Watermark watermark) {
            try {
                Object object = this.lock;
                synchronized (object) {
                    StreamTwoInputProcessor.this.watermarkGaugeOfInputs[1].setCurrentWatermark(watermark.getTimestamp());
                    this.operator.processWatermark2(watermark);
                }
            }
            catch (Exception e) {
                throw new RuntimeException("Exception occurred while processing valve output watermark: ", e);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void handleStreamStatus(StreamStatus streamStatus) {
            try {
                Object object = this.lock;
                synchronized (object) {
                    StreamTwoInputProcessor.this.secondStatus = streamStatus;
                    if (!streamStatus.equals(StreamTwoInputProcessor.this.streamStatusMaintainer.getStreamStatus())) {
                        if (streamStatus.isActive()) {
                            StreamTwoInputProcessor.this.streamStatusMaintainer.toggleStreamStatus(StreamStatus.ACTIVE);
                        } else if (StreamTwoInputProcessor.this.firstStatus.isIdle()) {
                            StreamTwoInputProcessor.this.streamStatusMaintainer.toggleStreamStatus(StreamStatus.IDLE);
                        }
                    }
                }
            }
            catch (Exception e) {
                throw new RuntimeException("Exception occurred while processing valve output stream status: ", e);
            }
        }
    }

    private class ForwardingValveOutputHandler1
    implements StatusWatermarkValve.ValveOutputHandler {
        private final TwoInputStreamOperator<IN1, IN2, ?> operator;
        private final Object lock;

        private ForwardingValveOutputHandler1(TwoInputStreamOperator<IN1, IN2, ?> operator, Object lock) {
            this.operator = (TwoInputStreamOperator)Preconditions.checkNotNull(operator);
            this.lock = Preconditions.checkNotNull((Object)lock);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void handleWatermark(Watermark watermark) {
            try {
                Object object = this.lock;
                synchronized (object) {
                    StreamTwoInputProcessor.this.watermarkGaugeOfInputs[0].setCurrentWatermark(watermark.getTimestamp());
                    this.operator.processWatermark1(watermark);
                }
            }
            catch (Exception e) {
                throw new RuntimeException("Exception occurred while processing valve output watermark: ", e);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void handleStreamStatus(StreamStatus streamStatus) {
            try {
                Object object = this.lock;
                synchronized (object) {
                    StreamTwoInputProcessor.this.firstStatus = streamStatus;
                    if (!streamStatus.equals(StreamTwoInputProcessor.this.streamStatusMaintainer.getStreamStatus())) {
                        if (streamStatus.isActive()) {
                            StreamTwoInputProcessor.this.streamStatusMaintainer.toggleStreamStatus(StreamStatus.ACTIVE);
                        } else if (StreamTwoInputProcessor.this.secondStatus.isIdle()) {
                            StreamTwoInputProcessor.this.streamStatusMaintainer.toggleStreamStatus(StreamStatus.IDLE);
                        }
                    }
                }
            }
            catch (Exception e) {
                throw new RuntimeException("Exception occurred while processing valve output stream status: ", e);
            }
        }
    }
}

