package org.apache.flink.runtime.io.network.partition.external.writer;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.disk.iomanager.BufferFileReader;
import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.RequestDoneCallback;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.io.network.partition.external.PartitionIndex;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/external/writer/AsynchronousPartitionedStreamFileReaderDelegate.class */
public class AsynchronousPartitionedStreamFileReaderDelegate implements RequestDoneCallback<Buffer>, BufferRecycler {
    private final BufferFileReader reader;
    private final Queue<MemorySegment> freeSegments;
    private final List<PartitionIndex> partitionIndices;
    private int nextPartitionIdx;
    private long nextOffset;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final LinkedBlockingQueue<Buffer> retBuffers = new LinkedBlockingQueue<>();
    private final AtomicReference<IOException> cause = new AtomicReference<>(null);

    public AsynchronousPartitionedStreamFileReaderDelegate(IOManager iOManager, FileIOChannel.ID id, List<MemorySegment> list, List<PartitionIndex> list2) throws IOException {
        this.reader = iOManager.createStreamFileReader(id, this);
        this.freeSegments = new ArrayDeque(list);
        this.partitionIndices = list2;
        while (true) {
            MemorySegment poll = this.freeSegments.poll();
            if (poll == null) {
                return;
            } else {
                sendRequestIfFeasible(poll);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public BufferFileReader getReader() {
        return this.reader;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Buffer getNextBufferBlocking() throws InterruptedException, IOException {
        while (this.cause.get() == null) {
            Buffer poll = this.retBuffers.poll(500L, TimeUnit.MILLISECONDS);
            if (poll != null) {
                return poll;
            }
        }
        throw this.cause.get();
    }

    private void sendRequestIfFeasible(MemorySegment memorySegment) throws IOException {
        long j = 0;
        while (true) {
            if (this.nextPartitionIdx >= this.partitionIndices.size()) {
                break;
            }
            PartitionIndex partitionIndex = this.partitionIndices.get(this.nextPartitionIdx);
            long startOffset = partitionIndex.getStartOffset() + partitionIndex.getLength();
            if (!$assertionsDisabled && startOffset < this.nextOffset) {
                throw new AssertionError();
            }
            if (startOffset > this.nextOffset) {
                j = Math.min(startOffset - this.nextOffset, memorySegment.size());
                break;
            }
            this.nextPartitionIdx++;
        }
        if (j > 0) {
            this.reader.readInto(new NetworkBuffer(memorySegment, this), j);
            this.nextOffset += j;
        }
    }

    @Override // org.apache.flink.runtime.io.disk.iomanager.RequestDoneCallback
    public void requestSuccessful(Buffer buffer) {
        this.retBuffers.add(buffer);
    }

    @Override // org.apache.flink.runtime.io.disk.iomanager.RequestDoneCallback
    public void requestFailed(Buffer buffer, IOException iOException) {
        this.cause.compareAndSet(null, iOException);
        throw new RuntimeException(iOException);
    }

    @Override // org.apache.flink.runtime.io.network.buffer.BufferRecycler
    public void recycle(MemorySegment memorySegment) {
        try {
            sendRequestIfFeasible(memorySegment);
        } catch (IOException e) {
            this.cause.compareAndSet(null, e);
            throw new RuntimeException(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void close() throws IOException {
        this.reader.close();
        this.freeSegments.clear();
    }

    static {
        $assertionsDisabled = !AsynchronousPartitionedStreamFileReaderDelegate.class.desiredAssertionStatus();
    }
}
