package org.apache.flink.streaming.runtime.io;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
import org.apache.flink.runtime.io.network.api.serialization.SerializerManagerUtility;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.io.network.partition.consumer.InputGateListener;
import org.apache.flink.runtime.plugable.DeserializationDelegate;
import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
import org.apache.flink.runtime.plugable.ReusingDeserializationDelegate;
import org.apache.flink.streaming.runtime.io.InputFetcher;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.InputSelector;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/streaming/runtime/io/InputGateFetcher.class */
class InputGateFetcher<IN> implements InputFetcher, InputGateListener {
    private static final Logger LOG = LoggerFactory.getLogger(InputGateFetcher.class);
    private final InputGate inputGate;
    private final RecordDeserializer<DeserializationDelegate<StreamElement>>[] recordDeserializers;
    private final DeserializationDelegate<StreamElement> deserializationDelegate;
    private final SelectedReadingBarrierHandler barrierHandler;
    private final InputProcessor inputProcessor;
    private final Object checkpointLock;
    private final int basedChannelCount;
    private final InputSelector.InputSelection inputSelection;
    private IN reusedObject;
    private RecordDeserializer<DeserializationDelegate<StreamElement>> currentRecordDeserializer;
    private int currentChannelIndex;
    private boolean isFinished = false;
    private InputFetcher.InputFetcherAvailableListener listener;

    public InputGateFetcher(InputSelector.InputSelection inputSelection, InputGate inputGate, TypeSerializer<IN> typeSerializer, SelectedReadingBarrierHandler selectedReadingBarrierHandler, IOManager iOManager, InputProcessor inputProcessor, Object obj, int i, boolean z, Configuration configuration) {
        this.inputSelection = (InputSelector.InputSelection) Preconditions.checkNotNull(inputSelection);
        this.inputGate = (InputGate) Preconditions.checkNotNull(inputGate);
        this.barrierHandler = (SelectedReadingBarrierHandler) Preconditions.checkNotNull(selectedReadingBarrierHandler);
        this.inputProcessor = (InputProcessor) Preconditions.checkNotNull(inputProcessor);
        this.checkpointLock = Preconditions.checkNotNull(obj);
        this.basedChannelCount = i;
        this.recordDeserializers = new SerializerManagerUtility(configuration).createRecordDeserializers(inputGate.getAllInputChannels(), iOManager.getSpillingDirectoriesPaths());
        if (z) {
            this.reusedObject = (IN) typeSerializer.createInstance();
            this.deserializationDelegate = new ReusingDeserializationDelegate(new StreamElementSerializer(typeSerializer));
        } else {
            this.reusedObject = null;
            this.deserializationDelegate = new NonReusingDeserializationDelegate(new StreamElementSerializer(typeSerializer));
        }
        this.inputGate.registerListener(this);
    }

    @Override // org.apache.flink.streaming.runtime.io.InputFetcher
    public boolean isFinished() {
        return this.isFinished;
    }

    @Override // org.apache.flink.streaming.runtime.io.InputFetcher
    public boolean moreAvailable() {
        return this.currentRecordDeserializer != null || this.inputGate.moreAvailable();
    }

    @Override // org.apache.flink.streaming.runtime.io.InputFetcher
    public void setup() throws Exception {
        this.inputGate.requestPartitions();
    }

    @Override // org.apache.flink.streaming.runtime.io.InputFetcher
    public boolean fetchAndProcess() throws Exception {
        RecordDeserializer.DeserializationResult nextResult;
        do {
            nextResult = getNextResult();
            if (nextResult == null) {
                return false;
            }
        } while (!nextResult.isFullRecord());
        StreamElement streamElement = (StreamElement) this.deserializationDelegate.getInstance();
        if (streamElement.isRecord()) {
            StreamRecord asRecord = streamElement.asRecord();
            this.reusedObject = (IN) asRecord.getValue();
            this.inputProcessor.processRecord(asRecord, this.currentChannelIndex);
            return true;
        }
        if (streamElement.isWatermark()) {
            this.inputProcessor.processWatermark(streamElement.asWatermark(), this.currentChannelIndex);
            return true;
        }
        if (streamElement.isLatencyMarker()) {
            this.inputProcessor.processLatencyMarker(streamElement.asLatencyMarker(), this.currentChannelIndex);
            return true;
        }
        if (!streamElement.isStreamStatus()) {
            throw new RuntimeException("Unknown stream element " + streamElement);
        }
        this.inputProcessor.processStreamStatus(streamElement.asStreamStatus(), this.currentChannelIndex);
        return true;
    }

