package org.apache.flink.streaming.api.operators.async.queue;

import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.Executor;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.flink.annotation.Internal;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.api.operators.async.OperatorActions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueue.class */
public class OrderedStreamElementQueue extends StreamElementQueue {
    private static final Logger LOG;
    private final ReentrantLock lock;
    private final Condition notFull;
    private final Condition headIsCompleted;
    private final ArrayDeque<StreamElementQueueEntry<?>> queue;
    static final /* synthetic */ boolean $assertionsDisabled;

    public OrderedStreamElementQueue(int i, Executor executor, OperatorActions operatorActions, boolean z, int i2, MetricGroup metricGroup) {
        super(i, executor, operatorActions, z, i2, metricGroup);
        this.lock = new ReentrantLock(false);
        this.headIsCompleted = this.lock.newCondition();
        this.notFull = this.lock.newCondition();
        this.queue = new ArrayDeque<>(i);
    }

    @Override // org.apache.flink.streaming.api.operators.async.queue.StreamElementQueue
    protected AsyncResult internalPeekBlockingly() throws InterruptedException {
        this.lock.lockInterruptibly();
        while (true) {
            try {
                if (!this.queue.isEmpty() && this.queue.peek().isDone()) {
                    LOG.debug("Peeked head element from ordered stream element queue with filling degree ({}/{}).", Integer.valueOf(this.queue.size()), Integer.valueOf(this.capacity));
                    return this.queue.peek();
                }
                this.headIsCompleted.await();
            } finally {
                this.lock.unlock();
            }
        }
    }

    @Override // org.apache.flink.streaming.api.operators.async.queue.StreamElementQueue
    protected AsyncResult internalPoll() throws InterruptedException {
        this.lock.lockInterruptibly();
        while (true) {
            try {
                if (!this.queue.isEmpty() && this.queue.peek().isDone()) {
                    this.notFull.signalAll();
                    LOG.debug("Polled head element from ordered stream element queue. New filling degree ({}/{}).", Integer.valueOf(this.queue.size() - 1), Integer.valueOf(this.capacity));
                    return this.queue.poll();
                }
                this.headIsCompleted.await();
            } finally {
                this.lock.unlock();
            }
        }
    }

    @Override // org.apache.flink.streaming.api.operators.async.queue.StreamElementQueue
    public Collection<StreamElementQueueEntry<?>> values() throws InterruptedException {
        this.lock.lockInterruptibly();
        try {
            return Arrays.asList((StreamElementQueueEntry[]) this.queue.toArray(new StreamElementQueueEntry[this.queue.size()]));
        } finally {
            this.lock.unlock();
        }
    }

    @Override // org.apache.flink.streaming.api.operators.async.queue.StreamElementQueue
    public boolean isEmpty() {
        return this.queue.isEmpty();
    }

    @Override // org.apache.flink.streaming.api.operators.async.queue.StreamElementQueue
    public int size() {
        return this.queue.size();
    }

    @Override // org.apache.flink.streaming.api.operators.async.queue.StreamElementQueue
    public <T> void put(StreamElementQueueEntry<T> streamElementQueueEntry) throws InterruptedException {
        this.lock.lockInterruptibly();
        while (this.queue.size() >= this.capacity) {
            try {
                this.notFull.await();
            } finally {
                this.lock.unlock();
            }
        }
        addEntry(streamElementQueueEntry);
    }

    @Override // org.apache.flink.streaming.api.operators.async.queue.StreamElementQueue
    public <T> boolean tryPut(StreamElementQueueEntry<T> streamElementQueueEntry) throws InterruptedException {
        this.lock.lockInterruptibly();
        try {
            if (this.queue.size() >= this.capacity) {
                LOG.debug("Failed to put element into ordered stream element queue because it was full ({}/{}).", Integer.valueOf(this.queue.size()), Integer.valueOf(this.capacity));
                return false;
            }
            addEntry(streamElementQueueEntry);
            LOG.debug("Put element into ordered stream element queue. New filling degree ({}/{}).", Integer.valueOf(this.queue.size()), Integer.valueOf(this.capacity));
            return true;
        } finally {
            this.lock.unlock();
        }
    }

    private <T> void addEntry(StreamElementQueueEntry<T> streamElementQueueEntry) {
        if (!$assertionsDisabled && !this.lock.isHeldByCurrentThread()) {
            throw new AssertionError();
        }
        this.queue.addLast(streamElementQueueEntry);
        streamElementQueueEntry.onComplete(streamElementQueueEntry2 -> {
            try {
                onCompleteHandler(streamElementQueueEntry2);
            } catch (InterruptedException e) {
                LOG.debug("AsyncBufferEntry could not be properly completed because the executor thread has been interrupted.", e);
            } catch (Throwable th) {
                this.operatorActions.failOperator(new Exception("Could not complete the stream element queue entry: " + streamElementQueueEntry2 + '.', th));
            }
        }, this.executor);
    }

    private void onCompleteHandler(StreamElementQueueEntry<?> streamElementQueueEntry) throws InterruptedException {
        this.lock.lockInterruptibly();
        try {
            if (!this.queue.isEmpty() && this.queue.peek().isDone()) {
                LOG.debug("Signal ordered stream element queue has completed head element.");
                this.headIsCompleted.signalAll();
            }
        } finally {
            this.lock.unlock();
        }
    }

    static {
        $assertionsDisabled = !OrderedStreamElementQueue.class.desiredAssertionStatus();
        LOG = LoggerFactory.getLogger(OrderedStreamElementQueue.class);
    }
}
