package com.alibaba.blink.streaming.connectors.common.source;

import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/alibaba/blink/streaming/connectors/common/source/InputFormatAdapterSource.class */
public class InputFormatAdapterSource<OUT> extends RichParallelSourceFunction<OUT> implements InputFormatFunction {
    private static final int SPLIT_PIPE_LEN = 1024;
    private static final long IDLE_INTERVAL = 100;
    private static final long FAILED_RETRY_INTERVAL = 1000;
    private static final long STOP_WAITING = 30;
    private static final transient Logger LOG = LoggerFactory.getLogger(InputFormatAdapterSource.class);
    private InputFormat<OUT, InputSplit> inputFormat;
    private transient InputSplitProvider provider;
    private transient List<SplitReader<OUT>> splitReaders;
    private transient ExecutorService readerPool;
    private transient boolean closed = false;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:com/alibaba/blink/streaming/connectors/common/source/InputFormatAdapterSource$SplitReader.class */
    public static class SplitReader<OUT> implements Runnable {
        private InputFormat<OUT, InputSplit> inputFormat;
        private BlockingQueue<OUT> splitPipe;
        private volatile boolean stopped = false;
        private volatile boolean failed = false;
        private volatile Throwable cause;
        private InputSplit split;

        public SplitReader(InputFormat<OUT, InputSplit> inputFormat, BlockingQueue<OUT> blockingQueue, InputSplit inputSplit) {
            this.inputFormat = inputFormat;
            this.splitPipe = blockingQueue;
            this.split = inputSplit;
        }

        public Throwable getCause() {
            return this.cause;
        }

        public InputSplit getSplit() {
            return this.split;
        }

        public BlockingQueue<OUT> getSplitPipe() {
            return this.splitPipe;
        }

        public void stop() {
            this.stopped = true;
        }

        public boolean isStopped() {
            return this.stopped;
        }

