package org.apache.flink.runtime.io.network.partition;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.core.memory.MemoryType;
import org.apache.flink.runtime.io.disk.iomanager.BufferFileReader;
import org.apache.flink.runtime.io.disk.iomanager.BufferFileWriter;
import org.apache.flink.runtime.io.disk.iomanager.SynchronousBufferFileReader;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
import org.apache.flink.runtime.util.event.NotificationListener;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.class */
public class SpilledSubpartitionView implements ResultSubpartitionView, NotificationListener {
    private static final Logger LOG;
    private final SpillableSubpartition parent;
    private final BufferFileWriter spillWriter;

    @GuardedBy("this")
    private final BufferFileReader fileReader;
    private final FixedLengthBufferPool bufferPool;
    private final BufferAvailabilityListener availabilityListener;
    private final long numberOfSpilledBuffers;

    @GuardedBy("this")
    private Buffer nextBuffer;
    private volatile boolean isSpillInProgress;
    static final /* synthetic */ boolean $assertionsDisabled;
    private AtomicBoolean isReleased = new AtomicBoolean(false);
    private int remainingBuffers = -1;

    /* JADX INFO: Access modifiers changed from: package-private */
    public SpilledSubpartitionView(SpillableSubpartition spillableSubpartition, int i, BufferFileWriter bufferFileWriter, long j, BufferAvailabilityListener bufferAvailabilityListener) throws IOException {
        this.isSpillInProgress = true;
        this.parent = (SpillableSubpartition) Preconditions.checkNotNull(spillableSubpartition);
        this.bufferPool = new FixedLengthBufferPool(2, i, MemoryType.HEAP);
        this.spillWriter = (BufferFileWriter) Preconditions.checkNotNull(bufferFileWriter);
        this.fileReader = new SynchronousBufferFileReader(bufferFileWriter.getChannelID(), false);
        Preconditions.checkArgument(j >= 0);
        this.numberOfSpilledBuffers = j;
        this.availabilityListener = (BufferAvailabilityListener) Preconditions.checkNotNull(bufferAvailabilityListener);
        if (bufferFileWriter.registerAllRequestsProcessedListener(this)) {
            LOG.debug("Spilling in progress. Waiting with notification about {} available buffers.", Long.valueOf(j));
            return;
        }
        this.isSpillInProgress = false;
        bufferAvailabilityListener.notifyDataAvailable();
        LOG.debug("No spilling in progress. Notified about {} available buffers.", Long.valueOf(j));
    }

    @Override // org.apache.flink.runtime.util.event.NotificationListener
    public void onNotification() {
        this.isSpillInProgress = false;
        this.availabilityListener.notifyDataAvailable();
        LOG.debug("Finished spilling. Notified about {} available buffers.", Long.valueOf(this.numberOfSpilledBuffers));
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultSubpartitionView
    @Nullable
    public ResultSubpartition.BufferAndBacklog getNextBuffer() throws IOException, InterruptedException {
        Buffer requestAndFillBuffer;
        boolean z;
        boolean z2;
        if (this.isSpillInProgress) {
            return null;
        }
        if (this.remainingBuffers < 0) {
            this.remainingBuffers = this.parent.getBuffersInBacklog();
        }
        try {
            synchronized (this) {
                requestAndFillBuffer = this.nextBuffer == null ? requestAndFillBuffer() : this.nextBuffer;
                this.nextBuffer = requestAndFillBuffer();
                z = (this.nextBuffer == null && this.fileReader.hasReachedEndOfFile()) ? false : true;
                z2 = (this.nextBuffer == null || this.nextBuffer.isBuffer()) ? false : true;
            }
            if (requestAndFillBuffer == null) {
                return null;
            }
            if (requestAndFillBuffer.isBuffer()) {
                this.remainingBuffers--;
            }
            return new ResultSubpartition.BufferAndBacklog(requestAndFillBuffer, this.remainingBuffers > 0 || z, this.remainingBuffers, z2);
        } catch (Throwable th) {
            throw new DataConsumptionException(this.parent.parent.getPartitionId(), th);
        }
    }

    @Nullable
    private Buffer requestAndFillBuffer() throws IOException, InterruptedException {
        if (!$assertionsDisabled && !Thread.holdsLock(this)) {
            throw new AssertionError();
        }
        if (this.fileReader.hasReachedEndOfFile()) {
            return null;
        }
        Buffer requestBufferUnblocking = this.bufferPool.requestBufferUnblocking();
        if (requestBufferUnblocking != null) {
            this.fileReader.readInto(requestBufferUnblocking);
        }
        return requestBufferUnblocking;
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultSubpartitionView
    public void notifyDataAvailable() {
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultSubpartitionView
    public void notifySubpartitionConsumed() throws IOException {
        this.parent.onConsumedSubpartition();
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultSubpartitionView
    public void releaseAllResources() throws IOException {
        if (this.isReleased.compareAndSet(false, true)) {
            synchronized (this) {
                this.fileReader.close();
                if (this.nextBuffer != null) {
                    this.nextBuffer.recycleBuffer();
                    this.nextBuffer = null;
                }
            }
            this.bufferPool.lazyDestroy();
        }
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultSubpartitionView
    public boolean isReleased() {
        return this.parent.isReleased() || this.isReleased.get();
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultSubpartitionView
    public boolean nextBufferIsEvent() {
        boolean z;
        synchronized (this) {
            if (this.nextBuffer == null) {
                try {
                    this.nextBuffer = requestAndFillBuffer();
                } catch (Exception e) {
                    return false;
                }
            }
            z = (this.nextBuffer == null || this.nextBuffer.isBuffer()) ? false : true;
        }
        return z;
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultSubpartitionView
    public synchronized boolean isAvailable() {
        return (this.nextBuffer == null && this.fileReader.hasReachedEndOfFile()) ? false : true;
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultSubpartitionView
    public void notifyCreditAdded(int i) {
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultSubpartitionView
    public Throwable getFailureCause() {
        return this.parent.getFailureCause();
    }

    public String toString() {
        return String.format("SpilledSubpartitionView(index: %d, buffers: %d) of InternalResultPartition %s", Integer.valueOf(this.parent.index), Long.valueOf(this.numberOfSpilledBuffers), this.parent.parent.getPartitionId());
    }

    @VisibleForTesting
    public int getRemainingBuffers() {
        return this.remainingBuffers;
    }

    static {
        $assertionsDisabled = !SpilledSubpartitionView.class.desiredAssertionStatus();
        LOG = LoggerFactory.getLogger(SpilledSubpartitionView.class);
    }
}
