package com.alibaba.blink.streaming.connectors.common.reader;

import com.alibaba.blink.streaming.connectors.common.source.WatermarkProvider;
import java.io.IOException;
import java.io.Serializable;
import java.util.ConcurrentModificationException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.functions.StoppableFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.dropwizard.metrics.SimpleHistogram;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.Meter;
import org.apache.flink.metrics.MeterView;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.util.LockGetReleaseWrapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/alibaba/blink/streaming/connectors/common/reader/ParallelReader.class */
public class ParallelReader<OUT, CURSOR extends Serializable> implements WatermarkProvider, StoppableFunction {
    private static final String SPLIT_PIPE_LEN_CONFIG = "yarn.app.blink..source.buffer-len";
    private static int split_pipe_len;
    private long watermarkInterval;
    private RuntimeContext context;
    private Configuration config;
    private ParallelReader<OUT, CURSOR>.DelayGauge delayMetric;
    private ParallelReader<OUT, CURSOR>.DelayGauge dataFetchedDelayGauge;
    private ParallelReader<OUT, CURSOR>.DelayGauge noDataDelayGauge;
    private Counter outputCounter;
    private Meter tpsMetric;
    private Histogram latencyMetric;
    private static final Logger LOG = LoggerFactory.getLogger(ParallelReader.class);
    private static long STOP_WAITING = 5;
    private static long FAILED_RETRY_INTERVAL = 1000;
    private static long IDLE_INTERVAL = 10;
    private ExecutorService readerPool = Executors.newCachedThreadPool();
    private BlockingQueue<ReaderRunner<OUT, CURSOR>> readerRunners = new LinkedBlockingQueue();
    private WatermarkEmitter<OUT> watermarkEmitter = null;
    private volatile boolean stop = false;
    private volatile boolean exitAfterReadFinished = false;
    private transient Map<InputSplit, Tuple2<CURSOR, MonotonyIncreaseProgress>> exitedReadRunnerSplitCursor = new HashMap();

    /* loaded from: input_file:com/alibaba/blink/streaming/connectors/common/reader/ParallelReader$DelayGauge.class */
    public class DelayGauge implements Gauge<Long> {
        private final ConcurrentHashMap<Integer, Long> delayStats = new ConcurrentHashMap<>();
        private BlockingQueue<ReaderRunner<OUT, CURSOR>> readerRunners;
        private DelayKind delayKind;

        public DelayGauge(BlockingQueue<ReaderRunner<OUT, CURSOR>> blockingQueue, DelayKind delayKind) {
            this.readerRunners = blockingQueue;
            this.delayKind = delayKind;
        }

        /* renamed from: getValue, reason: merged with bridge method [inline-methods] */
        public Long m70getValue() {
            if (null != this.readerRunners) {
                Iterator it = this.readerRunners.iterator();
                while (it.hasNext()) {
                    ReaderRunner readerRunner = (ReaderRunner) it.next();
                    switch (this.delayKind) {
                        case DELAY:
                            if (readerRunner.getDelay() > 0) {
                                this.delayStats.put(Integer.valueOf(readerRunner.getSplit().getSplitNumber()), Long.valueOf(System.currentTimeMillis() - readerRunner.getDelay()));
                                break;
                            } else {
                                break;
                            }
                        case FETCHED_DELAY:
                            if (readerRunner.getFetchedDelay() > 0) {
                                this.delayStats.put(Integer.valueOf(readerRunner.getSplit().getSplitNumber()), Long.valueOf(readerRunner.getFetchedDelay()));
                                break;
                            } else {
                                break;
                            }
                        case NO_DATA_DELAY:
                            if (readerRunner.getDelay() > 0 && readerRunner.getFetchedDelay() > 0) {
                                long currentTimeMillis = (System.currentTimeMillis() - readerRunner.getDelay()) - readerRunner.getFetchedDelay();
                                if (currentTimeMillis > 10000) {
                                    this.delayStats.put(Integer.valueOf(readerRunner.getSplit().getSplitNumber()), Long.valueOf(currentTimeMillis));
                                    break;
                                } else {
                                    this.delayStats.put(Integer.valueOf(readerRunner.getSplit().getSplitNumber()), 0L);
                                    break;
                                }
                            }
                            break;
                    }
                }
            }
            while (true) {
                try {
                    long j = 0;
                    for (Map.Entry<Integer, Long> entry : this.delayStats.entrySet()) {
                        if (j < entry.getValue().longValue()) {
                            j = entry.getValue().longValue();
                        }
                    }
                    return Long.valueOf(j);
                } catch (ConcurrentModificationException e) {
                    ParallelReader.LOG.debug("Unable to report delay statistics", e);
                }
            }
        }
    }

