/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.io.disk.iomanager.BufferFileWriter;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.InternalResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.runtime.io.network.partition.SpillableSubpartitionView;
import org.apache.flink.runtime.io.network.partition.SpilledSubpartitionView;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class SpillableSubpartition
extends ResultSubpartition {
    private static final Logger LOG = LoggerFactory.getLogger(SpillableSubpartition.class);
    private final IOManager ioManager;
    private BufferFileWriter spillWriter;
    private boolean isFinished;
    private volatile boolean isReleased;
    private ResultSubpartitionView readView;
    private LinkedBlockingQueue<ResultSubpartitionView> unreleasedReadViews;

    SpillableSubpartition(int index, InternalResultPartition parent, IOManager ioManager) {
        this(index, parent, ioManager, false);
    }

    SpillableSubpartition(int index, InternalResultPartition parent, IOManager ioManager, boolean spillDirectly) {
        super(index, parent);
        this.ioManager = (IOManager)Preconditions.checkNotNull((Object)ioManager);
        this.unreleasedReadViews = new LinkedBlockingQueue();
        if (spillDirectly) {
            try {
                this.spillWriter = ioManager.createBufferFileWriter(ioManager.createChannel());
            }
            catch (IOException e) {
                ExceptionUtils.rethrow((Throwable)e);
            }
        }
    }

    @Override
    public synchronized boolean add(BufferConsumer bufferConsumer) throws IOException {
        return this.add(bufferConsumer, false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean add(BufferConsumer bufferConsumer, boolean forceFinishRemainingBuffers) throws IOException {
        Preconditions.checkNotNull((Object)bufferConsumer);
        ArrayDeque arrayDeque = this.buffers;
        synchronized (arrayDeque) {
            if (this.isFinished || this.isReleased) {
                bufferConsumer.close();
                return false;
            }
            this.buffers.add(bufferConsumer);
            this.updateStatistics(bufferConsumer);
            this.increaseBuffersInBacklog(bufferConsumer);
            if (this.spillWriter != null) {
                this.spillFinishedBufferConsumers(forceFinishRemainingBuffers);
            }
        }
        return true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void flush() {
        ArrayDeque arrayDeque = this.buffers;
        synchronized (arrayDeque) {
            if (this.readView != null) {
                this.readView.notifyDataAvailable();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public synchronized void finish() throws IOException {
        ArrayDeque arrayDeque = this.buffers;
        synchronized (arrayDeque) {
            if (this.add(EventSerializer.toBufferConsumer(EndOfPartitionEvent.INSTANCE), true)) {
                this.isFinished = true;
            }
            this.flush();
        }
        if (this.spillWriter != null) {
            this.spillWriter.close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public synchronized void release() throws IOException {
        ArrayDeque arrayDeque = this.buffers;
        synchronized (arrayDeque) {
            if (this.isReleased) {
                return;
            }
            for (BufferConsumer buffer : this.buffers) {
                buffer.close();
            }
            this.buffers.clear();
            if (this.spillWriter != null) {
                this.spillWriter.closeAndDelete();
            }
            this.isReleased = true;
        }
        while (!this.unreleasedReadViews.isEmpty()) {
            ResultSubpartitionView subpartitionView = this.unreleasedReadViews.poll();
            if (subpartitionView == null || subpartitionView.isReleased()) continue;
            subpartitionView.notifyDataAvailable();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public ResultSubpartitionView createReadView(BufferAvailabilityListener availabilityListener) throws IOException {
        ArrayDeque arrayDeque = this.buffers;
        synchronized (arrayDeque) {
            if (!this.isFinished) {
                throw new IllegalStateException("Subpartition has not been finished yet, but blocking subpartitions can only be consumed after they have been finished.");
            }
            Preconditions.checkState((!this.isReleased ? 1 : 0) != 0, (Object)"Subpartition has been released.");
            if (this.spillWriter != null) {
                this.readView = new SpilledSubpartitionView(this, this.parent.getBufferProvider().getMemorySegmentSize(), this.spillWriter, this.getTotalNumberOfBuffers(), availabilityListener);
            } else {
                if (this.readView != null) {
                    throw new IllegalStateException("Subpartition is being or already has been consumed, but we currently allow subpartitions to only be consumed once.");
                }
                this.readView = new SpillableSubpartitionView(this, this.buffers, this.ioManager, this.parent.getBufferProvider().getMemorySegmentSize(), availabilityListener);
            }
            this.unreleasedReadViews.add(this.readView);
            return this.readView;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public int releaseMemory() throws IOException {
        ArrayDeque arrayDeque = this.buffers;
        synchronized (arrayDeque) {
            ResultSubpartitionView view = this.readView;
            if (view != null && view.getClass() == SpillableSubpartitionView.class) {
                SpillableSubpartitionView spillableView = (SpillableSubpartitionView)view;
                return spillableView.releaseMemory();
            }
            if (this.spillWriter == null) {
                this.spillWriter = this.ioManager.createBufferFileWriter(this.ioManager.createChannel());
                int numberOfBuffers = this.buffers.size();
                long spilledBytes = this.spillFinishedBufferConsumers(this.isFinished);
                int spilledBuffers = numberOfBuffers - this.buffers.size();
                LOG.debug("Spilling {} bytes ({} buffers} for sub partition {} of {}.", new Object[]{spilledBytes, spilledBuffers, this.index, this.parent.getPartitionId()});
                return spilledBuffers;
            }
        }
        return 0;
    }

    @VisibleForTesting
    long spillFinishedBufferConsumers(boolean forceFinishRemainingBuffers) throws IOException {
        long spilledBytes = 0L;
        while (!this.buffers.isEmpty()) {
            BufferConsumer bufferConsumer = (BufferConsumer)this.buffers.getFirst();
            Buffer buffer = bufferConsumer.build();
            this.updateStatistics(buffer);
            int bufferSize = buffer.getSize();
            spilledBytes += (long)bufferSize;
            if (bufferConsumer.isFinished() || forceFinishRemainingBuffers) {
                if (bufferSize > 0) {
                    this.spillWriter.writeBlock(buffer);
                } else {
                    this.decreaseBuffersInBacklog(buffer);
                    buffer.recycleBuffer();
                }
                bufferConsumer.close();
                this.buffers.poll();
                continue;
            }
            if (bufferSize > 0) {
                this.spillWriter.writeBlock(buffer);
                this.increaseBuffersInBacklog(bufferConsumer);
            } else {
                buffer.recycleBuffer();
            }
            return spilledBytes;
        }
        return spilledBytes;
    }

    @Override
    public boolean isReleased() {
        return this.isReleased;
    }

    @Override
    public int unsynchronizedGetNumberOfQueuedBuffers() {
        return Math.max(this.buffers.size(), 0);
    }

    public String toString() {
        return String.format("SpillableSubpartition [%d number of buffers (%d bytes),%d number of buffers in backlog, finished? %s, read view? %s, spilled? %s]", this.getTotalNumberOfBuffers(), this.getTotalNumberOfBytes(), this.getBuffersInBacklog(), this.isFinished, this.readView != null, this.spillWriter != null);
    }
}

