package org.apache.flink.streaming.runtime.tasks;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.CheckpointBarrierHandlerListener;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.io.network.partition.consumer.InputGateListener;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.util.FatalExitExceptionHandler;
import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.graph.StreamTaskConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.io.StreamArbitraryInputProcessor;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
import org.apache.flink.util.LockAndCondition;
import org.apache.flink.util.LockGetReleaseWrapper;
import org.apache.flink.util.OutputTag;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/ArbitraryInputStreamTask.class */
public class ArbitraryInputStreamTask<OUT> extends StreamTask<OUT, StreamOperator<OUT>> {
    private static final Logger LOG = LoggerFactory.getLogger(ArbitraryInputStreamTask.class);
    private StreamArbitraryInputProcessor processor;
    private List<SourceInputGate> sourceInputGates = new ArrayList();

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/ArbitraryInputStreamTask$SourceInputGate.class */
    class SourceInputGate<SRC_OUT, OP extends StreamSource<SRC_OUT, ?>> implements InputGate, Output<StreamRecord<SRC_OUT>> {
        private Thread sourceThread;
        private volatile InputGateListener inputGateListener;
        private LockAndCondition lock;
        private OneInputStreamOperator<SRC_OUT, ?> consumer;
        private final OP source;
        private volatile boolean isFinished = false;
        private volatile boolean running = false;
        private int priority = -1;
        private CompletableFuture<Throwable> result = new CompletableFuture<>();

        public SourceInputGate(LockAndCondition lockAndCondition, OP op, OneInputStreamOperator<SRC_OUT, ?> oneInputStreamOperator, StreamStatusMaintainer streamStatusMaintainer) {
            this.lock = lockAndCondition;
            this.consumer = oneInputStreamOperator;
            this.source = op;
            this.sourceThread = new Thread(() -> {
                Throwable th = null;
                try {
                    try {
                        ArbitraryInputStreamTask.LOG.info("Source input gate thread started");
                        op.run(lockAndCondition, streamStatusMaintainer, this);
                        ArbitraryInputStreamTask.LOG.info("Source input gate thread finished");
                        this.result.complete(null);
                        if (this.inputGateListener != null) {
                            this.inputGateListener.notifyInputGateNonEmpty(this);
                        }
                    } catch (Throwable th2) {
                        ArbitraryInputStreamTask.LOG.error("Source input gate is failed", th2);
                        th = th2;
                        this.result.complete(th);
                        if (this.inputGateListener != null) {
                            this.inputGateListener.notifyInputGateNonEmpty(this);
                        }
                    }
                } catch (Throwable th3) {
                    this.result.complete(th);
                    if (this.inputGateListener != null) {
                        this.inputGateListener.notifyInputGateNonEmpty(this);
                    }
                    throw th3;
                }
            });
            this.sourceThread.setName("source-" + op.getOperatorContext().getOperatorName() + " " + op.getContainingTask().getName());
            this.sourceThread.setDaemon(true);
            this.sourceThread.setUncaughtExceptionHandler(FatalExitExceptionHandler.INSTANCE);
        }

        public int getNumberOfInputChannels() {
            return 1;
        }

        public boolean isFinished() {
            return this.isFinished;
        }

        public void requestPartitions() throws IOException, InterruptedException {
            if (this.running) {
                return;
            }
            ArbitraryInputStreamTask.LOG.info("Start source input gate thread");
            this.running = true;
            this.sourceThread.start();
        }

        public BufferOrEvent getNextBufferOrEvent() throws IOException, InterruptedException {
            requestPartitions();
            try {
                if (this.isFinished) {
                    return null;
                }
                if (this.result.get() != null) {
                    throw this.result.get();
                }
                this.isFinished = true;
                if (this.inputGateListener != null) {
                    this.inputGateListener.notifyInputGateFinished(this);
                }
                return new BufferOrEvent(EndOfPartitionEvent.INSTANCE, 0, false);
            } catch (Throwable th) {
                throw new RuntimeException(th);
            }
        }

        public void sendTaskEvent(TaskEvent taskEvent) throws IOException {
        }

        public void registerListener(InputGateListener inputGateListener) {
            this.inputGateListener = inputGateListener;
        }

        public int getPageSize() {
            return 0;
        }

        public boolean isBlocked() {
            return false;
        }

        public void releaseBlockedChannels(long j) {
        }

        public void setCheckpointBarrierHandlerListener(CheckpointBarrierHandlerListener checkpointBarrierHandlerListener) {
        }

        public int getPriority() {
            return this.priority;
        }

        public void setPriority(int i) {
            this.priority = i;
        }

