package org.apache.flink.streaming.runtime.io;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.TreeMap;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.io.network.partition.consumer.InputGateUtil;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.plugable.DeserializationDelegate;
import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.LockAndCondition;
import org.apache.flink.util.LockGetReleaseWrapper;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/runtime/io/StreamArbitraryInputProcessor.class */
public class StreamArbitraryInputProcessor {
    private static final Logger LOG = LoggerFactory.getLogger(StreamArbitraryInputProcessor.class);
    private Map<InputGate, StreamProcessorWrapper<?>> inputGateProcessors = new HashMap();
    private TreeMap<Integer, InputGate> channelIdToGate = new TreeMap<>();
    private volatile boolean isStopped = false;
    private IOManager ioManager;
    private InputGate inputGate;
    private final LockAndCondition lock;
    private final int maxBlockingRequestsInFlight;
    private final TaskMetricGroup taskMetricGroup;

    /* loaded from: input_file:org/apache/flink/streaming/runtime/io/StreamArbitraryInputProcessor$FirstInputProcessor.class */
    public class FirstInputProcessor<IN> extends StreamProcessorWrapper<IN> {
        TwoInputStreamOperator<IN, ?, ?> headOperator;

        public FirstInputProcessor(InputGate inputGate, TypeSerializer<IN> typeSerializer, TwoInputStreamOperator<IN, ?, ?> twoInputStreamOperator) {
            super(inputGate, twoInputStreamOperator, typeSerializer);
            this.headOperator = twoInputStreamOperator;
        }

        @Override // org.apache.flink.streaming.runtime.io.StreamArbitraryInputProcessor.StreamProcessorWrapper
        public void processRecords(StreamRecord<IN> streamRecord) throws Exception {
            this.headOperator.setKeyContextElement1(streamRecord);
            this.headOperator.processElement1(streamRecord);
        }