    RecordDeserializer.DeserializationResult getNextResult() throws Exception {
        if (this.currentRecordDeserializer != null) {
            if (this.reusedObject != null) {
                this.deserializationDelegate.setInstance(new StreamRecord(this.reusedObject));
            }
            RecordDeserializer.DeserializationResult nextRecord = this.currentRecordDeserializer.getNextRecord(this.deserializationDelegate);
            if (nextRecord.isBufferConsumed()) {
                this.currentRecordDeserializer.getCurrentBuffer().recycleBuffer();
                this.currentRecordDeserializer = null;
            }
            return nextRecord;
        }
        if (getNextBufferOrEvent() == null) {
            return null;
        }
        if (this.reusedObject != null) {
            this.deserializationDelegate.setInstance(new StreamRecord(this.reusedObject));
        }
        RecordDeserializer.DeserializationResult nextRecord2 = this.currentRecordDeserializer.getNextRecord(this.deserializationDelegate);
        if (nextRecord2.isBufferConsumed()) {
            this.currentRecordDeserializer.getCurrentBuffer().recycleBuffer();
            this.currentRecordDeserializer = null;
        }
        return nextRecord2;
    }

    @VisibleForTesting
    BufferOrEvent getNextBufferOrEvent() throws Exception {
        do {
            BufferOrEvent pollNext = this.barrierHandler.pollNext(this.inputGate);
            if (pollNext == null) {
                return null;
            }
            if (!pollNext.isEvent()) {
                this.currentChannelIndex = pollNext.getChannelIndex() - this.basedChannelCount;
                this.currentRecordDeserializer = this.recordDeserializers[this.currentChannelIndex];
                this.currentRecordDeserializer.setNextBuffer(pollNext.getBuffer());
                return pollNext;
            }
            Preconditions.checkArgument(pollNext.getEvent() instanceof EndOfPartitionEvent);
            LOG.debug("receive event:" + pollNext.getEvent());
        } while (!this.inputGate.isFinished());
        LOG.info("Input gate {} is finished", this.inputGate);
        synchronized (this.checkpointLock) {
            this.inputProcessor.endInput();
            this.inputProcessor.release();
        }
        this.isFinished = true;
        return null;
    }

    @VisibleForTesting
    RecordDeserializer<DeserializationDelegate<StreamElement>> getCurrentRecordDeserializer() {
        return this.currentRecordDeserializer;
    }

    @VisibleForTesting
    void setCurrentRecordDeserializer(RecordDeserializer<DeserializationDelegate<StreamElement>> recordDeserializer) {
        this.currentRecordDeserializer = recordDeserializer;
    }

    @VisibleForTesting
    int getCurrentChannelIndex() {
        return this.currentChannelIndex;
    }

    @VisibleForTesting
    DeserializationDelegate<StreamElement> getDeserializationDelegate() {
        return this.deserializationDelegate;
    }

    @VisibleForTesting
    IN getReusedObject() {
        return this.reusedObject;
    }

    public InputGate getInputGate() {
        return this.inputGate;
    }

    @Override // org.apache.flink.streaming.runtime.io.InputFetcher
    public void cleanup() {
        for (RecordDeserializer<DeserializationDelegate<StreamElement>> recordDeserializer : this.recordDeserializers) {
            Buffer currentBuffer = recordDeserializer.getCurrentBuffer();
            if (currentBuffer != null && !currentBuffer.isRecycled()) {
                currentBuffer.recycleBuffer();
            }
            recordDeserializer.clear();
        }
    }

    @Override // org.apache.flink.streaming.runtime.io.InputFetcher
    public void cancel() {
    }

    @Override // org.apache.flink.streaming.runtime.io.InputFetcher
    public InputSelector.InputSelection getInputSelection() {
        return this.inputSelection;
    }

    @Override // org.apache.flink.streaming.runtime.io.InputFetcher
    public void registerAvailableListener(InputFetcher.InputFetcherAvailableListener inputFetcherAvailableListener) {
        Preconditions.checkState(this.listener == null);
        this.listener = inputFetcherAvailableListener;
    }

    public void notifyInputGateNonEmpty(InputGate inputGate) {
        Preconditions.checkArgument(this.inputGate == inputGate);
        if (this.listener != null) {
            this.listener.notifyInputFetcherAvailable(this);
        }
    }
}
