package org.apache.flink.streaming.runtime.tasks;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.RunnableFuture;
import java.util.function.Consumer;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint2.CheckpointBackend;
import org.apache.flink.runtime.checkpoint2.InitialOperatorPartitionSnapshot;
import org.apache.flink.runtime.checkpoint2.OperatorPartitionSnapshot;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.writer.ChannelSelector;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.partition.NetworkResultPartition;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.util.TaskConfig;
import org.apache.flink.runtime.state.KeyGroupRangeAssigner;
import org.apache.flink.streaming.api.collector.selector.CopyingDirectedOutput;
import org.apache.flink.streaming.api.collector.selector.DirectedOutput;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.graph.OperatorConfig;
import org.apache.flink.streaming.api.graph.OperatorContext;
import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.graph.StreamTaskConfig;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.InputElementSelection;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.OperatorSnapshotResult;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
import org.apache.flink.streaming.runtime.io.StreamRecordWriter;
import org.apache.flink.streaming.runtime.partitioner.ConfigurableStreamPartitioner;
import org.apache.flink.streaming.runtime.partitioner.DynamicRebalancePartitioner;
import org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner;
import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatusProvider;
import org.apache.flink.util.OutputTag;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.XORShiftRandom;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/OperatorChain.class */
public class OperatorChain implements StreamStatusMaintainer {
    private static final Logger LOG = LoggerFactory.getLogger(OperatorChain.class);
    private final Map<Integer, StreamOperator<?>> allOperators;
    private final Deque<StreamOperator<?>> allOperatorsTopologySorted;
    private final RecordWriterOutput<?>[] streamOutputs;
    private final Map<Integer, OperatorConfig> chainedConfigs;
    private final Map<Integer, Output<StreamRecord>> chainEntryPoints = new HashMap();
    private final Map<Integer, StreamOperator> headOperators = new HashMap();
    private final Map<Integer, OperatorInputEdgeState> operatorInputEdgeStates = new HashMap();
    private StreamStatus streamStatus = StreamStatus.ACTIVE;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/OperatorChain$BroadcastingOutputCollector.class */
    public static class BroadcastingOutputCollector<T> implements Output<StreamRecord<T>> {
        protected final Output<StreamRecord<T>>[] outputs;
        private final Random random = new XORShiftRandom();
        private final StreamStatusProvider streamStatusProvider;

        public BroadcastingOutputCollector(Output<StreamRecord<T>>[] outputArr, StreamStatusProvider streamStatusProvider) {
            this.outputs = outputArr;
            this.streamStatusProvider = streamStatusProvider;
        }

        @Override // org.apache.flink.streaming.api.operators.Output
        public void emitWatermark(Watermark watermark) {
            if (this.streamStatusProvider.getStreamStatus().isActive()) {
                for (Output<StreamRecord<T>> output : this.outputs) {
                    output.emitWatermark(watermark);
                }
            }
        }

        @Override // org.apache.flink.streaming.api.operators.Output
        public void emitLatencyMarker(LatencyMarker latencyMarker) {
            if (this.outputs.length <= 0) {
                return;
            }
            if (this.outputs.length == 1) {
                this.outputs[0].emitLatencyMarker(latencyMarker);
            } else {
                this.outputs[this.random.nextInt(this.outputs.length)].emitLatencyMarker(latencyMarker);
            }
        }

        @Override // 
        public void collect(StreamRecord<T> streamRecord) {
            for (Output<StreamRecord<T>> output : this.outputs) {
                output.collect(streamRecord);
            }
        }

        @Override // org.apache.flink.streaming.api.operators.Output
        public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> streamRecord) {
            for (Output<StreamRecord<T>> output : this.outputs) {
                output.collect(outputTag, streamRecord);
            }
        }

