/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition.external.writer;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.disk.iomanager.BufferFileReader;
import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.RequestDoneCallback;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.io.network.partition.external.PartitionIndex;

public class AsynchronousPartitionedStreamFileReaderDelegate
implements RequestDoneCallback<Buffer>,
BufferRecycler {
    private final BufferFileReader reader;
    private final Queue<MemorySegment> freeSegments;
    private final LinkedBlockingQueue<Buffer> retBuffers = new LinkedBlockingQueue();
    private final List<PartitionIndex> partitionIndices;
    private int nextPartitionIdx;
    private long nextOffset;
    private final AtomicReference<IOException> cause = new AtomicReference<Object>(null);

    public AsynchronousPartitionedStreamFileReaderDelegate(IOManager ioManager, FileIOChannel.ID channel, List<MemorySegment> segments, List<PartitionIndex> partitionIndices) throws IOException {
        MemorySegment segment;
        this.reader = ioManager.createStreamFileReader(channel, this);
        this.freeSegments = new ArrayDeque<MemorySegment>(segments);
        this.partitionIndices = partitionIndices;
        while ((segment = this.freeSegments.poll()) != null) {
            this.sendRequestIfFeasible(segment);
        }
    }

    BufferFileReader getReader() {
        return this.reader;
    }

    Buffer getNextBufferBlocking() throws InterruptedException, IOException {
        Buffer buffer;
        do {
            if (this.cause.get() == null) continue;
            throw this.cause.get();
        } while ((buffer = this.retBuffers.poll(500L, TimeUnit.MILLISECONDS)) == null);
        return buffer;
    }

    private void sendRequestIfFeasible(MemorySegment memorySegment) throws IOException {
        long nextReadLength = 0L;
        while (this.nextPartitionIdx < this.partitionIndices.size()) {
            PartitionIndex partitionIndex = this.partitionIndices.get(this.nextPartitionIdx);
            long partitionEndOffset = partitionIndex.getStartOffset() + partitionIndex.getLength();
            assert (partitionEndOffset >= this.nextOffset);
            if (partitionEndOffset > this.nextOffset) {
                nextReadLength = Math.min(partitionEndOffset - this.nextOffset, (long)memorySegment.size());
                break;
            }
            ++this.nextPartitionIdx;
        }
        if (nextReadLength > 0L) {
            NetworkBuffer buffer = new NetworkBuffer(memorySegment, this);
            this.reader.readInto(buffer, nextReadLength);
            this.nextOffset += nextReadLength;
        }
    }

    @Override
    public void requestSuccessful(Buffer buffer) {
        this.retBuffers.add(buffer);
    }

    @Override
    public void requestFailed(Buffer buffer, IOException e) {
        this.cause.compareAndSet(null, e);
        throw new RuntimeException(e);
    }

    @Override
    public void recycle(MemorySegment memorySegment) {
        try {
            this.sendRequestIfFeasible(memorySegment);
        }
        catch (IOException e) {
            this.cause.compareAndSet(null, e);
            throw new RuntimeException(e);
        }
    }

    void close() throws IOException {
        this.reader.close();
        this.freeSegments.clear();
    }
}