        public boolean isFailed() {
            return this.failed;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                Object obj = null;
                boolean z = false;
                while (!this.stopped && !this.inputFormat.reachedEnd()) {
                    try {
                        try {
                            obj = this.inputFormat.nextRecord(obj);
                            if (obj != null) {
                                z = false;
                                this.splitPipe.put(obj);
                            } else {
                                if (z) {
                                    Thread.sleep(InputFormatAdapterSource.IDLE_INTERVAL);
                                }
                                z = true;
                            }
                        } catch (IOException e) {
                            InputFormatAdapterSource.LOG.warn("IOException occurred", e);
                            Thread.sleep(InputFormatAdapterSource.FAILED_RETRY_INTERVAL);
                        }
                    } catch (Exception e2) {
                        this.failed = true;
                        this.cause = e2;
                        InputFormatAdapterSource.LOG.error("Split reader failed cause: ", e2);
                        try {
                            this.inputFormat.close();
                        } catch (IOException e3) {
                            this.failed = true;
                            this.cause = e3;
                            InputFormatAdapterSource.LOG.error("Split reader failed cause: ", e3);
                        }
                        this.stopped = true;
                        return;
                    }
                }
                try {
                    this.inputFormat.close();
                } catch (IOException e4) {
                    this.failed = true;
                    this.cause = e4;
                    InputFormatAdapterSource.LOG.error("Split reader failed cause: ", e4);
                }
                this.stopped = true;
            } catch (Throwable th) {
                try {
                    this.inputFormat.close();
                } catch (IOException e5) {
                    this.failed = true;
                    this.cause = e5;
                    InputFormatAdapterSource.LOG.error("Split reader failed cause: ", e5);
                }
                this.stopped = true;
                throw th;
            }
        }
    }

    public InputFormatAdapterSource(InputFormat<OUT, InputSplit> inputFormat) {
        this.inputFormat = inputFormat;
    }

    public void open(Configuration configuration) throws IOException, ClassNotFoundException {
        InputFormat<OUT, InputSplit> inputFormat;
        this.readerPool = Executors.newCachedThreadPool();
        this.splitReaders = new LinkedList();
        this.provider = getRuntimeContext().getInputSplitProvider();
        boolean z = true;
        this.inputFormat.configure(configuration);
        Iterator<InputSplit> inputSplits = getInputSplits();
        while (inputSplits.hasNext()) {
            InputSplit next = inputSplits.next();
            if (z) {
                inputFormat = this.inputFormat;
                z = false;
            } else {
                inputFormat = (InputFormat) SourceUtils.cloneObject(this.inputFormat);
            }
            inputFormat.open(next);
            this.splitReaders.add(new SplitReader<>(inputFormat, new ArrayBlockingQueue(SPLIT_PIPE_LEN), next));
        }
        for (SplitReader<OUT> splitReader : this.splitReaders) {
            this.readerPool.submit(splitReader);
            LOG.info(String.format("Start split reader for split[%d][%s]", Integer.valueOf(splitReader.getSplit().getSplitNumber()), splitReader.getSplit().toString()));
        }
    }

    public void close() {
        try {
            if (this.closed) {
                return;
            }
            Iterator<SplitReader<OUT>> it = this.splitReaders.iterator();
            while (it.hasNext()) {
                it.next().stop();
            }
            this.readerPool.shutdown();
            this.readerPool.awaitTermination(STOP_WAITING, TimeUnit.SECONDS);
        } catch (Exception e) {
            LOG.warn(e.toString());
        } finally {
            this.closed = true;
            LOG.info("Stopped all split reader");
        }
    }

    public void run(SourceFunction.SourceContext<OUT> sourceContext) throws Exception {
        while (!this.splitReaders.isEmpty()) {
            boolean z = true;
            Iterator<SplitReader<OUT>> it = this.splitReaders.iterator();
            while (it.hasNext()) {
                SplitReader<OUT> next = it.next();
                if (next.isFailed()) {
                    LOG.error(String.format("SplitReader for split[%d][%s] failed, cause: %s", Integer.valueOf(next.getSplit().getSplitNumber()), next.getSplit().toString(), next.getCause()));
                    throw new RuntimeException(next.getCause());
                }
                if (next.isStopped()) {
                    LOG.info(String.format("SplitReader for split[%d][%s] finished", Integer.valueOf(next.getSplit().getSplitNumber()), next.getSplit().toString()));
                    it.remove();
                } else {
                    OUT poll = next.getSplitPipe().poll();
                    if (poll != null) {
                        sourceContext.collect(poll);
                        z = false;
                    }
                }
            }
            if (z) {
                Thread.sleep(IDLE_INTERVAL);
            }
        }
    }

    public void cancel() {
        close();
    }

    @Override // com.alibaba.blink.streaming.connectors.common.source.InputFormatFunction
    public InputFormat getFormat() {
        return this.inputFormat;
    }

    private Iterator<InputSplit> getInputSplits() {
        return new Iterator<InputSplit>() { // from class: com.alibaba.blink.streaming.connectors.common.source.InputFormatAdapterSource.1
            private InputSplit nextSplit;
            private boolean exhausted;

            @Override // java.util.Iterator
            public boolean hasNext() {
                if (this.exhausted) {
                    return false;
                }
                if (this.nextSplit != null) {
                    return true;
                }
                try {
                    InputSplit nextInputSplit = InputFormatAdapterSource.this.provider.getNextInputSplit(InputFormatAdapterSource.this.getRuntimeContext().getOperatorContext().getOperatorID(), InputFormatAdapterSource.this.getRuntimeContext().getUserCodeClassLoader());
                    if (nextInputSplit != null) {
                        this.nextSplit = nextInputSplit;
                        return true;
                    }
                    this.exhausted = true;
                    return false;
                } catch (InputSplitProviderException e) {
                    throw new RuntimeException("Could not retrieve next input split.", e);
                }
            }

            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.Iterator
            public InputSplit next() {
                if (this.nextSplit == null && !hasNext()) {
                    throw new NoSuchElementException();
                }
                InputSplit inputSplit = this.nextSplit;
                this.nextSplit = null;
                return inputSplit;
            }

            @Override // java.util.Iterator
            public void remove() {
                throw new UnsupportedOperationException();
            }
        };
    }
}