        public void close() {
            for (Output<StreamRecord<T>> output : this.outputs) {
                output.close();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/OperatorChain$ChainingOutput.class */
    public static class ChainingOutput<IN, OUT> implements Output<StreamRecord<IN>> {
        protected final StreamOperator<OUT> operator;
        protected final Counter numRecordsIn;
        protected final StreamStatusProvider streamStatusProvider;
        protected final OutputTag<IN> outputTag;
        protected final StreamEdge.InputOrder inputOrder;
        protected final OutputAdapter<IN, OUT> outputAdapter;

        public ChainingOutput(StreamOperator<OUT> streamOperator, StreamStatusProvider streamStatusProvider, OutputTag<IN> outputTag, StreamEdge.InputOrder inputOrder) {
            this.operator = streamOperator;
            this.numRecordsIn = streamOperator.getMetricGroup().getIOMetricGroup().getNumRecordsInCounter();
            this.streamStatusProvider = streamStatusProvider;
            this.outputTag = outputTag;
            this.inputOrder = inputOrder;
            this.outputAdapter = new OutputAdapter<>(this.operator, this.inputOrder);
        }

        /* JADX WARN: Multi-variable type inference failed */
        public void collect(StreamRecord<IN> streamRecord) {
            if (this.outputTag != null) {
                return;
            }
            pushToOperator(streamRecord);
        }

        @Override // org.apache.flink.streaming.api.operators.Output
        public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> streamRecord) {
            if (this.outputTag == null || !this.outputTag.equals(outputTag)) {
                return;
            }
            pushToOperator(streamRecord);
        }

        protected <X> void pushToOperator(StreamRecord<X> streamRecord) {
            this.numRecordsIn.inc();
            this.outputAdapter.processElement(getStreamRecord(streamRecord));
        }

        /* JADX WARN: Multi-variable type inference failed */
        protected <X> StreamRecord<IN> getStreamRecord(StreamRecord<X> streamRecord) {
            return streamRecord;
        }

        @Override // org.apache.flink.streaming.api.operators.Output
        public void emitWatermark(Watermark watermark) {
            if (this.streamStatusProvider.getStreamStatus().isActive()) {
                this.outputAdapter.processWatermark(watermark);
            }
        }

        @Override // org.apache.flink.streaming.api.operators.Output
        public void emitLatencyMarker(LatencyMarker latencyMarker) {
            this.outputAdapter.processLatencyMarker(latencyMarker);
        }

        public void close() {
            try {
                this.operator.close();
            } catch (Exception e) {
                throw new ExceptionInChainedOperatorException(e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/OperatorChain$CopyingBroadcastingOutputCollector.class */
    public static final class CopyingBroadcastingOutputCollector<T> extends BroadcastingOutputCollector<T> {
        public CopyingBroadcastingOutputCollector(Output<StreamRecord<T>>[] outputArr, StreamStatusProvider streamStatusProvider) {
            super(outputArr, streamStatusProvider);
        }

        @Override // org.apache.flink.streaming.runtime.tasks.OperatorChain.BroadcastingOutputCollector
        public void collect(StreamRecord<T> streamRecord) {
            for (int i = 0; i < this.outputs.length - 1; i++) {
                this.outputs[i].collect(streamRecord.copy(streamRecord.getValue()));
            }
            if (this.outputs.length > 0) {
                this.outputs[this.outputs.length - 1].collect(streamRecord);
            }
        }

        @Override // org.apache.flink.streaming.runtime.tasks.OperatorChain.BroadcastingOutputCollector, org.apache.flink.streaming.api.operators.Output
        public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> streamRecord) {
            for (int i = 0; i < this.outputs.length - 1; i++) {
                this.outputs[i].collect(outputTag, streamRecord.copy(streamRecord.getValue()));
            }
            if (this.outputs.length > 0) {
                this.outputs[this.outputs.length - 1].collect(outputTag, streamRecord);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/OperatorChain$CopyingChainingOutput.class */
    public static final class CopyingChainingOutput<IN, OUT> extends ChainingOutput<IN, OUT> {
        private final TypeSerializer<IN> serializer;

        public CopyingChainingOutput(StreamOperator<OUT> streamOperator, TypeSerializer<IN> typeSerializer, OutputTag<IN> outputTag, StreamStatusProvider streamStatusProvider, StreamEdge.InputOrder inputOrder) {
            super(streamOperator, streamStatusProvider, outputTag, inputOrder);
            this.serializer = typeSerializer;
        }

        /* JADX WARN: Multi-variable type inference failed */
        @Override // org.apache.flink.streaming.runtime.tasks.OperatorChain.ChainingOutput
        protected <X> StreamRecord<IN> getStreamRecord(StreamRecord<X> streamRecord) {
            return streamRecord.copy(this.serializer.copy(streamRecord.getValue()));
        }

        @VisibleForTesting
        TypeSerializer<IN> getSerializer() {
            return this.serializer;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/OperatorChain$OneInputStreamOperatorProxy.class */
    private class OneInputStreamOperatorProxy<IN, OUT> extends StreamOperatorProxy<OUT> implements OneInputStreamOperator<IN, OUT> {
        private final OneInputStreamOperator<IN, OUT> operator;

        public OneInputStreamOperatorProxy(OneInputStreamOperator<IN, OUT> oneInputStreamOperator) {
            super(oneInputStreamOperator);
            this.operator = oneInputStreamOperator;
        }

        @Override // org.apache.flink.streaming.api.operators.OneInputStreamOperator
        public void processElement(StreamRecord<IN> streamRecord) throws Exception {
            this.operator.processElement(streamRecord);
        }

        @Override // org.apache.flink.streaming.api.operators.OneInputStreamOperator
        public void processWatermark(Watermark watermark) throws Exception {
            this.operator.processWatermark(watermark);
        }

        @Override // org.apache.flink.streaming.api.operators.OneInputStreamOperator
        public void processLatencyMarker(LatencyMarker latencyMarker) throws Exception {
            this.operator.processLatencyMarker(latencyMarker);
        }

        @Override // org.apache.flink.streaming.api.operators.OneInputStreamOperator
        public void endInput() throws Exception {
            Preconditions.checkNotNull((OperatorInputEdgeState) OperatorChain.this.operatorInputEdgeStates.get(Integer.valueOf(this.operator.getOperatorContext().getNodeID())));
            OperatorChain.this.chainedEndInput(this.operator.getOperatorContext().getNodeID(), 1, StreamEdge.InputOrder.FIRST);
        }

        @Override // org.apache.flink.streaming.api.operators.StreamOperator
        public void prepareSnapshotPreBarrier(long j) throws Exception {
            this.operator.prepareSnapshotPreBarrier(j);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/OperatorChain$OperatorInputEdgeState.class */
    public static class OperatorInputEdgeState {
        private final Map<StreamEdge.InputOrder, Integer> unfinishedInputEdgeRefs = new HashMap();

        public OperatorInputEdgeState() {
            addInputEdges(0, StreamEdge.InputOrder.FIRST);
        }

        public OperatorInputEdgeState(int i, StreamEdge.InputOrder inputOrder) {
            addInputEdges(i, inputOrder);
        }

        public boolean finishInputEdges(int i, StreamEdge.InputOrder inputOrder) {
            Integer num = this.unfinishedInputEdgeRefs.get(inputOrder);
            Preconditions.checkNotNull(num);
            int intValue = i < 0 ? 0 : num.intValue() - i;
            this.unfinishedInputEdgeRefs.put(inputOrder, Integer.valueOf(intValue));
            if (intValue == 0) {
                this.unfinishedInputEdgeRefs.remove(inputOrder);
                return true;
            }
            Preconditions.checkState(intValue > 0);
            return false;
        }

        public boolean allInputFinished() {
            return this.unfinishedInputEdgeRefs.isEmpty();
        }

        public void addInputEdges(int i, StreamEdge.InputOrder inputOrder) {
            this.unfinishedInputEdgeRefs.put(inputOrder, Integer.valueOf(this.unfinishedInputEdgeRefs.getOrDefault(inputOrder, 0).intValue() + i));
        }

        public int getInputEdgeCounts() {
            int i = 0;
            Iterator<Integer> it = this.unfinishedInputEdgeRefs.values().iterator();
            while (it.hasNext()) {
                i += it.next().intValue();
            }
            return i;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/OperatorChain$OutputAdapter.class */
    public static class OutputAdapter<IN, OUT> {
        private final Consumer<StreamRecord<IN>> collectorAdapter;
        private final Consumer<Watermark> watermarkAdapter;
        private final Consumer<LatencyMarker> latencyMarkerAdapter;

        public OutputAdapter(StreamOperator<OUT> streamOperator, StreamEdge.InputOrder inputOrder) {
            if (streamOperator instanceof OneInputStreamOperator) {
                this.collectorAdapter = streamRecord -> {
                    try {
                        streamOperator.setKeyContextElement1(streamRecord);
                        ((OneInputStreamOperator) streamOperator).processElement(streamRecord);
                    } catch (Exception e) {
                        throw new ExceptionInChainedOperatorException(e);
                    }
                };
                this.watermarkAdapter = watermark -> {
                    try {
                        ((OneInputStreamOperator) streamOperator).processWatermark(watermark);
                    } catch (Exception e) {
                        throw new ExceptionInChainedOperatorException(e);
                    }
                };
                this.latencyMarkerAdapter = latencyMarker -> {
                    try {
                        ((OneInputStreamOperator) streamOperator).processLatencyMarker(latencyMarker);
                    } catch (Exception e) {
                        throw new ExceptionInChainedOperatorException(e);
                    }
                };
            } else {
                if (!(streamOperator instanceof TwoInputStreamOperator)) {
                    throw new StreamTaskException("Unknown stream operator " + streamOperator);
                }
                if (inputOrder == StreamEdge.InputOrder.FIRST) {
                    this.collectorAdapter = streamRecord2 -> {
                        try {
                            streamOperator.setKeyContextElement1(streamRecord2);
                            ((TwoInputStreamOperator) streamOperator).processElement1(streamRecord2);
                        } catch (Exception e) {
                            throw new ExceptionInChainedOperatorException(e);
                        }
                    };
                    this.watermarkAdapter = watermark2 -> {
                        try {
                            ((TwoInputStreamOperator) streamOperator).processWatermark1(watermark2);
                        } catch (Exception e) {
                            throw new ExceptionInChainedOperatorException(e);
                        }
                    };
                    this.latencyMarkerAdapter = latencyMarker2 -> {
                        try {
                            ((TwoInputStreamOperator) streamOperator).processLatencyMarker1(latencyMarker2);
                        } catch (Exception e) {
                            throw new ExceptionInChainedOperatorException(e);
                        }
                    };
                } else {
                    this.collectorAdapter = streamRecord3 -> {
                        try {
                            streamOperator.setKeyContextElement2(streamRecord3);
                            ((TwoInputStreamOperator) streamOperator).processElement2(streamRecord3);
                        } catch (Exception e) {
                            throw new ExceptionInChainedOperatorException(e);
                        }
                    };
                    this.watermarkAdapter = watermark3 -> {
                        try {
                            ((TwoInputStreamOperator) streamOperator).processWatermark2(watermark3);
                        } catch (Exception e) {
                            throw new ExceptionInChainedOperatorException(e);
                        }
                    };
                    this.latencyMarkerAdapter = latencyMarker3 -> {
                        try {
                            ((TwoInputStreamOperator) streamOperator).processLatencyMarker2(latencyMarker3);
                        } catch (Exception e) {
                            throw new ExceptionInChainedOperatorException(e);
                        }
                    };
                }
            }
        }

        public void processElement(StreamRecord<IN> streamRecord) {
            this.collectorAdapter.accept(streamRecord);
        }

        public void processWatermark(Watermark watermark) {
            this.watermarkAdapter.accept(watermark);
        }

        public void processLatencyMarker(LatencyMarker latencyMarker) {
            this.latencyMarkerAdapter.accept(latencyMarker);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/OperatorChain$StreamOperatorProxy.class */
    private static abstract class StreamOperatorProxy<OUT> implements StreamOperator<OUT> {
        private final StreamOperator<OUT> operator;

        public StreamOperatorProxy(StreamOperator<OUT> streamOperator) {
            this.operator = streamOperator;
        }

        public RunnableFuture<OperatorPartitionSnapshot> snapshot(long j, CheckpointOptions checkpointOptions, CheckpointBackend checkpointBackend) throws Exception {
            return this.operator.snapshot(j, checkpointOptions, checkpointBackend);
        }

        @Override // org.apache.flink.streaming.api.operators.StreamOperator
        public void setup(StreamTask<?, ?> streamTask, OperatorContext operatorContext, Output<StreamRecord<OUT>> output) {
            this.operator.setup(streamTask, operatorContext, output);
        }

        @Override // org.apache.flink.streaming.api.operators.StreamOperator
        public void restore(InitialOperatorPartitionSnapshot initialOperatorPartitionSnapshot) throws Exception {
            this.operator.restore(initialOperatorPartitionSnapshot);
        }

        @Override // org.apache.flink.streaming.api.operators.StreamOperator
        public void open() throws Exception {
            this.operator.open();
        }

        @Override // org.apache.flink.streaming.api.operators.StreamOperator
        public void close() throws Exception {
            this.operator.close();
        }

        @Override // org.apache.flink.streaming.api.operators.StreamOperator
        public void dispose() throws Exception {
            this.operator.dispose();
        }

        @Override // org.apache.flink.streaming.api.operators.StreamOperator
        public OperatorSnapshotResult snapshotState(long j, long j2, CheckpointOptions checkpointOptions) throws Exception {
            return this.operator.snapshotState(j, j2, checkpointOptions);
        }

        @Override // org.apache.flink.streaming.api.operators.StreamOperator
        public void initializeState(OperatorSubtaskState operatorSubtaskState) throws Exception {
            this.operator.initializeState(operatorSubtaskState);
        }

        @Override // org.apache.flink.streaming.api.operators.StreamOperator
        public void notifyOfCompletedCheckpoint(long j) throws Exception {
            this.operator.notifyOfCompletedCheckpoint(j);
        }

        @Override // org.apache.flink.streaming.api.operators.StreamOperator
        public void notifyOfSubsumedCheckpoint(long j) throws Exception {
            this.operator.notifyOfSubsumedCheckpoint(j);
        }

        @Override // org.apache.flink.streaming.api.operators.StreamOperator
        public void notifyOfAbortedCheckpoint(long j) throws Exception {
            this.operator.notifyOfAbortedCheckpoint(j);
        }

        @Override // org.apache.flink.streaming.api.operators.StreamOperator
        public void setKeyContextElement1(StreamRecord<?> streamRecord) throws Exception {
            this.operator.setKeyContextElement1(streamRecord);
        }

        @Override // org.apache.flink.streaming.api.operators.StreamOperator
        public void setKeyContextElement2(StreamRecord<?> streamRecord) throws Exception {
            this.operator.setKeyContextElement2(streamRecord);
        }

        @Override // org.apache.flink.streaming.api.operators.StreamOperator
        public ChainingStrategy getChainingStrategy() {
            return this.operator.getChainingStrategy();
        }

        @Override // org.apache.flink.streaming.api.operators.StreamOperator
        public void setChainingStrategy(ChainingStrategy chainingStrategy) {
            this.operator.setChainingStrategy(chainingStrategy);
        }

        @Override // org.apache.flink.streaming.api.operators.StreamOperator
        public MetricGroup getMetricGroup() {
            return this.operator.getMetricGroup();
        }

        @Override // org.apache.flink.streaming.api.operators.StreamOperator
        public OperatorContext getOperatorContext() {
            return this.operator.getOperatorContext();
        }

        @Override // org.apache.flink.streaming.api.operators.StreamOperator
        public OperatorID getOperatorID() {
            return this.operator.getOperatorID();
        }

        @Override // org.apache.flink.streaming.api.operators.StreamOperator
        public boolean requireState() {
            return this.operator.requireState();
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/OperatorChain$TwoInputStreamOperatorProxy.class */
    private class TwoInputStreamOperatorProxy<IN1, IN2, OUT> extends StreamOperatorProxy<OUT> implements TwoInputStreamOperator<IN1, IN2, OUT> {
        private final TwoInputStreamOperator<IN1, IN2, OUT> operator;

        public TwoInputStreamOperatorProxy(TwoInputStreamOperator<IN1, IN2, OUT> twoInputStreamOperator) {
            super(twoInputStreamOperator);
            this.operator = twoInputStreamOperator;
        }

        @Override // org.apache.flink.streaming.api.operators.TwoInputStreamOperator
        public InputElementSelection firstInputSelection() {
            return this.operator.firstInputSelection();
        }

        @Override // org.apache.flink.streaming.api.operators.TwoInputStreamOperator
        public InputElementSelection processElement1(StreamRecord<IN1> streamRecord) throws Exception {
            return this.operator.processElement1(streamRecord);
        }

        @Override // org.apache.flink.streaming.api.operators.TwoInputStreamOperator
        public InputElementSelection processElement2(StreamRecord<IN2> streamRecord) throws Exception {
            return this.operator.processElement2(streamRecord);
        }

        @Override // org.apache.flink.streaming.api.operators.TwoInputStreamOperator
        public void processWatermark1(Watermark watermark) throws Exception {
            this.operator.processWatermark1(watermark);
        }

        @Override // org.apache.flink.streaming.api.operators.TwoInputStreamOperator
        public void processWatermark2(Watermark watermark) throws Exception {
            this.operator.processWatermark2(watermark);
        }

        @Override // org.apache.flink.streaming.api.operators.TwoInputStreamOperator
        public void processLatencyMarker1(LatencyMarker latencyMarker) throws Exception {
            this.operator.processLatencyMarker1(latencyMarker);
        }

        @Override // org.apache.flink.streaming.api.operators.TwoInputStreamOperator
        public void processLatencyMarker2(LatencyMarker latencyMarker) throws Exception {
            this.operator.processLatencyMarker2(latencyMarker);
        }

        @Override // org.apache.flink.streaming.api.operators.TwoInputStreamOperator
        public InputElementSelection endInput1() throws Exception {
            Preconditions.checkNotNull((OperatorInputEdgeState) OperatorChain.this.operatorInputEdgeStates.get(Integer.valueOf(this.operator.getOperatorContext().getNodeID())));
            OperatorChain.this.chainedEndInput(this.operator.getOperatorContext().getNodeID(), 1, StreamEdge.InputOrder.FIRST);
            return InputElementSelection.ANY;
        }

        @Override // org.apache.flink.streaming.api.operators.TwoInputStreamOperator
        public InputElementSelection endInput2() throws Exception {
            Preconditions.checkNotNull((OperatorInputEdgeState) OperatorChain.this.operatorInputEdgeStates.get(Integer.valueOf(this.operator.getOperatorContext().getNodeID())));
            OperatorChain.this.chainedEndInput(this.operator.getOperatorContext().getNodeID(), 1, StreamEdge.InputOrder.SECOND);
            return InputElementSelection.ANY;
        }

        @Override // org.apache.flink.streaming.api.operators.StreamOperator
        public void prepareSnapshotPreBarrier(long j) throws Exception {
            this.operator.prepareSnapshotPreBarrier(j);
        }
    }

    public OperatorChain(StreamTask streamTask) {
        ClassLoader userCodeClassLoader = streamTask.getUserCodeClassLoader();
        StreamTaskConfig configuration = streamTask.getConfiguration();
        this.chainedConfigs = configuration.getChainedTaskConfigs(userCodeClassLoader);
        List<StreamEdge> outEdgesInOrder = configuration.getOutEdgesInOrder(userCodeClassLoader);
        HashMap hashMap = new HashMap(outEdgesInOrder.size());
        this.streamOutputs = new RecordWriterOutput[outEdgesInOrder.size()];
        List outputKeyGroupAssignerInOrder = new TaskConfig(configuration.getConfiguration()).getOutputKeyGroupAssignerInOrder(userCodeClassLoader);
        if (outputKeyGroupAssignerInOrder != null && outputKeyGroupAssignerInOrder.size() != outEdgesInOrder.size()) {
            throw new StreamTaskException(String.format("KeyGroupAssinger num %d not equal to out put edge num %d", Integer.valueOf(outputKeyGroupAssignerInOrder.size()), Integer.valueOf(outEdgesInOrder.size())));
        }
        for (int i = 0; i < outEdgesInOrder.size(); i++) {
            try {
                StreamEdge streamEdge = outEdgesInOrder.get(i);
                if (outputKeyGroupAssignerInOrder != null && outputKeyGroupAssignerInOrder.get(i) != null && (streamEdge.getPartitioner() instanceof KeyGroupStreamPartitioner)) {
                    if (outputKeyGroupAssignerInOrder.get(i) != null) {
                        ((KeyGroupStreamPartitioner) streamEdge.getPartitioner()).setKeyGroupRangeAssigner((KeyGroupRangeAssigner) outputKeyGroupAssignerInOrder.get(i));
                        LOG.info("Set KeyGroupRangeAssigner for partitioner " + i + " and the assigner is " + outputKeyGroupAssignerInOrder.get(i));
                    } else {
                        LOG.warn("No keyGroupRangeAssigner for KeyGroupStreamPartitioner " + i + ".");
                    }
                }
                RecordWriterOutput<?> createStreamOutput = createStreamOutput(streamTask, streamEdge, configuration, this.chainedConfigs.get(Integer.valueOf(streamEdge.getSourceId())), i, streamTask.getEnvironment(), streamTask.getName());
                this.streamOutputs[i] = createStreamOutput;
                hashMap.put(streamEdge, createStreamOutput);
            } catch (Throwable th) {
                if (0 == 0) {
                    for (RecordWriterOutput<?> recordWriterOutput : this.streamOutputs) {
                        if (recordWriterOutput != null) {
                            recordWriterOutput.close();
                            recordWriterOutput.clearBuffers();
                        }
                    }
                }
                throw th;
            }
        }
        HashMap hashMap2 = new HashMap(this.chainedConfigs.size());
        if (!this.chainedConfigs.isEmpty()) {
            for (int i2 : configuration.getHeadNodeIDs()) {
                if (hashMap2.containsKey(Integer.valueOf(i2))) {
                    this.headOperators.put(Integer.valueOf(i2), hashMap2.get(Integer.valueOf(i2)));
                } else {
                    Output<StreamRecord<?>> createOutputCollector = createOutputCollector(streamTask, this.chainedConfigs.get(Integer.valueOf(i2)), this.chainedConfigs, userCodeClassLoader, hashMap, hashMap2);
                    StreamOperator<?> streamOperator = this.chainedConfigs.get(Integer.valueOf(i2)).getStreamOperator(userCodeClassLoader);
                    if (streamOperator != null) {
                        streamOperator.setup(streamTask, this.chainedConfigs.get(Integer.valueOf(i2)), createOutputCollector);
                        this.headOperators.put(Integer.valueOf(i2), streamOperator);
                        hashMap2.put(Integer.valueOf(i2), streamOperator);
                        this.chainEntryPoints.put(Integer.valueOf(i2), createOutputCollector);
                        this.operatorInputEdgeStates.put(Integer.valueOf(i2), new OperatorInputEdgeState());
                    }
                }
            }
        }
        this.allOperators = hashMap2;
        this.allOperatorsTopologySorted = getTopologySortedOperators(this.operatorInputEdgeStates, this.allOperators, this.chainedConfigs);
        for (StreamEdge streamEdge2 : configuration.getInPhysicalEdges(userCodeClassLoader)) {
            OperatorInputEdgeState operatorInputEdgeState = this.operatorInputEdgeStates.get(Integer.valueOf(streamEdge2.getTargetId()));
            if (operatorInputEdgeState != null) {
                operatorInputEdgeState.addInputEdges(1, streamEdge2.getInputOrder());
            }
        }
        if (1 == 0) {
            for (RecordWriterOutput<?> recordWriterOutput2 : this.streamOutputs) {
                if (recordWriterOutput2 != null) {
                    recordWriterOutput2.close();
                    recordWriterOutput2.clearBuffers();
                }
            }
        }
    }

    @Override // org.apache.flink.streaming.runtime.streamstatus.StreamStatusProvider
    public StreamStatus getStreamStatus() {
        return this.streamStatus;
    }

    @Override // org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer
    public void toggleStreamStatus(StreamStatus streamStatus) {
        if (streamStatus.equals(this.streamStatus)) {
            return;
        }
        this.streamStatus = streamStatus;
        for (RecordWriterOutput<?> recordWriterOutput : this.streamOutputs) {
            recordWriterOutput.emitStreamStatus(streamStatus);
        }
    }

    public void broadcastCheckpointBarrier(long j, long j2, CheckpointOptions checkpointOptions) throws IOException {
        try {
            CheckpointBarrier checkpointBarrier = new CheckpointBarrier(j, j2, checkpointOptions);
            for (RecordWriterOutput<?> recordWriterOutput : this.streamOutputs) {
                recordWriterOutput.broadcastEvent(checkpointBarrier);
            }
        } catch (InterruptedException e) {
            throw new IOException("Interrupted while broadcasting checkpoint barrier");
        }
    }

    public void broadcastCheckpointCancelMarker(long j) throws IOException {
        try {
            CancelCheckpointMarker cancelCheckpointMarker = new CancelCheckpointMarker(j);
            for (RecordWriterOutput<?> recordWriterOutput : this.streamOutputs) {
                recordWriterOutput.broadcastEvent(cancelCheckpointMarker);
            }
        } catch (InterruptedException e) {
            throw new IOException("Interrupted while broadcasting checkpoint cancellation");
        }
    }

    public void prepareSnapshotPreBarrier(long j) throws Exception {
        Iterator<StreamOperator<?>> it = getAllOperatorsTopologySorted().iterator();
        while (it.hasNext()) {
            it.next().prepareSnapshotPreBarrier(j);
        }
    }

    public RecordWriterOutput<?>[] getStreamOutputs() {
        return this.streamOutputs;
    }

    public Deque<StreamOperator<?>> getAllOperatorsTopologySorted() {
        return this.allOperatorsTopologySorted;
    }

    public Output<StreamRecord>[] getChainEntryPoints() {
        return (Output[]) this.chainEntryPoints.values().toArray(new Output[0]);
    }

    public void flushOutputs() throws IOException {
        for (RecordWriterOutput<?> recordWriterOutput : getStreamOutputs()) {
            recordWriterOutput.flush();
        }
    }

    public void releaseOutputs() {
        try {
            for (RecordWriterOutput<?> recordWriterOutput : this.streamOutputs) {
                recordWriterOutput.close();
            }
        } finally {
            for (RecordWriterOutput<?> recordWriterOutput2 : this.streamOutputs) {
                recordWriterOutput2.clearBuffers();
            }
        }
    }

    public StreamOperator[] getHeadOperators() {
        return (StreamOperator[]) this.headOperators.values().toArray(new StreamOperator[0]);
    }

    public StreamOperator getHeadOperator(int i) {
        return this.headOperators.get(Integer.valueOf(i));
    }

    public StreamOperator getHeadOperatorProxy(int i) {
        StreamOperator streamOperator = this.headOperators.get(Integer.valueOf(i));
        return streamOperator instanceof OneInputStreamOperator ? new OneInputStreamOperatorProxy((OneInputStreamOperator) streamOperator) : streamOperator instanceof TwoInputStreamOperator ? new TwoInputStreamOperatorProxy((TwoInputStreamOperator) streamOperator) : streamOperator;
    }

    public int getChainLength() {
        if (this.allOperators == null) {
            return 0;
        }
        return this.allOperators.size();
    }

    public void endAllInputs() throws Exception {
        for (StreamOperator<?> streamOperator : this.allOperatorsTopologySorted) {
            if (streamOperator instanceof OneInputStreamOperator) {
                ((OneInputStreamOperator) streamOperator).endInput();
            } else if (streamOperator instanceof TwoInputStreamOperator) {
                ((TwoInputStreamOperator) streamOperator).endInput1();
                ((TwoInputStreamOperator) streamOperator).endInput2();
            }
        }
    }

    public void endAllNonHeadInputs() throws Exception {
        for (StreamOperator<?> streamOperator : this.allOperatorsTopologySorted) {
            if (!this.headOperators.containsKey(Integer.valueOf(streamOperator.getOperatorContext().getNodeID()))) {
                if (streamOperator instanceof OneInputStreamOperator) {
                    ((OneInputStreamOperator) streamOperator).endInput();
                } else if (streamOperator instanceof TwoInputStreamOperator) {
                    ((TwoInputStreamOperator) streamOperator).endInput1();
                    ((TwoInputStreamOperator) streamOperator).endInput2();
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void chainedEndInput(int i, int i2, StreamEdge.InputOrder inputOrder) throws Exception {
        OperatorInputEdgeState operatorInputEdgeState = this.operatorInputEdgeStates.get(Integer.valueOf(i));
        Preconditions.checkNotNull(operatorInputEdgeState);
        if (operatorInputEdgeState.finishInputEdges(i2, inputOrder)) {
            StreamOperator<?> streamOperator = this.allOperators.get(Integer.valueOf(i));
            LOG.info("Ending " + i + " " + streamOperator.getOperatorContext().getOperatorName() + " " + inputOrder);
            if (streamOperator instanceof OneInputStreamOperator) {
                ((OneInputStreamOperator) streamOperator).endInput();
            } else if (streamOperator instanceof TwoInputStreamOperator) {
                if (inputOrder == StreamEdge.InputOrder.FIRST) {
                    ((TwoInputStreamOperator) streamOperator).endInput1();
                } else {
                    ((TwoInputStreamOperator) streamOperator).endInput2();
                }
            }
            if (operatorInputEdgeState.allInputFinished()) {
                OperatorConfig operatorConfig = this.chainedConfigs.get(Integer.valueOf(i));
                Preconditions.checkNotNull(operatorConfig);
                for (StreamEdge streamEdge : operatorConfig.getChainedOutputs()) {
                    chainedEndInput(streamEdge.getTargetId(), 1, streamEdge.getInputOrder());
                }
            }
        }
    }

    @VisibleForTesting
    <T> Output<StreamRecord<T>> createOutputCollector(StreamTask<?, ?> streamTask, OperatorConfig operatorConfig, Map<Integer, OperatorConfig> map, ClassLoader classLoader, Map<StreamEdge, RecordWriterOutput<?>> map2, Map<Integer, StreamOperator<?>> map3) {
        ArrayList arrayList = new ArrayList(4);
        for (StreamEdge streamEdge : operatorConfig.getNonChainedOutputs()) {
            arrayList.add(new Tuple2(map2.get(streamEdge), streamEdge));
        }
        for (StreamEdge streamEdge2 : operatorConfig.getChainedOutputs()) {
            arrayList.add(new Tuple2(createChainedOperator(streamTask, map.get(Integer.valueOf(streamEdge2.getTargetId())), map, classLoader, map2, map3, streamEdge2), streamEdge2));
        }
        List<OutputSelector<T>> outputSelectors = operatorConfig.getOutputSelectors();
        if (outputSelectors != null && !outputSelectors.isEmpty()) {
            return streamTask.getExecutionConfig().isObjectReuseEnabled() ? new CopyingDirectedOutput(outputSelectors, arrayList) : new DirectedOutput(outputSelectors, arrayList);
        }
        if (arrayList.size() == 1) {
            return (Output) ((Tuple2) arrayList.get(0)).f0;
        }
        Output[] outputArr = new Output[arrayList.size()];
        for (int i = 0; i < arrayList.size(); i++) {
            outputArr[i] = (Output) ((Tuple2) arrayList.get(i)).f0;
        }
        return streamTask.getExecutionConfig().isObjectReuseEnabled() ? new CopyingBroadcastingOutputCollector(outputArr, this) : new BroadcastingOutputCollector(outputArr, this);
    }

    @VisibleForTesting
    <IN, OP_OUT> Output<StreamRecord<IN>> createChainedOperator(StreamTask<?, ?> streamTask, OperatorConfig operatorConfig, Map<Integer, OperatorConfig> map, ClassLoader classLoader, Map<StreamEdge, RecordWriterOutput<?>> map2, Map<Integer, StreamOperator<?>> map3, StreamEdge streamEdge) {
        TypeSerializer typeSerializerIn2;
        StreamOperator<?> streamOperator = map3.get(Integer.valueOf(operatorConfig.getNodeID()));
        if (streamOperator == null) {
            Output<StreamRecord<?>> createOutputCollector = createOutputCollector(streamTask, operatorConfig, map, classLoader, map2, map3);
            streamOperator = operatorConfig.getStreamOperator(classLoader);
            streamOperator.setup(streamTask, operatorConfig, createOutputCollector);
            map3.put(Integer.valueOf(operatorConfig.getNodeID()), streamOperator);
            this.operatorInputEdgeStates.put(Integer.valueOf(operatorConfig.getNodeID()), new OperatorInputEdgeState());
        }
        this.operatorInputEdgeStates.get(Integer.valueOf(operatorConfig.getNodeID())).addInputEdges(1, streamEdge.getInputOrder());
        if (streamTask.getExecutionConfig().isObjectReuseEnabled()) {
            return new ChainingOutput(streamOperator, this, streamEdge.getOutputTag(), streamEdge.getInputOrder());
        }
        if (streamEdge.getInputOrder() == StreamEdge.InputOrder.FIRST) {
            typeSerializerIn2 = operatorConfig.getTypeSerializerIn1();
        } else {
            if (streamEdge.getInputOrder() != StreamEdge.InputOrder.SECOND) {
                throw new UnsupportedOperationException("Unknown input order " + streamEdge.getInputOrder() + " of input edge " + streamEdge);
            }
            typeSerializerIn2 = operatorConfig.getTypeSerializerIn2();
        }
        return new CopyingChainingOutput(streamOperator, typeSerializerIn2, streamEdge.getOutputTag(), this, streamEdge.getInputOrder());
    }

    private <T> RecordWriterOutput<T> createStreamOutput(StreamTask<?, ?> streamTask, StreamEdge streamEdge, StreamTaskConfig streamTaskConfig, OperatorConfig operatorConfig, int i, Environment environment, String str) {
        int numTargetKeyGroups;
        OutputTag outputTag = streamEdge.getOutputTag();
        TypeSerializer<?> typeSerializerSideOut = streamEdge.getOutputTag() != null ? operatorConfig.getTypeSerializerSideOut(streamEdge.getOutputTag()) : operatorConfig.getTypeSerializerOut();
        ResultPartitionWriter writer = environment.getWriter(i);
        NetworkResultPartition partition = writer.getPartition();
        if (partition != null) {
            partition.init(streamTask, new StreamElementSerializer(typeSerializerSideOut), i);
        }
        ChannelSelector partitioner = streamEdge.getPartitioner();
        LOG.debug("Using partitioner {} for output {} of task {}", new Object[]{partitioner, Integer.valueOf(i), str});
        if ((partitioner instanceof ConfigurableStreamPartitioner) && 0 < (numTargetKeyGroups = writer.getNumTargetKeyGroups())) {
            ((ConfigurableStreamPartitioner) partitioner).configure(numTargetKeyGroups);
        }
        if (partitioner instanceof RebalancePartitioner) {
            LOG.info("Replace the static rebalance partitioner with the dynamic one for outputIndex {}", Integer.valueOf(i));
            Configuration configuration = environment.getTaskManagerInfo().getConfiguration();
            if (configuration.getBoolean(TaskManagerOptions.TASK_DYNAMIC_REBALANCE_ENABLED) && (partition instanceof NetworkResultPartition)) {
                int integer = ((int) (configuration.getInteger(TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL) + Math.ceil(configuration.getInteger(TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE) / partition.getNumberOfSubpartitions()))) + 1;
                int integer2 = configuration.getInteger(TaskManagerOptions.TASK_DYNAMIC_REBALANCE_BUCKET_INTERVAL);
                if (integer2 <= 0) {
                    LOG.warn("The interval of the dynamic rebalance paritioner should be at least one, adjust it to one automatically.");
                    integer2 = 1;
                }
                if (integer2 > integer) {
                    LOG.warn("The interval of the dynamic rebalance paritioner should not be larger than the threshold, adjust it to the half of the threshold automatically.");
                    integer2 = Math.max(1, integer / 2);
                }
                LOG.info("replace the partitioner to dynamic partitioner, threshold = {}, interval = {}, number of partitions = {}", new Object[]{Integer.valueOf(integer), Integer.valueOf(integer2), Integer.valueOf(partition.getNumberOfSubpartitions())});
                DynamicRebalancePartitioner dynamicRebalancePartitioner = new DynamicRebalancePartitioner(integer, integer2, partition.getNumberOfSubpartitions());
                partition.setPartitionBacklogChangeListener(dynamicRebalancePartitioner);
                partitioner = dynamicRebalancePartitioner;
            }
        }
        StreamRecordWriter streamRecordWriter = new StreamRecordWriter(writer, partitioner, streamTaskConfig.getBufferTimeout());
        streamRecordWriter.setMetricGroup(environment.getMetricGroup().getIOMetricGroup(), environment.getExecutionConfig().isTracingMetricsEnabled(), environment.getExecutionConfig().getTracingMetricsInterval());
        return new RecordWriterOutput<>(streamRecordWriter, outputTag, this);
    }

    static Deque<StreamOperator<?>> getTopologySortedOperators(Map<Integer, OperatorInputEdgeState> map, Map<Integer, StreamOperator<?>> map2, Map<Integer, OperatorConfig> map3) {
        ArrayDeque arrayDeque = new ArrayDeque();
        HashMap hashMap = new HashMap();
        for (Map.Entry<Integer, OperatorInputEdgeState> entry : map.entrySet()) {
            hashMap.put(entry.getKey(), Integer.valueOf(entry.getValue().getInputEdgeCounts()));
            if (entry.getValue().getInputEdgeCounts() == 0) {
                arrayDeque.add(entry.getKey());
            }
        }
        ArrayDeque arrayDeque2 = new ArrayDeque();
        while (!arrayDeque.isEmpty()) {
            int intValue = ((Integer) arrayDeque.poll()).intValue();
            arrayDeque2.add(map2.get(Integer.valueOf(intValue)));
            Iterator<StreamEdge> it = map3.get(Integer.valueOf(intValue)).getChainedOutputs().iterator();
            while (it.hasNext()) {
                int targetId = it.next().getTargetId();
                int intValue2 = ((Integer) hashMap.get(Integer.valueOf(targetId))).intValue() - 1;
                hashMap.put(Integer.valueOf(targetId), Integer.valueOf(intValue2));
                if (intValue2 == 0) {
                    arrayDeque.add(Integer.valueOf(targetId));
                }
            }
        }
        return arrayDeque2;
    }
}
