package org.apache.flink.runtime.io.network.partition;

import java.io.IOException;
import java.util.Iterator;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
import org.apache.flink.runtime.io.network.partition.ReconnectableSubpartition;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/DrainablePipelinedSubpartition.class */
public class DrainablePipelinedSubpartition extends PipelinedSubpartition implements ReconnectableSubpartition {
    private static final Logger LOG = LoggerFactory.getLogger(DrainablePipelinedSubpartition.class);
    private BufferConsumer unfinishedBuffer;
    private ReconnectableSubpartition.State state;

    /* JADX INFO: Access modifiers changed from: package-private */
    public DrainablePipelinedSubpartition(int i, InternalResultPartition internalResultPartition) {
        super(i, internalResultPartition);
        this.state = ReconnectableSubpartition.State.INITIALIZED;
    }

    @Override // org.apache.flink.runtime.io.network.partition.PipelinedSubpartition, org.apache.flink.runtime.io.network.partition.ResultSubpartition
    public boolean add(BufferConsumer bufferConsumer) throws IOException {
        synchronized (this.buffers) {
            if (this.state != ReconnectableSubpartition.State.SUSPENDED) {
                return super.add(bufferConsumer);
            }
            clearFinishedBuffer();
            if (this.unfinishedBuffer != null) {
                this.unfinishedBuffer.build().recycleBuffer();
                Preconditions.checkState(this.unfinishedBuffer.isFinished());
                this.unfinishedBuffer.close();
                this.unfinishedBuffer = null;
            }
            bufferConsumer.build().recycleBuffer();
            if (bufferConsumer.isFinished()) {
                bufferConsumer.close();
            } else {
                this.unfinishedBuffer = bufferConsumer;
            }
            return true;
        }
    }

    @Override // org.apache.flink.runtime.io.network.partition.PipelinedSubpartition, org.apache.flink.runtime.io.network.partition.ResultSubpartition
    public void flush() {
        synchronized (this.buffers) {
            if (this.state != ReconnectableSubpartition.State.SUSPENDED) {
                super.flush();
            } else if (this.unfinishedBuffer != null) {
                this.unfinishedBuffer.build().recycleBuffer();
                if (this.unfinishedBuffer.isFinished()) {
                    this.unfinishedBuffer.close();
                    this.unfinishedBuffer = null;
                }
            }
        }
    }

    @Override // org.apache.flink.runtime.io.network.partition.PipelinedSubpartition, org.apache.flink.runtime.io.network.partition.ResultSubpartition
    public void finish() throws IOException {
        if (this.state == ReconnectableSubpartition.State.SUSPENDED) {
            LOG.info("Draining subpartition {} is finished", super.toString());
        }
        super.finish();
    }

    @Override // org.apache.flink.runtime.io.network.partition.PipelinedSubpartition, org.apache.flink.runtime.io.network.partition.ResultSubpartition
    public PipelinedSubpartitionView createReadView(BufferAvailabilityListener bufferAvailabilityListener) throws IOException {
        PipelinedSubpartitionView pipelinedSubpartitionView;
        synchronized (this.buffers) {
            Preconditions.checkState(!isReleased());
            LOG.debug("Creating read view for subpartition {} of partition {}.", Integer.valueOf(this.index), this.parent.getPartitionId());
            pipelinedSubpartitionView = new PipelinedSubpartitionView(this, bufferAvailabilityListener, false);
        }
        return pipelinedSubpartitionView;
    }

    @Override // org.apache.flink.runtime.io.network.partition.ReconnectableSubpartition
    public void suspend(@Nullable ResultSubpartitionView resultSubpartitionView) {
        synchronized (this.buffers) {
            if (this.state != ReconnectableSubpartition.State.SUSPENDED) {
                if (this.readView == null) {
                    LOG.info("{} is suspended, start draining now", this);
                    this.state = ReconnectableSubpartition.State.SUSPENDED;
                    clearFinishedBuffer();
                } else if (resultSubpartitionView == null || this.readView.equals(resultSubpartitionView)) {
                    LOG.info("{} is suspended, start draining now", this);
                    this.state = ReconnectableSubpartition.State.SUSPENDED;
                    clearFinishedBuffer();
                    this.readView.releaseAllResources(new ConsumptionDeclinedException(this.parent.getPartitionId()));
                    this.readView.notifyDataAvailable();
                    this.readView = null;
                } else {
                    LOG.info("Suspending is ignored because {} does not match the attached {}", resultSubpartitionView, this.readView);
                }
            }
        }
    }

    @Override // org.apache.flink.runtime.io.network.partition.ReconnectableSubpartition
    public void allowConsuming(ResultSubpartitionView resultSubpartitionView) {
        synchronized (this.buffers) {
            if (this.state != ReconnectableSubpartition.State.INITIALIZED) {
                LOG.info("{} is allowed to be consumed", this);
                clearFinishedBuffer();
                Preconditions.checkState(this.buffers.isEmpty(), "Buffer should be empty after draining");
                if (this.unfinishedBuffer != null) {
                    this.unfinishedBuffer.build().recycleBuffer();
                    Preconditions.checkState(this.unfinishedBuffer.isFinished(), "Buffer should be finished after draining");
                    this.unfinishedBuffer.close();
                    this.unfinishedBuffer = null;
                }
            }
            if (this.state == ReconnectableSubpartition.State.CONSUMING) {
                Preconditions.checkState(this.readView != null, "There should be a view attached in consuming state");
                LOG.warn("There is still a view {} attached, fail it", this.readView);
                this.readView.releaseAllResources(new ConsumptionDeclinedException(this.parent.getPartitionId()));
                this.readView.notifyDataAvailable();
            }
            this.readView = (PipelinedSubpartitionView) resultSubpartitionView;
            this.readView.allowConsuming();
            this.state = ReconnectableSubpartition.State.CONSUMING;
            updateFlushRequestedFlag(false);
            if (!this.buffers.isEmpty()) {
                this.readView.notifyDataAvailable();
            }
        }
    }

    @Override // org.apache.flink.runtime.io.network.partition.ReconnectableSubpartition
    public ReconnectableSubpartition.State getState() {
        return this.state;
    }

    @Override // org.apache.flink.runtime.io.network.partition.PipelinedSubpartition
    public String toString() {
        return String.format("Drainable [ with state %s ] %s ", this.state.toString(), super.toString());
    }

    @VisibleForTesting
    BufferConsumer getUnfinishedBuffer() {
        return this.unfinishedBuffer;
    }

    private int clearFinishedBuffer() {
        int i = 0;
        Iterator<BufferConsumer> it = this.buffers.iterator();
        while (it.hasNext()) {
            BufferConsumer next = it.next();
            next.build().recycleBuffer();
            if (next.isFinished()) {
                next.close();
                it.remove();
                i++;
            }
        }
        return i;
    }
}
