package org.apache.flink.streaming.api.operators;

import java.util.concurrent.ScheduledFuture;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.metrics.SumAndCount;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.util.OutputTag;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/api/operators/StreamSource.class */
public class StreamSource<OUT, SRC extends SourceFunction<OUT>> extends AbstractUdfStreamOperator<OUT, SRC> implements StreamOperator<OUT> {
    private static final long serialVersionUID = 1;
    private transient SourceFunction.SourceContext<OUT> ctx;
    private volatile transient boolean canceledOrStopped;
    private transient boolean enableTracingMetrics;
    private transient int tracingMetricsInterval;
    private transient SumAndCount taskLatency;
    private transient SumAndCount sourceLatency;
    private transient Counter numElementsReceived;
    private volatile transient boolean hasSentMaxWatermark;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/api/operators/StreamSource$LatencyMarksEmitter.class */
    public static class LatencyMarksEmitter<OUT> {
        private final ScheduledFuture<?> latencyMarkTimer;

        public LatencyMarksEmitter(ProcessingTimeService processingTimeService, final Output<StreamRecord<OUT>> output, long j, final OperatorID operatorID, final int i) {
            this.latencyMarkTimer = processingTimeService.scheduleAtFixedRate(new ProcessingTimeCallback() { // from class: org.apache.flink.streaming.api.operators.StreamSource.LatencyMarksEmitter.1
                @Override // org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback
                public void onProcessingTime(long j2) throws Exception {
                    try {
                        output.emitLatencyMarker(new LatencyMarker(j2, operatorID, i));
                    } catch (Throwable th) {
                        AbstractStreamOperator.LOG.warn("Error while emitting latency marker.", th);
                    }
                }
            }, 0L, j);
        }

        public void close() {
            this.latencyMarkTimer.cancel(true);
        }
    }

    public StreamSource(SRC src) {
        super(src);
        this.canceledOrStopped = false;
        this.enableTracingMetrics = false;
        this.hasSentMaxWatermark = false;
        this.chainingStrategy = ChainingStrategy.HEAD;
    }

    public void run(Object obj, StreamStatusMaintainer streamStatusMaintainer) throws Exception {
        run(obj, streamStatusMaintainer, this.output);
    }

    public void run(Object obj, StreamStatusMaintainer streamStatusMaintainer, Output<StreamRecord<OUT>> output) throws Exception {
        TimeCharacteristic timeCharacteristic = getOperatorConfig().getTimeCharacteristic();
        this.enableTracingMetrics = getRuntimeContext().getExecutionConfig().isTracingMetricsEnabled();
        if (this.enableTracingMetrics) {
            if (this.taskLatency == null) {
                this.taskLatency = new SumAndCount("taskLatency", getRuntimeContext().getMetricGroup().parent());
            }
            if (this.sourceLatency == null) {
                this.sourceLatency = new SumAndCount("sourceLatency", getRuntimeContext().getMetricGroup().parent());
            }
            this.tracingMetricsInterval = getRuntimeContext().getExecutionConfig().getTracingMetricsInterval();
        }
        if (this.numElementsReceived == null) {
            this.numElementsReceived = getRuntimeContext().getMetricGroup().parent().getIOMetricGroup().getNumRecordsReceived();
        }
        LatencyMarksEmitter latencyMarksEmitter = null;
        if (getExecutionConfig().isLatencyTrackingEnabled()) {
            latencyMarksEmitter = new LatencyMarksEmitter(getProcessingTimeService(), output, getExecutionConfig().getLatencyTrackingInterval(), getOperatorID(), getRuntimeContext().getIndexOfThisSubtask());
        }
        this.ctx = getSourceContext(timeCharacteristic, getProcessingTimeService(), obj, streamStatusMaintainer, output, getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval());
        try {
            ((SourceFunction) this.userFunction).run(this.ctx);
            if (!isCanceledOrStopped()) {
                advanceToEndOfEventTime();
            }
        } finally {
            this.ctx.close();
            if (latencyMarksEmitter != null) {
                latencyMarksEmitter.close();
            }
        }
    }

    public void advanceToEndOfEventTime() {
        if (this.hasSentMaxWatermark) {
            return;
        }
        this.ctx.emitWatermark(Watermark.MAX_WATERMARK);
        this.hasSentMaxWatermark = true;
    }

