/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import java.util.ArrayDeque;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotReadyException;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunctionV2;
import org.apache.flink.streaming.api.functions.source.SourceRecord;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamSourceContexts;
import org.apache.flink.streaming.api.operators.StreamSourceV2;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class SourceStreamTaskV2<OUT, SRC extends SourceFunctionV2<OUT>, OP extends StreamSourceV2<OUT, SRC>>
extends StreamTask<OUT, OP> {
    private static final Logger LOG = LoggerFactory.getLogger(SourceStreamTaskV2.class);
    private final ArrayDeque<Tuple2<CheckpointMetaData, CheckpointOptions>> pendingCheckpoints = new ArrayDeque();
    private volatile boolean checkpointTriggered = false;
    private volatile boolean running = true;

    public SourceStreamTaskV2(Environment env) {
        super(env);
    }

    @Override
    protected void init() {
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void cleanup() {
        ArrayDeque<Tuple2<CheckpointMetaData, CheckpointOptions>> arrayDeque = this.pendingCheckpoints;
        synchronized (arrayDeque) {
            this.pendingCheckpoints.clear();
            this.checkpointTriggered = false;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void run() throws Exception {
        block22: {
            OP headOperator = this.getHeadOperator();
            TimeCharacteristic timeCharacteristic = ((AbstractStreamOperator)headOperator).getOperatorConfig().getTimeCharacteristic();
            Output collector = ((AbstractStreamOperator)headOperator).getOutput();
            long watermarkInterval = ((AbstractStreamOperator)headOperator).getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval();
            SourceFunction.SourceContext ctx = StreamSourceContexts.getSourceContext(timeCharacteristic, this.getProcessingTimeService(), this.getCheckpointLock(), this.getStreamStatusMaintainer(), collector, watermarkInterval, -1L);
            try {
                boolean isIdle = false;
                while (this.running) {
                    SourceRecord sourceRecord = ((StreamSourceV2)headOperator).next();
                    if (sourceRecord != null) {
                        Object out = sourceRecord.getRecord();
                        if (out != null) {
                            if (sourceRecord.getTimestamp() > 0L) {
                                ctx.collectWithTimestamp(out, sourceRecord.getTimestamp());
                            } else {
                                ctx.collect(out);
                            }
                        }
                        if (sourceRecord.getWatermark() != null) {
                            ctx.emitWatermark(sourceRecord.getWatermark());
                        }
                    } else {
                        isIdle = true;
                    }
                    if (((StreamSourceV2)headOperator).isFinished()) break;
                    if (this.checkpointTriggered) {
                        Tuple2<CheckpointMetaData, CheckpointOptions> checkpointInfos;
                        isIdle = false;
                        ArrayDeque<Tuple2<CheckpointMetaData, CheckpointOptions>> arrayDeque = this.pendingCheckpoints;
                        synchronized (arrayDeque) {
                            checkpointInfos = this.pendingCheckpoints.poll();
                            if (this.pendingCheckpoints.isEmpty()) {
                                this.checkpointTriggered = false;
                            }
                        }
                        if (checkpointInfos != null && !super.triggerCheckpoint((CheckpointMetaData)checkpointInfos.f0, (CheckpointOptions)checkpointInfos.f1)) {
                            this.getEnvironment().declineCheckpoint(((CheckpointMetaData)checkpointInfos.f0).getCheckpointId(), (Throwable)new CheckpointDeclineTaskNotReadyException(this.getName()));
                        }
                    }
                    if (!isIdle) continue;
                    Thread.sleep(10L);
                    isIdle = false;
                }
                if (this.running) {
                    ctx.emitWatermark(Watermark.MAX_WATERMARK);
                    Object object = this.getCheckpointLock();
                    synchronized (object) {
                        for (StreamOperator<?> operator : this.operatorChain.getAllOperatorsTopologySorted()) {
                            if (!(operator instanceof OneInputStreamOperator)) continue;
                            ((OneInputStreamOperator)operator).endInput();
                        }
                        break block22;
                    }
                }
                ((StreamSourceV2)headOperator).cancel();
            }
            finally {
                if (ctx != null) {
                    ctx.close();
                    ctx = null;
                }
            }
        }
    }

    @Override
    protected void cancelTask() throws Exception {
        this.running = false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) throws Exception {
        if (!this.running) {
            return false;
        }
        ArrayDeque<Tuple2<CheckpointMetaData, CheckpointOptions>> arrayDeque = this.pendingCheckpoints;
        synchronized (arrayDeque) {
            if (this.pendingCheckpoints.size() >= this.maxConcurrentCheckpoints) {
                this.pendingCheckpoints.poll();
            }
            this.pendingCheckpoints.add((Tuple2<CheckpointMetaData, CheckpointOptions>)Tuple2.of((Object)checkpointMetaData, (Object)checkpointOptions));
            this.checkpointTriggered = true;
        }
        return true;
    }

    protected OP getHeadOperator() {
        Preconditions.checkState((this.operatorChain.getHeadOperators().length == 1 ? 1 : 0) != 0, (Object)("There should only one head operator, not " + this.operatorChain.getHeadOperators().length));
        return (OP)((StreamSourceV2)this.operatorChain.getHeadOperators()[0]);
    }
}