        @Override // org.apache.flink.streaming.runtime.io.StreamArbitraryInputProcessor.StreamProcessorWrapper
        public void endInput() throws Exception {
            this.headOperator.endInput1();
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/io/StreamArbitraryInputProcessor$OneInputProcessor.class */
    public class OneInputProcessor<IN> extends StreamProcessorWrapper<IN> {
        OneInputStreamOperator<IN, ?> headOperator;

        public OneInputProcessor(InputGate inputGate, OneInputStreamOperator<IN, ?> oneInputStreamOperator) {
            super(inputGate, oneInputStreamOperator);
            this.headOperator = oneInputStreamOperator;
        }

        public OneInputProcessor(InputGate inputGate, TypeSerializer<IN> typeSerializer, OneInputStreamOperator<IN, ?> oneInputStreamOperator) {
            super(inputGate, oneInputStreamOperator, typeSerializer);
            this.headOperator = oneInputStreamOperator;
        }

        @Override // org.apache.flink.streaming.runtime.io.StreamArbitraryInputProcessor.StreamProcessorWrapper
        public void processRecords(StreamRecord<IN> streamRecord) throws Exception {
            this.headOperator.setKeyContextElement1(streamRecord);
            this.headOperator.processElement(streamRecord);
        }

        @Override // org.apache.flink.streaming.runtime.io.StreamArbitraryInputProcessor.StreamProcessorWrapper
        public void endInput() throws Exception {
            this.headOperator.endInput();
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/io/StreamArbitraryInputProcessor$SecondInputProcessor.class */
    public class SecondInputProcessor<IN> extends StreamProcessorWrapper<IN> {
        TwoInputStreamOperator<?, IN, ?> headOperator;

        public SecondInputProcessor(InputGate inputGate, TypeSerializer<IN> typeSerializer, TwoInputStreamOperator<?, IN, ?> twoInputStreamOperator) {
            super(inputGate, twoInputStreamOperator, typeSerializer);
            this.headOperator = twoInputStreamOperator;
        }

        @Override // org.apache.flink.streaming.runtime.io.StreamArbitraryInputProcessor.StreamProcessorWrapper
        public void processRecords(StreamRecord<IN> streamRecord) throws Exception {
            this.headOperator.setKeyContextElement2(streamRecord);
            this.headOperator.processElement2(streamRecord);
        }

        @Override // org.apache.flink.streaming.runtime.io.StreamArbitraryInputProcessor.StreamProcessorWrapper
        public void endInput() throws Exception {
            this.headOperator.endInput2();
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/io/StreamArbitraryInputProcessor$StreamProcessorWrapper.class */
    public abstract class StreamProcessorWrapper<IN> {
        private final RecordDeserializer<DeserializationDelegate<StreamElement>>[] recordDeserializers;
        private final DeserializationDelegate<StreamElement> deserializationDelegate;
        private final InputGate inputGate;
        private final StreamOperator headOperator;
        private Counter numRecordsIn;

        public void processElements(StreamRecord<IN> streamRecord) throws Exception {
            if (this.numRecordsIn == null) {
                this.numRecordsIn = this.headOperator.getMetricGroup().getIOMetricGroup().getNumRecordsInCounter();
            }
            this.numRecordsIn.inc();
            StreamArbitraryInputProcessor.this.taskMetricGroup.getIOMetricGroup().getNumRecordsInCounter().inc();
            processRecords(streamRecord);
        }

        public abstract void processRecords(StreamRecord<IN> streamRecord) throws Exception;

        public abstract void endInput() throws Exception;

        public StreamProcessorWrapper(InputGate inputGate, StreamOperator streamOperator) {
            this.recordDeserializers = null;
            this.deserializationDelegate = null;
            this.inputGate = inputGate;
            this.headOperator = streamOperator;
        }

        public StreamProcessorWrapper(InputGate inputGate, StreamOperator streamOperator, TypeSerializer<IN> typeSerializer) {
            this.inputGate = inputGate;
            this.headOperator = streamOperator;
            this.deserializationDelegate = new NonReusingDeserializationDelegate(new StreamElementSerializer(typeSerializer));
            this.recordDeserializers = new SpillingAdaptiveSpanningRecordDeserializer[inputGate.getNumberOfInputChannels()];
            for (int i = 0; i < this.recordDeserializers.length; i++) {
                this.recordDeserializers[i] = new SpillingAdaptiveSpanningRecordDeserializer(StreamArbitraryInputProcessor.this.ioManager.getSpillingDirectoriesPaths());
            }
        }

        public void processBuffer(BufferOrEvent bufferOrEvent) throws Exception {
            RecordDeserializer.DeserializationResult deserializationResult;
            if (bufferOrEvent.isEvent() && (bufferOrEvent.getEvent() instanceof EndOfPartitionEvent)) {
                StreamArbitraryInputProcessor.LOG.info("receive event:" + bufferOrEvent.getEvent());
                if (this.inputGate.isFinished()) {
                    StreamArbitraryInputProcessor.LOG.info("Input gate {} is finished", this.inputGate);
                    endInput();
                    return;
                }
                return;
            }
            RecordDeserializer<DeserializationDelegate<StreamElement>> recordDeserializer = this.recordDeserializers[bufferOrEvent.getChannelIndex()];
            recordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());
            RecordDeserializer.DeserializationResult nextRecord = recordDeserializer.getNextRecord(this.deserializationDelegate);
            while (true) {
                deserializationResult = nextRecord;
                if (!deserializationResult.isFullRecord() || StreamArbitraryInputProcessor.this.isStopped) {
                    break;
                }
                processElements(((StreamElement) this.deserializationDelegate.getInstance()).asRecord());
                nextRecord = recordDeserializer.getNextRecord(this.deserializationDelegate);
            }
            if (deserializationResult.isBufferConsumed()) {
                recordDeserializer.getCurrentBuffer().recycleBuffer();
            }
        }

        public void cleanup() {
            if (this.recordDeserializers == null) {
                return;
            }
            for (RecordDeserializer<DeserializationDelegate<StreamElement>> recordDeserializer : this.recordDeserializers) {
                Buffer currentBuffer = recordDeserializer.getCurrentBuffer();
                if (currentBuffer != null && !currentBuffer.isRecycled()) {
                    currentBuffer.recycleBuffer();
                }
                recordDeserializer.clear();
            }
        }
    }

    public StreamArbitraryInputProcessor(IOManager iOManager, TaskMetricGroup taskMetricGroup, LockAndCondition lockAndCondition, int i) {
        this.ioManager = (IOManager) Preconditions.checkNotNull(iOManager);
        this.lock = (LockAndCondition) Preconditions.checkNotNull(lockAndCondition);
        this.maxBlockingRequestsInFlight = i;
        this.taskMetricGroup = (TaskMetricGroup) Preconditions.checkNotNull(taskMetricGroup);
    }

    public void process() throws Exception {
        InputGate[] inputGateArr = (InputGate[]) this.inputGateProcessors.keySet().toArray(new InputGate[0]);
        this.inputGate = InputGateUtil.createInputGateWithPriority(this.maxBlockingRequestsInFlight, inputGateArr);
        int i = 0;
        for (InputGate inputGate : inputGateArr) {
            this.channelIdToGate.put(Integer.valueOf(i), inputGate);
            i += inputGate.getNumberOfInputChannels();
        }
        while (!this.isStopped && !this.inputGate.isFinished()) {
            BufferOrEvent nextBufferOrEvent = this.inputGate.getNextBufferOrEvent();
            if (nextBufferOrEvent == null) {
                LOG.info("No data in input gates");
            } else {
                Map.Entry<Integer, InputGate> floorEntry = this.channelIdToGate.floorEntry(Integer.valueOf(nextBufferOrEvent.getChannelIndex()));
                nextBufferOrEvent.setChannelIndex(nextBufferOrEvent.getChannelIndex() - floorEntry.getKey().intValue());
                InputGate value = floorEntry.getValue();
                LockGetReleaseWrapper lockGetReleaseWrapper = new LockGetReleaseWrapper(this.lock.getLock());
                Throwable th = null;
                try {
                    try {
                        this.inputGateProcessors.get(value).processBuffer(nextBufferOrEvent);
                        if (lockGetReleaseWrapper != null) {
                            if (0 != 0) {
                                try {
                                    lockGetReleaseWrapper.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                lockGetReleaseWrapper.close();
                            }
                        }
                    } catch (Throwable th3) {
                        th = th3;
                        throw th3;
                    }
                } catch (Throwable th4) {
                    if (lockGetReleaseWrapper != null) {
                        if (th != null) {
                            try {
                                lockGetReleaseWrapper.close();
                            } catch (Throwable th5) {
                                th.addSuppressed(th5);
                            }
                        } else {
                            lockGetReleaseWrapper.close();
                        }
                    }
                    throw th4;
                }
            }
        }
    }

    public void stop() {
        this.isStopped = true;
    }

    public void cleanup() throws IOException {
        this.inputGateProcessors.values().forEach((v0) -> {
            v0.cleanup();
        });
    }

    public <IN> void bindSourceInputGate(InputGate inputGate, OneInputStreamOperator<IN, ?> oneInputStreamOperator) {
        this.inputGateProcessors.put(inputGate, new OneInputProcessor(inputGate, oneInputStreamOperator));
    }

    public <IN> void bindInputGateToOneInputOperator(InputGate inputGate, OneInputStreamOperator<IN, ?> oneInputStreamOperator, TypeSerializer<IN> typeSerializer) {
        this.inputGateProcessors.put(inputGate, new OneInputProcessor(inputGate, typeSerializer, oneInputStreamOperator));
    }

    public <IN> void bindFirstInputGateToTwoInputOperator(InputGate inputGate, TwoInputStreamOperator<IN, ?, ?> twoInputStreamOperator, TypeSerializer<IN> typeSerializer) {
        this.inputGateProcessors.put(inputGate, new FirstInputProcessor(inputGate, typeSerializer, twoInputStreamOperator));
    }

    public <IN> void bindSecondInputGateToTwoInputOperator(InputGate inputGate, TwoInputStreamOperator<?, IN, ?> twoInputStreamOperator, TypeSerializer<IN> typeSerializer) {
        this.inputGateProcessors.put(inputGate, new SecondInputProcessor(inputGate, typeSerializer, twoInputStreamOperator));
    }
}