    @VisibleForTesting
    protected SourceFunction.SourceContext<OUT> getSourceContext(TimeCharacteristic timeCharacteristic, ProcessingTimeService processingTimeService, Object obj, StreamStatusMaintainer streamStatusMaintainer, Output<StreamRecord<OUT>> output, boolean z, int i, SumAndCount sumAndCount, SumAndCount sumAndCount2, Counter counter, long j) {
        this.enableTracingMetrics = z;
        this.tracingMetricsInterval = i;
        this.taskLatency = sumAndCount;
        this.sourceLatency = sumAndCount2;
        this.numElementsReceived = counter;
        return getSourceContext(timeCharacteristic, processingTimeService, obj, streamStatusMaintainer, output, j);
    }

    private SourceFunction.SourceContext<OUT> getSourceContext(TimeCharacteristic timeCharacteristic, ProcessingTimeService processingTimeService, Object obj, StreamStatusMaintainer streamStatusMaintainer, Output<StreamRecord<OUT>> output, long j) {
        return StreamSourceContexts.getSourceContext(timeCharacteristic, processingTimeService, obj, streamStatusMaintainer, getOutputWithTaskLatency(output, this.enableTracingMetrics, this.tracingMetricsInterval, this.taskLatency, this.sourceLatency, this.numElementsReceived), j, -1L);
    }

    private Output<StreamRecord<OUT>> getOutputWithTaskLatency(final Output<StreamRecord<OUT>> output, final boolean z, final int i, final SumAndCount sumAndCount, final SumAndCount sumAndCount2, final Counter counter) {
        return new Output<StreamRecord<OUT>>() { // from class: org.apache.flink.streaming.api.operators.StreamSource.1
            private long lastEmitTime = 0;

            @Override // org.apache.flink.streaming.api.operators.Output
            public void emitWatermark(Watermark watermark) {
                counter.inc();
                if (z && counter.getCount() % i == 0) {
                    emitWatermarkWithMetrics(watermark);
                } else {
                    output.emitWatermark(watermark);
                }
            }

            public void emitWatermarkWithMetrics(Watermark watermark) {
                long nanoTime = System.nanoTime();
                if (this.lastEmitTime > 0) {
                    sumAndCount2.update(nanoTime - this.lastEmitTime);
                }
                output.emitWatermark(watermark);
                this.lastEmitTime = System.nanoTime();
                sumAndCount.update(this.lastEmitTime - nanoTime);
            }

            @Override // org.apache.flink.streaming.api.operators.Output
            public void emitLatencyMarker(LatencyMarker latencyMarker) {
                counter.inc();
                if (z && counter.getCount() % i == 0) {
                    emitLatencyMarkWithMetrics(latencyMarker);
                } else {
                    output.emitLatencyMarker(latencyMarker);
                }
            }

            public void emitLatencyMarkWithMetrics(LatencyMarker latencyMarker) {
                long nanoTime = System.nanoTime();
                if (this.lastEmitTime > 0) {
                    sumAndCount2.update(nanoTime - this.lastEmitTime);
                }
                output.emitLatencyMarker(latencyMarker);
                this.lastEmitTime = System.nanoTime();
                sumAndCount.update(this.lastEmitTime - nanoTime);
            }

            public void collect(StreamRecord<OUT> streamRecord) {
                counter.inc();
                if (z && counter.getCount() % i == 0) {
                    collectWithMetrics(streamRecord);
                } else {
                    output.collect(streamRecord);
                }
            }

            public void collectWithMetrics(StreamRecord<OUT> streamRecord) {
                long nanoTime = System.nanoTime();
                if (this.lastEmitTime > 0) {
                    sumAndCount2.update(nanoTime - this.lastEmitTime);
                }
                output.collect(streamRecord);
                this.lastEmitTime = System.nanoTime();
                sumAndCount.update(this.lastEmitTime - nanoTime);
            }

            @Override // org.apache.flink.streaming.api.operators.Output
            public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> streamRecord) {
                counter.inc();
                if (z && counter.getCount() % i == 0) {
                    collectWithMetrics(outputTag, streamRecord);
                } else {
                    output.collect(outputTag, streamRecord);
                }
            }

            public <X> void collectWithMetrics(OutputTag<X> outputTag, StreamRecord<X> streamRecord) {
                long nanoTime = System.nanoTime();
                if (this.lastEmitTime > 0) {
                    sumAndCount2.update(nanoTime - this.lastEmitTime);
                }
                output.collect(outputTag, streamRecord);
                this.lastEmitTime = System.nanoTime();
                sumAndCount.update(this.lastEmitTime - nanoTime);
            }

            public void close() {
                output.close();
            }
        };
    }

    public void cancel() {
        markCanceledOrStopped();
        ((SourceFunction) this.userFunction).cancel();
        if (this.ctx != null) {
            this.ctx.close();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void markCanceledOrStopped() {
        this.canceledOrStopped = true;
    }

    public boolean isCanceledOrStopped() {
        return this.canceledOrStopped;
    }
}
