/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.core.memory.MemoryType;
import org.apache.flink.runtime.io.disk.iomanager.BufferFileReader;
import org.apache.flink.runtime.io.disk.iomanager.BufferFileWriter;
import org.apache.flink.runtime.io.disk.iomanager.SynchronousBufferFileReader;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.DataConsumptionException;
import org.apache.flink.runtime.io.network.partition.FixedLengthBufferPool;
import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.runtime.io.network.partition.SpillableSubpartition;
import org.apache.flink.runtime.util.event.NotificationListener;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SpilledSubpartitionView
implements ResultSubpartitionView,
NotificationListener {
    private static final Logger LOG = LoggerFactory.getLogger(SpilledSubpartitionView.class);
    private final SpillableSubpartition parent;
    private final BufferFileWriter spillWriter;
    @GuardedBy(value="this")
    private final BufferFileReader fileReader;
    private final FixedLengthBufferPool bufferPool;
    private final BufferAvailabilityListener availabilityListener;
    private final long numberOfSpilledBuffers;
    private AtomicBoolean isReleased = new AtomicBoolean(false);
    @GuardedBy(value="this")
    private Buffer nextBuffer;
    private volatile boolean isSpillInProgress = true;
    private int remainingBuffers = -1;

    SpilledSubpartitionView(SpillableSubpartition parent, int memorySegmentSize, BufferFileWriter spillWriter, long numberOfSpilledBuffers, BufferAvailabilityListener availabilityListener) throws IOException {
        this.parent = (SpillableSubpartition)Preconditions.checkNotNull((Object)parent);
        this.bufferPool = new FixedLengthBufferPool(2, memorySegmentSize, MemoryType.HEAP);
        this.spillWriter = (BufferFileWriter)Preconditions.checkNotNull((Object)spillWriter);
        this.fileReader = new SynchronousBufferFileReader(spillWriter.getChannelID(), false);
        Preconditions.checkArgument((numberOfSpilledBuffers >= 0L ? 1 : 0) != 0);
        this.numberOfSpilledBuffers = numberOfSpilledBuffers;
        this.availabilityListener = (BufferAvailabilityListener)Preconditions.checkNotNull((Object)availabilityListener);
        if (!spillWriter.registerAllRequestsProcessedListener(this)) {
            this.isSpillInProgress = false;
            availabilityListener.notifyDataAvailable();
            LOG.debug("No spilling in progress. Notified about {} available buffers.", (Object)numberOfSpilledBuffers);
        } else {
            LOG.debug("Spilling in progress. Waiting with notification about {} available buffers.", (Object)numberOfSpilledBuffers);
        }
    }

    @Override
    public void onNotification() {
        this.isSpillInProgress = false;
        this.availabilityListener.notifyDataAvailable();
        LOG.debug("Finished spilling. Notified about {} available buffers.", (Object)this.numberOfSpilledBuffers);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    @Nullable
    public ResultSubpartition.BufferAndBacklog getNextBuffer() throws IOException, InterruptedException {
        if (this.isSpillInProgress) {
            return null;
        }
        if (this.remainingBuffers < 0) {
            this.remainingBuffers = this.parent.getBuffersInBacklog();
        }
        try {
            boolean nextBufferIsEvent;
            boolean hasMoreData;
            Buffer current;
            SpilledSubpartitionView spilledSubpartitionView = this;
            synchronized (spilledSubpartitionView) {
                current = this.nextBuffer == null ? this.requestAndFillBuffer() : this.nextBuffer;
                this.nextBuffer = this.requestAndFillBuffer();
                hasMoreData = this.nextBuffer != null || !this.fileReader.hasReachedEndOfFile();
                nextBufferIsEvent = this.nextBuffer != null && !this.nextBuffer.isBuffer();
            }
            if (current == null) {
                return null;
            }
            if (current.isBuffer()) {
                --this.remainingBuffers;
            }
            return new ResultSubpartition.BufferAndBacklog(current, this.remainingBuffers > 0 || hasMoreData, this.remainingBuffers, nextBufferIsEvent);
        }
        catch (Throwable t) {
            throw new DataConsumptionException(this.parent.parent.getPartitionId(), t);
        }
    }

    @Nullable
    private Buffer requestAndFillBuffer() throws IOException, InterruptedException {
        assert (Thread.holdsLock(this));
        if (this.fileReader.hasReachedEndOfFile()) {
            return null;
        }
        Buffer buffer = this.bufferPool.requestBufferUnblocking();
        if (buffer != null) {
            this.fileReader.readInto(buffer);
        }
        return buffer;
    }

    @Override
    public void notifyDataAvailable() {
    }

    @Override
    public void notifySubpartitionConsumed() throws IOException {
        this.parent.onConsumedSubpartition();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void releaseAllResources() throws IOException {
        if (this.isReleased.compareAndSet(false, true)) {
            SpilledSubpartitionView spilledSubpartitionView = this;
            synchronized (spilledSubpartitionView) {
                this.fileReader.close();
                if (this.nextBuffer != null) {
                    this.nextBuffer.recycleBuffer();
                    this.nextBuffer = null;
                }
            }
            this.bufferPool.lazyDestroy();
        }
    }

    @Override
    public boolean isReleased() {
        return this.parent.isReleased() || this.isReleased.get();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean nextBufferIsEvent() {
        SpilledSubpartitionView spilledSubpartitionView = this;
        synchronized (spilledSubpartitionView) {
            if (this.nextBuffer == null) {
                try {
                    this.nextBuffer = this.requestAndFillBuffer();
                }
                catch (Exception e) {
                    return false;
                }
            }
            return this.nextBuffer != null && !this.nextBuffer.isBuffer();
        }
    }

    @Override
    public synchronized boolean isAvailable() {
        if (this.nextBuffer != null) {
            return true;
        }
        return !this.fileReader.hasReachedEndOfFile();
    }

    @Override
    public void notifyCreditAdded(int creditDeltas) {
    }

    @Override
    public Throwable getFailureCause() {
        return this.parent.getFailureCause();
    }

    public String toString() {
        return String.format("SpilledSubpartitionView(index: %d, buffers: %d) of InternalResultPartition %s", this.parent.index, this.numberOfSpilledBuffers, this.parent.parent.getPartitionId());
    }

    @VisibleForTesting
    public int getRemainingBuffers() {
        return this.remainingBuffers;
    }
}

