package org.apache.flink.streaming.runtime.tasks;

import java.util.ArrayList;
import java.util.List;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.graph.StreamTaskConfig;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor;
import org.apache.flink.util.LockGetReleaseWrapper;
import org.apache.flink.util.Preconditions;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.class */
public class TwoInputStreamTask<IN1, IN2, OUT> extends StreamTask<OUT, TwoInputStreamOperator<IN1, IN2, OUT>> {
    private StreamTwoInputProcessor<IN1, IN2> inputProcessor;
    private volatile boolean running = true;

    @Override // org.apache.flink.streaming.runtime.tasks.StreamTask
    public void init() throws Exception {
        StreamTaskConfig configuration = getConfiguration();
        ClassLoader userCodeClassLoader = getUserCodeClassLoader();
        Preconditions.checkState(this.operatorChain.getHeadOperators().length == 1);
        TwoInputStreamOperator twoInputStreamOperator = (TwoInputStreamOperator) this.operatorChain.getHeadOperators()[0];
        TypeSerializer typeSerializerIn1 = twoInputStreamOperator.getOperatorContext().getTypeSerializerIn1();
        TypeSerializer typeSerializerIn2 = twoInputStreamOperator.getOperatorContext().getTypeSerializerIn2();
        int inputsNum = configuration.getInputsNum();
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        List<StreamEdge> inPhysicalEdges = configuration.getInPhysicalEdges(userCodeClassLoader);
        for (int i = 0; i < inputsNum; i++) {
            StreamEdge.InputOrder inputOrder = inPhysicalEdges.get(i).getInputOrder();
            InputGate inputGate = getEnvironment().getInputGate(i);
            switch (inputOrder) {
                case FIRST:
                    arrayList.add(inputGate);
                    break;
                case SECOND:
                    arrayList2.add(inputGate);
                    break;
                default:
                    throw new RuntimeException("Invalid InputOrder: " + inputOrder);
            }
        }
        this.inputProcessor = new StreamTwoInputProcessor<>(arrayList, arrayList2, typeSerializerIn1, typeSerializerIn2, this, getCheckpointConfiguration().getCheckpointMode(), getCheckpointLock(), getEnvironment().getIOManager(), getEnvironment().getTaskManagerInfo().getConfiguration(), getStreamStatusMaintainer(), twoInputStreamOperator, getExecutionConfig().isTracingMetricsEnabled(), getExecutionConfig().getTracingMetricsInterval(), getExecutionConfig().isInputElementReuseEnabled(), getEnvironment().getTaskManagerInfo().getConfiguration().getInteger(TaskManagerOptions.TASK_MAX_REQUESTS_IN_FLIGHT));
        this.inputProcessor.setMetricGroup(getEnvironment().getMetricGroup().getIOMetricGroup());
    }

    @Override // org.apache.flink.streaming.runtime.tasks.StreamTask
    protected void run() throws Exception {
        StreamTwoInputProcessor<IN1, IN2> streamTwoInputProcessor = this.inputProcessor;
        while (this.running && streamTwoInputProcessor.processInput()) {
        }
        if (this.running) {
            LockGetReleaseWrapper lockGetReleaseWrapper = new LockGetReleaseWrapper(getCheckpointLock().getLock());
            Throwable th = null;
            try {
                try {
                    this.operatorChain.endAllNonHeadInputs();
                    if (lockGetReleaseWrapper != null) {
                        if (0 == 0) {
                            lockGetReleaseWrapper.close();
                            return;
                        }
                        try {
                            lockGetReleaseWrapper.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                } catch (Throwable th3) {
                    th = th3;
                    throw th3;
                }
            } catch (Throwable th4) {
                if (lockGetReleaseWrapper != null) {
                    if (th != null) {
                        try {
                            lockGetReleaseWrapper.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    } else {
                        lockGetReleaseWrapper.close();
                    }
                }
                throw th4;
            }
        }
    }

    @Override // org.apache.flink.streaming.runtime.tasks.StreamTask
    protected void cleanup() throws Exception {
        if (this.inputProcessor != null) {
            this.inputProcessor.cleanup();
        }
    }

    @Override // org.apache.flink.streaming.runtime.tasks.StreamTask
    protected void cancelTask() {
        this.running = false;
    }
}
