package org.apache.flink.runtime.io.network.netty;

import java.io.IOException;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.io.network.NetworkSequenceViewReader;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.class */
public class CreditBasedSequenceNumberingViewReader implements BufferAvailabilityListener, NetworkSequenceViewReader {
    private final InputChannelID receiverId;
    private final PartitionRequestQueue requestQueue;
    private volatile ResultSubpartitionView subpartitionView;
    private int numCreditsAvailable;
    private final Object requestLock = new Object();
    private boolean isRegisteredAsAvailable = false;
    private int sequenceNumber = -1;

    /* JADX INFO: Access modifiers changed from: package-private */
    public CreditBasedSequenceNumberingViewReader(InputChannelID inputChannelID, int i, PartitionRequestQueue partitionRequestQueue) {
        this.receiverId = inputChannelID;
        this.numCreditsAvailable = i;
        this.requestQueue = partitionRequestQueue;
    }

    @Override // org.apache.flink.runtime.io.network.NetworkSequenceViewReader
    public void requestSubpartitionView(ResultPartitionProvider resultPartitionProvider, ResultPartitionID resultPartitionID, int i) throws IOException {
        synchronized (this.requestLock) {
            if (this.subpartitionView != null) {
                throw new IllegalStateException("Subpartition already requested");
            }
            this.subpartitionView = resultPartitionProvider.createSubpartitionView(resultPartitionID, i, this);
            this.subpartitionView.notifyCreditAdded(this.numCreditsAvailable);
        }
    }

    @Override // org.apache.flink.runtime.io.network.NetworkSequenceViewReader
    public void addCredit(int i) {
        this.numCreditsAvailable += i;
        this.subpartitionView.notifyCreditAdded(i);
    }

    @Override // org.apache.flink.runtime.io.network.NetworkSequenceViewReader
    public void setRegisteredAsAvailable(boolean z) {
        this.isRegisteredAsAvailable = z;
    }

    @Override // org.apache.flink.runtime.io.network.NetworkSequenceViewReader
    public boolean isRegisteredAsAvailable() {
        return this.isRegisteredAsAvailable;
    }

    @Override // org.apache.flink.runtime.io.network.NetworkSequenceViewReader
    public boolean isAvailable() {
        return hasBuffersAvailable() && (this.numCreditsAvailable > 0 || this.subpartitionView.nextBufferIsEvent());
    }

    private boolean isAvailable(ResultSubpartition.BufferAndBacklog bufferAndBacklog) {
        return bufferAndBacklog.isMoreAvailable() && (this.numCreditsAvailable > 0 || bufferAndBacklog.nextBufferIsEvent());
    }

    @Override // org.apache.flink.runtime.io.network.NetworkSequenceViewReader
    public InputChannelID getReceiverId() {
        return this.receiverId;
    }

    @Override // org.apache.flink.runtime.io.network.NetworkSequenceViewReader
    public int getSequenceNumber() {
        return this.sequenceNumber;
    }

    @VisibleForTesting
    int getNumCreditsAvailable() {
        return this.numCreditsAvailable;
    }

    @VisibleForTesting
    boolean hasBuffersAvailable() {
        return this.subpartitionView.isAvailable();
    }

    @Override // org.apache.flink.runtime.io.network.NetworkSequenceViewReader
    public InputChannel.BufferAndAvailability getNextBuffer() throws IOException, InterruptedException {
        ResultSubpartition.BufferAndBacklog nextBuffer = this.subpartitionView.getNextBuffer();
        if (nextBuffer == null) {
            return null;
        }
        this.sequenceNumber++;
        if (nextBuffer.buffer().isBuffer()) {
            int i = this.numCreditsAvailable - 1;
            this.numCreditsAvailable = i;
            if (i < 0) {
                throw new IllegalStateException("no credit available");
            }
        }
        return new InputChannel.BufferAndAvailability(nextBuffer.buffer(), isAvailable(nextBuffer), nextBuffer.buffersInBacklog());
    }

    @Override // org.apache.flink.runtime.io.network.NetworkSequenceViewReader
    public void notifySubpartitionConsumed() throws IOException {
        this.subpartitionView.notifySubpartitionConsumed();
    }

    @Override // org.apache.flink.runtime.io.network.NetworkSequenceViewReader
    public boolean isReleased() {
        return this.subpartitionView.isReleased();
    }

    @Override // org.apache.flink.runtime.io.network.NetworkSequenceViewReader
    public Throwable getFailureCause() {
        return this.subpartitionView.getFailureCause();
    }

    @Override // org.apache.flink.runtime.io.network.NetworkSequenceViewReader
    public void releaseAllResources() throws IOException {
        this.subpartitionView.releaseAllResources();
    }

    @Override // org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener
    public void notifyDataAvailable() {
        this.requestQueue.notifyReaderNonEmpty(this);
    }

    public String toString() {
        return "CreditBasedSequenceNumberingViewReader{requestLock=" + this.requestLock + ", receiverId=" + this.receiverId + ", sequenceNumber=" + this.sequenceNumber + ", numCreditsAvailable=" + this.numCreditsAvailable + ", isRegisteredAsAvailable=" + this.isRegisteredAsAvailable + '}';
    }
}
