/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators;

import java.util.concurrent.ScheduledFuture;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.metrics.SumAndCount;
import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamSourceContexts;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.util.OutputTag;

@Internal
public class StreamSource<OUT, SRC extends SourceFunction<OUT>>
extends AbstractUdfStreamOperator<OUT, SRC>
implements StreamOperator<OUT> {
    private static final long serialVersionUID = 1L;
    private transient SourceFunction.SourceContext<OUT> ctx;
    private volatile transient boolean canceledOrStopped = false;
    private transient boolean enableTracingMetrics = false;
    private transient int tracingMetricsInterval;
    private transient SumAndCount taskLatency;
    private transient SumAndCount sourceLatency;
    private transient Counter numElementsReceived;

    public StreamSource(SRC sourceFunction) {
        super(sourceFunction);
        this.chainingStrategy = ChainingStrategy.HEAD;
    }

    public void run(Object lockingObject, StreamStatusMaintainer streamStatusMaintainer) throws Exception {
        this.run(lockingObject, streamStatusMaintainer, this.output);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void run(Object lockingObject, StreamStatusMaintainer streamStatusMaintainer, Output<StreamRecord<OUT>> collector) throws Exception {
        TimeCharacteristic timeCharacteristic = this.getOperatorConfig().getTimeCharacteristic();
        this.enableTracingMetrics = this.getRuntimeContext().getExecutionConfig().isTracingMetricsEnabled();
        if (this.enableTracingMetrics) {
            if (this.taskLatency == null) {
                this.taskLatency = new SumAndCount("taskLatency", (MetricGroup)((OperatorMetricGroup)this.getRuntimeContext().getMetricGroup()).parent());
            }
            if (this.sourceLatency == null) {
                this.sourceLatency = new SumAndCount("sourceLatency", (MetricGroup)((OperatorMetricGroup)this.getRuntimeContext().getMetricGroup()).parent());
            }
            this.tracingMetricsInterval = this.getRuntimeContext().getExecutionConfig().getTracingMetricsInterval();
        }
        if (this.numElementsReceived == null) {
            this.numElementsReceived = ((OperatorMetricGroup)this.getRuntimeContext().getMetricGroup()).parent().getIOMetricGroup().getNumRecordsReceived();
        }
        LatencyMarksEmitter<OUT> latencyEmitter = null;
        if (this.getExecutionConfig().isLatencyTrackingEnabled()) {
            latencyEmitter = new LatencyMarksEmitter<OUT>(this.getProcessingTimeService(), collector, this.getExecutionConfig().getLatencyTrackingInterval(), this.getOperatorID(), this.getRuntimeContext().getIndexOfThisSubtask());
        }
        long watermarkInterval = this.getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval();
        this.ctx = this.getSourceContext(timeCharacteristic, this.getProcessingTimeService(), lockingObject, streamStatusMaintainer, collector, watermarkInterval);
        try {
            ((SourceFunction)this.userFunction).run(this.ctx);
            if (!this.isCanceledOrStopped()) {
                this.ctx.emitWatermark(Watermark.MAX_WATERMARK);
            }
        }
        finally {
            this.ctx.close();
            if (latencyEmitter != null) {
                latencyEmitter.close();
            }
        }
    }

    @VisibleForTesting
    protected SourceFunction.SourceContext<OUT> getSourceContext(TimeCharacteristic timeCharacteristic, ProcessingTimeService processingTimeService, Object lockingObject, StreamStatusMaintainer streamStatusMaintainer, Output<StreamRecord<OUT>> collector, boolean enableTracingMetrics, int tracingMetricsInterval, SumAndCount taskLatency, SumAndCount sourceLatency, Counter numElementsReceived, long watermarkInterval) {
        this.enableTracingMetrics = enableTracingMetrics;
        this.tracingMetricsInterval = tracingMetricsInterval;
        this.taskLatency = taskLatency;
        this.sourceLatency = sourceLatency;
        this.numElementsReceived = numElementsReceived;
        return this.getSourceContext(timeCharacteristic, processingTimeService, lockingObject, streamStatusMaintainer, collector, watermarkInterval);
    }

    private SourceFunction.SourceContext<OUT> getSourceContext(TimeCharacteristic timeCharacteristic, ProcessingTimeService processingTimeService, Object lockingObject, StreamStatusMaintainer streamStatusMaintainer, Output<StreamRecord<OUT>> collector, long watermarkInterval) {
        return StreamSourceContexts.getSourceContext(timeCharacteristic, processingTimeService, lockingObject, streamStatusMaintainer, this.getOutputWithTaskLatency(collector, this.enableTracingMetrics, this.tracingMetricsInterval, this.taskLatency, this.sourceLatency, this.numElementsReceived), watermarkInterval, -1L);
    }

    private Output<StreamRecord<OUT>> getOutputWithTaskLatency(final Output<StreamRecord<OUT>> collector, final boolean enableTracingMetrics, final int tracingMetricsInterval, final SumAndCount taskLatency, final SumAndCount sourceLatency, final Counter numElementsReceived) {
        return new Output<StreamRecord<OUT>>(){
            private long lastEmitTime = 0L;

            @Override
            public void emitWatermark(Watermark mark) {
                numElementsReceived.inc();
                if (enableTracingMetrics && numElementsReceived.getCount() % (long)tracingMetricsInterval == 0L) {
                    this.emitWatermarkWithMetrics(mark);
                } else {
                    collector.emitWatermark(mark);
                }
            }

            public void emitWatermarkWithMetrics(Watermark mark) {
                long start = System.nanoTime();
                if (this.lastEmitTime > 0L) {
                    sourceLatency.update(start - this.lastEmitTime);
                }
                collector.emitWatermark(mark);
                this.lastEmitTime = System.nanoTime();
                taskLatency.update(this.lastEmitTime - start);
            }

            @Override
            public void emitLatencyMarker(LatencyMarker latencyMarker) {
                numElementsReceived.inc();
                if (enableTracingMetrics && numElementsReceived.getCount() % (long)tracingMetricsInterval == 0L) {
                    this.emitLatencyMarkWithMetrics(latencyMarker);
                } else {
                    collector.emitLatencyMarker(latencyMarker);
                }
            }

            public void emitLatencyMarkWithMetrics(LatencyMarker mark) {
                long start = System.nanoTime();
                if (this.lastEmitTime > 0L) {
                    sourceLatency.update(start - this.lastEmitTime);
                }
                collector.emitLatencyMarker(mark);
                this.lastEmitTime = System.nanoTime();
                taskLatency.update(this.lastEmitTime - start);
            }

            public void collect(StreamRecord<OUT> record) {
                numElementsReceived.inc();
                if (enableTracingMetrics && numElementsReceived.getCount() % (long)tracingMetricsInterval == 0L) {
                    this.collectWithMetrics(record);
                } else {
                    collector.collect(record);
                }
            }

            public void collectWithMetrics(StreamRecord<OUT> record) {
                long start = System.nanoTime();
                if (this.lastEmitTime > 0L) {
                    sourceLatency.update(start - this.lastEmitTime);
                }
                collector.collect(record);
                this.lastEmitTime = System.nanoTime();
                taskLatency.update(this.lastEmitTime - start);
            }

            @Override
            public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) {
                numElementsReceived.inc();
                if (enableTracingMetrics && numElementsReceived.getCount() % (long)tracingMetricsInterval == 0L) {
                    this.collectWithMetrics(outputTag, record);
                } else {
                    collector.collect(outputTag, record);
                }
            }

            public <X> void collectWithMetrics(OutputTag<X> outputTag, StreamRecord<X> record) {
                long start = System.nanoTime();
                if (this.lastEmitTime > 0L) {
                    sourceLatency.update(start - this.lastEmitTime);
                }
                collector.collect(outputTag, record);
                this.lastEmitTime = System.nanoTime();
                taskLatency.update(this.lastEmitTime - start);
            }

            public void close() {
                collector.close();
            }
        };
    }

    public void cancel() {
        this.markCanceledOrStopped();
        ((SourceFunction)this.userFunction).cancel();
        if (this.ctx != null) {
            this.ctx.close();
        }
    }

    protected void markCanceledOrStopped() {
        this.canceledOrStopped = true;
    }

    public boolean isCanceledOrStopped() {
        return this.canceledOrStopped;
    }

    private static class LatencyMarksEmitter<OUT> {
        private final ScheduledFuture<?> latencyMarkTimer;

        public LatencyMarksEmitter(ProcessingTimeService processingTimeService, final Output<StreamRecord<OUT>> output, long latencyTrackingInterval, final OperatorID operatorId, final int subtaskIndex) {
            this.latencyMarkTimer = processingTimeService.scheduleAtFixedRate(new ProcessingTimeCallback(){

                @Override
                public void onProcessingTime(long timestamp) throws Exception {
                    try {
                        output.emitLatencyMarker(new LatencyMarker(timestamp, operatorId, subtaskIndex));
                    }
                    catch (Throwable t) {
                        AbstractStreamOperator.LOG.warn("Error while emitting latency marker.", t);
                    }
                }
            }, 0L, latencyTrackingInterval);
        }

        public void close() {
            this.latencyMarkTimer.cancel(true);
        }
    }
}

