package org.apache.flink.streaming.api.functions.source;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.io.CheckpointableInputFormat;
import org.apache.flink.api.common.io.FileInputFormat;
import org.apache.flink.api.common.state2.ListState;
import org.apache.flink.api.common.state2.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Meter;
import org.apache.flink.metrics.MeterView;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
import org.apache.flink.streaming.api.operators.StreamSourceContexts;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.LockAndCondition;
import org.apache.flink.util.LockGetReleaseWrapper;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.class */
public class ContinuousFileReaderOperator<OUT> extends AbstractStreamOperator<OUT> implements OneInputStreamOperator<TimestampedFileInputSplit, OUT>, OutputTypeConfigurable<OUT> {
    private static final long serialVersionUID = 1;
    private static final Logger LOG;
    private FileInputFormat<OUT> format;
    private TypeSerializer<OUT> serializer;
    private transient LockAndCondition checkpointLock;
    private transient ContinuousFileReaderOperator<OUT>.SplitReader<OUT> reader;
    private transient SourceFunction.SourceContext<OUT> readerContext;
    private transient ListState<TimestampedFileInputSplit> inputSplitState;
    private transient Meter parserTpsMetrics;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator$SplitReader.class */
    private class SplitReader<OT> extends Thread {
        private volatile boolean shouldClose;
        private volatile boolean isRunning;
        private final FileInputFormat<OT> format;
        private final TypeSerializer<OT> serializer;
        private final LockAndCondition checkpointLock;
        private final SourceFunction.SourceContext<OT> readerContext;
        private final Queue<TimestampedFileInputSplit> pendingSplits;
        private TimestampedFileInputSplit currentSplit;
        private volatile boolean isSplitOpen;