        public void collect(StreamRecord<SRC_OUT> streamRecord) {
            LockGetReleaseWrapper lockGetReleaseWrapper = new LockGetReleaseWrapper(this.lock.getLock());
            Throwable th = null;
            try {
                try {
                    this.consumer.processElement(streamRecord);
                    if (lockGetReleaseWrapper != null) {
                        if (0 == 0) {
                            lockGetReleaseWrapper.close();
                            return;
                        }
                        try {
                            lockGetReleaseWrapper.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            } catch (Throwable th3) {
                if (lockGetReleaseWrapper != null) {
                    if (0 != 0) {
                        try {
                            lockGetReleaseWrapper.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        lockGetReleaseWrapper.close();
                    }
                }
                throw th3;
            }
        }

        public void close() {
        }

        @Override // org.apache.flink.streaming.api.operators.Output
        public void emitWatermark(Watermark watermark) {
        }

        @Override // org.apache.flink.streaming.api.operators.Output
        public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> streamRecord) {
            throw new UnsupportedOperationException("Not support output tag yet.");
        }

        @Override // org.apache.flink.streaming.api.operators.Output
        public void emitLatencyMarker(LatencyMarker latencyMarker) {
        }

        public String toString() {
            return "SourceInputGate of " + this.source;
        }

        public void cancel() {
            this.source.cancel();
        }

        public void join() {
            while (true) {
                try {
                    this.sourceThread.join();
                    return;
                } catch (InterruptedException e) {
                    ArbitraryInputStreamTask.LOG.warn("Interrupted when waiting source input gate thread stopped", e);
                }
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/ArbitraryInputStreamTask$SourceOperatorProxy.class */
    public static class SourceOperatorProxy<IN> extends AbstractStreamOperator<IN> implements OneInputStreamOperator<IN, IN> {
        private Output<StreamRecord<IN>> collector;
        private int rawSourceOperatorId;
        private OperatorChain chain;

        public SourceOperatorProxy(OperatorChain operatorChain, StreamSource streamSource) {
            this.chain = operatorChain;
            this.collector = streamSource.getOutput();
            this.rawSourceOperatorId = streamSource.getOperatorContext().getNodeID();
        }

        @Override // org.apache.flink.streaming.api.operators.OneInputStreamOperator
        public void processElement(StreamRecord<IN> streamRecord) throws Exception {
            this.collector.collect(streamRecord);
        }

        @Override // org.apache.flink.streaming.api.operators.OneInputStreamOperator
        public void endInput() throws Exception {
            this.chain.chainedEndInput(this.rawSourceOperatorId, 0, StreamEdge.InputOrder.FIRST);
        }
    }

    @Override // org.apache.flink.streaming.runtime.tasks.StreamTask
    public void init() throws Exception {
        this.processor = new StreamArbitraryInputProcessor(getEnvironment().getIOManager(), getEnvironment().getMetricGroup(), getCheckpointLock(), getEnvironment().getTaskManagerInfo().getConfiguration().getInteger(TaskManagerOptions.TASK_MAX_REQUESTS_IN_FLIGHT));
        StreamTaskConfig configuration = getConfiguration();
        ClassLoader userCodeClassLoader = getUserCodeClassLoader();
        int inputsNum = configuration.getInputsNum();
        List<StreamEdge> inPhysicalEdges = configuration.getInPhysicalEdges(userCodeClassLoader);
        Preconditions.checkState(inputsNum == inPhysicalEdges.size());
        for (StreamOperator streamOperator : this.operatorChain.getHeadOperators()) {
            if (streamOperator instanceof StreamSource) {
                SourceOperatorProxy sourceOperatorProxy = new SourceOperatorProxy(this.operatorChain, (StreamSource) streamOperator);
                SourceInputGate sourceInputGate = new SourceInputGate(getCheckpointLock(), (StreamSource) streamOperator, sourceOperatorProxy, null);
                sourceInputGate.setPriority(streamOperator.getOperatorContext().getPriority());
                this.processor.bindSourceInputGate(sourceInputGate, sourceOperatorProxy);
                this.sourceInputGates.add(sourceInputGate);
            }
        }
        for (int i = 0; i < inputsNum; i++) {
            StreamEdge streamEdge = inPhysicalEdges.get(i);
            SingleInputGate inputGate = getEnvironment().getInputGate(i);
            if (inputGate instanceof SingleInputGate) {
                inputGate.setPriority(streamEdge.getPriority());
            }
            StreamOperator headOperatorProxy = this.operatorChain.getHeadOperatorProxy(streamEdge.getTargetId());
            Preconditions.checkNotNull(headOperatorProxy);
            if (headOperatorProxy instanceof OneInputStreamOperator) {
                this.processor.bindInputGateToOneInputOperator(inputGate, (OneInputStreamOperator) headOperatorProxy, headOperatorProxy.getOperatorContext().getTypeSerializerIn1());
            } else {
                if (!(headOperatorProxy instanceof TwoInputStreamOperator)) {
                    throw new RuntimeException("Unsupported of " + headOperatorProxy + " yet");
                }
                if (streamEdge.getInputOrder() == StreamEdge.InputOrder.FIRST) {
                    this.processor.bindFirstInputGateToTwoInputOperator(inputGate, (TwoInputStreamOperator) headOperatorProxy, headOperatorProxy.getOperatorContext().getTypeSerializerIn1());
                } else {
                    this.processor.bindSecondInputGateToTwoInputOperator(inputGate, (TwoInputStreamOperator) headOperatorProxy, headOperatorProxy.getOperatorContext().getTypeSerializerIn2());
                }
            }
        }
    }

    @Override // org.apache.flink.streaming.runtime.tasks.StreamTask
    protected void run() throws Exception {
        this.processor.process();
        LOG.info(String.format("Task Finished, input count:%d, output count:%d", Long.valueOf(getEnvironment().getMetricGroup().getIOMetricGroup().getNumRecordsInCounter().getCount()), Long.valueOf(getEnvironment().getMetricGroup().getIOMetricGroup().getNumRecordsOutCounter().getCount())));
    }

    @Override // org.apache.flink.streaming.runtime.tasks.StreamTask
    protected void cleanup() throws Exception {
        if (this.processor != null) {
            this.processor.cleanup();
        }
    }

    @Override // org.apache.flink.streaming.runtime.tasks.StreamTask
    protected void cancelTask() {
        if (this.sourceInputGates.isEmpty()) {
            for (SourceInputGate sourceInputGate : this.sourceInputGates) {
                LOG.info("Canceling source input gate {}", sourceInputGate);
                sourceInputGate.cancel();
            }
        }
        if (this.processor != null) {
            this.processor.stop();
        }
        if (this.sourceInputGates.isEmpty()) {
            for (SourceInputGate sourceInputGate2 : this.sourceInputGates) {
                LOG.info("Waiting source input gate stopped {}", sourceInputGate2);
                sourceInputGate2.join();
            }
        }
    }
}
