package org.apache.flink.runtime.io.network.partition.consumer;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
import org.apache.flink.shaded.guava18.com.google.common.collect.Sets;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.class */
public class UnionInputGate implements InputGate, InputGateListener {
    private final InputGate[] inputGates;
    private final Set<InputGate> inputGatesWithRemainingData;
    private final int totalNumberOfInputChannels;
    private final Map<InputGate, Integer> inputGateToIndexOffsetMap;
    private boolean requestedPartitionsFlag;
    private final ArrayDeque<InputGate> inputGatesWithData = new ArrayDeque<>();
    private final Set<InputGate> enqueuedInputGatesWithData = new HashSet();
    private final List<InputGateListener> inputGateListeners = new ArrayList();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate$InputGateWithData.class */
    public static class InputGateWithData {
        private final InputGate inputGate;
        private final BufferOrEvent bufferOrEvent;
        private final boolean moreInputGatesAvailable;

        InputGateWithData(InputGate inputGate, BufferOrEvent bufferOrEvent, boolean z) {
            this.inputGate = (InputGate) Preconditions.checkNotNull(inputGate);
            this.bufferOrEvent = (BufferOrEvent) Preconditions.checkNotNull(bufferOrEvent);
            this.moreInputGatesAvailable = z;
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    public UnionInputGate(InputGate... inputGateArr) {
        this.inputGates = (InputGate[]) Preconditions.checkNotNull(inputGateArr);
        Preconditions.checkArgument(inputGateArr.length > 1, "Union input gate should union at least two input gates.");
        this.inputGateToIndexOffsetMap = Maps.newHashMapWithExpectedSize(inputGateArr.length);
        this.inputGatesWithRemainingData = Sets.newHashSetWithExpectedSize(inputGateArr.length);
        int i = 0;
        for (InputGate inputGate : inputGateArr) {
            this.inputGateToIndexOffsetMap.put(Preconditions.checkNotNull(inputGate), Integer.valueOf(i));
            this.inputGatesWithRemainingData.add(inputGate);
            i += inputGate.getNumberOfInputChannels();
            inputGate.registerListener(this);
        }
        this.totalNumberOfInputChannels = i;
    }

    @Override // org.apache.flink.runtime.io.network.partition.consumer.InputGate
    public int getNumberOfInputChannels() {
        return this.totalNumberOfInputChannels;
    }

    @Override // org.apache.flink.runtime.io.network.partition.consumer.InputGate
    public InputChannel[] getAllInputChannels() {
        InputChannel[] inputChannelArr = new InputChannel[this.totalNumberOfInputChannels];
        this.inputGateToIndexOffsetMap.forEach((inputGate, num) -> {
            InputChannel[] allInputChannels = inputGate.getAllInputChannels();
            for (int i = 0; i < allInputChannels.length; i++) {
                inputChannelArr[i + num.intValue()] = allInputChannels[i];
            }
        });
        return inputChannelArr;
    }

    @Override // org.apache.flink.runtime.io.network.partition.consumer.InputGate
    public boolean isFinished() {
        for (InputGate inputGate : this.inputGates) {
            if (!inputGate.isFinished()) {
                return false;
            }
        }
        return true;
    }

    @Override // org.apache.flink.runtime.io.network.partition.consumer.InputGate
    public boolean moreAvailable() {
        boolean z;
        synchronized (this.inputGatesWithData) {
            z = !this.inputGatesWithData.isEmpty();
        }
        return z;
    }

    @Override // org.apache.flink.runtime.io.network.partition.consumer.InputGate
    public void requestPartitions() throws IOException, InterruptedException {
        if (this.requestedPartitionsFlag) {
            return;
        }
        for (InputGate inputGate : this.inputGates) {
            inputGate.requestPartitions();
        }
        this.requestedPartitionsFlag = true;
    }

    @Override // org.apache.flink.runtime.io.network.partition.consumer.InputGate
    public Optional<BufferOrEvent> getNextBufferOrEvent() throws IOException, InterruptedException {
        return getNextBufferOrEvent(Optional.empty(), true);
    }

    @Override // org.apache.flink.runtime.io.network.partition.consumer.InputGate
    public Optional<BufferOrEvent> pollNextBufferOrEvent() throws IOException, InterruptedException {
        return getNextBufferOrEvent(Optional.empty(), false);
    }

    @Override // org.apache.flink.runtime.io.network.partition.consumer.InputGate
    public Optional<BufferOrEvent> pollNextBufferOrEvent(InputGate inputGate) throws IOException, InterruptedException {
        return getNextBufferOrEvent(Optional.of(inputGate), false);
    }

    @Override // org.apache.flink.runtime.io.network.partition.consumer.InputGate
    public Optional<BufferOrEvent> getNextBufferOrEvent(InputGate inputGate) throws IOException, InterruptedException {
        Preconditions.checkNotNull(inputGate, "subInputGate is null");
        return getNextBufferOrEvent(Optional.of(inputGate), true);
    }

    private Optional<BufferOrEvent> getNextBufferOrEvent(Optional<InputGate> optional, boolean z) throws IOException, InterruptedException {
        if (optional.isPresent()) {
            if (!this.inputGatesWithRemainingData.contains(optional.get())) {
                return Optional.empty();
            }
        } else if (this.inputGatesWithRemainingData.isEmpty()) {
            return Optional.empty();
        }
        requestPartitions();
        Optional<InputGateWithData> waitAndGetNextInputGate = waitAndGetNextInputGate(optional, z);
        if (!z && !waitAndGetNextInputGate.isPresent()) {
            return Optional.empty();
        }
        InputGateWithData inputGateWithData = waitAndGetNextInputGate.get();
        InputGate inputGate = inputGateWithData.inputGate;
        BufferOrEvent bufferOrEvent = inputGateWithData.bufferOrEvent;
        if (bufferOrEvent.moreAvailable()) {
            queueInputGate(inputGate);
        }
        if (bufferOrEvent.isEvent() && bufferOrEvent.getEvent().getClass() == EndOfPartitionEvent.class && inputGate.isFinished()) {
            Preconditions.checkState(!bufferOrEvent.moreAvailable());
            if (!this.inputGatesWithRemainingData.remove(inputGate)) {
                throw new IllegalStateException("Couldn't find input gate in set of remaining input gates.");
            }
        }
        bufferOrEvent.setChannelIndex(this.inputGateToIndexOffsetMap.get(inputGate).intValue() + bufferOrEvent.getChannelIndex());
        bufferOrEvent.setMoreAvailable(bufferOrEvent.moreAvailable() || inputGateWithData.moreInputGatesAvailable);
        return Optional.of(bufferOrEvent);
    }

    private Optional<InputGateWithData> waitAndGetNextInputGate(Optional<InputGate> optional, boolean z) throws IOException, InterruptedException {
        InputGate remove;
        boolean z2;
        Optional<BufferOrEvent> pollNextBufferOrEvent;
        boolean z3;
        if (optional.isPresent()) {
            InputGate inputGate = optional.get();
            Optional<BufferOrEvent> nextBufferOrEvent = z ? inputGate.getNextBufferOrEvent() : inputGate.pollNextBufferOrEvent();
            if (!nextBufferOrEvent.isPresent()) {
                if (z) {
                    throw new IOException("Couldn't read a finished input gate.");
                }
                return Optional.empty();
            }
            synchronized (this.inputGatesWithData) {
                if (this.enqueuedInputGatesWithData.size() == 1 && this.enqueuedInputGatesWithData.contains(inputGate)) {
                    this.inputGatesWithData.remove(inputGate);
                    this.enqueuedInputGatesWithData.remove(inputGate);
                }
                z3 = this.enqueuedInputGatesWithData.size() > 0;
            }
            return Optional.of(new InputGateWithData(inputGate, nextBufferOrEvent.get(), z3));
        }
        do {
            synchronized (this.inputGatesWithData) {
                while (this.inputGatesWithData.size() == 0) {
                    if (!z) {
                        return Optional.empty();
                    }
                    this.inputGatesWithData.wait();
                }
                remove = this.inputGatesWithData.remove();
                this.enqueuedInputGatesWithData.remove(remove);
                z2 = this.enqueuedInputGatesWithData.size() > 0;
                pollNextBufferOrEvent = remove.pollNextBufferOrEvent();
            }
        } while (!pollNextBufferOrEvent.isPresent());
        return Optional.of(new InputGateWithData(remove, pollNextBufferOrEvent.get(), z2));
    }

    @Override // org.apache.flink.runtime.io.network.partition.consumer.InputGate
    public void sendTaskEvent(TaskEvent taskEvent) throws IOException {
        for (InputGate inputGate : this.inputGates) {
            inputGate.sendTaskEvent(taskEvent);
        }
    }

    @Override // org.apache.flink.runtime.io.network.partition.consumer.InputGate
    public void registerListener(InputGateListener inputGateListener) {
        synchronized (this.inputGateListeners) {
            this.inputGateListeners.add(inputGateListener);
        }
    }

    @Override // org.apache.flink.runtime.io.network.partition.consumer.InputGate
    public int getPageSize() {
        int i = -1;
        for (InputGate inputGate : this.inputGates) {
            if (i == -1) {
                i = inputGate.getPageSize();
            } else if (inputGate.getPageSize() != i) {
                throw new IllegalStateException("Found input gates with different page sizes.");
            }
        }
        return i;
    }

    @Override // org.apache.flink.runtime.io.network.partition.consumer.InputGate
    public int getSubInputGateCount() {
        return this.inputGates.length;
    }

    @Override // org.apache.flink.runtime.io.network.partition.consumer.InputGate
    public InputGate getSubInputGate(int i) {
        return this.inputGates[i];
    }

    @Override // org.apache.flink.runtime.io.network.partition.consumer.InputGateListener
    public void notifyInputGateNonEmpty(InputGate inputGate) {
        queueInputGate((InputGate) Preconditions.checkNotNull(inputGate));
    }

    private void queueInputGate(InputGate inputGate) {
        synchronized (this.inputGatesWithData) {
            if (this.enqueuedInputGatesWithData.contains(inputGate)) {
                return;
            }
            int size = this.inputGatesWithData.size();
            this.inputGatesWithData.add(inputGate);
            this.enqueuedInputGatesWithData.add(inputGate);
            if (size == 0) {
                this.inputGatesWithData.notifyAll();
            }
            if (size == 0) {
                synchronized (this.inputGateListeners) {
                    Iterator<InputGateListener> it = this.inputGateListeners.iterator();
                    while (it.hasNext()) {
                        it.next().notifyInputGateNonEmpty(this);
                    }
                }
            }
        }
    }
}
