package org.apache.flink.streaming.runtime.io;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.StreamSourceV2;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.runtime.io.InputFetcher;
import org.apache.flink.streaming.runtime.metrics.MinWatermarkGauge;
import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatusSubMaintainer;
import org.apache.flink.streaming.runtime.tasks.InputSelector;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/runtime/io/StreamArbitraryInputProcessor.class */
public class StreamArbitraryInputProcessor implements InputSelector.SelectionChangedListener, InputFetcher.InputFetcherAvailableListener {
    private static final Logger LOG = LoggerFactory.getLogger(StreamArbitraryInputProcessor.class);
    private IOManager ioManager;
    private final Object checkpointLock;
    private final InputSelector inputSelector;
    private final TaskMetricGroup taskMetricGroup;
    private final MinWatermarkGauge minAllInputWatermarkGauge;
    private final SelectedReadingBarrierHandler barrierHandler;
    private volatile boolean isStopped = false;
    private final ArrayDeque<InputFetcher> inputFetcherReadingQueue = new ArrayDeque<>();
    private final Set<InputFetcher> enqueuedInputFetchers = new HashSet();
    private final List<InputFetcher> inputFetchers = new ArrayList();
    private final Set<InputFetcher> selectedInputFetchers = new HashSet();
    private volatile boolean inputSelectionChanged = false;

    public StreamArbitraryInputProcessor(IOManager iOManager, Object obj, InputSelector inputSelector, TaskMetricGroup taskMetricGroup, @Nullable SelectedReadingBarrierHandler selectedReadingBarrierHandler) {
        this.ioManager = (IOManager) Preconditions.checkNotNull(iOManager);
        this.checkpointLock = Preconditions.checkNotNull(obj);
        this.inputSelector = (InputSelector) Preconditions.checkNotNull(inputSelector);
        this.taskMetricGroup = (TaskMetricGroup) Preconditions.checkNotNull(taskMetricGroup);
        this.barrierHandler = selectedReadingBarrierHandler;
        this.inputSelector.registerSelectionChangedListener(this);
        this.minAllInputWatermarkGauge = new MinWatermarkGauge(new WatermarkGauge[0]);
        TaskMetricGroup taskMetricGroup2 = this.taskMetricGroup;
        MinWatermarkGauge minWatermarkGauge = this.minAllInputWatermarkGauge;
        minWatermarkGauge.getClass();
        taskMetricGroup2.gauge("currentInputWatermark", minWatermarkGauge::m86getValue);
    }

    public void process() throws Exception {
        InputFetcher dequeueInputFetcher;
        LOG.info("Start processing with {} source(s)/input gate(s)", Integer.valueOf(this.inputFetchers.size()));
        Iterator<InputFetcher> it = this.inputFetchers.iterator();
        while (it.hasNext()) {
            it.next().setup();
        }
        while (!this.isStopped && !this.inputFetchers.isEmpty()) {
            constructInputFetcherReadingQueue();
            while (!this.inputSelectionChanged && !this.inputFetchers.isEmpty()) {
                Preconditions.checkState(!this.selectedInputFetchers.isEmpty());
                synchronized (this.inputFetcherReadingQueue) {
                    while (this.inputFetcherReadingQueue.isEmpty()) {
                        this.inputFetcherReadingQueue.wait();
                    }
                    dequeueInputFetcher = dequeueInputFetcher();
                }
                while (!this.inputSelectionChanged && dequeueInputFetcher.fetchAndProcess()) {
                }
                if (dequeueInputFetcher.isFinished()) {
                    LOG.info("Input fetcher {} {} is finished", dequeueInputFetcher, dequeueInputFetcher.getInputSelection());
                    dequeueInputFetcher.cleanup();
                    this.inputFetchers.remove(dequeueInputFetcher);
                    this.selectedInputFetchers.remove(dequeueInputFetcher);
                } else if (!this.inputSelectionChanged && dequeueInputFetcher.moreAvailable()) {
                    enqueueInputFetcher(dequeueInputFetcher);
                }
            }
            if (this.inputSelectionChanged) {
                LOG.info("Input selection is changed, need to reconstruct reading queue");
            }
        }
        if (this.isStopped) {
            Iterator<InputFetcher> it2 = this.inputFetchers.iterator();
            while (it2.hasNext()) {
                it2.next().cancel();
            }
        }
        LOG.info("Finish processing all input fetchers");
    }

