package org.apache.flink.streaming.runtime.io;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Iterator;
import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.checkpoint.decline.AlignmentLimitExceededException;
import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineException;
import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException;
import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineSubsumedException;
import org.apache.flink.runtime.checkpoint.decline.InputEndOfStreamException;
import org.apache.flink.runtime.checkpoint2.SubtaskCheckpointInfoCollector;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.CheckpointBarrierHandlerListener;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
import org.apache.flink.streaming.api.operators.InputElementSelection;
import org.apache.flink.streaming.runtime.io.BufferSpiller;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/runtime/io/BarrierBuffer.class */
public class BarrierBuffer implements CheckpointBarrierHandler, CheckpointBarrierHandlerListener {
    private static final Logger LOG = LoggerFactory.getLogger(BarrierBuffer.class);
    private final InputGate inputGate;
    private final boolean[] blockedChannels;
    private final int totalNumberOfInputChannels;
    private final BufferSpiller bufferSpiller;
    private final ArrayDeque<BufferSpiller.SpilledBufferOrEventSequence> queuedBuffered;
    private final long maxBufferedBytes;
    private BufferSpiller.SpilledBufferOrEventSequence currentBuffered;
    private StatefulTask toNotifyOnCheckpoint;
    private long currentCheckpointId;
    private int numBarriersReceived;
    private int numClosedChannels;
    private long numQueuedBytes;
    private long startOfAlignmentTimestamp;
    private long latestAlignmentDurationNanos;
    private boolean endOfStream;

    public BarrierBuffer(InputGate inputGate, IOManager iOManager) throws IOException {
        this(inputGate, iOManager, -1L);
    }

    public BarrierBuffer(InputGate inputGate, IOManager iOManager, long j) throws IOException {
        this.currentCheckpointId = -1L;
        Preconditions.checkArgument(j == -1 || j > 0);
        this.inputGate = inputGate;
        this.maxBufferedBytes = j;
        this.totalNumberOfInputChannels = inputGate.getNumberOfInputChannels();
        this.blockedChannels = new boolean[this.totalNumberOfInputChannels];
        this.bufferSpiller = new BufferSpiller(iOManager, inputGate.getPageSize(), 4096);
        this.queuedBuffered = new ArrayDeque<>();
        this.inputGate.setCheckpointBarrierHandlerListener(this);
    }

    @Override // org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler
    public BufferOrEvent getNextNonBlocked() throws Exception {
        BufferOrEvent next;
        while (true) {
            if (this.currentBuffered == null) {
                next = this.inputGate.getNextBufferOrEvent();
            } else {
                next = this.currentBuffered.getNext();
                if (next == null) {
                    completeBufferedSequence();
                    return getNextNonBlocked();
                }
            }
            if (next == null) {
                if (this.endOfStream) {
                    return null;
                }
                this.endOfStream = true;
                releaseBlocksAndResetBarriers();
                return getNextNonBlocked();
            }
            if (isBlocked(next.getChannelIndex())) {
                this.bufferSpiller.add(next);
                checkSizeLimit();
            } else {
                if (next.isBuffer()) {
                    return next;
                }
                if (next.getEvent().getClass() == CheckpointBarrier.class) {
                    if (!this.endOfStream) {
                        processBarrier((CheckpointBarrier) next.getEvent(), next.getChannelIndex());
                    }
                } else {
                    if (next.getEvent().getClass() != CancelCheckpointMarker.class) {
                        if (next.getEvent().getClass() == EndOfPartitionEvent.class) {
                            processEndOfPartition();
                        }
                        return next;
                    }
                    processCancellationBarrier((CancelCheckpointMarker) next.getEvent());
                }
            }
        }
    }

    @Override // org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler
    public BufferOrEvent getNextNonBlocked(InputElementSelection inputElementSelection) throws Exception {
        if (InputElementSelection.ANY.equals(inputElementSelection)) {
            return getNextNonBlocked();
        }
        throw new UnsupportedOperationException("BarrierBuffer only accept input any.");
    }

    private void completeBufferedSequence() throws IOException {
        LOG.debug("Finished feeding back buffered data");
        this.currentBuffered.cleanup();
        this.currentBuffered = this.queuedBuffered.pollFirst();
        if (this.currentBuffered != null) {
            this.currentBuffered.open();
            this.numQueuedBytes -= this.currentBuffered.size();
        }
    }