    /* loaded from: input_file:com/alibaba/blink/streaming/connectors/common/reader/ParallelReader$DelayKind.class */
    public enum DelayKind {
        DELAY,
        FETCHED_DELAY,
        NO_DATA_DELAY
    }

    /* loaded from: input_file:com/alibaba/blink/streaming/connectors/common/reader/ParallelReader$Progress.class */
    public static class Progress<CURSOR extends Serializable> implements Serializable {
        private Map<InputSplit, Tuple2<CURSOR, MonotonyIncreaseProgress>> splitProgress = new HashMap();

        public void addProgress(InputSplit inputSplit, CURSOR cursor, MonotonyIncreaseProgress monotonyIncreaseProgress) {
            if (cursor != null) {
                this.splitProgress.put(inputSplit, Tuple2.of(cursor, monotonyIncreaseProgress));
            }
        }

        public Map<InputSplit, Tuple2<CURSOR, MonotonyIncreaseProgress>> getProgress() {
            return this.splitProgress;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:com/alibaba/blink/streaming/connectors/common/reader/ParallelReader$ReaderRunner.class */
    public static class ReaderRunner<OUT, CURSOR extends Serializable> implements Runnable, WatermarkProvider, Comparable<ReaderRunner<OUT, CURSOR>> {
        private final RuntimeContext runtimeContext;
        private final RecordReader<OUT, CURSOR> recordReader;
        private final Histogram latencyMetric;
        private InputSplit split;
        private volatile CURSOR progress;
        private volatile Throwable cause;
        private volatile Thread currentThread;
        private volatile AtomicBoolean isBlocked;
        private volatile Semaphore stateSignal;
        private BlockingDeque<Tuple5<OUT, Long, Long, CURSOR, MonotonyIncreaseProgress>> splitPipe = new LinkedBlockingDeque(ParallelReader.split_pipe_len);
        private volatile long watermark = Long.MIN_VALUE;
        private volatile boolean stop = false;
        private volatile boolean stopped = false;
        private volatile boolean finished = false;
        private volatile MonotonyIncreaseProgress monotonyIncreaseProgress = MonotonyIncreaseProgress.EMPTY;

        public ReaderRunner(RecordReader<OUT, CURSOR> recordReader, InputSplit inputSplit, CURSOR cursor, Histogram histogram, RuntimeContext runtimeContext, AtomicBoolean atomicBoolean, Semaphore semaphore) {
            this.progress = null;
            this.recordReader = recordReader;
            this.split = inputSplit;
            this.progress = cursor;
            this.runtimeContext = runtimeContext;
            this.latencyMetric = histogram;
            this.isBlocked = atomicBoolean;
            this.stateSignal = semaphore;
        }

        public InputSplit getSplit() {
            return this.split;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                try {
                    try {
                        this.recordReader.open(this.split, this.runtimeContext);
                        if (this.progress != null) {
                            ParallelReader.LOG.info("init progress not null, seek {}", this.progress);
                            this.recordReader.seek(this.progress);
                        }
                        this.currentThread = Thread.currentThread();
                        while (!this.stop && !this.finished) {
                            if (this.isBlocked.get()) {
                                this.stateSignal.acquire();
                            }
                            long nanoTime = System.nanoTime();
                            this.finished = !this.recordReader.next();
                            this.latencyMetric.update(System.nanoTime() - nanoTime);
                            if (this.finished) {
                                ParallelReader.LOG.info("Finishing Split {}.", this.split);
                            } else if (this.recordReader.isHeartBeat()) {
                                updateWatermarkAndProgress(this.recordReader.getWatermark(), this.recordReader.getProgress(), this.recordReader.getMonotonyIncreaseProgress());
                            } else {
                                put(this.recordReader.getMessage());
                            }
                        }
                        try {
                            if (this.recordReader != null) {
                                ParallelReader.LOG.info("Closing split {} reader runner.", this.split);
                                this.recordReader.close();
                            }
                        } catch (Throwable th) {
                            ParallelReader.LOG.error("Exception caught in closing record reader", th);
                            if (this.cause == null) {
                                this.cause = th;
                            }
                        }
                        this.stopped = true;
                        ParallelReader.LOG.info("Split reader {} thread will exit.", this.split);
                    } catch (Throwable th2) {
                        try {
                            if (this.recordReader != null) {
                                ParallelReader.LOG.info("Closing split {} reader runner.", this.split);
                                this.recordReader.close();
                            }
                        } catch (Throwable th3) {
                            ParallelReader.LOG.error("Exception caught in closing record reader", th3);
                            if (this.cause == null) {
                                this.cause = th3;
                            }
                        }
                        this.stopped = true;
                        ParallelReader.LOG.info("Split reader {} thread will exit.", this.split);
                        throw th2;
                    }
                } catch (Throwable th4) {
                    this.cause = th4;
                    ParallelReader.LOG.error("Split reader " + this.split + " is failed cause: ", th4);
                    try {
                        if (this.recordReader != null) {
                            ParallelReader.LOG.info("Closing split {} reader runner.", this.split);
                            this.recordReader.close();
                        }
                    } catch (Throwable th5) {
                        ParallelReader.LOG.error("Exception caught in closing record reader", th5);
                        if (this.cause == null) {
                            this.cause = th5;
                        }
                    }
                    this.stopped = true;
                    ParallelReader.LOG.info("Split reader {} thread will exit.", this.split);
                }
            } catch (InterruptedException e) {
                if (!this.stop) {
                    this.cause = e;
                }
                ParallelReader.LOG.info("Split reader " + this.split + " is interrupted.", e);
                try {
                    if (this.recordReader != null) {
                        ParallelReader.LOG.info("Closing split {} reader runner.", this.split);
                        this.recordReader.close();
                    }
                } catch (Throwable th6) {
                    ParallelReader.LOG.error("Exception caught in closing record reader", th6);
                    if (this.cause == null) {
                        this.cause = th6;
                    }
                }
                this.stopped = true;
                ParallelReader.LOG.info("Split reader {} thread will exit.", this.split);
            }
        }

        protected void put(OUT out) throws IOException, InterruptedException {
            Tuple5<OUT, Long, Long, CURSOR, MonotonyIncreaseProgress> tuple5 = new Tuple5<>(out, Long.valueOf(this.recordReader.getWatermark()), Long.valueOf(this.recordReader.getWatermark()), this.recordReader.getProgress(), this.recordReader.getMonotonyIncreaseProgress());
            while (!this.stop && !this.splitPipe.offer(tuple5, ParallelReader.IDLE_INTERVAL, TimeUnit.MILLISECONDS)) {
            }
        }

        public void stop() {
            this.stop = true;
            if (this.recordReader instanceof Interruptible) {
                ((Interruptible) this.recordReader).interrupt();
            }
        }

        public boolean isStopped() {
            return this.stopped;
        }

        public boolean isExhausted() {
            return this.stop || (this.finished && this.splitPipe.isEmpty());
        }

        public Throwable getCause() {
            return this.cause;
        }

        public synchronized void updateWatermarkAndProgress(long j, CURSOR cursor, MonotonyIncreaseProgress monotonyIncreaseProgress) {
            Tuple5<OUT, Long, Long, CURSOR, MonotonyIncreaseProgress> peekLast = this.splitPipe.peekLast();
            if (peekLast != null) {
                peekLast.f2 = Long.valueOf(j);
                peekLast.f3 = cursor;
                peekLast.f4 = monotonyIncreaseProgress;
            } else {
                this.watermark = j;
                this.progress = cursor;
                this.monotonyIncreaseProgress = monotonyIncreaseProgress;
            }
        }

        public synchronized Tuple2<OUT, Long> pollRecord() {
            Tuple5<OUT, Long, Long, CURSOR, MonotonyIncreaseProgress> poll = this.splitPipe.poll();
            if (poll == null) {
                return null;
            }
            this.watermark = ((Long) poll.f2).longValue();
            this.progress = (CURSOR) poll.f3;
            this.monotonyIncreaseProgress = (MonotonyIncreaseProgress) poll.f4;
            return new Tuple2<>(poll.f0, poll.f1);
        }

        public synchronized CURSOR getProgress() throws IOException {
            return this.progress;
        }

        public synchronized MonotonyIncreaseProgress getMonotonyIncreaseProgress() throws IOException {
            return this.monotonyIncreaseProgress;
        }

        @Override // com.alibaba.blink.streaming.connectors.common.source.WatermarkProvider
        public synchronized long getWatermark() {
            return this.watermark;
        }

        @Override // java.lang.Comparable
        public int compareTo(ReaderRunner<OUT, CURSOR> readerRunner) {
            return Long.compare(getWatermark(), readerRunner.getWatermark());
        }

        public String getStackTrace() {
            if (this.currentThread == null || !this.currentThread.isAlive()) {
                return "No stack trace, maybe thread already exited.";
            }
            StringBuilder sb = new StringBuilder();
            for (StackTraceElement stackTraceElement : this.currentThread.getStackTrace()) {
                sb.append(stackTraceElement).append('\n');
            }
            return sb.toString();
        }

        public long getDelay() {
            return this.recordReader.getDelay();
        }

        public long getFetchedDelay() {
            return this.recordReader.getFetchedDelay();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:com/alibaba/blink/streaming/connectors/common/reader/ParallelReader$WatermarkEmitter.class */
    public static class WatermarkEmitter<OUT> implements Runnable {
        private volatile boolean stopped = false;
        private ParallelReader provider;
        private SourceFunction.SourceContext<OUT> ctx;
        private long watermarkInterval;

        public WatermarkEmitter(ParallelReader parallelReader, long j, SourceFunction.SourceContext<OUT> sourceContext) {
            this.provider = parallelReader;
            this.ctx = sourceContext;
            this.watermarkInterval = j;
        }

        @Override // java.lang.Runnable
        public void run() {
            long j = 0;
            while (!this.stopped) {
                Watermark watermark = new Watermark(this.provider.getWatermark());
                if (j != watermark.getTimestamp()) {
                    j = watermark.getTimestamp();
                    this.ctx.emitWatermark(watermark);
                }
                try {
                    Thread.sleep(this.watermarkInterval);
                } catch (InterruptedException e) {
                    return;
                }
            }
        }

        public void stop() {
            this.stopped = true;
        }
    }

    public ParallelReader(RuntimeContext runtimeContext, Configuration configuration, long j) {
        this.context = runtimeContext;
        this.config = configuration;
        this.watermarkInterval = j;
        split_pipe_len = configuration.getInteger(SPLIT_PIPE_LEN_CONFIG, 10);
        this.delayMetric = (DelayGauge) runtimeContext.getMetricGroup().gauge("delay", new DelayGauge(this.readerRunners, DelayKind.DELAY));
        this.dataFetchedDelayGauge = (DelayGauge) runtimeContext.getMetricGroup().gauge("fetched_delay", new DelayGauge(this.readerRunners, DelayKind.FETCHED_DELAY));
        this.noDataDelayGauge = (DelayGauge) runtimeContext.getMetricGroup().gauge("no_data_delay", new DelayGauge(this.readerRunners, DelayKind.NO_DATA_DELAY));
        this.outputCounter = runtimeContext.getMetricGroup().counter("tps_counter", new SimpleCounter());
        this.tpsMetric = runtimeContext.getMetricGroup().meter("tps", new MeterView(this.outputCounter, 60));
        this.latencyMetric = runtimeContext.getMetricGroup().histogram("partitionLatency", new SimpleHistogram());
    }

    public ParallelReader<OUT, CURSOR> setExitAfterReadFinished(boolean z) {
        this.exitAfterReadFinished = z;
        return this;
    }

    public void addRecordReader(RecordReader<OUT, CURSOR> recordReader, InputSplit inputSplit, CURSOR cursor) throws IOException {
        addRecordReader(recordReader, inputSplit, cursor, null, null);
    }

    public void addRecordReader(RecordReader<OUT, CURSOR> recordReader, InputSplit inputSplit, CURSOR cursor, AtomicBoolean atomicBoolean, Semaphore semaphore) {
        this.readerRunners.add(new ReaderRunner<>(recordReader, inputSplit, cursor, this.latencyMetric, this.context, atomicBoolean, semaphore));
    }

    public void run(SourceFunction.SourceContext<OUT> sourceContext) throws Exception {
        try {
            try {
                submitReaders();
                runWatermarkEmitter(sourceContext);
                runImpl(sourceContext);
                close();
            } catch (InterruptedException e) {
                LOG.error("ParallelReader was interrupted: ", e);
                close();
            } catch (Throwable th) {
                LOG.error("ParallelReader caught exception: ", th);
                throw new RuntimeException(th);
            }
        } catch (Throwable th2) {
            close();
            throw th2;
        }
    }

    private void runWatermarkEmitter(SourceFunction.SourceContext<OUT> sourceContext) {
        if (this.watermarkInterval > 0) {
            this.watermarkEmitter = new WatermarkEmitter<>(this, this.watermarkInterval, sourceContext);
            this.readerPool.submit(this.watermarkEmitter);
        }
    }

    public void stop() {
        this.stop = true;
    }

    public void close() {
        boolean z = false;
        try {
            try {
                Iterator it = this.readerRunners.iterator();
                while (it.hasNext()) {
                    ((ReaderRunner) it.next()).stop();
                }
                if (this.watermarkEmitter != null) {
                    this.watermarkEmitter.stop();
                }
                try {
                    Thread.sleep(100 * IDLE_INTERVAL);
                } catch (InterruptedException e) {
                    LOG.warn("Waiting for reader stopping is interrupted.", e);
                }
                this.readerPool.shutdownNow();
                z = this.readerPool.awaitTermination(STOP_WAITING, TimeUnit.SECONDS);
                if (!z) {
                    Iterator it2 = this.readerRunners.iterator();
                    while (it2.hasNext()) {
                        ReaderRunner readerRunner = (ReaderRunner) it2.next();
                        if (!readerRunner.isStopped()) {
                            LOG.info("Can not stop reader for split {}, it is stuck in method: \n {}.", readerRunner.getSplit(), readerRunner.getStackTrace());
                        }
                    }
                }
                if (!z) {
                    LOG.error("Shut down reader pool failed, exit process!");
                    System.exit(1);
                }
                LOG.info("Stopped all split reader.");
            } catch (Throwable th) {
                LOG.warn(th.toString());
                if (!z) {
                    LOG.error("Shut down reader pool failed, exit process!");
                    System.exit(1);
                }
                LOG.info("Stopped all split reader.");
            }
        } catch (Throwable th2) {
            if (!z) {
                LOG.error("Shut down reader pool failed, exit process!");
                System.exit(1);
            }
            LOG.info("Stopped all split reader.");
            throw th2;
        }
    }

    protected void runImpl(SourceFunction.SourceContext<OUT> sourceContext) throws Exception {
        while (!this.stop && !this.readerRunners.isEmpty()) {
            Iterator it = this.readerRunners.iterator();
            boolean z = true;
            while (it.hasNext()) {
                ReaderRunner readerRunner = (ReaderRunner) it.next();
                if (readerRunner.isBlocked.get()) {
                    readerRunner.stateSignal.acquire();
                }
                if (readerRunner.isStopped() && readerRunner.getCause() != null) {
                    LOG.error(String.format("SplitReader for split[%d][%s] failed, cause: %s", Integer.valueOf(readerRunner.getSplit().getSplitNumber()), readerRunner.getSplit().toString(), readerRunner.getCause()));
                    throw new RuntimeException(readerRunner.getCause());
                }
                if (readerRunner.isExhausted()) {
                    LOG.info(String.format("SplitReader for split[%d][%s] finished", Integer.valueOf(readerRunner.getSplit().getSplitNumber()), readerRunner.getSplit().toString()));
                    this.exitedReadRunnerSplitCursor.put(readerRunner.getSplit(), Tuple2.of(readerRunner.getProgress(), readerRunner.getMonotonyIncreaseProgress()));
                    it.remove();
                } else {
                    LockGetReleaseWrapper lockGetReleaseWrapper = new LockGetReleaseWrapper(sourceContext.getFairCheckpointLock().getLock());
                    Throwable th = null;
                    try {
                        try {
                            Tuple2<OUT, Long> pollRecord = readerRunner.pollRecord();
                            if (pollRecord != null) {
                                sourceContext.collectWithTimestamp(pollRecord.f0, ((Long) pollRecord.f1).longValue());
                                z = false;
                                this.tpsMetric.markEvent();
                            }
                            if (lockGetReleaseWrapper != null) {
                                if (0 != 0) {
                                    try {
                                        lockGetReleaseWrapper.close();
                                    } catch (Throwable th2) {
                                        th.addSuppressed(th2);
                                    }
                                } else {
                                    lockGetReleaseWrapper.close();
                                }
                            }
                        } catch (Throwable th3) {
                            if (lockGetReleaseWrapper != null) {
                                if (th != null) {
                                    try {
                                        lockGetReleaseWrapper.close();
                                    } catch (Throwable th4) {
                                        th.addSuppressed(th4);
                                    }
                                } else {
                                    lockGetReleaseWrapper.close();
                                }
                            }
                            throw th3;
                        }
                    } catch (Throwable th5) {
                        th = th5;
                        throw th5;
                    }
                }
            }
            if (z) {
                Thread.sleep(IDLE_INTERVAL);
            }
        }
        sourceContext.markAsTemporarilyIdle();
        LOG.info(String.format("This subTask [%d]/[%d] has finished, idle...", Integer.valueOf(this.context.getIndexOfThisSubtask()), Integer.valueOf(this.context.getNumberOfParallelSubtasks())));
        while (!this.stop && !this.exitAfterReadFinished) {
            Thread.sleep(1000L);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    public Progress<CURSOR> getProgress() throws IOException {
        Progress<CURSOR> progress = (Progress<CURSOR>) new Progress();
        Iterator it = this.readerRunners.iterator();
        while (it.hasNext()) {
            ReaderRunner readerRunner = (ReaderRunner) it.next();
            progress.addProgress(readerRunner.getSplit(), readerRunner.getProgress(), readerRunner.getMonotonyIncreaseProgress());
        }
        for (Map.Entry<InputSplit, Tuple2<CURSOR, MonotonyIncreaseProgress>> entry : this.exitedReadRunnerSplitCursor.entrySet()) {
            progress.addProgress(entry.getKey(), (Serializable) entry.getValue().f0, (MonotonyIncreaseProgress) entry.getValue().f1);
        }
        return progress;
    }

    protected void submitReaders() {
        Iterator it = this.readerRunners.iterator();
        while (it.hasNext()) {
            ReaderRunner readerRunner = (ReaderRunner) it.next();
            this.readerPool.submit(readerRunner);
            LOG.info(String.format("Start split reader for split[%d][%s]", Integer.valueOf(readerRunner.getSplit().getSplitNumber()), readerRunner.getSplit().toString()));
        }
    }

    @Override // com.alibaba.blink.streaming.connectors.common.source.WatermarkProvider
    public long getWatermark() {
        long j = Long.MAX_VALUE;
        Iterator it = this.readerRunners.iterator();
        while (it.hasNext()) {
            ReaderRunner readerRunner = (ReaderRunner) it.next();
            j = j < readerRunner.getWatermark() ? j : readerRunner.getWatermark();
        }
        return j;
    }
}
