package org.apache.flink.streaming.runtime.tasks;

import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunctionV2;
import org.apache.flink.streaming.api.functions.source.SourceRecord;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamSourceContexts;
import org.apache.flink.streaming.api.operators.StreamSourceV2;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.checkpointlock.CheckpointLockDelegate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/InputFormatSourceStreamTaskV2.class */
public class InputFormatSourceStreamTaskV2<OUT, SRC extends SourceFunctionV2<OUT>, OP extends StreamSourceV2<OUT, SRC>> extends StreamTask<OUT, OP> {
    private static final Logger LOG = LoggerFactory.getLogger(InputFormatSourceStreamTaskV2.class);
    private boolean running;

    public InputFormatSourceStreamTaskV2(Environment environment) {
        super(environment);
        this.running = true;
    }

    @Override // org.apache.flink.streaming.runtime.tasks.StreamTask
    protected void init() {
    }

    @Override // org.apache.flink.streaming.runtime.tasks.StreamTask
    protected void cleanup() {
    }

    @Override // org.apache.flink.streaming.runtime.tasks.StreamTask
    protected void run() throws Exception {
        SourceRecord<OUT> next;
        OP headOperator = getHeadOperator();
        SourceFunction.SourceContext sourceContext = StreamSourceContexts.getSourceContext(headOperator.getOperatorConfig().getTimeCharacteristic(), getProcessingTimeService(), getCheckpointLock(), getStreamStatusMaintainer(), headOperator.getOutput(), headOperator.getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval(), -1L);
        while (this.running && (next = headOperator.next()) != null) {
            try {
                OUT record = next.getRecord();
                if (record != null) {
                    sourceContext.collect(record);
                }
            } finally {
                if (sourceContext != null) {
                    sourceContext.close();
                }
            }
        }
        if (this.running) {
            new CheckpointLockDelegate(getCheckpointLock()).lockAndRun(() -> {
                for (StreamOperator<?> streamOperator : this.operatorChain.getAllOperatorsTopologySorted()) {
                    if (streamOperator instanceof OneInputStreamOperator) {
                        ((OneInputStreamOperator) streamOperator).endInput();
                    }
                }
            });
        } else {
            headOperator.cancel();
        }
    }

    @Override // org.apache.flink.streaming.runtime.tasks.StreamTask
    protected void cancelTask() throws Exception {
        this.running = false;
    }

    protected OP getHeadOperator() {
        Preconditions.checkState(this.operatorChain.getHeadOperators().length == 1, "There should only one head operator, not " + this.operatorChain.getHeadOperators().length);
        return (OP) this.operatorChain.getHeadOperators()[0];
    }
}