    private void processBarrier(CheckpointBarrier checkpointBarrier, int i) throws Exception {
        long id = checkpointBarrier.getId();
        if (this.totalNumberOfInputChannels == 1) {
            if (id > this.currentCheckpointId) {
                this.currentCheckpointId = id;
                notifyCheckpoint(checkpointBarrier);
                return;
            }
            return;
        }
        if (this.numBarriersReceived > 0) {
            if (id == this.currentCheckpointId) {
                onBarrier(i);
            } else {
                if (id <= this.currentCheckpointId) {
                    return;
                }
                LOG.warn("Received checkpoint barrier for checkpoint {} before completing current checkpoint {}. Skipping current checkpoint.", Long.valueOf(id), Long.valueOf(this.currentCheckpointId));
                notifyAbort(this.currentCheckpointId, new CheckpointDeclineSubsumedException(id));
                releaseBlocksAndResetBarriers();
                beginNewAlignment(id, i);
            }
        } else if (id <= this.currentCheckpointId) {
            return;
        } else {
            beginNewAlignment(id, i);
        }
        if (this.numBarriersReceived + this.numClosedChannels == this.totalNumberOfInputChannels) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Received all barriers, triggering checkpoint {} at {}", Long.valueOf(checkpointBarrier.getId()), Long.valueOf(checkpointBarrier.getTimestamp()));
            }
            releaseBlocksAndResetBarriers();
            notifyCheckpoint(checkpointBarrier);
        }
    }

    private void processCancellationBarrier(CancelCheckpointMarker cancelCheckpointMarker) throws Exception {
        long checkpointId = cancelCheckpointMarker.getCheckpointId();
        if (this.totalNumberOfInputChannels == 1) {
            if (checkpointId > this.currentCheckpointId) {
                this.currentCheckpointId = checkpointId;
                notifyAbortOnCancellationBarrier(checkpointId);
                return;
            }
            return;
        }
        if (this.numBarriersReceived <= 0) {
            if (checkpointId > this.currentCheckpointId) {
                this.currentCheckpointId = checkpointId;
                this.startOfAlignmentTimestamp = 0L;
                this.latestAlignmentDurationNanos = 0L;
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Checkpoint {} canceled, skipping alignment", Long.valueOf(checkpointId));
                }
                notifyAbortOnCancellationBarrier(checkpointId);
                return;
            }
            return;
        }
        if (checkpointId == this.currentCheckpointId) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Checkpoint {} canceled, aborting alignment", Long.valueOf(checkpointId));
            }
            releaseBlocksAndResetBarriers();
            notifyAbortOnCancellationBarrier(checkpointId);
            return;
        }
        if (checkpointId > this.currentCheckpointId) {
            LOG.warn("Received cancellation barrier for checkpoint {} before completing current checkpoint {}. Skipping current checkpoint.", Long.valueOf(checkpointId), Long.valueOf(this.currentCheckpointId));
            releaseBlocksAndResetBarriers();
            this.currentCheckpointId = checkpointId;
            this.startOfAlignmentTimestamp = 0L;
            this.latestAlignmentDurationNanos = 0L;
            notifyAbort(this.currentCheckpointId, new CheckpointDeclineSubsumedException(checkpointId));
            notifyAbortOnCancellationBarrier(checkpointId);
        }
    }

    private void processEndOfPartition() throws Exception {
        this.numClosedChannels++;
        if (this.numBarriersReceived > 0) {
            notifyAbort(this.currentCheckpointId, new InputEndOfStreamException());
            releaseBlocksAndResetBarriers();
        }
    }

    private void notifyCheckpoint(CheckpointBarrier checkpointBarrier) throws Exception {
        if (this.toNotifyOnCheckpoint != null) {
            SubtaskCheckpointInfoCollector subtaskCheckpointInfoCollector = new SubtaskCheckpointInfoCollector();
            subtaskCheckpointInfoCollector.setAlignStartInstant(this.startOfAlignmentTimestamp);
            subtaskCheckpointInfoCollector.setAlignEndInstant(this.startOfAlignmentTimestamp + this.latestAlignmentDurationNanos);
            this.toNotifyOnCheckpoint.triggerCheckpointOnBarrier(checkpointBarrier.getId(), checkpointBarrier.getTimestamp(), checkpointBarrier.getCheckpointOptions(), subtaskCheckpointInfoCollector);
        }
    }

    private void notifyAbortOnCancellationBarrier(long j) throws Exception {
        notifyAbort(j, new CheckpointDeclineOnCancellationBarrierException());
    }

    private void notifyAbort(long j, CheckpointDeclineException checkpointDeclineException) throws Exception {
        if (this.toNotifyOnCheckpoint != null) {
            this.toNotifyOnCheckpoint.abortCheckpointOnBarrier(j, checkpointDeclineException);
        }
    }

    private void checkSizeLimit() throws Exception {
        if (this.maxBufferedBytes <= 0 || this.numQueuedBytes + this.bufferSpiller.getBytesWritten() <= this.maxBufferedBytes) {
            return;
        }
        LOG.info("Checkpoint {} aborted because alignment volume limit ({} bytes) exceeded", Long.valueOf(this.currentCheckpointId), Long.valueOf(this.maxBufferedBytes));
        releaseBlocksAndResetBarriers();
        notifyAbort(this.currentCheckpointId, new AlignmentLimitExceededException(this.maxBufferedBytes));
    }

    @Override // org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler
    public void registerCheckpointEventHandler(StatefulTask statefulTask) {
        if (this.toNotifyOnCheckpoint != null) {
            throw new IllegalStateException("BarrierBuffer already has a registered checkpoint notifyee");
        }
        this.toNotifyOnCheckpoint = statefulTask;
    }

    @Override // org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler
    public boolean isEmpty() {
        return this.currentBuffered == null;
    }

    @Override // org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler
    public void cleanup() throws IOException {
        this.bufferSpiller.close();
        if (this.currentBuffered != null) {
            this.currentBuffered.cleanup();
        }
        Iterator<BufferSpiller.SpilledBufferOrEventSequence> it = this.queuedBuffered.iterator();
        while (it.hasNext()) {
            it.next().cleanup();
        }
        this.queuedBuffered.clear();
        this.numQueuedBytes = 0L;
    }

    private void beginNewAlignment(long j, int i) throws IOException {
        this.currentCheckpointId = j;
        onBarrier(i);
        this.startOfAlignmentTimestamp = System.currentTimeMillis();
        if (LOG.isDebugEnabled()) {
            LOG.debug("Starting stream alignment for checkpoint " + j + '.');
        }
    }

    private boolean isBlocked(int i) {
        return this.blockedChannels[i];
    }

    private void onBarrier(int i) throws IOException {
        if (this.blockedChannels[i]) {
            throw new IOException("Stream corrupt: Repeated barrier for same checkpoint on input " + i);
        }
        this.blockedChannels[i] = true;
        this.numBarriersReceived++;
        if (LOG.isDebugEnabled()) {
            LOG.debug("Received barrier from channel " + i);
        }
    }

    private void releaseBlocksAndResetBarriers() throws IOException {
        LOG.debug("End of stream alignment, feeding buffered data back");
        if (this.totalNumberOfInputChannels > 1) {
            this.inputGate.releaseBlockedChannels(this.currentCheckpointId);
        }
        for (int i = 0; i < this.blockedChannels.length; i++) {
            this.blockedChannels[i] = false;
        }
        if (this.currentBuffered == null) {
            this.currentBuffered = this.bufferSpiller.rollOver();
            if (this.currentBuffered != null) {
                this.currentBuffered.open();
            }
        } else {
            LOG.debug("Checkpoint skipped via buffered data:Pushing back current alignment buffers and feeding back new alignment data first.");
            BufferSpiller.SpilledBufferOrEventSequence rollOverWithNewBuffer = this.bufferSpiller.rollOverWithNewBuffer();
            if (rollOverWithNewBuffer != null) {
                rollOverWithNewBuffer.open();
                this.queuedBuffered.addFirst(this.currentBuffered);
                this.numQueuedBytes += this.currentBuffered.size();
                this.currentBuffered = rollOverWithNewBuffer;
            }
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Size of buffered data: {} bytes", Long.valueOf(this.currentBuffered == null ? 0L : this.currentBuffered.size()));
        }
        this.numBarriersReceived = 0;
        if (this.startOfAlignmentTimestamp > 0) {
            this.latestAlignmentDurationNanos = System.currentTimeMillis() - this.startOfAlignmentTimestamp;
            this.startOfAlignmentTimestamp = 0L;
        }
    }

    public long getCurrentCheckpointId() {
        return this.currentCheckpointId;
    }

    @Override // org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler
    public long getAlignmentDurationNanos() {
        long j = this.startOfAlignmentTimestamp;
        return j <= 0 ? this.latestAlignmentDurationNanos : System.currentTimeMillis() - j;
    }

    public String toString() {
        return String.format("last checkpoint: %d, current barriers: %d, closed channels: %d", Long.valueOf(this.currentCheckpointId), Integer.valueOf(this.numBarriersReceived), Integer.valueOf(this.numClosedChannels));
    }

    public void updateBlockStateOnBarrier(InputChannel inputChannel, CheckpointBarrier checkpointBarrier) {
        if (this.totalNumberOfInputChannels == 1) {
            return;
        }
        long id = checkpointBarrier.getId();
        if (id > this.currentCheckpointId || (id == this.currentCheckpointId && this.numBarriersReceived > 0)) {
            inputChannel.blockChannel(id);
        }
    }

    public boolean isAllowReadBlockedChannels() {
        return false;
    }
}
