package org.apache.flink.streaming.runtime.tasks;

import java.util.ArrayDeque;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotReadyException;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunctionV2;
import org.apache.flink.streaming.api.functions.source.SourceRecord;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamSourceContexts;
import org.apache.flink.streaming.api.operators.StreamSourceV2;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/SourceStreamTaskV2.class */
public class SourceStreamTaskV2<OUT, SRC extends SourceFunctionV2<OUT>, OP extends StreamSourceV2<OUT, SRC>> extends StreamTask<OUT, OP> {
    private static final Logger LOG = LoggerFactory.getLogger(SourceStreamTaskV2.class);
    private final ArrayDeque<Tuple2<CheckpointMetaData, CheckpointOptions>> pendingCheckpoints;
    private volatile boolean checkpointTriggered;
    private volatile boolean running;

    public SourceStreamTaskV2(Environment environment) {
        super(environment);
        this.checkpointTriggered = false;
        this.running = true;
        this.pendingCheckpoints = new ArrayDeque<>();
    }

    @Override // org.apache.flink.streaming.runtime.tasks.StreamTask
    protected void init() {
    }

    @Override // org.apache.flink.streaming.runtime.tasks.StreamTask
    protected void cleanup() {
        synchronized (this.pendingCheckpoints) {
            this.pendingCheckpoints.clear();
            this.checkpointTriggered = false;
        }
    }

    @Override // org.apache.flink.streaming.runtime.tasks.StreamTask
    protected void run() throws Exception {
        Tuple2<CheckpointMetaData, CheckpointOptions> poll;
        OP headOperator = getHeadOperator();
        SourceFunction.SourceContext sourceContext = StreamSourceContexts.getSourceContext(headOperator.getOperatorConfig().getTimeCharacteristic(), getProcessingTimeService(), getCheckpointLock(), getStreamStatusMaintainer(), headOperator.getOutput(), headOperator.getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval(), -1L);
        boolean z = false;
        while (this.running) {
            try {
                SourceRecord<OUT> next = headOperator.next();
                if (next != null) {
                    OUT record = next.getRecord();
                    if (record != null) {
                        if (next.getTimestamp() > 0) {
                            sourceContext.collectWithTimestamp(record, next.getTimestamp());
                        } else {
                            sourceContext.collect(record);
                        }
                    }
                    if (next.getWatermark() != null) {
                        sourceContext.emitWatermark(next.getWatermark());
                    }
                } else {
                    z = true;
                }
                if (headOperator.isFinished()) {
                    break;
                }
                if (this.checkpointTriggered) {
                    z = false;
                    synchronized (this.pendingCheckpoints) {
                        poll = this.pendingCheckpoints.poll();
                        if (this.pendingCheckpoints.isEmpty()) {
                            this.checkpointTriggered = false;
                        }
                    }
                    if (poll != null && !super.triggerCheckpoint((CheckpointMetaData) poll.f0, (CheckpointOptions) poll.f1)) {
                        getEnvironment().declineCheckpoint(((CheckpointMetaData) poll.f0).getCheckpointId(), new CheckpointDeclineTaskNotReadyException(getName()));
                    }
                }
                if (z) {
                    Thread.sleep(10L);
                    z = false;
                }
            } finally {
                if (sourceContext != null) {
                    sourceContext.close();
                }
            }
        }
        if (this.running) {
            sourceContext.emitWatermark(Watermark.MAX_WATERMARK);
            synchronized (getCheckpointLock()) {
                for (StreamOperator<?> streamOperator : this.operatorChain.getAllOperatorsTopologySorted()) {
                    if (streamOperator instanceof OneInputStreamOperator) {
                        ((OneInputStreamOperator) streamOperator).endInput();
                    }
                }
            }
        } else {
            headOperator.cancel();
        }
    }

    @Override // org.apache.flink.streaming.runtime.tasks.StreamTask
    protected void cancelTask() throws Exception {
        this.running = false;
    }

    @Override // org.apache.flink.streaming.runtime.tasks.StreamTask
    public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) throws Exception {
        if (!this.running) {
            return false;
        }
        synchronized (this.pendingCheckpoints) {
            if (this.pendingCheckpoints.size() >= this.maxConcurrentCheckpoints) {
                this.pendingCheckpoints.poll();
            }
            this.pendingCheckpoints.add(Tuple2.of(checkpointMetaData, checkpointOptions));
            this.checkpointTriggered = true;
        }
        return true;
    }

    protected OP getHeadOperator() {
        Preconditions.checkState(this.operatorChain.getHeadOperators().length == 1, "There should only one head operator, not " + this.operatorChain.getHeadOperators().length);
        return (OP) this.operatorChain.getHeadOperators()[0];
    }
}