        private SplitReader(FileInputFormat<OT> fileInputFormat, TypeSerializer<OT> typeSerializer, SourceFunction.SourceContext<OT> sourceContext, LockAndCondition lockAndCondition, List<TimestampedFileInputSplit> list) {
            this.format = (FileInputFormat) Preconditions.checkNotNull(fileInputFormat, "Unspecified FileInputFormat.");
            this.serializer = (TypeSerializer) Preconditions.checkNotNull(typeSerializer, "Unspecified Serializer.");
            this.readerContext = (SourceFunction.SourceContext) Preconditions.checkNotNull(sourceContext, "Unspecified Reader Context.");
            this.checkpointLock = (LockAndCondition) Preconditions.checkNotNull(lockAndCondition, "Unspecified checkpoint lock.");
            this.shouldClose = false;
            this.isRunning = true;
            this.pendingSplits = new PriorityQueue();
            if (list != null) {
                this.pendingSplits.addAll(list);
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void addSplit(TimestampedFileInputSplit timestampedFileInputSplit) {
            Preconditions.checkNotNull(timestampedFileInputSplit, "Cannot insert a null value in the pending splits queue.");
            LockGetReleaseWrapper lockGetReleaseWrapper = new LockGetReleaseWrapper(this.checkpointLock.getLock());
            Throwable th = null;
            try {
                try {
                    this.pendingSplits.add(timestampedFileInputSplit);
                    if (lockGetReleaseWrapper != null) {
                        if (0 == 0) {
                            lockGetReleaseWrapper.close();
                            return;
                        }
                        try {
                            lockGetReleaseWrapper.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                } catch (Throwable th3) {
                    th = th3;
                    throw th3;
                }
            } catch (Throwable th4) {
                if (lockGetReleaseWrapper != null) {
                    if (th != null) {
                        try {
                            lockGetReleaseWrapper.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    } else {
                        lockGetReleaseWrapper.close();
                    }
                }
                throw th4;
            }
        }

        public boolean isRunning() {
            return this.isRunning;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            LockGetReleaseWrapper lockGetReleaseWrapper;
            Throwable th;
            LockGetReleaseWrapper lockGetReleaseWrapper2;
            Throwable th2;
            try {
                try {
                    Counter counter = ContinuousFileReaderOperator.this.getMetricGroup().counter("numSplitsProcessed");
                    this.format.openInputFormat();
                    while (this.isRunning) {
                        LockGetReleaseWrapper lockGetReleaseWrapper3 = new LockGetReleaseWrapper(this.checkpointLock.getLock());
                        Throwable th3 = null;
                        try {
                            try {
                                try {
                                    try {
                                        try {
                                            if (this.currentSplit == null) {
                                                this.currentSplit = this.pendingSplits.poll();
                                                if (this.currentSplit == null) {
                                                    if (this.shouldClose) {
                                                        this.isRunning = false;
                                                    } else {
                                                        this.checkpointLock.getCondition().await(50L, TimeUnit.MILLISECONDS);
                                                    }
                                                    if (lockGetReleaseWrapper3 != null) {
                                                        if (0 != 0) {
                                                            try {
                                                                lockGetReleaseWrapper3.close();
                                                            } catch (Throwable th4) {
                                                                th3.addSuppressed(th4);
                                                            }
                                                        } else {
                                                            lockGetReleaseWrapper3.close();
                                                        }
                                                    }
                                                }
                                            }
                                            this.format.close();
                                            this.isSplitOpen = false;
                                            this.currentSplit = null;
                                            if (lockGetReleaseWrapper2 != null) {
                                                if (0 != 0) {
                                                    try {
                                                        lockGetReleaseWrapper2.close();
                                                    } catch (Throwable th5) {
                                                        th2.addSuppressed(th5);
                                                    }
                                                } else {
                                                    lockGetReleaseWrapper2.close();
                                                }
                                            }
                                        } catch (Throwable th6) {
                                            th2 = th6;
                                            throw th6;
                                        }
                                    } finally {
                                    }
                                    Object createInstance = this.serializer.createInstance();
                                    while (true) {
                                        if (this.format.reachedEnd()) {
                                            break;
                                        }
                                        LockGetReleaseWrapper lockGetReleaseWrapper4 = new LockGetReleaseWrapper(this.checkpointLock.getLock());
                                        Throwable th7 = null;
                                        try {
                                            try {
                                                createInstance = this.format.nextRecord(createInstance);
                                                if (createInstance != null) {
                                                    this.readerContext.collect(createInstance);
                                                    if (ContinuousFileReaderOperator.this.parserTpsMetrics != null) {
                                                        ContinuousFileReaderOperator.this.parserTpsMetrics.markEvent();
                                                    }
                                                    if (lockGetReleaseWrapper4 != null) {
                                                        if (0 != 0) {
                                                            try {
                                                                lockGetReleaseWrapper4.close();
                                                            } catch (Throwable th8) {
                                                                th7.addSuppressed(th8);
                                                            }
                                                        } else {
                                                            lockGetReleaseWrapper4.close();
                                                        }
                                                    }
                                                } else if (lockGetReleaseWrapper4 != null) {
                                                    if (0 != 0) {
                                                        try {
                                                            lockGetReleaseWrapper4.close();
                                                        } catch (Throwable th9) {
                                                            th7.addSuppressed(th9);
                                                        }
                                                    } else {
                                                        lockGetReleaseWrapper4.close();
                                                    }
                                                }
                                            } catch (Throwable th10) {
                                                throw th10;
                                            }
                                        } catch (Throwable th11) {
                                            th7 = th11;
                                            throw th11;
                                        }
                                    }
                                    counter.inc();
                                    lockGetReleaseWrapper2 = new LockGetReleaseWrapper(this.checkpointLock.getLock());
                                    th2 = null;
                                } catch (Throwable th12) {
                                    LockGetReleaseWrapper lockGetReleaseWrapper5 = new LockGetReleaseWrapper(this.checkpointLock.getLock());
                                    Throwable th13 = null;
                                    try {
                                        try {
                                            this.format.close();
                                            this.isSplitOpen = false;
                                            this.currentSplit = null;
                                            if (lockGetReleaseWrapper5 != null) {
                                                if (0 != 0) {
                                                    try {
                                                        lockGetReleaseWrapper5.close();
                                                    } catch (Throwable th14) {
                                                        th13.addSuppressed(th14);
                                                    }
                                                } else {
                                                    lockGetReleaseWrapper5.close();
                                                }
                                            }
                                            throw th12;
                                        } catch (Throwable th15) {
                                            th13 = th15;
                                            throw th15;
                                        }
                                    } finally {
                                        if (lockGetReleaseWrapper5 != null) {
                                            if (th13 != null) {
                                                try {
                                                    lockGetReleaseWrapper5.close();
                                                } catch (Throwable th16) {
                                                    th13.addSuppressed(th16);
                                                }
                                            } else {
                                                lockGetReleaseWrapper5.close();
                                            }
                                        }
                                    }
                                }
                                if (!(this.format instanceof CheckpointableInputFormat) || this.currentSplit.getSplitState() == null) {
                                    this.format.open(this.currentSplit);
                                } else {
                                    this.format.reopen(this.currentSplit, this.currentSplit.getSplitState());
                                }
                                this.currentSplit.resetSplitState();
                                this.isSplitOpen = true;
                                if (lockGetReleaseWrapper3 != null) {
                                    if (0 != 0) {
                                        try {
                                            lockGetReleaseWrapper3.close();
                                        } catch (Throwable th17) {
                                            th3.addSuppressed(th17);
                                        }
                                    } else {
                                        lockGetReleaseWrapper3.close();
                                    }
                                }
                                ContinuousFileReaderOperator.LOG.debug("Reading split: " + this.currentSplit);
                            } finally {
                                if (lockGetReleaseWrapper3 != null) {
                                    if (th3 != null) {
                                        try {
                                            lockGetReleaseWrapper3.close();
                                        } catch (Throwable th18) {
                                            th3.addSuppressed(th18);
                                        }
                                    } else {
                                        lockGetReleaseWrapper3.close();
                                    }
                                }
                            }
                        } catch (Throwable th19) {
                            th3 = th19;
                            throw th19;
                        }
                    }
                    lockGetReleaseWrapper = new LockGetReleaseWrapper(this.checkpointLock.getLock());
                    th = null;
                } catch (Throwable th20) {
                    LockGetReleaseWrapper lockGetReleaseWrapper6 = new LockGetReleaseWrapper(this.checkpointLock.getLock());
                    Throwable th21 = null;
                    try {
                        try {
                            ContinuousFileReaderOperator.LOG.debug("Reader terminated, and exiting...");
                            try {
                                this.format.closeInputFormat();
                            } catch (IOException e) {
                                ContinuousFileReaderOperator.this.getContainingTask().handleAsyncException("Caught exception from " + this.format.getClass().getName() + ".closeInputFormat() : " + e.getMessage(), e);
                            }
                            this.isSplitOpen = false;
                            this.currentSplit = null;
                            this.isRunning = false;
                            this.checkpointLock.getCondition().signalAll();
                            if (lockGetReleaseWrapper6 != null) {
                                if (0 != 0) {
                                    try {
                                        lockGetReleaseWrapper6.close();
                                    } catch (Throwable th22) {
                                        th21.addSuppressed(th22);
                                    }
                                } else {
                                    lockGetReleaseWrapper6.close();
                                }
                            }
                            throw th20;
                        } catch (Throwable th23) {
                            th21 = th23;
                            throw th23;
                        }
                    } finally {
                    }
                }
                try {
                    try {
                        ContinuousFileReaderOperator.LOG.debug("Reader terminated, and exiting...");
                        try {
                            this.format.closeInputFormat();
                        } catch (IOException e2) {
                            ContinuousFileReaderOperator.this.getContainingTask().handleAsyncException("Caught exception from " + this.format.getClass().getName() + ".closeInputFormat() : " + e2.getMessage(), e2);
                        }
                        this.isSplitOpen = false;
                        this.currentSplit = null;
                        this.isRunning = false;
                        this.checkpointLock.getCondition().signalAll();
                        if (lockGetReleaseWrapper != null) {
                            if (0 == 0) {
                                lockGetReleaseWrapper.close();
                                return;
                            }
                            try {
                                lockGetReleaseWrapper.close();
                            } catch (Throwable th24) {
                                th.addSuppressed(th24);
                            }
                        }
                    } catch (Throwable th25) {
                        th = th25;
                        throw th25;
                    }
                } finally {
                }
            } catch (Throwable th26) {
                ContinuousFileReaderOperator.this.getContainingTask().handleAsyncException("Caught exception when processing split: " + this.currentSplit, th26);
                LockGetReleaseWrapper lockGetReleaseWrapper7 = new LockGetReleaseWrapper(this.checkpointLock.getLock());
                Throwable th27 = null;
                try {
                    ContinuousFileReaderOperator.LOG.debug("Reader terminated, and exiting...");
                    try {
                        this.format.closeInputFormat();
                    } catch (IOException e3) {
                        ContinuousFileReaderOperator.this.getContainingTask().handleAsyncException("Caught exception from " + this.format.getClass().getName() + ".closeInputFormat() : " + e3.getMessage(), e3);
                    }
                    this.isSplitOpen = false;
                    this.currentSplit = null;
                    this.isRunning = false;
                    this.checkpointLock.getCondition().signalAll();
                    if (lockGetReleaseWrapper7 != null) {
                        if (0 == 0) {
                            lockGetReleaseWrapper7.close();
                            return;
                        }
                        try {
                            lockGetReleaseWrapper7.close();
                        } catch (Throwable th28) {
                            th27.addSuppressed(th28);
                        }
                    }
                } catch (Throwable th29) {
                    if (lockGetReleaseWrapper7 != null) {
                        if (0 != 0) {
                            try {
                                lockGetReleaseWrapper7.close();
                            } catch (Throwable th30) {
                                th27.addSuppressed(th30);
                            }
                        } else {
                            lockGetReleaseWrapper7.close();
                        }
                    }
                    throw th29;
                }
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public List<TimestampedFileInputSplit> getReaderState() throws IOException {
            ArrayList arrayList = new ArrayList(this.pendingSplits.size());
            if (this.currentSplit != null) {
                if ((this.format instanceof CheckpointableInputFormat) && this.isSplitOpen) {
                    this.currentSplit.setSplitState(this.format.getCurrentState());
                }
                arrayList.add(this.currentSplit);
            }
            arrayList.addAll(this.pendingSplits);
            return arrayList;
        }

        public void cancel() {
            this.isRunning = false;
        }

        public void close() {
            this.shouldClose = true;
        }
    }

    public ContinuousFileReaderOperator(FileInputFormat<OUT> fileInputFormat) {
        this.format = (FileInputFormat) Preconditions.checkNotNull(fileInputFormat);
    }

    @Override // org.apache.flink.streaming.api.operators.OutputTypeConfigurable
    public void setOutputType(TypeInformation<OUT> typeInformation, ExecutionConfig executionConfig) {
        this.serializer = typeInformation.createSerializer(executionConfig);
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public void open() throws Exception {
        super.open();
        Preconditions.checkState(this.reader == null, "The reader is already initialized.");
        Preconditions.checkState(this.serializer != null, "The serializer has not been set. Probably the setOutputType() was not called. Please report it.");
        this.format.setRuntimeContext(getRuntimeContext());
        this.format.configure(new Configuration());
        this.checkpointLock = getContainingTask().getCheckpointLock();
        this.readerContext = StreamSourceContexts.getSourceContext(getContainingTask().getConfiguration().getTimeCharacteristic(), getProcessingTimeService(), this.checkpointLock, getContainingTask().getStreamStatusMaintainer(), this.output, getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval(), -1L);
        this.parserTpsMetrics = getRuntimeContext().getMetricGroup().meter("parserTps", new MeterView(getRuntimeContext().getMetricGroup().counter("parserTps_counter", new SimpleCounter()), 30));
        this.inputSplitState = getRuntimeContext().getPartitionedState(new ListStateDescriptor("input-splits", TimestampedFileInputSplit.class));
        List list = (List) this.inputSplitState.get();
        LOG.info("Starting reader with restored splits: {}.", list);
        this.reader = new SplitReader<>(this.format, this.serializer, this.readerContext, this.checkpointLock, list);
        this.reader.start();
    }

    @Override // org.apache.flink.streaming.api.operators.OneInputStreamOperator
    public void processElement(StreamRecord<TimestampedFileInputSplit> streamRecord) throws Exception {
        this.reader.addSplit(streamRecord.getValue());
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.OneInputStreamOperator
    public void processWatermark(Watermark watermark) throws Exception {
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public void dispose() throws Exception {
        super.dispose();
        this.reader.cancel();
        try {
            this.reader.join(200L);
        } catch (InterruptedException e) {
        }
        while (this.reader.isAlive()) {
            StringBuilder sb = new StringBuilder();
            for (StackTraceElement stackTraceElement : this.reader.getStackTrace()) {
                sb.append(stackTraceElement).append('\n');
            }
            LOG.warn("The reader is stuck in method:\n {}", sb.toString());
            this.reader.interrupt();
            try {
                this.reader.join(50L);
            } catch (InterruptedException e2) {
            }
        }
        this.reader = null;
        this.readerContext = null;
        this.format = null;
        this.serializer = null;
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public void close() throws Exception {
        super.close();
        if (!$assertionsDisabled && !this.checkpointLock.getLock().isHeldByCurrentThread()) {
            throw new AssertionError();
        }
        while (this.reader != null && this.reader.isAlive() && this.reader.isRunning()) {
            this.reader.close();
            this.checkpointLock.getCondition().await();
        }
        if (this.readerContext != null) {
            this.readerContext.emitWatermark(Watermark.MAX_WATERMARK);
            this.readerContext.close();
        }
        this.output.close();
    }

    @Override // org.apache.flink.streaming.api.operators.OneInputStreamOperator
    public void endInput() throws Exception {
        if (!$assertionsDisabled && !this.checkpointLock.getLock().isHeldByCurrentThread()) {
            throw new AssertionError();
        }
        while (this.reader != null && this.reader.isAlive() && this.reader.isRunning()) {
            this.reader.close();
            this.checkpointLock.getCondition().await();
        }
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator
    public void preSnapshot(long j) throws Exception {
        List readerState = this.reader.getReaderState();
        try {
            this.inputSplitState.clear();
            this.inputSplitState.addAll(readerState);
        } catch (Exception e) {
            throw new Exception("Could not add timestamped file input splits to to operator state backend of operator " + getOperatorName() + '.', e);
        }
    }

    static {
        $assertionsDisabled = !ContinuousFileReaderOperator.class.desiredAssertionStatus();
        LOG = LoggerFactory.getLogger(ContinuousFileReaderOperator.class);
    }
}