    public void stop() {
        this.isStopped = true;
    }

    public void cleanup() throws Exception {
        Iterator<InputFetcher> it = this.inputFetchers.iterator();
        while (it.hasNext()) {
            it.next().cleanup();
        }
        if (this.barrierHandler != null) {
            this.barrierHandler.cleanup();
        }
    }

    public <IN> void bindOneInputOperator(StreamEdge streamEdge, InputGate inputGate, int i, OneInputStreamOperator<IN, ?> oneInputStreamOperator, TypeSerializer<IN> typeSerializer, StreamStatusSubMaintainer streamStatusSubMaintainer, boolean z, Configuration configuration) {
        Preconditions.checkNotNull(this.barrierHandler);
        InputGateFetcher inputGateFetcher = new InputGateFetcher(InputSelector.EdgeInputSelection.create(streamEdge), inputGate, typeSerializer, this.barrierHandler, this.ioManager, new OneInputProcessor(streamStatusSubMaintainer, oneInputStreamOperator, this.checkpointLock, this.taskMetricGroup, this.minAllInputWatermarkGauge, inputGate.getNumberOfInputChannels()), this.checkpointLock, i, z, configuration);
        inputGateFetcher.registerAvailableListener(this);
        this.inputFetchers.add(inputGateFetcher);
        LOG.info("Bind an one input operator {} to {}", oneInputStreamOperator, inputGate);
    }

    public <IN> void bindFirstOfTwoInputOperator(StreamEdge streamEdge, InputGate inputGate, int i, TwoInputStreamOperator<IN, ?, ?> twoInputStreamOperator, TypeSerializer<IN> typeSerializer, StreamStatusSubMaintainer streamStatusSubMaintainer, boolean z, Configuration configuration) {
        Preconditions.checkNotNull(this.barrierHandler);
        InputGateFetcher inputGateFetcher = new InputGateFetcher(InputSelector.EdgeInputSelection.create(streamEdge), inputGate, typeSerializer, this.barrierHandler, this.ioManager, new FirstOfTwoInputProcessor(streamStatusSubMaintainer, twoInputStreamOperator, this.checkpointLock, this.taskMetricGroup, this.minAllInputWatermarkGauge, inputGate.getNumberOfInputChannels()), this.checkpointLock, i, z, configuration);
        inputGateFetcher.registerAvailableListener(this);
        this.inputFetchers.add(inputGateFetcher);
        LOG.info("Bind the edge {} of a first of two input operator {} to {}", new Object[]{streamEdge, twoInputStreamOperator, inputGate});
    }

    public <IN> void bindSecondOfTwoInputOperator(StreamEdge streamEdge, InputGate inputGate, int i, TwoInputStreamOperator<IN, ?, ?> twoInputStreamOperator, TypeSerializer<IN> typeSerializer, StreamStatusSubMaintainer streamStatusSubMaintainer, boolean z, Configuration configuration) {
        Preconditions.checkNotNull(this.barrierHandler);
        InputGateFetcher inputGateFetcher = new InputGateFetcher(InputSelector.EdgeInputSelection.create(streamEdge), inputGate, typeSerializer, this.barrierHandler, this.ioManager, new SecondOfTwoInputProcessor(streamStatusSubMaintainer, twoInputStreamOperator, this.checkpointLock, this.taskMetricGroup, this.minAllInputWatermarkGauge, inputGate.getNumberOfInputChannels()), this.checkpointLock, i, z, configuration);
        inputGateFetcher.registerAvailableListener(this);
        this.inputFetchers.add(inputGateFetcher);
        LOG.info("Bind the edge {} of a second of two input operator {} to {}", new Object[]{streamEdge, twoInputStreamOperator, inputGate});
    }

