package org.apache.flink.streaming.api.operators.async;

import java.util.Collection;
import java.util.Iterator;
import org.apache.flink.annotation.Internal;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
import org.apache.flink.streaming.api.operators.async.queue.AsyncCollectionResult;
import org.apache.flink.streaming.api.operators.async.queue.AsyncResult;
import org.apache.flink.streaming.api.operators.async.queue.AsyncWatermarkResult;
import org.apache.flink.streaming.api.operators.async.queue.StreamElementQueue;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/api/operators/async/Emitter.class */
public class Emitter<OUT> implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(Emitter.class);
    private final Object checkpointLock;
    private final Output<StreamRecord<OUT>> output;
    private final StreamElementQueue streamElementQueue;
    private final OperatorActions operatorActions;
    private final TimestampedCollector<OUT> timestampedCollector;
    private volatile boolean running = true;

    public Emitter(Object obj, Output<StreamRecord<OUT>> output, StreamElementQueue streamElementQueue, OperatorActions operatorActions) {
        this.checkpointLock = Preconditions.checkNotNull(obj, "checkpointLock");
        this.output = (Output) Preconditions.checkNotNull(output, "output");
        this.streamElementQueue = (StreamElementQueue) Preconditions.checkNotNull(streamElementQueue, "asyncCollectorBuffer");
        this.operatorActions = (OperatorActions) Preconditions.checkNotNull(operatorActions, "operatorActions");
        this.timestampedCollector = new TimestampedCollector<>(this.output);
    }

    @Override // java.lang.Runnable
    public void run() {
        while (this.running) {
            try {
                LOG.debug("Wait for next completed async stream element result.");
                output(this.streamElementQueue.peekBlockingly());
            } catch (InterruptedException e) {
                if (this.running) {
                    this.operatorActions.failOperator(e);
                    return;
                } else {
                    LOG.debug("Emitter thread got interrupted. This indicates that the emitter should shut down.", e);
                    return;
                }
            } catch (Throwable th) {
                this.operatorActions.failOperator(new Exception("AsyncWaitOperator's emitter caught an unexpected throwable.", th));
                return;
            }
        }
    }

    private void output(AsyncResult asyncResult) throws InterruptedException {
        if (asyncResult.isWatermark()) {
            synchronized (this.checkpointLock) {
                AsyncWatermarkResult asWatermark = asyncResult.asWatermark();
                LOG.debug("Output async watermark.");
                this.output.emitWatermark(asWatermark.getWatermark());
                this.streamElementQueue.poll();
                this.checkpointLock.notifyAll();
            }
            return;
        }
        AsyncCollectionResult asResultCollection = asyncResult.asResultCollection();
        if (asResultCollection.hasTimestamp()) {
            this.timestampedCollector.setAbsoluteTimestamp(asResultCollection.getTimestamp());
        } else {
            this.timestampedCollector.eraseTimestamp();
        }
        synchronized (this.checkpointLock) {
            LOG.debug("Output async stream element collection result.");
            try {
                Collection collection = asResultCollection.get();
                if (collection != null) {
                    Iterator it = collection.iterator();
                    while (it.hasNext()) {
                        this.timestampedCollector.collect(it.next());
                    }
                }
            } catch (Exception e) {
                this.operatorActions.failOperator(new Exception("An async function call terminated with an exception. Failing the AsyncWaitOperator.", e));
            }
            this.streamElementQueue.poll();
            this.checkpointLock.notifyAll();
        }
    }

    public void stop() {
        this.running = false;
    }
}
