/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition.consumer;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.SequencedCollection;
import java.util.Timer;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionLocation;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.io.network.NetworkEnvironment;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.PartitionConsumptionFailedEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.BufferProvider;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException;
import org.apache.flink.runtime.io.network.partition.BlockingShuffleType;
import org.apache.flink.runtime.io.network.partition.ProducerFailedException;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.io.network.partition.consumer.InputGateListener;
import org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.PartitionRequestManager;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.UnknownInputChannel;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.taskmanager.TaskActions;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SingleInputGate
implements InputGate {
    private static final Logger LOG = LoggerFactory.getLogger(SingleInputGate.class);
    private final Object requestLock = new Object();
    private final String owningTaskName;
    private final JobID jobId;
    private final IntermediateDataSetID consumedResultId;
    private final ResultPartitionType consumedPartitionType;
    private final int consumedSubpartitionIndex;
    private final int numberOfInputChannels;
    private final Map<IntermediateResultPartitionID, InputChannel> inputChannels;
    @GuardedBy(value="requestLock")
    private final Map<IntermediateResultPartitionID, InputChannelDeploymentDescriptor> updatedInputChannelDescriptors;
    @GuardedBy(value="requestLock")
    private final Map<IntermediateResultPartitionID, ScheduledFuture> connectionLostChannelTimers;
    private final ArrayDeque<InputChannel> inputChannelsWithData = new ArrayDeque();
    private final BitSet enqueuedInputChannelsWithData;
    private final BitSet channelsWithEndOfPartitionEvents;
    private final TaskActions taskActions;
    private final TaskIOMetricGroup metrics;
    private BufferPool bufferPool;
    private NetworkBufferPool networkBufferPool;
    private final ScheduledExecutorService scheduledExecutorService;
    private final NetworkEnvironment networkEnvironment;
    private final boolean isCreditBased;
    private boolean hasReceivedAllEndOfPartitionEvents;
    private boolean requestedPartitionsFlag;
    private volatile boolean isReleased;
    private final List<InputGateListener> inputGateListeners = new ArrayList<InputGateListener>();
    private final List<TaskEvent> pendingEvents = new ArrayList<TaskEvent>();
    private int numberOfUninitializedChannels;
    private int networkBuffersPerChannel;
    private Timer retriggerLocalRequestTimer;
    private final PartitionRequestManager partitionRequestManager;
    private final boolean isPartitionRequestRestricted;
    private final long failedInputChannelUpdatingTimeout;

    public SingleInputGate(String owningTaskName, JobID jobId, IntermediateDataSetID consumedResultId, ResultPartitionType consumedPartitionType, int consumedSubpartitionIndex, int numberOfInputChannels, TaskActions taskActions, TaskIOMetricGroup metrics, PartitionRequestManager partitionRequestManager, ScheduledExecutorService scheduledExecutorService, @Nullable NetworkEnvironment networkEnvironment, boolean isPartitionRequestRestricted, long failedInputChannelUpdatingTimeout) {
        this.owningTaskName = (String)Preconditions.checkNotNull((Object)owningTaskName);
        this.jobId = (JobID)Preconditions.checkNotNull((Object)jobId);
        this.consumedResultId = (IntermediateDataSetID)((Object)Preconditions.checkNotNull((Object)((Object)consumedResultId)));
        this.consumedPartitionType = (ResultPartitionType)((Object)Preconditions.checkNotNull((Object)((Object)consumedPartitionType)));
        Preconditions.checkArgument((consumedSubpartitionIndex >= 0 ? 1 : 0) != 0);
        this.consumedSubpartitionIndex = consumedSubpartitionIndex;
        Preconditions.checkArgument((numberOfInputChannels > 0 ? 1 : 0) != 0);
        this.numberOfInputChannels = numberOfInputChannels;
        this.inputChannels = new HashMap<IntermediateResultPartitionID, InputChannel>(numberOfInputChannels);
        this.updatedInputChannelDescriptors = new HashMap<IntermediateResultPartitionID, InputChannelDeploymentDescriptor>();
        this.connectionLostChannelTimers = new HashMap<IntermediateResultPartitionID, ScheduledFuture>();
        this.channelsWithEndOfPartitionEvents = new BitSet(numberOfInputChannels);
        this.enqueuedInputChannelsWithData = new BitSet(numberOfInputChannels);
        this.taskActions = (TaskActions)Preconditions.checkNotNull((Object)taskActions);
        this.metrics = (TaskIOMetricGroup)Preconditions.checkNotNull((Object)metrics);
        this.scheduledExecutorService = (ScheduledExecutorService)Preconditions.checkNotNull((Object)scheduledExecutorService);
        this.networkEnvironment = networkEnvironment;
        this.partitionRequestManager = (PartitionRequestManager)Preconditions.checkNotNull((Object)partitionRequestManager);
        this.isPartitionRequestRestricted = isPartitionRequestRestricted;
        this.isCreditBased = networkEnvironment == null ? true : networkEnvironment.isCreditBased();
        this.failedInputChannelUpdatingTimeout = failedInputChannelUpdatingTimeout;
        if (isPartitionRequestRestricted) {
            Preconditions.checkArgument((boolean)this.isCreditBased, (Object)"Yarn external shuffle service must use credit based back pressure.");
        }
        partitionRequestManager.registerSingleInputGate(this);
    }

    @Override
    public int getNumberOfInputChannels() {
        return this.numberOfInputChannels;
    }

    @Override
    public InputChannel[] getAllInputChannels() {
        InputChannel[] channelArr = new InputChannel[this.numberOfInputChannels];
        Iterator<InputChannel> iterator = this.inputChannels.values().iterator();
        while (iterator.hasNext()) {
            InputChannel channel;
            channelArr[channel.channelIndex] = channel = iterator.next();
        }
        return channelArr;
    }

    public IntermediateDataSetID getConsumedResultId() {
        return this.consumedResultId;
    }

    public ResultPartitionType getConsumedPartitionType() {
        return this.consumedPartitionType;
    }

    int getConsumedSubpartitionIndex() {
        return this.consumedSubpartitionIndex;
    }

    public boolean isPartitionRequestRestricted() {
        return this.isPartitionRequestRestricted;
    }

    BufferProvider getBufferProvider() {
        return this.bufferPool;
    }

    public BufferPool getBufferPool() {
        return this.bufferPool;
    }

    @Override
    public int getPageSize() {
        if (this.bufferPool != null) {
            return this.bufferPool.getMemorySegmentSize();
        }
        throw new IllegalStateException("Input gate has not been initialized with buffers.");
    }

    @Override
    public int getSubInputGateCount() {
        return 0;
    }

    @Override
    public InputGate getSubInputGate(int index) {
        return null;
    }

    public int getNumberOfQueuedBuffers() {
        for (int retry2 = 0; retry2 < 3; ++retry2) {
            try {
                int totalBuffers = 0;
                for (InputChannel channel : this.inputChannels.values()) {
                    if (!(channel instanceof RemoteInputChannel)) continue;
                    totalBuffers += ((RemoteInputChannel)channel).getNumberOfQueuedBuffers();
                }
                return totalBuffers;
            }
            catch (Exception exception) {
                continue;
            }
        }
        return 0;
    }

    public void setBufferPool(BufferPool bufferPool) {
        Preconditions.checkState((this.bufferPool == null ? 1 : 0) != 0, (Object)"Bug in input gate setup logic: buffer pool hasalready been set for this input gate.");
        this.bufferPool = (BufferPool)Preconditions.checkNotNull((Object)bufferPool);
    }

    public void assignExclusiveSegments(InputChannel inputChannel) throws IOException {
        Preconditions.checkNotNull((Object)inputChannel);
        Preconditions.checkArgument((boolean)(inputChannel instanceof RemoteInputChannel), (Object)"InputChannel should be remote input channel only.");
        Preconditions.checkState((this.networkBufferPool != null ? 1 : 0) != 0, (Object)"Bug in input gate setup logic: global buffer pool has not been set for this input gate.");
        if (this.isCreditBased) {
            ((RemoteInputChannel)inputChannel).assignExclusiveSegments(this.networkBufferPool.requestMemorySegments(this.networkBuffersPerChannel));
        }
    }

    public void setNetworkProperties(NetworkBufferPool networkBufferPool, int networkBuffersPerChannel) {
        Preconditions.checkState((this.networkBufferPool == null ? 1 : 0) != 0, (Object)"Bug in input gate setup logic: global buffer pool hasalready been set for this input gate.");
        this.networkBufferPool = (NetworkBufferPool)Preconditions.checkNotNull((Object)networkBufferPool);
        this.networkBuffersPerChannel = networkBuffersPerChannel;
    }

    public void returnExclusiveSegments(List<MemorySegment> segments) throws IOException {
        this.networkBufferPool.recycleMemorySegments(segments);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void setInputChannel(IntermediateResultPartitionID partitionId, InputChannel inputChannel) {
        Object object = this.requestLock;
        synchronized (object) {
            if (this.inputChannels.put((IntermediateResultPartitionID)((Object)Preconditions.checkNotNull((Object)((Object)partitionId))), (InputChannel)Preconditions.checkNotNull((Object)inputChannel)) == null && inputChannel instanceof UnknownInputChannel) {
                ++this.numberOfUninitializedChannels;
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void updateInputChannel(InputChannelDeploymentDescriptor icdd) throws IOException, InterruptedException {
        Object object = this.requestLock;
        synchronized (object) {
            IntermediateResultPartitionID partitionId;
            InputChannel current;
            if (this.isReleased) {
                return;
            }
            ScheduledFuture timerFuture = this.connectionLostChannelTimers.remove((Object)icdd.getConsumedPartitionId().getPartitionId());
            if (timerFuture != null) {
                timerFuture.cancel(false);
            }
            if ((current = this.inputChannels.get((Object)(partitionId = icdd.getConsumedPartitionId().getPartitionId()))) instanceof UnknownInputChannel) {
                InputChannel newChannel;
                ResultPartitionLocation partitionLocation = icdd.getConsumedPartitionLocation();
                if (partitionLocation.isLocal()) {
                    newChannel = ((UnknownInputChannel)current).toLocalInputChannel(icdd.getConsumedPartitionId());
                } else if (partitionLocation.isRemote()) {
                    newChannel = ((UnknownInputChannel)current).toRemoteInputChannel(partitionLocation.getConnectionId(), icdd.getConsumedPartitionId());
                } else {
                    throw new IllegalStateException("Tried to update unknown channel with unknown channel.");
                }
                this.updateInputChannel(icdd.getConsumedPartitionId().getPartitionId(), newChannel);
            } else if (!current.getPartitionId().getProducerId().equals((Object)icdd.getConsumedPartitionId().getProducerId())) {
                this.updatedInputChannelDescriptors.put(partitionId, icdd);
                current.setError(new ProducerFailedException(new Exception("There is a new input channel producer " + (Object)((Object)icdd.getConsumedPartitionId().getProducerId()) + " for " + (Object)((Object)icdd.getConsumedPartitionId().getPartitionId()))));
            } else {
                LOG.debug("Received a duplicated updating message {} for channel {}", (Object)icdd, (Object)current);
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug("Update input channel {} with {}", (Object)current, (Object)icdd);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void retriggerPartitionRequest(IntermediateResultPartitionID partitionId) throws IOException, InterruptedException {
        Object object = this.requestLock;
        synchronized (object) {
            if (!this.isReleased) {
                InputChannel ch = this.inputChannels.get((Object)partitionId);
                Preconditions.checkNotNull((Object)ch, (String)("Unknown input channel with ID " + (Object)((Object)partitionId)));
                LOG.debug("Retriggering partition request {}:{}.", (Object)ch.partitionId, (Object)this.consumedSubpartitionIndex);
                if (ch.getClass() == RemoteInputChannel.class) {
                    RemoteInputChannel rch = (RemoteInputChannel)ch;
                    rch.retriggerSubpartitionRequest(this.consumedSubpartitionIndex);
                } else if (ch.getClass() == LocalInputChannel.class) {
                    LocalInputChannel ich = (LocalInputChannel)ch;
                    if (this.retriggerLocalRequestTimer == null) {
                        this.retriggerLocalRequestTimer = new Timer(true);
                    }
                    ich.retriggerSubpartitionRequest(this.retriggerLocalRequestTimer, this.consumedSubpartitionIndex);
                } else {
                    throw new IllegalStateException("Unexpected type of channel to retrigger partition: " + ch.getClass());
                }
            }
        }
    }

    public void releaseAllResources() throws IOException {
        this.releaseAllResources(null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void releaseAllResources(@Nullable Throwable throwable) throws IOException {
        boolean released = false;
        ArrayDeque<InputChannel> arrayDeque = this.requestLock;
        synchronized (arrayDeque) {
            if (!this.isReleased) {
                try {
                    LOG.debug("{}: Releasing {}.", (Object)this.owningTaskName, (Object)this);
                    if (this.retriggerLocalRequestTimer != null) {
                        this.retriggerLocalRequestTimer.cancel();
                    }
                    for (ScheduledFuture future : this.connectionLostChannelTimers.values()) {
                        future.cancel(false);
                    }
                    this.connectionLostChannelTimers.clear();
                    this.updatedInputChannelDescriptors.clear();
                    if (this.bufferPool != null) {
                        this.bufferPool.lazyDestroy();
                    }
                    for (InputChannel inputChannel : this.inputChannels.values()) {
                        try {
                            inputChannel.releaseAllResources(throwable);
                        }
                        catch (IOException e) {
                            LOG.warn("Error during release of channel resources: " + e.getMessage(), (Throwable)e);
                        }
                    }
                }
                finally {
                    this.isReleased = true;
                    released = true;
                }
            }
        }
        if (released) {
            arrayDeque = this.inputChannelsWithData;
            synchronized (arrayDeque) {
                this.inputChannelsWithData.notifyAll();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean isFinished() {
        Object object = this.requestLock;
        synchronized (object) {
            for (InputChannel inputChannel : this.inputChannels.values()) {
                if (inputChannel.isReleased()) continue;
                return false;
            }
        }
        return true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean moreAvailable() {
        ArrayDeque<InputChannel> arrayDeque = this.inputChannelsWithData;
        synchronized (arrayDeque) {
            return !this.inputChannelsWithData.isEmpty();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void requestPartitions() throws IOException, InterruptedException {
        Object object = this.requestLock;
        synchronized (object) {
            if (!this.requestedPartitionsFlag) {
                if (this.isReleased) {
                    throw new IllegalStateException("Already released.");
                }
                if (this.numberOfInputChannels != this.inputChannels.size()) {
                    throw new IllegalStateException("Bug in input gate setup logic: mismatch betweennumber of total input channels and the currently set number of input channels.");
                }
                this.partitionRequestManager.requestPartitions(this);
            }
            this.requestedPartitionsFlag = true;
        }
    }

    @Override
    public Optional<BufferOrEvent> getNextBufferOrEvent() throws IOException, InterruptedException {
        return this.getNextBufferOrEvent(true);
    }

    @Override
    public Optional<BufferOrEvent> getNextBufferOrEvent(InputGate subInputGate) throws IOException, InterruptedException {
        return this.getNextBufferOrEvent(true);
    }

    @Override
    public Optional<BufferOrEvent> pollNextBufferOrEvent() throws IOException, InterruptedException {
        return this.getNextBufferOrEvent(false);
    }

    @Override
    public Optional<BufferOrEvent> pollNextBufferOrEvent(InputGate subInputGate) throws IOException, InterruptedException {
        return this.getNextBufferOrEvent(false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Optional<BufferOrEvent> getNextBufferOrEvent(boolean blocking) throws IOException, InterruptedException {
        Buffer buffer;
        boolean moreAvailable;
        InputChannel currentChannel;
        if (this.hasReceivedAllEndOfPartitionEvents) {
            return Optional.empty();
        }
        if (this.isReleased) {
            throw new IllegalStateException("Released");
        }
        this.requestPartitions();
        Optional<Object> result = Optional.empty();
        do {
            ArrayDeque<InputChannel> arrayDeque = this.inputChannelsWithData;
            synchronized (arrayDeque) {
                while (this.inputChannelsWithData.size() == 0) {
                    if (this.isReleased) {
                        throw new IllegalStateException("Released");
                    }
                    if (blocking) {
                        this.inputChannelsWithData.wait();
                        continue;
                    }
                    return Optional.empty();
                }
                currentChannel = this.inputChannelsWithData.remove();
                this.enqueuedInputChannelsWithData.clear(currentChannel.getChannelIndex());
                moreAvailable = this.inputChannelsWithData.size() > 0;
            }
            if (currentChannel.isReleased()) continue;
            try {
                result = currentChannel.getNextBuffer();
            }
            catch (RemoteTransportException | ProducerFailedException e) {
                if (this.networkEnvironment != null) {
                    LOG.info("Producer of {} is failed", (Object)currentChannel, (Object)e);
                    this.producerFail(currentChannel, e);
                    return Optional.of(new BufferOrEvent(PartitionConsumptionFailedEvent.INSTANCE, currentChannel.getChannelIndex(), false));
                }
                throw e;
            }
        } while (!result.isPresent());
        if (((InputChannel.BufferAndAvailability)result.get()).moreAvailable()) {
            this.queueChannel(currentChannel);
            moreAvailable = true;
        }
        if ((buffer = ((InputChannel.BufferAndAvailability)result.get()).buffer()).isBuffer()) {
            return Optional.of(new BufferOrEvent(buffer, currentChannel.getChannelIndex(), moreAvailable));
        }
        AbstractEvent event = EventSerializer.fromBuffer(buffer, this.getClass().getClassLoader());
        if (event.getClass() == EndOfPartitionEvent.class) {
            this.channelsWithEndOfPartitionEvents.set(currentChannel.getChannelIndex());
            if (this.channelsWithEndOfPartitionEvents.cardinality() == this.numberOfInputChannels) {
                Preconditions.checkState((!moreAvailable || !this.pollNextBufferOrEvent().isPresent() ? 1 : 0) != 0);
                moreAvailable = false;
                this.hasReceivedAllEndOfPartitionEvents = true;
            }
            currentChannel.releaseAllResources();
            this.partitionRequestManager.onInputChannelFinish(this, currentChannel, this.hasReceivedAllEndOfPartitionEvents);
        }
        return Optional.of(new BufferOrEvent(event, currentChannel.getChannelIndex(), moreAvailable));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void sendTaskEvent(TaskEvent event) throws IOException {
        Object object = this.requestLock;
        synchronized (object) {
            for (InputChannel inputChannel : this.inputChannels.values()) {
                inputChannel.sendTaskEvent(event);
            }
            if (this.numberOfUninitializedChannels > 0) {
                this.pendingEvents.add(event);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void registerListener(InputGateListener inputGateListener) {
        List<InputGateListener> list = this.inputGateListeners;
        synchronized (list) {
            this.inputGateListeners.add(inputGateListener);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    void producerFail(InputChannel failedChannel, Throwable cause) throws IOException, InterruptedException {
        Object object = this.requestLock;
        synchronized (object) {
            failedChannel.releaseAllResources(cause);
            this.partitionRequestManager.onInputChannelFinish(this, failedChannel, this.hasReceivedAllEndOfPartitionEvents);
            InputChannelDeploymentDescriptor icdd = this.updatedInputChannelDescriptors.remove((Object)failedChannel.getPartitionId().getPartitionId());
            if (icdd != null) {
                InputChannel newChannel;
                ResultPartitionLocation partitionLocation = icdd.getConsumedPartitionLocation();
                if (partitionLocation.isLocal()) {
                    newChannel = new LocalInputChannel(this, failedChannel.getChannelIndex(), icdd.getConsumedPartitionId(), this.networkEnvironment.getResultPartitionManager(), this.networkEnvironment.getTaskEventDispatcher(), this.networkEnvironment.getPartitionRequestInitialBackoff(), this.networkEnvironment.getPartitionRequestMaxBackoff(), this.metrics);
                } else if (partitionLocation.isRemote()) {
                    newChannel = new RemoteInputChannel(this, failedChannel.getChannelIndex(), icdd.getConsumedPartitionId(), partitionLocation.getConnectionId(), this.networkEnvironment.getConnectionManager(), this.networkEnvironment.getPartitionRequestInitialBackoff(), this.networkEnvironment.getPartitionRequestMaxBackoff(), this.metrics);
                } else {
                    throw new IllegalStateException("Tried to update unknown channel with unknown channel.");
                }
                this.updateInputChannel(icdd.getConsumedPartitionId().getPartitionId(), newChannel);
            } else {
                UnknownInputChannel unknownInputChannel = new UnknownInputChannel(this, failedChannel.getChannelIndex(), failedChannel.getPartitionId(), this.networkEnvironment.getResultPartitionManager(), this.networkEnvironment.getTaskEventDispatcher(), this.networkEnvironment.getConnectionManager(), this.networkEnvironment.getPartitionRequestInitialBackoff(), this.networkEnvironment.getPartitionRequestMaxBackoff(), this.metrics);
                this.setInputChannel(failedChannel.getPartitionId().getPartitionId(), unknownInputChannel);
                if (this.failedInputChannelUpdatingTimeout == 0L) {
                    throw new FlinkRuntimeException("Failed InputChannel " + failedChannel + " without waiting updating", cause);
                }
                if (this.failedInputChannelUpdatingTimeout > 0L) {
                    ScheduledFuture<?> timerFuture = this.scheduledExecutorService.schedule(() -> {
                        try {
                            this.taskActions.failExternally((Throwable)new FlinkRuntimeException("Failed InputChannel " + failedChannel + " has not been updated after " + this.failedInputChannelUpdatingTimeout + " ms", cause));
                        }
                        catch (Throwable throwable) {
                            LOG.warn("Could not fail task", throwable);
                        }
                    }, this.failedInputChannelUpdatingTimeout, TimeUnit.MILLISECONDS);
                    this.connectionLostChannelTimers.put(failedChannel.getPartitionId().getPartitionId(), timerFuture);
                }
            }
        }
    }

    void notifyChannelNonEmpty(InputChannel channel) {
        this.queueChannel((InputChannel)Preconditions.checkNotNull((Object)channel));
    }

    void triggerPartitionStateCheck(ResultPartitionID partitionId) {
        this.taskActions.triggerPartitionProducerStateCheck(this.jobId, this.consumedResultId, partitionId);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void queueChannel(InputChannel channel) {
        int availableChannels;
        SequencedCollection<Object> sequencedCollection = this.inputChannelsWithData;
        synchronized (sequencedCollection) {
            if (this.enqueuedInputChannelsWithData.get(channel.getChannelIndex())) {
                return;
            }
            availableChannels = this.inputChannelsWithData.size();
            this.inputChannelsWithData.add(channel);
            this.enqueuedInputChannelsWithData.set(channel.getChannelIndex());
            if (availableChannels == 0) {
                this.inputChannelsWithData.notifyAll();
            }
        }
        if (availableChannels == 0) {
            sequencedCollection = this.inputGateListeners;
            synchronized (sequencedCollection) {
                for (InputGateListener listener : this.inputGateListeners) {
                    listener.notifyInputGateNonEmpty(this);
                }
            }
        }
    }

    Map<IntermediateResultPartitionID, InputChannel> getInputChannels() {
        return this.inputChannels;
    }

    @VisibleForTesting
    Map<IntermediateResultPartitionID, ScheduledFuture> getConnectionLostChannelTimers() {
        return this.connectionLostChannelTimers;
    }

    public void runAsync(Runnable runnable) {
        this.scheduledExecutorService.submit(runnable);
    }

    public static SingleInputGate create(String owningTaskName, JobID jobId, ExecutionAttemptID executionId, InputGateDeploymentDescriptor igdd, NetworkEnvironment networkEnvironment, TaskActions taskActions, TaskIOMetricGroup metrics, PartitionRequestManager partitionRequestManager, BlockingShuffleType shuffleType, ScheduledExecutorService scheduledExecutorService, long failedInputChannelUpdatingTimeout) {
        IntermediateDataSetID consumedResultId = (IntermediateDataSetID)((Object)Preconditions.checkNotNull((Object)((Object)igdd.getConsumedResultId())));
        ResultPartitionType consumedPartitionType = (ResultPartitionType)((Object)Preconditions.checkNotNull((Object)((Object)igdd.getConsumedPartitionType())));
        int consumedSubpartitionIndex = igdd.getConsumedSubpartitionIndex();
        Preconditions.checkArgument((consumedSubpartitionIndex >= 0 ? 1 : 0) != 0);
        InputChannelDeploymentDescriptor[] icdd = (InputChannelDeploymentDescriptor[])Preconditions.checkNotNull((Object)igdd.getInputChannelDeploymentDescriptors());
        boolean isPartitionRequestRestricted = shuffleType == BlockingShuffleType.YARN && consumedPartitionType.isBlocking();
        SingleInputGate inputGate = new SingleInputGate(owningTaskName, jobId, consumedResultId, consumedPartitionType, consumedSubpartitionIndex, icdd.length, taskActions, metrics, partitionRequestManager, scheduledExecutorService, networkEnvironment, isPartitionRequestRestricted, failedInputChannelUpdatingTimeout);
        InputChannel[] inputChannels = new InputChannel[icdd.length];
        int numLocalChannels = 0;
        int numRemoteChannels = 0;
        int numUnknownChannels = 0;
        for (int i = 0; i < inputChannels.length; ++i) {
            ResultPartitionID partitionId = icdd[i].getConsumedPartitionId();
            ResultPartitionLocation partitionLocation = icdd[i].getConsumedPartitionLocation();
            if (partitionLocation.isLocal()) {
                inputChannels[i] = new LocalInputChannel(inputGate, i, partitionId, networkEnvironment.getResultPartitionManager(), networkEnvironment.getTaskEventDispatcher(), networkEnvironment.getPartitionRequestInitialBackoff(), networkEnvironment.getPartitionRequestMaxBackoff(), metrics);
                ++numLocalChannels;
            } else if (partitionLocation.isRemote()) {
                inputChannels[i] = new RemoteInputChannel(inputGate, i, partitionId, partitionLocation.getConnectionId(), networkEnvironment.getConnectionManager(), networkEnvironment.getPartitionRequestInitialBackoff(), networkEnvironment.getPartitionRequestMaxBackoff(), metrics);
                ++numRemoteChannels;
            } else if (partitionLocation.isUnknown()) {
                inputChannels[i] = new UnknownInputChannel(inputGate, i, partitionId, networkEnvironment.getResultPartitionManager(), networkEnvironment.getTaskEventDispatcher(), networkEnvironment.getConnectionManager(), networkEnvironment.getPartitionRequestInitialBackoff(), networkEnvironment.getPartitionRequestMaxBackoff(), metrics);
                ++numUnknownChannels;
            } else {
                throw new IllegalStateException("Unexpected partition location.");
            }
            inputGate.setInputChannel(partitionId.getPartitionId(), inputChannels[i]);
        }
        LOG.info("Created {} input channels (local: {}, remote: {}, unknown: {}).", new Object[]{inputChannels.length, numLocalChannels, numRemoteChannels, numUnknownChannels});
        return inputGate;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    public void assignExclusiveSegments() throws IOException {
        Preconditions.checkState((boolean)this.isCreditBased, (Object)"Bug in input gate setup logic: exclusive buffers only exist with credit-based flow control.");
        Preconditions.checkState((this.networkBufferPool != null ? 1 : 0) != 0, (Object)"Bug in input gate setup logic: global buffer pool hasnot been set for this input gate.");
        Object object = this.requestLock;
        synchronized (object) {
            for (InputChannel inputChannel : this.inputChannels.values()) {
                if (!(inputChannel instanceof RemoteInputChannel)) continue;
                ((RemoteInputChannel)inputChannel).assignExclusiveSegments(this.networkBufferPool.requestMemorySegments(this.networkBuffersPerChannel));
            }
        }
    }

    @VisibleForTesting
    void updateInputChannel(IntermediateResultPartitionID partitionID, InputChannel inputChannel) throws IOException, InterruptedException {
        LOG.debug("Updated unknown input channel to {}.", (Object)inputChannel);
        this.inputChannels.put(partitionID, inputChannel);
        if (this.requestedPartitionsFlag) {
            this.partitionRequestManager.updateInputChannel(this, inputChannel);
        }
        for (TaskEvent event : this.pendingEvents) {
            inputChannel.sendTaskEvent(event);
        }
        if (--this.numberOfUninitializedChannels == 0) {
            this.pendingEvents.clear();
        }
    }
}

