/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.io;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.StreamSourceV2;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.runtime.io.FirstOfTwoInputProcessor;
import org.apache.flink.streaming.runtime.io.InputFetcher;
import org.apache.flink.streaming.runtime.io.InputGateFetcher;
import org.apache.flink.streaming.runtime.io.OneInputProcessor;
import org.apache.flink.streaming.runtime.io.SecondOfTwoInputProcessor;
import org.apache.flink.streaming.runtime.io.SelectedReadingBarrierHandler;
import org.apache.flink.streaming.runtime.io.SourceFetcher;
import org.apache.flink.streaming.runtime.io.SourceInputProcessor;
import org.apache.flink.streaming.runtime.metrics.MinWatermarkGauge;
import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatusSubMaintainer;
import org.apache.flink.streaming.runtime.tasks.InputSelector;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class StreamArbitraryInputProcessor
implements InputSelector.SelectionChangedListener,
InputFetcher.InputFetcherAvailableListener {
    private static final Logger LOG = LoggerFactory.getLogger(StreamArbitraryInputProcessor.class);
    private volatile boolean isStopped = false;
    private IOManager ioManager;
    private final Object checkpointLock;
    private final InputSelector inputSelector;
    private final TaskMetricGroup taskMetricGroup;
    private final MinWatermarkGauge minAllInputWatermarkGauge;
    private final SelectedReadingBarrierHandler barrierHandler;
    private final ArrayDeque<InputFetcher> inputFetcherReadingQueue = new ArrayDeque();
    private final Set<InputFetcher> enqueuedInputFetchers = new HashSet<InputFetcher>();
    private final List<InputFetcher> inputFetchers = new ArrayList<InputFetcher>();
    private final Set<InputFetcher> selectedInputFetchers = new HashSet<InputFetcher>();
    private volatile boolean inputSelectionChanged = false;

    public StreamArbitraryInputProcessor(IOManager ioManager, Object checkpointLock, InputSelector inputSelector, TaskMetricGroup taskMetricGroup, @Nullable SelectedReadingBarrierHandler barrierHandler) {
        this.ioManager = (IOManager)Preconditions.checkNotNull((Object)ioManager);
        this.checkpointLock = Preconditions.checkNotNull((Object)checkpointLock);
        this.inputSelector = (InputSelector)Preconditions.checkNotNull((Object)inputSelector);
        this.taskMetricGroup = (TaskMetricGroup)Preconditions.checkNotNull((Object)taskMetricGroup);
        this.barrierHandler = barrierHandler;
        this.inputSelector.registerSelectionChangedListener(this);
        this.minAllInputWatermarkGauge = new MinWatermarkGauge(new WatermarkGauge[0]);
        this.taskMetricGroup.gauge("currentInputWatermark", this.minAllInputWatermarkGauge::getValue);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void process() throws Exception {
        LOG.info("Start processing with {} source(s)/input gate(s)", (Object)this.inputFetchers.size());
        for (InputFetcher inputFetcher : this.inputFetchers) {
            inputFetcher.setup();
        }
        while (!this.isStopped && !this.inputFetchers.isEmpty()) {
            this.constructInputFetcherReadingQueue();
            while (!this.inputSelectionChanged && !this.inputFetchers.isEmpty()) {
                InputFetcher inputFetcher;
                Preconditions.checkState((!this.selectedInputFetchers.isEmpty() ? 1 : 0) != 0);
                ArrayDeque<InputFetcher> arrayDeque = this.inputFetcherReadingQueue;
                synchronized (arrayDeque) {
                    while (this.inputFetcherReadingQueue.isEmpty()) {
                        this.inputFetcherReadingQueue.wait();
                    }
                    inputFetcher = this.dequeueInputFetcher();
                }
                while (!this.inputSelectionChanged && inputFetcher.fetchAndProcess()) {
                }
                if (inputFetcher.isFinished()) {
                    LOG.info("Input fetcher {} {} is finished", (Object)inputFetcher, (Object)inputFetcher.getInputSelection());
                    inputFetcher.cleanup();
                    this.inputFetchers.remove(inputFetcher);
                    this.selectedInputFetchers.remove(inputFetcher);
                    continue;
                }
                if (this.inputSelectionChanged || !inputFetcher.moreAvailable()) continue;
                this.enqueueInputFetcher(inputFetcher);
            }
            if (!this.inputSelectionChanged) continue;
            LOG.info("Input selection is changed, need to reconstruct reading queue");
        }
        if (this.isStopped) {
            for (InputFetcher inputFetcher : this.inputFetchers) {
                inputFetcher.cancel();
            }
        }
        LOG.info("Finish processing all input fetchers");
    }

    public void stop() {
        this.isStopped = true;
    }

    public void cleanup() throws Exception {
        for (InputFetcher inputFetcher : this.inputFetchers) {
            inputFetcher.cleanup();
        }
        if (this.barrierHandler != null) {
            this.barrierHandler.cleanup();
        }
    }

    public <IN> void bindOneInputOperator(StreamEdge streamEdge, InputGate inputGate, int maxChannelIndex, OneInputStreamOperator<IN, ?> operator, TypeSerializer<IN> typeSerializer, StreamStatusSubMaintainer streamStatusSubMaintainer, boolean isObjectReuse, Configuration taskManagerConfig) {
        Preconditions.checkNotNull((Object)this.barrierHandler);
        InputGateFetcher<IN> inputGateFetcher = new InputGateFetcher<IN>(InputSelector.EdgeInputSelection.create(streamEdge), inputGate, typeSerializer, this.barrierHandler, this.ioManager, new OneInputProcessor(streamStatusSubMaintainer, operator, this.checkpointLock, this.taskMetricGroup, this.minAllInputWatermarkGauge, inputGate.getNumberOfInputChannels()), this.checkpointLock, maxChannelIndex, isObjectReuse, taskManagerConfig);
        inputGateFetcher.registerAvailableListener(this);
        this.inputFetchers.add(inputGateFetcher);
        LOG.info("Bind an one input operator {} to {}", operator, (Object)inputGate);
    }

    public <IN> void bindFirstOfTwoInputOperator(StreamEdge streamEdge, InputGate inputGate, int maxChannelIndex, TwoInputStreamOperator<IN, ?, ?> operator, TypeSerializer<IN> typeSerializer, StreamStatusSubMaintainer streamStatusSubMaintainer, boolean isObjectReuse, Configuration taskManagerConfig) {
        Preconditions.checkNotNull((Object)this.barrierHandler);
        InputGateFetcher<IN> inputGateFetcher = new InputGateFetcher<IN>(InputSelector.EdgeInputSelection.create(streamEdge), inputGate, typeSerializer, this.barrierHandler, this.ioManager, new FirstOfTwoInputProcessor(streamStatusSubMaintainer, operator, this.checkpointLock, this.taskMetricGroup, this.minAllInputWatermarkGauge, inputGate.getNumberOfInputChannels()), this.checkpointLock, maxChannelIndex, isObjectReuse, taskManagerConfig);
        inputGateFetcher.registerAvailableListener(this);
        this.inputFetchers.add(inputGateFetcher);
        LOG.info("Bind the edge {} of a first of two input operator {} to {}", new Object[]{streamEdge, operator, inputGate});
    }

    public <IN> void bindSecondOfTwoInputOperator(StreamEdge streamEdge, InputGate inputGate, int maxChannelIndex, TwoInputStreamOperator<IN, ?, ?> operator, TypeSerializer<IN> typeSerializer, StreamStatusSubMaintainer streamStatusSubMaintainer, boolean isObjectReuse, Configuration taskManagerConfig) {
        Preconditions.checkNotNull((Object)this.barrierHandler);
        InputGateFetcher<IN> inputGateFetcher = new InputGateFetcher<IN>(InputSelector.EdgeInputSelection.create(streamEdge), inputGate, typeSerializer, this.barrierHandler, this.ioManager, new SecondOfTwoInputProcessor(streamStatusSubMaintainer, operator, this.checkpointLock, this.taskMetricGroup, this.minAllInputWatermarkGauge, inputGate.getNumberOfInputChannels()), this.checkpointLock, maxChannelIndex, isObjectReuse, taskManagerConfig);
        inputGateFetcher.registerAvailableListener(this);
        this.inputFetchers.add(inputGateFetcher);
        LOG.info("Bind the edge {} of a second of two input operator {} to {}", new Object[]{streamEdge, operator, inputGate});
    }

    public <IN> void bindSourceOperator(int operatorId, StreamSourceV2 operator, OneInputStreamOperator<IN, ?> operatorProxy, SourceFunction.SourceContext context, StreamStatusSubMaintainer streamStatusSubMaintainer) {
        SourceFetcher sourceFetcher = new SourceFetcher(InputSelector.SourceInputSelection.create(operatorId), operator, context, new SourceInputProcessor(streamStatusSubMaintainer, operatorProxy, this.checkpointLock, this.taskMetricGroup, 1));
        sourceFetcher.registerAvailableListener(this);
        this.inputFetchers.add(sourceFetcher);
        LOG.info("Bind a source operator {}", (Object)operatorId);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    void constructInputFetcherReadingQueue() {
        ArrayDeque<InputFetcher> arrayDeque = this.inputFetcherReadingQueue;
        synchronized (arrayDeque) {
            this.inputSelectionChanged = false;
            this.selectedInputFetchers.clear();
            this.inputFetcherReadingQueue.clear();
            this.enqueuedInputFetchers.clear();
            List<InputSelector.InputSelection> inputSelections = this.inputSelector.getNextSelectedInputs();
            Preconditions.checkNotNull(inputSelections);
            Preconditions.checkState((!inputSelections.isEmpty() ? 1 : 0) != 0);
            HashMap inputFetcherMap = new HashMap();
            this.inputFetchers.forEach(inputFetcher -> inputFetcherMap.put(inputFetcher.getInputSelection(), inputFetcher));
            for (InputSelector.InputSelection inputSelection : inputSelections) {
                InputFetcher inputFetcher2 = (InputFetcher)inputFetcherMap.get(inputSelection);
                if (inputFetcher2 != null) {
                    this.selectedInputFetchers.add(inputFetcher2);
                    if (!inputFetcher2.moreAvailable() && !inputFetcher2.isFinished()) continue;
                    this.enqueueInputFetcher(inputFetcher2);
                    continue;
                }
                LOG.info("The selected edge {} is finished already", (Object)inputSelection);
            }
            LOG.info("Select inputs {}", inputSelections);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    void enqueueInputFetcher(InputFetcher inputFetcher) {
        ArrayDeque<InputFetcher> arrayDeque = this.inputFetcherReadingQueue;
        synchronized (arrayDeque) {
            if (!this.enqueuedInputFetchers.contains(inputFetcher)) {
                this.inputFetcherReadingQueue.add(inputFetcher);
                this.enqueuedInputFetchers.add(inputFetcher);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    InputFetcher dequeueInputFetcher() {
        ArrayDeque<InputFetcher> arrayDeque = this.inputFetcherReadingQueue;
        synchronized (arrayDeque) {
            InputFetcher inputFetcher = this.inputFetcherReadingQueue.remove();
            this.enqueuedInputFetchers.remove(inputFetcher);
            return inputFetcher;
        }
    }

    @VisibleForTesting
    List<InputFetcher> getInputFetchers() {
        return this.inputFetchers;
    }

    @Override
    public void notifySelectionChanged() {
        this.inputSelectionChanged = true;
    }

    @VisibleForTesting
    ArrayDeque<InputFetcher> getInputFetcherReadingQueue() {
        return this.inputFetcherReadingQueue;
    }

    @VisibleForTesting
    Set<InputFetcher> getEnqueuedInputFetchers() {
        return this.enqueuedInputFetchers;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void notifyInputFetcherAvailable(InputFetcher inputFetcher) {
        ArrayDeque<InputFetcher> arrayDeque = this.inputFetcherReadingQueue;
        synchronized (arrayDeque) {
            if (this.selectedInputFetchers.contains(inputFetcher) && !this.enqueuedInputFetchers.contains(inputFetcher)) {
                boolean noDataAvailable = this.inputFetcherReadingQueue.isEmpty();
                this.inputFetcherReadingQueue.add(inputFetcher);
                this.enqueuedInputFetchers.add(inputFetcher);
                if (noDataAvailable) {
                    this.inputFetcherReadingQueue.notifyAll();
                }
            }
        }
    }
}

