/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import java.util.ArrayList;
import java.util.List;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor;
import org.apache.flink.streaming.runtime.metrics.MinWatermarkGauge;
import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.checkpointlock.CheckpointLockDelegate;

@Internal
public class TwoInputStreamTask<IN1, IN2, OUT>
extends StreamTask<OUT, TwoInputStreamOperator<IN1, IN2, OUT>> {
    private StreamTwoInputProcessor<IN1, IN2> inputProcessor;
    private volatile boolean running = true;
    private final WatermarkGauge input1WatermarkGauge = new WatermarkGauge();
    private final WatermarkGauge input2WatermarkGauge = new WatermarkGauge();
    private final MinWatermarkGauge minInputWatermarkGauge = new MinWatermarkGauge(this.input1WatermarkGauge, this.input2WatermarkGauge);

    public TwoInputStreamTask(Environment env) {
        super(env);
    }

    @Override
    public void init() throws Exception {
        StreamConfig configuration = this.getConfiguration();
        ClassLoader userClassLoader = this.getUserCodeClassLoader();
        TwoInputStreamOperator<IN1, IN2, OUT> headOperator = this.getHeadOperator();
        TypeSerializer inputDeserializer1 = configuration.getTypeSerializerIn1(userClassLoader);
        TypeSerializer inputDeserializer2 = configuration.getTypeSerializerIn2(userClassLoader);
        int numberOfInputs = configuration.getNumberOfInputs();
        ArrayList<InputGate> inputList1 = new ArrayList<InputGate>();
        ArrayList<InputGate> inputList2 = new ArrayList<InputGate>();
        List<StreamEdge> inEdges = this.getStreamTaskConfig().getInStreamEdgesOfChain();
        block4: for (int i = 0; i < numberOfInputs; ++i) {
            int inputType = inEdges.get(i).getTypeNumber();
            InputGate reader = this.getEnvironment().getInputGate(i);
            switch (inputType) {
                case 1: {
                    inputList1.add(reader);
                    continue block4;
                }
                case 2: {
                    inputList2.add(reader);
                    continue block4;
                }
                default: {
                    throw new RuntimeException("Invalid input type number: " + inputType);
                }
            }
        }
        this.inputProcessor = new StreamTwoInputProcessor(inputList1, inputList2, inputDeserializer1, inputDeserializer2, configuration.isCheckpointingEnabled(), this, configuration.getCheckpointMode(), this.getCheckpointLock(), this.getEnvironment().getIOManager(), this.getEnvironment().getTaskManagerInfo().getConfiguration(), this.getStreamStatusMaintainer(), headOperator, this.getEnvironment().getMetricGroup().getIOMetricGroup(), this.input1WatermarkGauge, this.input2WatermarkGauge, this.getExecutionConfig().isObjectReuseEnabled(), this.getExecutionConfig().isTracingMetricsEnabled(), this.getExecutionConfig().getTracingMetricsInterval());
        headOperator.getMetricGroup().gauge("currentInputWatermark", (Gauge)this.minInputWatermarkGauge);
        headOperator.getMetricGroup().gauge("currentInput1Watermark", (Gauge)this.input1WatermarkGauge);
        headOperator.getMetricGroup().gauge("currentInput2Watermark", (Gauge)this.input2WatermarkGauge);
        this.getEnvironment().getMetricGroup().gauge("currentInputWatermark", this.minInputWatermarkGauge::getValue);
    }

    @Override
    protected void run() throws Exception {
        StreamTwoInputProcessor<IN1, IN2> inputProcessor = this.inputProcessor;
        while (this.running && inputProcessor.processInput()) {
        }
        if (this.running) {
            new CheckpointLockDelegate(this.getCheckpointLock()).lockAndRun(() -> {
                TwoInputStreamOperator<IN1, IN2, OUT> headOperator = this.getHeadOperator();
                for (StreamOperator<?> operator : this.operatorChain.getAllOperatorsTopologySorted()) {
                    if (operator.getOperatorID().equals((Object)headOperator.getOperatorID())) continue;
                    Preconditions.checkState((boolean)(operator instanceof OneInputStreamOperator));
                    ((OneInputStreamOperator)operator).endInput();
                }
            });
        }
    }

    @Override
    protected void cleanup() throws Exception {
        if (this.inputProcessor != null) {
            this.inputProcessor.cleanup();
        }
    }

    @Override
    protected void cancelTask() {
        this.running = false;
    }

    protected TwoInputStreamOperator<IN1, IN2, OUT> getHeadOperator() {
        Preconditions.checkState((this.operatorChain.getHeadOperators().length == 1 ? 1 : 0) != 0, (Object)("There should only one head operator, not " + this.operatorChain.getHeadOperators().length));
        return (TwoInputStreamOperator)this.operatorChain.getHeadOperators()[0];
    }
}

