package com.alibaba.blink.streaming.connectors.common.reader;

import com.alibaba.blink.streaming.connectors.common.source.AbstractParallelSourceBase;
import java.io.IOException;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.functions.StoppableFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.Meter;
import org.apache.flink.metrics.MeterView;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException;
import org.apache.flink.runtime.metrics.SimpleHistogram;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.LockGetReleaseWrapper;

/* loaded from: input_file:com/alibaba/blink/streaming/connectors/common/reader/SequenceReader.class */
public class SequenceReader<T> implements StoppableFunction {
    private InputSplitProvider inputSplitProvider;
    private AbstractParallelSourceBase<T, ?> sourceFunction;
    private Configuration config;
    private volatile boolean isStop = false;
    private Counter outputCounter;
    private Meter tpsMetric;
    private Histogram latencyMetric;

    public SequenceReader(AbstractParallelSourceBase<T, ?> abstractParallelSourceBase, InputSplitProvider inputSplitProvider, Configuration configuration) {
        this.sourceFunction = abstractParallelSourceBase;
        this.inputSplitProvider = inputSplitProvider;
        this.config = configuration;
        RuntimeContext runtimeContext = abstractParallelSourceBase.getRuntimeContext();
        this.outputCounter = runtimeContext.getMetricGroup().counter("tps_counter", new SimpleCounter());
        this.tpsMetric = runtimeContext.getMetricGroup().meter("tps", new MeterView(this.outputCounter, 60));
        this.latencyMetric = runtimeContext.getMetricGroup().histogram("partitionLatency", new SimpleHistogram());
    }

    public void run(SourceFunction.SourceContext<T> sourceContext) throws InputSplitProviderException, IOException, InterruptedException {
        InputSplit nextInputSplit = this.inputSplitProvider.getNextInputSplit(this.sourceFunction.getRuntimeContext().getOperatorContext().getOperatorID(), this.sourceFunction.getRuntimeContext().getUserCodeClassLoader());
        while (true) {
            InputSplit inputSplit = nextInputSplit;
            if (this.isStop || inputSplit == null) {
                return;
            }
            RecordReader<T, ?> createReader = this.sourceFunction.createReader(this.config);
            try {
                createReader.open(inputSplit, this.sourceFunction.getRuntimeContext());
                long nanoTime = System.nanoTime();
                while (!this.isStop && createReader.next()) {
                    this.latencyMetric.update(System.nanoTime() - nanoTime);
                    if (!createReader.isHeartBeat()) {
                        LockGetReleaseWrapper lockGetReleaseWrapper = new LockGetReleaseWrapper(sourceContext.getFairCheckpointLock().getLock());
                        Throwable th = null;
                        try {
                            try {
                                this.tpsMetric.markEvent();
                                sourceContext.collect(createReader.getMessage());
                                if (lockGetReleaseWrapper != null) {
                                    if (0 != 0) {
                                        try {
                                            lockGetReleaseWrapper.close();
                                        } catch (Throwable th2) {
                                            th.addSuppressed(th2);
                                        }
                                    } else {
                                        lockGetReleaseWrapper.close();
                                    }
                                }
                                nanoTime = System.nanoTime();
                            } finally {
                            }
                        } catch (Throwable th3) {
                            th = th3;
                            throw th3;
                        }
                    }
                }
                nextInputSplit = this.inputSplitProvider.getNextInputSplit(this.sourceFunction.getRuntimeContext().getOperatorContext().getOperatorID(), this.sourceFunction.getRuntimeContext().getUserCodeClassLoader());
            } finally {
                createReader.close();
            }
        }
    }

    public void stop() {
        this.isStop = true;
    }
}
