package org.apache.flink.streaming.runtime.io;

import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceRecord;
import org.apache.flink.streaming.api.operators.StreamSourceV2;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.io.InputFetcher;
import org.apache.flink.streaming.runtime.tasks.InputSelector;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.checkpointlock.CheckpointLockDelegate;

/* loaded from: input_file:org/apache/flink/streaming/runtime/io/SourceFetcher.class */
class SourceFetcher implements InputFetcher {
    private final StreamSourceV2 operator;
    private final SourceInputProcessor processor;
    private final SourceFunction.SourceContext context;
    private final InputSelector.InputSelection inputSelection;
    private boolean isIdle = false;
    private volatile boolean finishedInput = false;
    private InputFetcher.InputFetcherAvailableListener listener;

    public SourceFetcher(InputSelector.InputSelection inputSelection, StreamSourceV2 streamSourceV2, SourceFunction.SourceContext sourceContext, SourceInputProcessor sourceInputProcessor) {
        this.inputSelection = (InputSelector.InputSelection) Preconditions.checkNotNull(inputSelection);
        this.operator = (StreamSourceV2) Preconditions.checkNotNull(streamSourceV2);
        this.processor = (SourceInputProcessor) Preconditions.checkNotNull(sourceInputProcessor);
        this.context = (SourceFunction.SourceContext) Preconditions.checkNotNull(sourceContext);
    }

    @Override // org.apache.flink.streaming.runtime.io.InputFetcher
    public void setup() throws Exception {
    }

    @Override // org.apache.flink.streaming.runtime.io.InputFetcher
    public boolean fetchAndProcess() throws Exception {
        if (isFinished()) {
            finishInput();
            return false;
        }
        SourceRecord next = this.operator.next();
        if (next != null) {
            Object record = next.getRecord();
            if (next.getWatermark() != null) {
                this.context.emitWatermark(next.getWatermark());
            }
            if (record != null) {
                if (next.getTimestamp() > 0) {
                    this.context.collectWithTimestamp(record, next.getTimestamp());
                } else {
                    this.context.collect(record);
                }
            }
            this.isIdle = false;
        } else {
            this.context.markAsTemporarilyIdle();
            this.isIdle = true;
        }
        if (!isFinished()) {
            return !this.isIdle;
        }
        finishInput();
        return false;
    }

    private void finishInput() throws Exception {
        if (this.finishedInput) {
            return;
        }
        this.context.emitWatermark(Watermark.MAX_WATERMARK);
        new CheckpointLockDelegate(this.context.getCheckpointLock()).lockAndRun(() -> {
            this.processor.endInput();
            this.processor.release();
        });
        this.finishedInput = true;
    }

    @Override // org.apache.flink.streaming.runtime.io.InputFetcher
    public boolean isFinished() {
        return this.operator.isFinished();
    }

    @Override // org.apache.flink.streaming.runtime.io.InputFetcher
    public boolean moreAvailable() {
        return !isFinished();
    }

    @Override // org.apache.flink.streaming.runtime.io.InputFetcher
    public void cleanup() {
    }

    @Override // org.apache.flink.streaming.runtime.io.InputFetcher
    public void cancel() {
        this.operator.cancel();
    }

    @Override // org.apache.flink.streaming.runtime.io.InputFetcher
    public InputSelector.InputSelection getInputSelection() {
        return this.inputSelection;
    }

    @Override // org.apache.flink.streaming.runtime.io.InputFetcher
    public void registerAvailableListener(InputFetcher.InputFetcherAvailableListener inputFetcherAvailableListener) {
        Preconditions.checkState(this.listener == null);
        this.listener = inputFetcherAvailableListener;
    }
}
