package org.apache.flink.streaming.api.functions.source;

import java.io.IOException;
import java.util.Iterator;
import java.util.NoSuchElementException;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.io.RichInputFormat;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionV2.class */
public class InputFormatSourceFunctionV2<OUT> extends RichParallelSourceFunctionV2<OUT> {
    private static final Logger LOG = LoggerFactory.getLogger(InputFormatSourceFunctionV2.class);
    private static final long serialVersionUID = 1;
    private TypeInformation<OUT> typeInfo;
    private transient TypeSerializer<OUT> serializer;
    private InputFormat<OUT, InputSplit> format;
    private transient StreamingRuntimeContext context;
    private transient InputSplitProvider provider;
    private transient Iterator<InputSplit> splitIterator;
    private transient Counter completedSplitsCounter;
    private transient OUT reusableElement;
    private transient SourceRecord<OUT> sourceRecord;
    private transient boolean isObjectReuse;
    private transient boolean hasMoreData;
    private transient long afterOpen;

    public InputFormatSourceFunctionV2(InputFormat<OUT, ?> inputFormat, TypeInformation<OUT> typeInformation) {
        this.format = inputFormat;
        this.typeInfo = typeInformation;
    }

    public void open(Configuration configuration) throws Exception {
        this.context = getRuntimeContext();
        if (this.format instanceof RichInputFormat) {
            this.format.setRuntimeContext(this.context);
        }
        this.format.configure(configuration);
        this.provider = this.context.getInputSplitProvider();
        this.serializer = this.typeInfo.createSerializer(getRuntimeContext().getExecutionConfig());
        this.splitIterator = getInputSplits();
        this.completedSplitsCounter = getRuntimeContext().getMetricGroup().counter("numSplitsProcessed");
        if (this.format instanceof RichInputFormat) {
            this.format.openInputFormat();
        }
        long currentTimeMillis = System.currentTimeMillis();
        this.hasMoreData = this.splitIterator.hasNext();
        LOG.info("get input split from splitProvider, elapsed time: " + (System.currentTimeMillis() - currentTimeMillis));
        if (this.hasMoreData) {
            long currentTimeMillis2 = System.currentTimeMillis();
            this.format.open(this.splitIterator.next());
            this.afterOpen = System.currentTimeMillis();
            LOG.info("open the format, elapsed time: " + (this.afterOpen - currentTimeMillis2));
        }
        this.isObjectReuse = getRuntimeContext().getExecutionConfig().isObjectReuseEnabled();
        this.sourceRecord = new SourceRecord<>();
        this.reusableElement = (OUT) this.serializer.createInstance();
    }

    @Override // org.apache.flink.streaming.api.functions.source.SourceFunctionV2
    public boolean isFinished() {
        return !this.hasMoreData;
    }

    @Override // org.apache.flink.streaming.api.functions.source.SourceFunctionV2
    public SourceRecord<OUT> next() throws Exception {
        while (this.hasMoreData) {
            if (this.format.reachedEnd()) {
                requestNextSplit();
            } else {
                OUT out = (OUT) this.format.nextRecord(this.reusableElement);
                if (out != null) {
                    this.reusableElement = out;
                    this.sourceRecord.setRecord(out);
                    return this.sourceRecord;
                }
                this.completedSplitsCounter.inc();
                requestNextSplit();
            }
        }
        return null;
    }

    @Override // org.apache.flink.streaming.api.functions.source.SourceFunctionV2
    public void cancel() {
    }

    private void requestNextSplit() throws IOException {
        long currentTimeMillis = System.currentTimeMillis();
        LOG.info("do a split, cost: " + (currentTimeMillis - this.afterOpen));
        this.format.close();
        long currentTimeMillis2 = System.currentTimeMillis();
        LOG.info("close a split, cost: " + (currentTimeMillis2 - currentTimeMillis));
        if (!this.splitIterator.hasNext()) {
            LOG.info("get input split from splitProvider, elapsed time: " + (System.currentTimeMillis() - currentTimeMillis2));
            this.hasMoreData = false;
        } else {
            long currentTimeMillis3 = System.currentTimeMillis();
            LOG.info("get input split from splitProvider, elapsed time: " + (currentTimeMillis3 - currentTimeMillis2));
            this.format.open(this.splitIterator.next());
            this.afterOpen = System.currentTimeMillis();
            LOG.info("open the format, elapsed time: " + (this.afterOpen - currentTimeMillis3));
        }
    }

    public void close() throws Exception {
        if (this.format != null) {
            this.format.close();
            if (this.format instanceof RichInputFormat) {
                this.format.closeInputFormat();
            }
            this.format = null;
        }
    }

    public InputFormat<OUT, InputSplit> getFormat() {
        return this.format;
    }

    private Iterator<InputSplit> getInputSplits() {
        return new Iterator<InputSplit>() { // from class: org.apache.flink.streaming.api.functions.source.InputFormatSourceFunctionV2.1
            private InputSplit nextSplit;
            private boolean exhausted;

            @Override // java.util.Iterator
            public boolean hasNext() {
                if (this.exhausted) {
                    return false;
                }
                if (this.nextSplit != null) {
                    return true;
                }
                try {
                    InputSplit nextInputSplit = InputFormatSourceFunctionV2.this.provider.getNextInputSplit(InputFormatSourceFunctionV2.this.context.getOperatorID(), InputFormatSourceFunctionV2.this.context.getUserCodeClassLoader());
                    if (nextInputSplit != null) {
                        this.nextSplit = nextInputSplit;
                        return true;
                    }
                    this.exhausted = true;
                    return false;
                } catch (InputSplitProviderException e) {
                    throw new RuntimeException("Could not retrieve next input split.", e);
                }
            }

            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.Iterator
            public InputSplit next() {
                if (this.nextSplit == null && !hasNext()) {
                    throw new NoSuchElementException();
                }
                InputSplit inputSplit = this.nextSplit;
                this.nextSplit = null;
                return inputSplit;
            }

            @Override // java.util.Iterator
            public void remove() {
                throw new UnsupportedOperationException();
            }
        };
    }
}
