/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition.external;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutorService;
import javax.annotation.Nonnull;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
import org.apache.flink.runtime.io.disk.iomanager.SynchronousBufferFileReader;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.DataConsumptionException;
import org.apache.flink.runtime.io.network.partition.FixedLengthBufferPool;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.runtime.io.network.partition.external.ExternalBlockResultPartitionMeta;
import org.apache.flink.runtime.io.network.partition.external.FadvisedReadAheadSynchronousBufferFileReader;
import org.apache.flink.runtime.io.network.partition.external.FadvisedSynchronousBufferFileReader;
import org.apache.flink.util.Preconditions;
import org.apache.hadoop.io.ReadaheadPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ExternalBlockSubpartitionView
implements ResultSubpartitionView,
Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(ExternalBlockSubpartitionView.class);
    private final ExternalBlockResultPartitionMeta externalResultPartitionMeta;
    private final int subpartitionIndex;
    private final ExecutorService threadPool;
    private final ResultPartitionID resultPartitionID;
    private final FixedLengthBufferPool bufferPool;
    private final long waitCreditTimeoutInMills;
    private long totalLength;
    private final Object lock = new Object();
    @GuardedBy(value="lock")
    private final ArrayDeque<Buffer> buffers = new ArrayDeque();
    @GuardedBy(value="lock")
    private volatile Throwable cause;
    private long totalReadLength = 0L;
    private Iterator<ExternalBlockResultPartitionMeta.ExternalSubpartitionMeta> metaIterator;
    private SynchronousBufferFileReader currFsIn = null;
    private long currRemainLength = 0L;
    private final BufferAvailabilityListener listener;
    @GuardedBy(value="lock")
    private volatile boolean isReleased;
    @GuardedBy(value="lock")
    private boolean isRunning;
    @GuardedBy(value="lock")
    private volatile int currentCredit = 0;
    private final ReadaheadPool readaheadPool;

    public ExternalBlockSubpartitionView(ExternalBlockResultPartitionMeta externalResultPartitionMeta, int subpartitionIndex, ExecutorService threadPool, ResultPartitionID resultPartitionID, FixedLengthBufferPool bufferPool, long waitCreditTimeoutInMills, BufferAvailabilityListener listener, ReadaheadPool readaheadPool) {
        this.externalResultPartitionMeta = (ExternalBlockResultPartitionMeta)Preconditions.checkNotNull((Object)externalResultPartitionMeta);
        this.subpartitionIndex = subpartitionIndex;
        this.threadPool = (ExecutorService)Preconditions.checkNotNull((Object)threadPool);
        this.resultPartitionID = (ResultPartitionID)Preconditions.checkNotNull((Object)resultPartitionID);
        this.bufferPool = (FixedLengthBufferPool)Preconditions.checkNotNull((Object)bufferPool);
        this.waitCreditTimeoutInMills = waitCreditTimeoutInMills;
        this.listener = (BufferAvailabilityListener)Preconditions.checkNotNull((Object)listener);
        this.readaheadPool = readaheadPool;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @Override
    public void run() {
        Object object = this.lock;
        synchronized (object) {
            Preconditions.checkState((!this.isRunning ? 1 : 0) != 0, (Object)"All the previous instances should be already exited.");
            if (this.isReleased) {
                return;
            }
            this.isRunning = true;
        }
        try {
            if (this.metaIterator == null) {
                this.initializeMeta();
            }
            if (this.totalLength == 0L) {
                this.enqueueBuffer(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE));
                return;
            }
            while (true) {
                Object buffer;
                if (this.isAvailableForReadUnsafe()) {
                    buffer = this.readNextBuffer();
                    this.enqueueBuffer((Buffer)buffer);
                    continue;
                }
                if (this.waitCreditTimeoutInMills == 0L) return;
                if (!this.hasMoreDataToReadUnsafe()) {
                    return;
                }
                buffer = this.lock;
                synchronized (buffer) {
                    if (this.isReleased) {
                        return;
                    }
                    if (this.waitCreditTimeoutInMills < 0L) {
                        this.lock.wait();
                    } else {
                        this.lock.wait(this.waitCreditTimeoutInMills);
                        if (!this.isAvailableForReadUnsafe()) {
                            return;
                        }
                    }
                }
            }
        }
        catch (Throwable t) {
            LOG.error("Exception during reading {}", (Object)this, (Object)t);
            this.releaseAllResources(new DataConsumptionException(this.resultPartitionID, t));
            this.listener.notifyDataAvailable();
            return;
        }
        finally {
            object = this.lock;
            synchronized (object) {
                if (this.isReleased) {
                    this.closeCurrentFileReader();
                }
                this.isRunning = false;
                if (this.isAvailableForReadUnsafe()) {
                    this.threadPool.execute(this);
                }
            }
        }
    }

    private void initializeMeta() throws IOException {
        if (!this.externalResultPartitionMeta.hasInitialized()) {
            this.externalResultPartitionMeta.initialize();
        }
        List<ExternalBlockResultPartitionMeta.ExternalSubpartitionMeta> subpartitionMetas = this.externalResultPartitionMeta.getSubpartitionMeta(this.subpartitionIndex);
        this.metaIterator = subpartitionMetas.iterator();
        for (ExternalBlockResultPartitionMeta.ExternalSubpartitionMeta meta : subpartitionMetas) {
            this.totalLength += meta.getLength();
        }
    }

    private boolean isAvailableForReadUnsafe() {
        return this.hasMoreDataToReadUnsafe() && this.currentCredit > 0;
    }

    private boolean hasMoreDataToReadUnsafe() {
        return !this.isReleased && this.totalReadLength < this.totalLength;
    }

    @Nonnull
    private Buffer readNextBuffer() throws IOException, InterruptedException {
        if (this.currFsIn == null) {
            this.currFsIn = this.getNextFileReader();
        }
        Preconditions.checkState((this.currFsIn != null ? 1 : 0) != 0, (Object)"No more data to read.");
        Buffer buffer = this.bufferPool.requestBufferBlocking();
        Preconditions.checkState((buffer != null ? 1 : 0) != 0, (Object)"Failed to request a buffer.");
        Preconditions.checkState((this.currRemainLength > 0L ? 1 : 0) != 0, (Object)"Should have data to read from the current file.");
        try {
            long lengthToRead = Math.min(this.currRemainLength, (long)buffer.getMaxCapacity());
            this.currFsIn.readInto(buffer, lengthToRead);
            this.currRemainLength -= lengthToRead;
        }
        catch (Throwable t) {
            if (!buffer.isRecycled()) {
                buffer.recycleBuffer();
            }
            throw t;
        }
        if (this.currRemainLength == 0L) {
            this.closeCurrentFileReader();
        }
        return buffer;
    }

    private void closeCurrentFileReader() {
        if (this.currFsIn != null) {
            try {
                this.currFsIn.close();
            }
            catch (IOException ioe) {
                LOG.error("Ignore the close file exception.", (Throwable)ioe);
            }
            this.currFsIn = null;
        }
    }

    private SynchronousBufferFileReader getNextFileReader() throws IOException {
        SynchronousBufferFileReader nextFsIn = null;
        while (this.metaIterator.hasNext()) {
            ExternalBlockResultPartitionMeta.ExternalSubpartitionMeta nextMeta = this.metaIterator.next();
            this.currRemainLength = nextMeta.getLength();
            if (this.currRemainLength <= 0L) continue;
            FileIOChannel.ID fileChannelID = new FileIOChannel.ID(nextMeta.getDataFile().getPath());
            switch (this.externalResultPartitionMeta.getOsCachePolicy()) {
                case READ_AHEAD: {
                    nextFsIn = new FadvisedReadAheadSynchronousBufferFileReader(fileChannelID, false, nextMeta.getOffset(), nextMeta.getLength(), this.externalResultPartitionMeta.getMaxReadAheadLength(), this.readaheadPool);
                    break;
                }
                case NO_TREATMENT: {
                    nextFsIn = new SynchronousBufferFileReader(fileChannelID, false, false);
                    break;
                }
                default: {
                    nextFsIn = new FadvisedSynchronousBufferFileReader(fileChannelID, false, this.externalResultPartitionMeta.getOsCachePolicy(), nextMeta.getOffset(), nextMeta.getLength());
                }
            }
            nextFsIn.seekToPosition(nextMeta.getOffset());
            break;
        }
        return nextFsIn;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void enqueueBuffer(Buffer buffer) throws IOException {
        Object object = this.lock;
        synchronized (object) {
            if (this.isReleased) {
                buffer.recycleBuffer();
                return;
            }
            this.buffers.add(buffer);
            if (buffer.isBuffer()) {
                --this.currentCredit;
                this.totalReadLength += (long)buffer.getSize();
            }
            if (this.totalReadLength == this.totalLength && this.totalLength != 0L) {
                this.buffers.add(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE));
            }
        }
        this.listener.notifyDataAvailable();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public ResultSubpartition.BufferAndBacklog getNextBuffer() {
        Object object = this.lock;
        synchronized (object) {
            Buffer buffer = this.buffers.poll();
            Buffer nextBuffer = this.buffers.peek();
            if (buffer != null) {
                return new ResultSubpartition.BufferAndBacklog(buffer, nextBuffer != null, this.buffers.size(), nextBuffer != null && !nextBuffer.isBuffer());
            }
            return null;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean nextBufferIsEvent() {
        Object object = this.lock;
        synchronized (object) {
            if (this.cause != null) {
                Preconditions.checkState((this.buffers.size() == 0 ? 1 : 0) != 0, (Object)"All the buffer should be cleared after errors occur and released.");
                return true;
            }
            return this.buffers.size() > 0 && !this.buffers.peek().isBuffer();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean isAvailable() {
        Object object = this.lock;
        synchronized (object) {
            return this.buffers.size() > 0 || this.cause != null;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void notifyCreditAdded(int creditDeltas) {
        Object object = this.lock;
        synchronized (object) {
            int creditBeforeAdded = this.currentCredit;
            this.currentCredit += creditDeltas;
            if (creditBeforeAdded == 0) {
                if (!this.isRunning) {
                    this.threadPool.execute(this);
                } else {
                    this.lock.notifyAll();
                }
            }
        }
    }

    int getCreditUnsafe() {
        return this.currentCredit;
    }

    String getResultPartitionDir() {
        return this.externalResultPartitionMeta.getResultPartitionDir();
    }

    int getSubpartitionIndex() {
        return this.subpartitionIndex;
    }

    @Override
    public void notifyDataAvailable() {
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void releaseAllResources(Throwable cause) {
        Object object = this.lock;
        synchronized (object) {
            Buffer buffer;
            if (this.isReleased) {
                return;
            }
            if (cause != null) {
                this.cause = cause;
            }
            while ((buffer = this.buffers.poll()) != null) {
                buffer.recycleBuffer();
            }
            if (!this.isRunning) {
                this.closeCurrentFileReader();
            }
            this.externalResultPartitionMeta.notifySubpartitionConsumed(this.subpartitionIndex);
            this.isReleased = true;
            this.lock.notifyAll();
        }
    }

    @Override
    public boolean isReleased() {
        return this.isReleased;
    }

    @Override
    public Throwable getFailureCause() {
        return this.cause;
    }

    public String toString() {
        return String.format("ExternalSubpartitionView [current read file path : %s]", this.currFsIn == null ? null : this.currFsIn.getChannelID().getPath());
    }

    @VisibleForTesting
    long getTotalLength() {
        return this.totalLength;
    }

    @VisibleForTesting
    Iterator<ExternalBlockResultPartitionMeta.ExternalSubpartitionMeta> getMetaIterator() {
        return this.metaIterator;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    int getCurrentCredit() {
        Object object = this.lock;
        synchronized (object) {
            return this.currentCredit;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    public boolean isRunning() {
        Object object = this.lock;
        synchronized (object) {
            return this.isRunning;
        }
    }

    public ResultPartitionID getResultPartitionID() {
        return this.resultPartitionID;
    }
}

