package org.apache.flink.runtime.io.network.partition.consumer;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.ConnectionManager;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferListener;
import org.apache.flink.runtime.io.network.buffer.BufferProvider;
import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.io.network.netty.PartitionRequestClient;
import org.apache.flink.runtime.io.network.partition.DataConsumptionException;
import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.class */
public class RemoteInputChannel extends InputChannel implements BufferRecycler, BufferListener {
    private final InputChannelID id;
    private final ConnectionID connectionId;
    private final ConnectionManager connectionManager;
    private final ArrayDeque<Buffer> receivedBuffers;
    private final AtomicBoolean isReleased;
    private volatile PartitionRequestClient partitionRequestClient;
    private final Object partitionRequestLock;
    private int expectedSequenceNumber;
    private int initialCredit;
    private final AvailableBufferQueue bufferQueue;
    private final AtomicInteger unannouncedCredit;

    @GuardedBy("bufferQueue")
    private int numRequiredBuffers;

    @GuardedBy("bufferQueue")
    private boolean isWaitingForFloatingBuffers;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel$AvailableBufferQueue.class */
    public static class AvailableBufferQueue {
        private final ArrayDeque<Buffer> exclusiveBuffers = new ArrayDeque<>();
        private final ArrayDeque<Buffer> floatingBuffers = new ArrayDeque<>();

        AvailableBufferQueue() {
        }

        int addExclusiveBuffer(Buffer buffer, int i) {
            this.exclusiveBuffers.add(buffer);
            if (getAvailableBufferSize() <= i) {
                return 1;
            }
            this.floatingBuffers.poll().recycleBuffer();
            return 0;
        }

        void addFloatingBuffer(Buffer buffer) {
            this.floatingBuffers.add(buffer);
        }

        @Nullable
        Buffer takeBuffer() {
            return this.floatingBuffers.size() > 0 ? this.floatingBuffers.poll() : this.exclusiveBuffers.poll();
        }

        void releaseAll(List<MemorySegment> list) {
            while (true) {
                Buffer poll = this.floatingBuffers.poll();
                if (poll == null) {
                    break;
                } else {
                    poll.recycleBuffer();
                }
            }
            while (true) {
                Buffer poll2 = this.exclusiveBuffers.poll();
                if (poll2 == null) {
                    return;
                } else {
                    list.add(poll2.getMemorySegment());
                }
            }
        }

