/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.io;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
import org.apache.flink.runtime.io.network.api.serialization.SerializerManagerUtility;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.io.network.partition.consumer.InputGateListener;
import org.apache.flink.runtime.plugable.DeserializationDelegate;
import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
import org.apache.flink.streaming.runtime.io.InputFetcher;
import org.apache.flink.streaming.runtime.io.InputProcessor;
import org.apache.flink.streaming.runtime.io.SelectedReadingBarrierHandler;
import org.apache.flink.streaming.runtime.streamrecord.ReusingRecordValueDeserializationDelegate;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
import org.apache.flink.streaming.runtime.tasks.InputSelector;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class InputGateFetcher<IN>
implements InputFetcher,
InputGateListener {
    private static final Logger LOG = LoggerFactory.getLogger(InputGateFetcher.class);
    private final InputGate inputGate;
    private final RecordDeserializer<DeserializationDelegate<StreamElement>>[] recordDeserializers;
    private final DeserializationDelegate<StreamElement> deserializationDelegate;
    private final SelectedReadingBarrierHandler barrierHandler;
    private final InputProcessor inputProcessor;
    private final Object checkpointLock;
    private final int basedChannelCount;
    private final InputSelector.InputSelection inputSelection;
    private RecordDeserializer<DeserializationDelegate<StreamElement>> currentRecordDeserializer;
    private int currentChannelIndex;
    private boolean isFinished = false;
    private InputFetcher.InputFetcherAvailableListener listener;

    public InputGateFetcher(InputSelector.InputSelection inputSelection, InputGate inputGate, TypeSerializer<IN> serializer, SelectedReadingBarrierHandler barrierHandler, IOManager ioManager, InputProcessor inputProcessor, Object checkpointLock, int basedChannelCount, boolean objectReuse, Configuration taskManagerConfig) {
        this.inputSelection = (InputSelector.InputSelection)Preconditions.checkNotNull((Object)inputSelection);
        this.inputGate = (InputGate)Preconditions.checkNotNull((Object)inputGate);
        this.barrierHandler = (SelectedReadingBarrierHandler)Preconditions.checkNotNull((Object)barrierHandler);
        this.inputProcessor = (InputProcessor)Preconditions.checkNotNull((Object)inputProcessor);
        this.checkpointLock = Preconditions.checkNotNull((Object)checkpointLock);
        this.basedChannelCount = basedChannelCount;
        SerializerManagerUtility serializerManagerUtility = new SerializerManagerUtility(taskManagerConfig);
        this.recordDeserializers = serializerManagerUtility.createRecordDeserializers(inputGate.getAllInputChannels(), ioManager.getSpillingDirectoriesPaths());
        this.deserializationDelegate = objectReuse ? new ReusingRecordValueDeserializationDelegate<IN>(new StreamElementSerializer<IN>(serializer)) : new NonReusingDeserializationDelegate(new StreamElementSerializer<IN>(serializer));
        this.inputGate.registerListener((InputGateListener)this);
    }

    @Override
    public boolean isFinished() {
        return this.isFinished;
    }

    @Override
    public boolean moreAvailable() {
        return this.currentRecordDeserializer != null || this.inputGate.moreAvailable();
    }

    @Override
    public void setup() throws Exception {
        this.inputGate.requestPartitions();
    }

    @Override
    public boolean fetchAndProcess() throws Exception {
        RecordDeserializer.DeserializationResult result;
        do {
            if ((result = this.getNextResult()) != null) continue;
            return false;
        } while (!result.isFullRecord());
        StreamElement streamElement = (StreamElement)this.deserializationDelegate.getInstance();
        if (streamElement.isRecord()) {
            this.inputProcessor.processRecord(streamElement.asRecord(), this.currentChannelIndex);
        } else if (streamElement.isWatermark()) {
            this.inputProcessor.processWatermark(streamElement.asWatermark(), this.currentChannelIndex);
        } else if (streamElement.isLatencyMarker()) {
            this.inputProcessor.processLatencyMarker(streamElement.asLatencyMarker(), this.currentChannelIndex);
        } else if (streamElement.isStreamStatus()) {
            this.inputProcessor.processStreamStatus(streamElement.asStreamStatus(), this.currentChannelIndex);
        } else {
            throw new RuntimeException("Unknown stream element " + streamElement);
        }
        return true;
    }

    RecordDeserializer.DeserializationResult getNextResult() throws Exception {
        if (this.currentRecordDeserializer != null) {
            RecordDeserializer.DeserializationResult result = this.currentRecordDeserializer.getNextRecord(this.deserializationDelegate);
            if (result.isBufferConsumed()) {
                this.currentRecordDeserializer.getCurrentBuffer().recycleBuffer();
                this.currentRecordDeserializer = null;
            }
            return result;
        }
        BufferOrEvent bufferOrEvent = this.getNextBufferOrEvent();
        if (bufferOrEvent != null) {
            RecordDeserializer.DeserializationResult result = this.currentRecordDeserializer.getNextRecord(this.deserializationDelegate);
            if (result.isBufferConsumed()) {
                this.currentRecordDeserializer.getCurrentBuffer().recycleBuffer();
                this.currentRecordDeserializer = null;
            }
            return result;
        }
        return null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    BufferOrEvent getNextBufferOrEvent() throws Exception {
        BufferOrEvent bufferOrEvent;
        while ((bufferOrEvent = this.barrierHandler.pollNext(this.inputGate)) != null) {
            if (bufferOrEvent.isEvent()) {
                Preconditions.checkArgument((boolean)(bufferOrEvent.getEvent() instanceof EndOfPartitionEvent));
                LOG.debug("receive event:" + bufferOrEvent.getEvent());
                if (!this.inputGate.isFinished()) continue;
                LOG.info("Input gate {} is finished", (Object)this.inputGate);
                Object object = this.checkpointLock;
                synchronized (object) {
                    this.inputProcessor.endInput();
                    this.inputProcessor.release();
                }
                this.isFinished = true;
                return null;
            }
            this.currentChannelIndex = bufferOrEvent.getChannelIndex() - this.basedChannelCount;
            this.currentRecordDeserializer = this.recordDeserializers[this.currentChannelIndex];
            this.currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());
            return bufferOrEvent;
        }
        return null;
    }

    @VisibleForTesting
    RecordDeserializer<DeserializationDelegate<StreamElement>> getCurrentRecordDeserializer() {
        return this.currentRecordDeserializer;
    }

    @VisibleForTesting
    void setCurrentRecordDeserializer(RecordDeserializer<DeserializationDelegate<StreamElement>> recordDeserializer) {
        this.currentRecordDeserializer = recordDeserializer;
    }

    @VisibleForTesting
    int getCurrentChannelIndex() {
        return this.currentChannelIndex;
    }

    @VisibleForTesting
    DeserializationDelegate<StreamElement> getDeserializationDelegate() {
        return this.deserializationDelegate;
    }

    public InputGate getInputGate() {
        return this.inputGate;
    }

    @Override
    public void cleanup() {
        for (RecordDeserializer<DeserializationDelegate<StreamElement>> deserializer : this.recordDeserializers) {
            Buffer buffer = deserializer.getCurrentBuffer();
            if (buffer != null && !buffer.isRecycled()) {
                buffer.recycleBuffer();
            }
            deserializer.clear();
        }
    }

    @Override
    public void cancel() {
    }

    @Override
    public InputSelector.InputSelection getInputSelection() {
        return this.inputSelection;
    }

    @Override
    public void registerAvailableListener(InputFetcher.InputFetcherAvailableListener listener) {
        Preconditions.checkState((this.listener == null ? 1 : 0) != 0);
        this.listener = listener;
    }

    public void notifyInputGateNonEmpty(InputGate inputGate) {
        Preconditions.checkArgument((this.inputGate == inputGate ? 1 : 0) != 0);
        if (this.listener != null) {
            this.listener.notifyInputFetcherAvailable(this);
        }
    }
}

