/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators.async;

import java.util.Collection;
import org.apache.flink.annotation.Internal;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
import org.apache.flink.streaming.api.operators.async.OperatorActions;
import org.apache.flink.streaming.api.operators.async.queue.AsyncCollectionResult;
import org.apache.flink.streaming.api.operators.async.queue.AsyncResult;
import org.apache.flink.streaming.api.operators.async.queue.AsyncWatermarkResult;
import org.apache.flink.streaming.api.operators.async.queue.StreamElementQueue;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.checkpointlock.CheckpointLockDelegate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class Emitter<OUT>
implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(Emitter.class);
    private final CheckpointLockDelegate checkpointLockDelegate;
    private final Output<StreamRecord<OUT>> output;
    private final StreamElementQueue streamElementQueue;
    private final OperatorActions operatorActions;
    private final TimestampedCollector<OUT> timestampedCollector;
    private volatile boolean running;

    public Emitter(Object checkpointLock, Output<StreamRecord<OUT>> output, StreamElementQueue streamElementQueue, OperatorActions operatorActions) {
        this.checkpointLockDelegate = new CheckpointLockDelegate(Preconditions.checkNotNull((Object)checkpointLock, (String)"checkpointLock"));
        this.output = (Output)Preconditions.checkNotNull(output, (String)"output");
        this.streamElementQueue = (StreamElementQueue)Preconditions.checkNotNull((Object)streamElementQueue, (String)"streamElementQueue");
        this.operatorActions = (OperatorActions)Preconditions.checkNotNull((Object)operatorActions, (String)"operatorActions");
        this.timestampedCollector = new TimestampedCollector(this.output);
        this.running = true;
    }

    @Override
    public void run() {
        try {
            while (this.running) {
                LOG.debug("Wait for next completed async stream element result.");
                AsyncResult streamElementEntry = this.streamElementQueue.peekBlockingly();
                this.output(streamElementEntry);
            }
        }
        catch (InterruptedException e) {
            if (this.running) {
                this.operatorActions.failOperator(e);
            } else {
                LOG.debug("Emitter thread got interrupted, shutting down.");
            }
        }
        catch (Throwable t) {
            this.operatorActions.failOperator(new Exception("AsyncWaitOperator's emitter caught an unexpected throwable.", t));
        }
    }

    private void output(AsyncResult asyncResult) throws Exception {
        if (asyncResult.isWatermark()) {
            this.checkpointLockDelegate.lockAndRun(() -> {
                AsyncWatermarkResult asyncWatermarkResult = asyncResult.asWatermark();
                LOG.debug("Output async watermark.");
                this.output.emitWatermark(asyncWatermarkResult.getWatermark());
                this.streamElementQueue.poll();
                this.checkpointLockDelegate.signalAll();
            });
        } else {
            AsyncCollectionResult streamRecordResult = asyncResult.asResultCollection();
            if (streamRecordResult.hasTimestamp()) {
                this.timestampedCollector.setAbsoluteTimestamp(streamRecordResult.getTimestamp());
            } else {
                this.timestampedCollector.eraseTimestamp();
            }
            this.checkpointLockDelegate.lockAndRun(() -> {
                LOG.debug("Output async stream element collection result.");
                try {
                    Collection resultCollection = streamRecordResult.get();
                    if (resultCollection != null) {
                        for (Object result : resultCollection) {
                            this.timestampedCollector.collect(result);
                        }
                    }
                }
                catch (Exception e) {
                    this.operatorActions.failOperator(new Exception("An async function call terminated with an exception. Failing the AsyncWaitOperator.", e));
                }
                this.streamElementQueue.poll();
                this.checkpointLockDelegate.signalAll();
            });
        }
    }

    public void stop() {
        this.running = false;
    }
}