    public <IN> void bindSourceOperator(int i, StreamSourceV2 streamSourceV2, OneInputStreamOperator<IN, ?> oneInputStreamOperator, SourceFunction.SourceContext sourceContext, StreamStatusSubMaintainer streamStatusSubMaintainer) {
        SourceFetcher sourceFetcher = new SourceFetcher(InputSelector.SourceInputSelection.create(i), streamSourceV2, sourceContext, new SourceInputProcessor(streamStatusSubMaintainer, oneInputStreamOperator, this.checkpointLock, this.taskMetricGroup, 1));
        sourceFetcher.registerAvailableListener(this);
        this.inputFetchers.add(sourceFetcher);
        LOG.info("Bind a source operator {}", Integer.valueOf(i));
    }

    @VisibleForTesting
    void constructInputFetcherReadingQueue() {
        synchronized (this.inputFetcherReadingQueue) {
            this.inputSelectionChanged = false;
            this.selectedInputFetchers.clear();
            this.inputFetcherReadingQueue.clear();
            this.enqueuedInputFetchers.clear();
            List<InputSelector.InputSelection> nextSelectedInputs = this.inputSelector.getNextSelectedInputs();
            Preconditions.checkNotNull(nextSelectedInputs);
            Preconditions.checkState(!nextSelectedInputs.isEmpty());
            HashMap hashMap = new HashMap();
            this.inputFetchers.forEach(inputFetcher -> {
            });
            for (InputSelector.InputSelection inputSelection : nextSelectedInputs) {
                InputFetcher inputFetcher2 = (InputFetcher) hashMap.get(inputSelection);
                if (inputFetcher2 != null) {
                    this.selectedInputFetchers.add(inputFetcher2);
                    if (inputFetcher2.moreAvailable() || inputFetcher2.isFinished()) {
                        enqueueInputFetcher(inputFetcher2);
                    }
                } else {
                    LOG.info("The selected edge {} is finished already", inputSelection);
                }
            }
            LOG.info("Select inputs {}", nextSelectedInputs);
        }
    }

    @VisibleForTesting
    void enqueueInputFetcher(InputFetcher inputFetcher) {
        synchronized (this.inputFetcherReadingQueue) {
            if (!this.enqueuedInputFetchers.contains(inputFetcher)) {
                this.inputFetcherReadingQueue.add(inputFetcher);
                this.enqueuedInputFetchers.add(inputFetcher);
            }
        }
    }

    @VisibleForTesting
    InputFetcher dequeueInputFetcher() {
        InputFetcher remove;
        synchronized (this.inputFetcherReadingQueue) {
            remove = this.inputFetcherReadingQueue.remove();
            this.enqueuedInputFetchers.remove(remove);
        }
        return remove;
    }

    @VisibleForTesting
    List<InputFetcher> getInputFetchers() {
        return this.inputFetchers;
    }

    @Override // org.apache.flink.streaming.runtime.tasks.InputSelector.SelectionChangedListener
    public void notifySelectionChanged() {
        this.inputSelectionChanged = true;
    }

    @VisibleForTesting
    ArrayDeque<InputFetcher> getInputFetcherReadingQueue() {
        return this.inputFetcherReadingQueue;
    }

    @VisibleForTesting
    Set<InputFetcher> getEnqueuedInputFetchers() {
        return this.enqueuedInputFetchers;
    }

    @Override // org.apache.flink.streaming.runtime.io.InputFetcher.InputFetcherAvailableListener
    public void notifyInputFetcherAvailable(InputFetcher inputFetcher) {
        synchronized (this.inputFetcherReadingQueue) {
            if (this.selectedInputFetchers.contains(inputFetcher) && !this.enqueuedInputFetchers.contains(inputFetcher)) {
                boolean isEmpty = this.inputFetcherReadingQueue.isEmpty();
                this.inputFetcherReadingQueue.add(inputFetcher);
                this.enqueuedInputFetchers.add(inputFetcher);
                if (isEmpty) {
                    this.inputFetcherReadingQueue.notifyAll();
                }
            }
        }
    }
}
