package org.apache.flink.runtime.io.network.partition;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.io.disk.iomanager.BufferFileWriter;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.class */
class SpillableSubpartitionView implements ResultSubpartitionView {
    private static final Logger LOG = LoggerFactory.getLogger(SpillableSubpartitionView.class);
    private final SpillableSubpartition parent;
    private final ArrayDeque<BufferConsumer> buffers;
    private final IOManager ioManager;
    private final int memorySegmentSize;
    private final BufferAvailabilityListener listener;
    private final AtomicBoolean isReleased = new AtomicBoolean(false);
    private final long numBuffers;
    private BufferConsumer nextBuffer;
    private volatile SpilledSubpartitionView spilledView;

    /* JADX INFO: Access modifiers changed from: package-private */
    public SpillableSubpartitionView(SpillableSubpartition spillableSubpartition, ArrayDeque<BufferConsumer> arrayDeque, IOManager iOManager, int i, BufferAvailabilityListener bufferAvailabilityListener) {
        this.parent = (SpillableSubpartition) Preconditions.checkNotNull(spillableSubpartition);
        this.buffers = (ArrayDeque) Preconditions.checkNotNull(arrayDeque);
        this.ioManager = (IOManager) Preconditions.checkNotNull(iOManager);
        this.memorySegmentSize = i;
        this.listener = (BufferAvailabilityListener) Preconditions.checkNotNull(bufferAvailabilityListener);
        synchronized (arrayDeque) {
            this.numBuffers = arrayDeque.size();
            this.nextBuffer = arrayDeque.poll();
        }
        if (this.nextBuffer != null) {
            bufferAvailabilityListener.notifyDataAvailable();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int releaseMemory() throws IOException {
        synchronized (this.buffers) {
            if (this.spilledView != null || this.nextBuffer == null) {
                return 0;
            }
            BufferFileWriter createBufferFileWriter = this.ioManager.createBufferFileWriter(this.ioManager.createChannel());
            long j = 0;
            int size = this.buffers.size();
            for (int i = 0; i < size; i++) {
                BufferConsumer remove = this.buffers.remove();
                Throwable th = null;
                try {
                    try {
                        Buffer build = remove.build();
                        Preconditions.checkState(remove.isFinished(), "BufferConsumer must be finished before spilling. Otherwise we would not be able to simply remove it from the queue. This should be guaranteed by creating ResultSubpartitionView only once Subpartition isFinished.");
                        this.parent.updateStatistics(build);
                        j += build.getSize();
                        createBufferFileWriter.writeBlock(build);
                        if (remove != null) {
                            if (0 != 0) {
                                try {
                                    remove.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                remove.close();
                            }
                        }
                    } finally {
                    }
                } finally {
                }
            }
            this.spilledView = new SpilledSubpartitionView(this.parent, this.memorySegmentSize, createBufferFileWriter, size, this.listener);
            LOG.debug("Spilling {} bytes for sub partition {} of {}.", new Object[]{Long.valueOf(j), Integer.valueOf(this.parent.index), this.parent.parent.getPartitionId()});
            return size;
        }
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultSubpartitionView
    @Nullable
    public ResultSubpartition.BufferAndBacklog getNextBuffer() throws IOException, InterruptedException {
        Buffer buffer = null;
        boolean z = false;
        int i = 0;
        boolean z2 = false;
        if (this.parent.isReleased()) {
            if (getFailureCause() != null) {
                throw new ProducerFailedException(getFailureCause());
            }
            throw new ProducerFailedException(new IllegalStateException("Result subpartition[" + this.parent + "] has been released"));
        }
        try {
            synchronized (this.buffers) {
                if (this.isReleased.get()) {
                    return null;
                }
                if (this.nextBuffer != null) {
                    buffer = this.nextBuffer.build();
                    Preconditions.checkState(this.nextBuffer.isFinished(), "We can only read from SpillableSubpartition after it was finished");
                    i = this.parent.decreaseBuffersInBacklogUnsafe(this.nextBuffer.isBuffer());
                    this.nextBuffer.close();
                    this.nextBuffer = this.buffers.poll();
                    if (this.nextBuffer != null) {
                        z = !this.nextBuffer.isBuffer();
                        z2 = true;
                    }
                    this.parent.updateStatistics(buffer);
                    if (this.spilledView == null) {
                        return new ResultSubpartition.BufferAndBacklog(buffer, z2, i, z);
                    }
                }
                SpilledSubpartitionView spilledSubpartitionView = this.spilledView;
                if (spilledSubpartitionView != null) {
                    return buffer != null ? new ResultSubpartition.BufferAndBacklog(buffer, spilledSubpartitionView.isAvailable(), i, spilledSubpartitionView.nextBufferIsEvent()) : spilledSubpartitionView.getNextBuffer();
                }
                throw new IllegalStateException("No in-memory buffers available, but also nothing spilled.");
            }
        } catch (Throwable th) {
            throw new DataConsumptionException(this.parent.parent.getPartitionId(), th);
        }
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultSubpartitionView
    public void notifyDataAvailable() {
        this.listener.notifyDataAvailable();
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultSubpartitionView
    public void releaseAllResources(@Nullable Throwable th) throws IOException {
        if (this.isReleased.compareAndSet(false, true)) {
            SpilledSubpartitionView spilledSubpartitionView = this.spilledView;
            if (spilledSubpartitionView != null) {
                spilledSubpartitionView.releaseAllResources(th);
            } else if (th == null) {
                this.parent.onConsumedSubpartition();
            } else {
                this.parent.onSubpartitionConsumingFailure(this, th);
            }
            synchronized (this.buffers) {
                if (this.nextBuffer != null) {
                    this.nextBuffer.close();
                    this.nextBuffer = null;
                }
            }
        }
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultSubpartitionView
    public boolean isReleased() {
        SpilledSubpartitionView spilledSubpartitionView = this.spilledView;
        return spilledSubpartitionView != null ? spilledSubpartitionView.isReleased() : this.isReleased.get();
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultSubpartitionView
    public boolean nextBufferIsEvent() {
        synchronized (this.buffers) {
            if (this.isReleased.get()) {
                return false;
            }
            if (this.nextBuffer != null) {
                return !this.nextBuffer.isBuffer();
            }
            Preconditions.checkState(this.spilledView != null, "No in-memory buffers available, but also nothing spilled.");
            return this.spilledView.nextBufferIsEvent();
        }
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultSubpartitionView
    public boolean isAvailable() {
        synchronized (this.buffers) {
            if (this.nextBuffer != null) {
                return true;
            }
            if (this.spilledView == null) {
                return false;
            }
            return this.spilledView.isAvailable();
        }
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultSubpartitionView
    public void notifyCreditAdded(int i) {
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultSubpartitionView
    public Throwable getFailureCause() {
        SpilledSubpartitionView spilledSubpartitionView = this.spilledView;
        return spilledSubpartitionView != null ? spilledSubpartitionView.getFailureCause() : this.parent.getFailureCause();
    }

    public String toString() {
        return String.format("SpillableSubpartitionView(index: %d, buffers: %d, spilled? %b) of InternalResultPartition %s", Integer.valueOf(this.parent.index), Long.valueOf(this.numBuffers), Boolean.valueOf(this.spilledView != null), this.parent.parent.getPartitionId());
    }

    @VisibleForTesting
    public SpilledSubpartitionView getSpilledView() {
        return this.spilledView;
    }
}
