/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition.consumer;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.ConnectionManager;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferListener;
import org.apache.flink.runtime.io.network.buffer.BufferProvider;
import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.io.network.netty.PartitionRequestClient;
import org.apache.flink.runtime.io.network.partition.DataConsumptionException;
import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;

public class RemoteInputChannel
extends InputChannel
implements BufferRecycler,
BufferListener {
    private final InputChannelID id = new InputChannelID();
    private final ConnectionID connectionId;
    private final ConnectionManager connectionManager;
    private final ArrayDeque<Buffer> receivedBuffers = new ArrayDeque();
    private final AtomicBoolean isReleased = new AtomicBoolean();
    private volatile PartitionRequestClient partitionRequestClient;
    private final Object partitionRequestLock = new Object();
    private int expectedSequenceNumber = 0;
    private int initialCredit;
    private final AvailableBufferQueue bufferQueue = new AvailableBufferQueue();
    private final AtomicInteger unannouncedCredit = new AtomicInteger(0);
    @GuardedBy(value="bufferQueue")
    private int numRequiredBuffers;
    @GuardedBy(value="bufferQueue")
    private boolean isWaitingForFloatingBuffers;

    public RemoteInputChannel(SingleInputGate inputGate, int channelIndex, ResultPartitionID partitionId, ConnectionID connectionId, ConnectionManager connectionManager, TaskIOMetricGroup metrics) {
        this(inputGate, channelIndex, partitionId, connectionId, connectionManager, 0, 0, metrics);
    }

    public RemoteInputChannel(SingleInputGate inputGate, int channelIndex, ResultPartitionID partitionId, ConnectionID connectionId, ConnectionManager connectionManager, int initialBackOff, int maxBackoff, TaskIOMetricGroup metrics) {
        this(inputGate, channelIndex, partitionId, connectionId, connectionManager, initialBackOff, maxBackoff, metrics.getNumBytesInRemoteCounter());
    }

    public RemoteInputChannel(SingleInputGate inputGate, int channelIndex, ResultPartitionID partitionId, ConnectionID connectionId, ConnectionManager connectionManager, int initialBackOff, int maxBackoff, Counter bytesIn) {
        super(inputGate, channelIndex, partitionId, initialBackOff, maxBackoff, bytesIn);
        this.connectionId = (ConnectionID)Preconditions.checkNotNull((Object)connectionId);
        this.connectionManager = (ConnectionManager)Preconditions.checkNotNull((Object)connectionManager);
        inputGate.runAsync(new Runnable(){

            @Override
            public void run() {
                RemoteInputChannel.this.createPartitionRequestClient(false);
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void createPartitionRequestClient(boolean notifyError) {
        Object object = this.partitionRequestLock;
        synchronized (object) {
            block6: {
                if (this.partitionRequestClient == null) {
                    try {
                        this.partitionRequestClient = this.connectionManager.createPartitionRequestClient(this.connectionId);
                    }
                    catch (Throwable t) {
                        if (!notifyError) break block6;
                        this.setError(new DataConsumptionException(this.partitionId, t));
                    }
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    public void assignExclusiveSegments(List<MemorySegment> segments) {
        Preconditions.checkState((this.initialCredit == 0 ? 1 : 0) != 0, (Object)"Bug in input channel setup logic: exclusive buffers have already been set for this input channel.");
        Preconditions.checkNotNull(segments);
        Preconditions.checkArgument((segments.size() > 0 ? 1 : 0) != 0, (Object)"The number of exclusive buffers per channel should be larger than 0.");
        this.initialCredit = segments.size();
        this.numRequiredBuffers = segments.size();
        AvailableBufferQueue availableBufferQueue = this.bufferQueue;
        synchronized (availableBufferQueue) {
            for (MemorySegment segment : segments) {
                this.bufferQueue.addExclusiveBuffer(new NetworkBuffer(segment, this), this.numRequiredBuffers);
            }
        }
    }

    @Override
    @VisibleForTesting
    public void requestSubpartition(int subpartitionIndex) throws IOException, InterruptedException {
        if (this.partitionRequestClient == null) {
            this.createPartitionRequestClient(true);
            this.checkError();
        }
        this.partitionRequestClient.requestSubpartition(this.partitionId, subpartitionIndex, this, 0);
    }

    void retriggerSubpartitionRequest(int subpartitionIndex) throws IOException, InterruptedException {
        Preconditions.checkState((this.partitionRequestClient != null ? 1 : 0) != 0, (Object)"Partition request client initialization failed.");
        if (this.increaseBackoff()) {
            this.partitionRequestClient.requestSubpartition(this.partitionId, subpartitionIndex, this, this.getCurrentBackoff());
        } else {
            this.failPartitionRequest();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    Optional<InputChannel.BufferAndAvailability> getNextBuffer() throws IOException {
        int remaining;
        Buffer next;
        Preconditions.checkState((!this.isReleased.get() ? 1 : 0) != 0, (Object)"Queried for a buffer after channel has been closed.");
        Preconditions.checkState((this.partitionRequestClient != null ? 1 : 0) != 0, (Object)"Queried for a buffer before requesting a queue.");
        this.checkError();
        ArrayDeque<Buffer> arrayDeque = this.receivedBuffers;
        synchronized (arrayDeque) {
            next = this.receivedBuffers.poll();
            remaining = this.receivedBuffers.size();
        }
        this.numBytesIn.inc((long)next.getSizeUnsafe());
        return Optional.of(new InputChannel.BufferAndAvailability(next, remaining > 0, this.getSenderBacklog()));
    }

    @Override
    void sendTaskEvent(TaskEvent event) throws IOException {
        Preconditions.checkState((!this.isReleased.get() ? 1 : 0) != 0, (Object)"Tried to send task event to producer after channel has been released.");
        Preconditions.checkState((this.partitionRequestClient != null ? 1 : 0) != 0, (Object)"Tried to send task event to producer before requesting a queue.");
        this.checkError();
        this.partitionRequestClient.sendTaskEvent(this.partitionId, event, this);
    }

    @Override
    public boolean isReleased() {
        return this.isReleased.get();
    }

    @Override
    void notifySubpartitionConsumed() {
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    void releaseAllResources() throws IOException {
        if (this.isReleased.compareAndSet(false, true)) {
            ArrayList<MemorySegment> exclusiveRecyclingSegments = new ArrayList<MemorySegment>();
            Object object = this.receivedBuffers;
            synchronized (object) {
                Buffer buffer;
                while ((buffer = this.receivedBuffers.poll()) != null) {
                    if (buffer.getRecycler() == this) {
                        exclusiveRecyclingSegments.add(buffer.getMemorySegment());
                        continue;
                    }
                    buffer.recycleBuffer();
                }
            }
            object = this.bufferQueue;
            synchronized (object) {
                this.bufferQueue.releaseAll(exclusiveRecyclingSegments);
            }
            if (exclusiveRecyclingSegments.size() > 0) {
                this.inputGate.returnExclusiveSegments(exclusiveRecyclingSegments);
            }
            if (this.partitionRequestClient != null) {
                this.partitionRequestClient.close(this);
            } else {
                this.connectionManager.closeOpenChannelConnections(this.connectionId);
            }
        }
    }

    private void failPartitionRequest() {
        this.setError(new DataConsumptionException(this.partitionId, (Throwable)new PartitionNotFoundException(this.partitionId)));
    }

    public String toString() {
        return "RemoteInputChannel [" + this.partitionId + " at " + this.connectionId + "]";
    }

    private void notifyCreditAvailable() {
        Preconditions.checkState((this.partitionRequestClient != null ? 1 : 0) != 0, (Object)"Tried to send task event to producer before requesting a queue.");
        if (!this.isReleased.get()) {
            this.partitionRequestClient.notifyCreditAvailable(this);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void recycle(MemorySegment segment) {
        int numAddedBuffers;
        AvailableBufferQueue availableBufferQueue = this.bufferQueue;
        synchronized (availableBufferQueue) {
            if (this.isReleased.get()) {
                try {
                    this.inputGate.returnExclusiveSegments(Collections.singletonList(segment));
                    return;
                }
                catch (Throwable t) {
                    ExceptionUtils.rethrow((Throwable)t);
                }
            }
            numAddedBuffers = this.bufferQueue.addExclusiveBuffer(new NetworkBuffer(segment, this), this.numRequiredBuffers);
        }
        if (numAddedBuffers > 0 && this.unannouncedCredit.getAndAdd(numAddedBuffers) == 0) {
            this.notifyCreditAvailable();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public int getNumberOfAvailableBuffers() {
        AvailableBufferQueue availableBufferQueue = this.bufferQueue;
        synchronized (availableBufferQueue) {
            return this.bufferQueue.getAvailableBufferSize();
        }
    }

    public int getNumberOfRequiredBuffers() {
        return this.numRequiredBuffers;
    }

    public int getSenderBacklog() {
        return this.numRequiredBuffers - this.initialCredit;
    }

    @VisibleForTesting
    boolean isWaitingForFloatingBuffers() {
        return this.isWaitingForFloatingBuffers;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public BufferListener.NotificationResult notifyBufferAvailable(Buffer buffer) {
        BufferListener.NotificationResult notificationResult = BufferListener.NotificationResult.NONE;
        try {
            AvailableBufferQueue availableBufferQueue = this.bufferQueue;
            synchronized (availableBufferQueue) {
                Preconditions.checkState((boolean)this.isWaitingForFloatingBuffers, (Object)"This channel should be waiting for floating buffers.");
                if (this.isReleased.get() || this.bufferQueue.getAvailableBufferSize() >= this.numRequiredBuffers) {
                    this.isWaitingForFloatingBuffers = false;
                    return notificationResult;
                }
                this.bufferQueue.addFloatingBuffer(buffer);
                if (this.bufferQueue.getAvailableBufferSize() == this.numRequiredBuffers) {
                    this.isWaitingForFloatingBuffers = false;
                    notificationResult = BufferListener.NotificationResult.BUFFER_USED_FINISHED;
                } else {
                    notificationResult = BufferListener.NotificationResult.BUFFER_USED_NEED_MORE;
                }
            }
            if (this.unannouncedCredit.getAndAdd(1) == 0) {
                this.notifyCreditAvailable();
            }
        }
        catch (Throwable t) {
            this.setError(t);
        }
        return notificationResult;
    }

    @Override
    public void notifyBufferDestroyed() {
    }

    public int getUnannouncedCredit() {
        return this.unannouncedCredit.get();
    }

    public int getAndResetUnannouncedCredit() {
        return this.unannouncedCredit.getAndSet(0);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public int getNumberOfQueuedBuffers() {
        ArrayDeque<Buffer> arrayDeque = this.receivedBuffers;
        synchronized (arrayDeque) {
            return this.receivedBuffers.size();
        }
    }

    public int unsynchronizedGetNumberOfQueuedBuffers() {
        return Math.max(0, this.receivedBuffers.size());
    }

    public InputChannelID getInputChannelId() {
        return this.id;
    }

    public int getInitialCredit() {
        return this.initialCredit;
    }

    public BufferProvider getBufferProvider() throws IOException {
        if (this.isReleased.get()) {
            return null;
        }
        return this.inputGate.getBufferProvider();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Nullable
    public Buffer requestBuffer() {
        AvailableBufferQueue availableBufferQueue = this.bufferQueue;
        synchronized (availableBufferQueue) {
            return this.bufferQueue.takeBuffer();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void onSenderBacklog(int backlog) throws IOException {
        int numRequestedBuffers = 0;
        AvailableBufferQueue availableBufferQueue = this.bufferQueue;
        synchronized (availableBufferQueue) {
            if (this.isReleased.get()) {
                return;
            }
            this.numRequiredBuffers = backlog + this.initialCredit;
            while (this.bufferQueue.getAvailableBufferSize() < this.numRequiredBuffers && !this.isWaitingForFloatingBuffers) {
                Buffer buffer = this.inputGate.getBufferPool().requestBuffer();
                if (buffer != null) {
                    this.bufferQueue.addFloatingBuffer(buffer);
                    ++numRequestedBuffers;
                    continue;
                }
                if (!this.inputGate.getBufferProvider().addBufferListener(this)) continue;
                this.isWaitingForFloatingBuffers = true;
                break;
            }
        }
        if (numRequestedBuffers > 0 && this.unannouncedCredit.getAndAdd(numRequestedBuffers) == 0) {
            this.notifyCreditAvailable();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onBuffer(Buffer buffer, int sequenceNumber, int backlog) throws IOException {
        boolean success = false;
        try {
            ArrayDeque<Buffer> arrayDeque = this.receivedBuffers;
            synchronized (arrayDeque) {
                if (!this.isReleased.get()) {
                    if (this.expectedSequenceNumber == sequenceNumber) {
                        int available = this.receivedBuffers.size();
                        this.receivedBuffers.add(buffer);
                        ++this.expectedSequenceNumber;
                        if (available == 0) {
                            this.notifyChannelNonEmpty();
                        }
                        success = true;
                    } else {
                        this.onError(new BufferReorderingException(this.expectedSequenceNumber, sequenceNumber));
                    }
                }
            }
            if (success && backlog >= 0) {
                this.onSenderBacklog(backlog);
            }
        }
        finally {
            if (!success) {
                buffer.recycleBuffer();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onEmptyBuffer(int sequenceNumber, int backlog) throws IOException {
        boolean success = false;
        ArrayDeque<Buffer> arrayDeque = this.receivedBuffers;
        synchronized (arrayDeque) {
            if (!this.isReleased.get()) {
                if (this.expectedSequenceNumber == sequenceNumber) {
                    ++this.expectedSequenceNumber;
                    success = true;
                } else {
                    this.onError(new BufferReorderingException(this.expectedSequenceNumber, sequenceNumber));
                }
            }
        }
        if (success && backlog >= 0) {
            this.onSenderBacklog(backlog);
        }
    }

    public void onFailedPartitionRequest() {
        this.inputGate.triggerPartitionStateCheck(this.partitionId);
    }

    public void onError(Throwable cause) {
        this.setError(cause);
    }

    private static class AvailableBufferQueue {
        private final ArrayDeque<Buffer> floatingBuffers;
        private final ArrayDeque<Buffer> exclusiveBuffers = new ArrayDeque();

        AvailableBufferQueue() {
            this.floatingBuffers = new ArrayDeque();
        }

        int addExclusiveBuffer(Buffer buffer, int numRequiredBuffers) {
            this.exclusiveBuffers.add(buffer);
            if (this.getAvailableBufferSize() > numRequiredBuffers) {
                Buffer floatingBuffer = this.floatingBuffers.poll();
                floatingBuffer.recycleBuffer();
                return 0;
            }
            return 1;
        }

        void addFloatingBuffer(Buffer buffer) {
            this.floatingBuffers.add(buffer);
        }

        @Nullable
        Buffer takeBuffer() {
            if (this.floatingBuffers.size() > 0) {
                return this.floatingBuffers.poll();
            }
            return this.exclusiveBuffers.poll();
        }

        void releaseAll(List<MemorySegment> exclusiveSegments) {
            Buffer buffer;
            while ((buffer = this.floatingBuffers.poll()) != null) {
                buffer.recycleBuffer();
            }
            while ((buffer = this.exclusiveBuffers.poll()) != null) {
                exclusiveSegments.add(buffer.getMemorySegment());
            }
        }

        int getAvailableBufferSize() {
            return this.floatingBuffers.size() + this.exclusiveBuffers.size();
        }
    }

    private static class BufferReorderingException
    extends IOException {
        private static final long serialVersionUID = -888282210356266816L;
        private final int expectedSequenceNumber;
        private final int actualSequenceNumber;

        BufferReorderingException(int expectedSequenceNumber, int actualSequenceNumber) {
            this.expectedSequenceNumber = expectedSequenceNumber;
            this.actualSequenceNumber = actualSequenceNumber;
        }

        @Override
        public String getMessage() {
            return String.format("Buffer re-ordering: expected buffer with sequence number %d, but received %d.", this.expectedSequenceNumber, this.actualSequenceNumber);
        }
    }
}