        int getAvailableBufferSize() {
            return this.floatingBuffers.size() + this.exclusiveBuffers.size();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel$BufferReorderingException.class */
    public static class BufferReorderingException extends IOException {
        private static final long serialVersionUID = -888282210356266816L;
        private final int expectedSequenceNumber;
        private final int actualSequenceNumber;

        BufferReorderingException(int i, int i2) {
            this.expectedSequenceNumber = i;
            this.actualSequenceNumber = i2;
        }

        @Override // java.lang.Throwable
        public String getMessage() {
            return String.format("Buffer re-ordering: expected buffer with sequence number %d, but received %d.", Integer.valueOf(this.expectedSequenceNumber), Integer.valueOf(this.actualSequenceNumber));
        }
    }

    public RemoteInputChannel(SingleInputGate singleInputGate, int i, ResultPartitionID resultPartitionID, ConnectionID connectionID, ConnectionManager connectionManager, TaskIOMetricGroup taskIOMetricGroup) {
        this(singleInputGate, i, resultPartitionID, connectionID, connectionManager, 0, 0, taskIOMetricGroup);
    }

    public RemoteInputChannel(SingleInputGate singleInputGate, int i, ResultPartitionID resultPartitionID, ConnectionID connectionID, ConnectionManager connectionManager, int i2, int i3, TaskIOMetricGroup taskIOMetricGroup) {
        this(singleInputGate, i, resultPartitionID, connectionID, connectionManager, i2, i3, taskIOMetricGroup.getNumBytesInRemoteCounter());
    }

    public RemoteInputChannel(SingleInputGate singleInputGate, int i, ResultPartitionID resultPartitionID, ConnectionID connectionID, ConnectionManager connectionManager, int i2, int i3, Counter counter) {
        super(singleInputGate, i, resultPartitionID, i2, i3, counter);
        this.id = new InputChannelID();
        this.receivedBuffers = new ArrayDeque<>();
        this.isReleased = new AtomicBoolean();
        this.partitionRequestLock = new Object();
        this.expectedSequenceNumber = 0;
        this.bufferQueue = new AvailableBufferQueue();
        this.unannouncedCredit = new AtomicInteger(0);
        this.connectionId = (ConnectionID) Preconditions.checkNotNull(connectionID);
        this.connectionManager = (ConnectionManager) Preconditions.checkNotNull(connectionManager);
        singleInputGate.runAsync(new Runnable() { // from class: org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.1
            @Override // java.lang.Runnable
            public void run() {
                RemoteInputChannel.this.createPartitionRequestClient(false);
            }
        });
    }

    @VisibleForTesting
    void createPartitionRequestClient(boolean z) {
        synchronized (this.partitionRequestLock) {
            if (isReleased()) {
                if (z) {
                    setError(new IllegalStateException("Channel has been released!"));
                }
                return;
            }
            if (this.partitionRequestClient == null) {
                try {
                    this.partitionRequestClient = this.connectionManager.createPartitionRequestClient(this.connectionId);
                } catch (Throwable th) {
                    if (z) {
                        setError(new DataConsumptionException(this.partitionId, th));
                    }
                }
            }
        }
    }

    @VisibleForTesting
    public void assignExclusiveSegments(List<MemorySegment> list) {
        Preconditions.checkState(this.initialCredit == 0, "Bug in input channel setup logic: exclusive buffers have already been set for this input channel.");
        Preconditions.checkNotNull(list);
        Preconditions.checkArgument(list.size() > 0, "The number of exclusive buffers per channel should be larger than 0.");
        this.initialCredit = list.size();
        this.numRequiredBuffers = list.size();
        synchronized (this.bufferQueue) {
            Iterator<MemorySegment> it = list.iterator();
            while (it.hasNext()) {
                this.bufferQueue.addExclusiveBuffer(new NetworkBuffer(it.next(), this), this.numRequiredBuffers);
            }
        }
    }

    @Override // org.apache.flink.runtime.io.network.partition.consumer.InputChannel
    @VisibleForTesting
    public void requestSubpartition(int i) throws IOException, InterruptedException {
        if (this.partitionRequestClient == null) {
            createPartitionRequestClient(true);
            checkError();
        }
        this.partitionRequestClient.requestSubpartition(this.partitionId, i, this, 0);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void retriggerSubpartitionRequest(int i) throws IOException, InterruptedException {
        Preconditions.checkState(this.partitionRequestClient != null, "Partition request client initialization failed.");
        if (increaseBackoff()) {
            this.partitionRequestClient.requestSubpartition(this.partitionId, i, this, getCurrentBackoff());
        } else {
            failPartitionRequest();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Override // org.apache.flink.runtime.io.network.partition.consumer.InputChannel
    public Optional<InputChannel.BufferAndAvailability> getNextBuffer() throws IOException {
        Buffer poll;
        int size;
        Preconditions.checkState(!this.isReleased.get(), "Queried for a buffer after channel has been closed.");
        Preconditions.checkState(this.partitionRequestClient != null, "Queried for a buffer before requesting a queue.");
        checkError();
        synchronized (this.receivedBuffers) {
            poll = this.receivedBuffers.poll();
            size = this.receivedBuffers.size();
        }
        this.numBytesIn.inc(poll.getSizeUnsafe());
        return Optional.of(new InputChannel.BufferAndAvailability(poll, size > 0, getSenderBacklog()));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Override // org.apache.flink.runtime.io.network.partition.consumer.InputChannel
    public void sendTaskEvent(TaskEvent taskEvent) throws IOException {
        Preconditions.checkState(!this.isReleased.get(), "Tried to send task event to producer after channel has been released.");
        Preconditions.checkState(this.partitionRequestClient != null, "Tried to send task event to producer before requesting a queue.");
        checkError();
        this.partitionRequestClient.sendTaskEvent(this.partitionId, taskEvent, this);
    }

    @Override // org.apache.flink.runtime.io.network.partition.consumer.InputChannel
    public boolean isReleased() {
        return this.isReleased.get();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Override // org.apache.flink.runtime.io.network.partition.consumer.InputChannel
    public void releaseAllResources(@Nullable Throwable th) throws IOException {
        if (this.isReleased.compareAndSet(false, true)) {
            ArrayList arrayList = new ArrayList();
            synchronized (this.receivedBuffers) {
                while (true) {
                    Buffer poll = this.receivedBuffers.poll();
                    if (poll == null) {
                        break;
                    } else if (poll.getRecycler() == this) {
                        arrayList.add(poll.getMemorySegment());
                    } else {
                        poll.recycleBuffer();
                    }
                }
            }
            synchronized (this.bufferQueue) {
                this.bufferQueue.releaseAll(arrayList);
            }
            if (arrayList.size() > 0) {
                this.inputGate.returnExclusiveSegments(arrayList);
            }
            synchronized (this.partitionRequestLock) {
                if (this.partitionRequestClient != null) {
                    this.partitionRequestClient.close(this, th);
                } else {
                    this.connectionManager.closeOpenChannelConnections(this.connectionId);
                }
            }
        }
    }

    private void failPartitionRequest() {
        setError(new DataConsumptionException(this.partitionId, new PartitionNotFoundException(this.partitionId)));
    }

    public String toString() {
        return "RemoteInputChannel [" + this.partitionId + " at " + this.connectionId + " with channel index " + this.channelIndex + "]";
    }

    public ConnectionID getConnectionId() {
        return this.connectionId;
    }

    private void notifyCreditAvailable() {
        Preconditions.checkState(this.partitionRequestClient != null, "Tried to send task event to producer before requesting a queue.");
        if (this.isReleased.get()) {
            return;
        }
        this.partitionRequestClient.notifyCreditAvailable(this);
    }

    @Override // org.apache.flink.runtime.io.network.buffer.BufferRecycler
    public void recycle(MemorySegment memorySegment) {
        synchronized (this.bufferQueue) {
            if (this.isReleased.get()) {
                try {
                    this.inputGate.returnExclusiveSegments(Collections.singletonList(memorySegment));
                    return;
                } catch (Throwable th) {
                    ExceptionUtils.rethrow(th);
                }
            }
            int addExclusiveBuffer = this.bufferQueue.addExclusiveBuffer(new NetworkBuffer(memorySegment, this), this.numRequiredBuffers);
            if (addExclusiveBuffer <= 0 || this.unannouncedCredit.getAndAdd(addExclusiveBuffer) != 0) {
                return;
            }
            notifyCreditAvailable();
        }
    }

    public int getNumberOfAvailableBuffers() {
        int availableBufferSize;
        synchronized (this.bufferQueue) {
            availableBufferSize = this.bufferQueue.getAvailableBufferSize();
        }
        return availableBufferSize;
    }

    public int getNumberOfRequiredBuffers() {
        return this.numRequiredBuffers;
    }

    public int getSenderBacklog() {
        return this.numRequiredBuffers - this.initialCredit;
    }

    @VisibleForTesting
    boolean isWaitingForFloatingBuffers() {
        return this.isWaitingForFloatingBuffers;
    }

    @VisibleForTesting
    PartitionRequestClient getPartitionRequestClient() {
        return this.partitionRequestClient;
    }

    @Override // org.apache.flink.runtime.io.network.buffer.BufferListener
    public BufferListener.NotificationResult notifyBufferAvailable(Buffer buffer) {
        BufferListener.NotificationResult notificationResult = BufferListener.NotificationResult.NONE;
        try {
        } catch (Throwable th) {
            setError(th);
        }
        synchronized (this.bufferQueue) {
            Preconditions.checkState(this.isWaitingForFloatingBuffers, "This channel should be waiting for floating buffers.");
            if (this.isReleased.get() || this.bufferQueue.getAvailableBufferSize() >= this.numRequiredBuffers) {
                this.isWaitingForFloatingBuffers = false;
                return notificationResult;
            }
            this.bufferQueue.addFloatingBuffer(buffer);
            if (this.bufferQueue.getAvailableBufferSize() == this.numRequiredBuffers) {
                this.isWaitingForFloatingBuffers = false;
                notificationResult = BufferListener.NotificationResult.BUFFER_USED_FINISHED;
            } else {
                notificationResult = BufferListener.NotificationResult.BUFFER_USED_NEED_MORE;
            }
            if (this.unannouncedCredit.getAndAdd(1) == 0) {
                notifyCreditAvailable();
            }
            return notificationResult;
        }
    }

    @Override // org.apache.flink.runtime.io.network.buffer.BufferListener
    public void notifyBufferDestroyed() {
    }

    public int getUnannouncedCredit() {
        return this.unannouncedCredit.get();
    }

    public int getAndResetUnannouncedCredit() {
        return this.unannouncedCredit.getAndSet(0);
    }

    public int getNumberOfQueuedBuffers() {
        int size;
        synchronized (this.receivedBuffers) {
            size = this.receivedBuffers.size();
        }
        return size;
    }

    public int unsynchronizedGetNumberOfQueuedBuffers() {
        return Math.max(0, this.receivedBuffers.size());
    }

    public InputChannelID getInputChannelId() {
        return this.id;
    }

    public int getInitialCredit() {
        return this.initialCredit;
    }

    public BufferProvider getBufferProvider() throws IOException {
        if (this.isReleased.get()) {
            return null;
        }
        return this.inputGate.getBufferProvider();
    }

    @Nullable
    public Buffer requestBuffer() {
        Buffer takeBuffer;
        synchronized (this.bufferQueue) {
            takeBuffer = this.bufferQueue.takeBuffer();
        }
        return takeBuffer;
    }

    void onSenderBacklog(int i) throws IOException {
        int i2 = 0;
        synchronized (this.bufferQueue) {
            if (this.isReleased.get()) {
                return;
            }
            this.numRequiredBuffers = i + this.initialCredit;
            while (true) {
                if (this.bufferQueue.getAvailableBufferSize() >= this.numRequiredBuffers || this.isWaitingForFloatingBuffers) {
                    break;
                }
                Buffer requestBuffer = this.inputGate.getBufferPool().requestBuffer();
                if (requestBuffer != null) {
                    this.bufferQueue.addFloatingBuffer(requestBuffer);
                    i2++;
                } else if (this.inputGate.getBufferProvider().addBufferListener(this)) {
                    this.isWaitingForFloatingBuffers = true;
                    break;
                }
            }
            if (i2 <= 0 || this.unannouncedCredit.getAndAdd(i2) != 0) {
                return;
            }
            notifyCreditAvailable();
        }
    }

    public void onBuffer(Buffer buffer, int i, int i2) throws IOException {
        boolean z = false;
        try {
            synchronized (this.receivedBuffers) {
                if (!this.isReleased.get()) {
                    if (this.expectedSequenceNumber == i) {
                        int size = this.receivedBuffers.size();
                        this.receivedBuffers.add(buffer);
                        this.expectedSequenceNumber++;
                        if (size == 0) {
                            notifyChannelNonEmpty();
                        }
                        z = true;
                    } else {
                        onError(new BufferReorderingException(this.expectedSequenceNumber, i));
                    }
                }
            }
            if (z && i2 >= 0) {
                onSenderBacklog(i2);
            }
        } finally {
            if (!z) {
                buffer.recycleBuffer();
            }
        }
    }

    public void onEmptyBuffer(int i, int i2) throws IOException {
        boolean z = false;
        synchronized (this.receivedBuffers) {
            if (!this.isReleased.get()) {
                if (this.expectedSequenceNumber == i) {
                    this.expectedSequenceNumber++;
                    z = true;
                } else {
                    onError(new BufferReorderingException(this.expectedSequenceNumber, i));
                }
            }
        }
        if (!z || i2 < 0) {
            return;
        }
        onSenderBacklog(i2);
    }

    public void onFailedPartitionRequest() {
        this.inputGate.triggerPartitionStateCheck(this.partitionId);
    }

    public void onError(Throwable th) {
        setError(th);
    }
}
