/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition.consumer;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.util.Preconditions;

public class PartitionRequestManager {
    private final Object lock = new Object();
    private final LinkedList<SingleInputGate> inputGates;
    @GuardedBy(value="lock")
    private final Map<SingleInputGate, LinkedList<RemoteInputChannel>> pendingPartitionRequests;
    @GuardedBy(value="lock")
    private final Map<SingleInputGate, Integer> reservedPartitionRequestQuota;
    @GuardedBy(value="lock")
    private final Map<SingleInputGate, Integer> currentUsedPartitionRequestQuota;
    private final int numInputs;
    private final int maxConcurrentPartitionRequests;
    private final boolean randomRequestsPerConnection;
    @GuardedBy(value="lock")
    private int availableRequestQuota;
    private int numRegisteredInputGates;

    public PartitionRequestManager(int maxConcurrentPartitionRequests, int numInputs) {
        this(maxConcurrentPartitionRequests, numInputs, true);
    }

    public PartitionRequestManager(int maxConcurrentPartitionRequests, int numInputs, boolean randomRequestsPerConnection) {
        Preconditions.checkArgument((numInputs > 0 ? 1 : 0) != 0);
        Preconditions.checkArgument((maxConcurrentPartitionRequests >= numInputs ? 1 : 0) != 0);
        this.maxConcurrentPartitionRequests = maxConcurrentPartitionRequests;
        this.numInputs = numInputs;
        this.randomRequestsPerConnection = randomRequestsPerConnection;
        this.inputGates = new LinkedList();
        this.reservedPartitionRequestQuota = new HashMap<SingleInputGate, Integer>();
        this.currentUsedPartitionRequestQuota = new HashMap<SingleInputGate, Integer>();
        this.pendingPartitionRequests = new HashMap<SingleInputGate, LinkedList<RemoteInputChannel>>();
        this.availableRequestQuota = 0;
        this.numRegisteredInputGates = 0;
    }

    void registerSingleInputGate(SingleInputGate inputGate) {
        Preconditions.checkArgument((this.numRegisteredInputGates < this.numInputs ? 1 : 0) != 0, (Object)("Too many input gate registrations, input size: " + this.numInputs));
        if (inputGate.isPartitionRequestRestricted()) {
            this.inputGates.add(inputGate);
        }
        if (++this.numRegisteredInputGates == this.numInputs && this.inputGates.size() > 0) {
            Collections.shuffle(this.inputGates);
            this.distributePartitionRequestQuotasFairly();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void requestPartitions(SingleInputGate inputGate) throws IOException, InterruptedException {
        Map<IntermediateResultPartitionID, InputChannel> inputChannels = inputGate.getInputChannels();
        int consumedSubpartitionIndex = inputGate.getConsumedSubpartitionIndex();
        if (!inputGate.isPartitionRequestRestricted()) {
            for (InputChannel inputChannel2 : inputChannels.values()) {
                this.internalRequestSubpartition(inputGate, inputChannel2, consumedSubpartitionIndex);
            }
            return;
        }
        LinkedList<RemoteInputChannel> pendingChannelList = new LinkedList<RemoteInputChannel>();
        ArrayList<InputChannel> allInputChannels = new ArrayList<InputChannel>(inputChannels.values());
        ArrayList nonRemoteInputChannels = new ArrayList(inputChannels.size());
        ArrayList<RemoteInputChannel> remoteInputChannels = new ArrayList<RemoteInputChannel>(inputChannels.size());
        allInputChannels.forEach(inputChannel -> {
            if (!(inputChannel instanceof RemoteInputChannel)) {
                nonRemoteInputChannels.add(inputChannel);
            } else {
                remoteInputChannels.add((RemoteInputChannel)inputChannel);
            }
        });
        this.shuffleRemoteInputChannels(remoteInputChannels);
        Object object = this.lock;
        synchronized (object) {
            for (InputChannel inputChannel3 : nonRemoteInputChannels) {
                this.internalRequestSubpartition(inputGate, inputChannel3, consumedSubpartitionIndex);
            }
            int remainingQuota = this.reservedPartitionRequestQuota.get(inputGate);
            int numberOfUsedQuota = 0;
            for (RemoteInputChannel inputChannel4 : remoteInputChannels) {
                if (remainingQuota > 0) {
                    this.internalRequestSubpartition(inputGate, inputChannel4, consumedSubpartitionIndex);
                    --remainingQuota;
                    ++numberOfUsedQuota;
                    continue;
                }
                if (this.availableRequestQuota > 0) {
                    this.internalRequestSubpartition(inputGate, inputChannel4, consumedSubpartitionIndex);
                    --this.availableRequestQuota;
                    ++numberOfUsedQuota;
                    continue;
                }
                pendingChannelList.addLast(inputChannel4);
            }
            if (!pendingChannelList.isEmpty()) {
                this.pendingPartitionRequests.put(inputGate, pendingChannelList);
            }
            if (numberOfUsedQuota == 0) {
                this.availableRequestQuota += remainingQuota - 1;
                this.reservedPartitionRequestQuota.put(inputGate, 1);
            } else {
                this.availableRequestQuota += remainingQuota;
                this.reservedPartitionRequestQuota.put(inputGate, 0);
            }
            this.currentUsedPartitionRequestQuota.put(inputGate, numberOfUsedQuota);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void onInputChannelFinish(SingleInputGate inputGate, InputChannel channel, boolean hasReceivedAllEndOfPartitionEvents) throws IOException, InterruptedException {
        if (!inputGate.isPartitionRequestRestricted() || !(channel instanceof RemoteInputChannel)) {
            return;
        }
        int consumedSubpartitionIndex = inputGate.getConsumedSubpartitionIndex();
        Object object = this.lock;
        synchronized (object) {
            RemoteInputChannel inputChannel = this.getPendingRemoteChannel(inputGate);
            if (inputChannel != null) {
                this.internalRequestSubpartition(inputGate, inputChannel, consumedSubpartitionIndex);
                return;
            }
            int currentUsedQuota = this.currentUsedPartitionRequestQuota.get(inputGate);
            this.currentUsedPartitionRequestQuota.put(inputGate, currentUsedQuota - 1);
            if (currentUsedQuota > 1 || hasReceivedAllEndOfPartitionEvents) {
                if (this.pendingPartitionRequests.size() > 0) {
                    for (int i = 0; i < this.inputGates.size(); ++i) {
                        SingleInputGate currentInputGate = this.inputGates.pollFirst();
                        this.inputGates.addLast(currentInputGate);
                        inputChannel = this.getPendingRemoteChannel(currentInputGate);
                        if (inputChannel == null) continue;
                        this.internalRequestSubpartition(inputGate, inputChannel, consumedSubpartitionIndex);
                        this.currentUsedPartitionRequestQuota.put(currentInputGate, this.currentUsedPartitionRequestQuota.get(currentInputGate) + 1);
                        break;
                    }
                } else {
                    ++this.availableRequestQuota;
                }
            } else if (currentUsedQuota == 1) {
                int reservedQuota = this.reservedPartitionRequestQuota.get(inputGate);
                Preconditions.checkState((reservedQuota == 0 ? 1 : 0) != 0, (Object)("The reserved quota must be 0, but actual is " + reservedQuota));
                this.reservedPartitionRequestQuota.put(inputGate, reservedQuota + 1);
            } else {
                throw new IllegalStateException("The current used quota should be never less than 1, but the actual value is " + currentUsedQuota);
            }
            if (hasReceivedAllEndOfPartitionEvents) {
                this.inputGates.remove(inputGate);
                this.pendingPartitionRequests.remove(inputGate);
                this.reservedPartitionRequestQuota.remove(inputGate);
                this.currentUsedPartitionRequestQuota.remove(inputGate);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void updateInputChannel(SingleInputGate inputGate, InputChannel inputChannel) throws IOException, InterruptedException {
        int consumedSubpartitionIndex = inputGate.getConsumedSubpartitionIndex();
        if (!inputGate.isPartitionRequestRestricted() || !(inputChannel instanceof RemoteInputChannel)) {
            this.internalRequestSubpartition(inputGate, inputChannel, consumedSubpartitionIndex);
            return;
        }
        Object object = this.lock;
        synchronized (object) {
            int currentUsedQuota = this.currentUsedPartitionRequestQuota.get(inputGate);
            int reservedQuota = this.reservedPartitionRequestQuota.get(inputGate);
            if (reservedQuota > 0) {
                Preconditions.checkState((reservedQuota == 1 ? 1 : 0) != 0, (Object)("The reserved quota must be 1, but actual is " + reservedQuota));
                this.internalRequestSubpartition(inputGate, inputChannel, consumedSubpartitionIndex);
                this.reservedPartitionRequestQuota.put(inputGate, reservedQuota - 1);
                this.currentUsedPartitionRequestQuota.put(inputGate, currentUsedQuota + 1);
            } else if (this.availableRequestQuota > 0) {
                this.internalRequestSubpartition(inputGate, inputChannel, consumedSubpartitionIndex);
                --this.availableRequestQuota;
                this.currentUsedPartitionRequestQuota.put(inputGate, currentUsedQuota + 1);
            } else {
                this.addPendingRemoteChannel(inputGate, (RemoteInputChannel)inputChannel);
            }
        }
    }

    private void distributePartitionRequestQuotasFairly() {
        int averageQuota = this.maxConcurrentPartitionRequests / this.inputGates.size();
        int remainingQuota = this.maxConcurrentPartitionRequests % this.inputGates.size();
        int index = 0;
        for (SingleInputGate inputGate : this.inputGates) {
            if (index++ < remainingQuota) {
                this.reservedPartitionRequestQuota.put(inputGate, averageQuota + 1);
                continue;
            }
            this.reservedPartitionRequestQuota.put(inputGate, averageQuota);
        }
    }

    private RemoteInputChannel getPendingRemoteChannel(SingleInputGate inputGate) {
        assert (Thread.holdsLock(this.lock));
        LinkedList<RemoteInputChannel> channelList = this.pendingPartitionRequests.get(inputGate);
        if (channelList == null) {
            return null;
        }
        RemoteInputChannel remoteInputChannel = channelList.pollFirst();
        if (channelList.isEmpty()) {
            this.pendingPartitionRequests.remove(inputGate);
        }
        return remoteInputChannel;
    }

    private void addPendingRemoteChannel(SingleInputGate inputGate, RemoteInputChannel inputChannel) {
        assert (Thread.holdsLock(this.lock));
        LinkedList<RemoteInputChannel> channelList = this.pendingPartitionRequests.get(inputGate);
        if (channelList == null) {
            channelList = new LinkedList();
            this.pendingPartitionRequests.put(inputGate, channelList);
        }
        channelList.addLast(inputChannel);
    }

    private void internalRequestSubpartition(SingleInputGate inputGate, InputChannel inputChannel, int consumedSubpartitionIndex) throws IOException, InterruptedException {
        if (inputChannel instanceof RemoteInputChannel) {
            inputGate.assignExclusiveSegments(inputChannel);
        }
        inputChannel.requestSubpartition(consumedSubpartitionIndex);
    }

    @VisibleForTesting
    protected void shuffleRemoteInputChannels(List<RemoteInputChannel> remoteInputChannels) {
        int initialSize = remoteInputChannels.size();
        HashMap<InetSocketAddress, ArrayList> socketAddrToInputChannels = new HashMap<InetSocketAddress, ArrayList>();
        remoteInputChannels.forEach(inputChannel -> {
            InetSocketAddress socketAddress = inputChannel.getConnectionId().getAddress();
            ArrayList inputChannelsPerAddr = socketAddrToInputChannels.computeIfAbsent(socketAddress, key -> new ArrayList());
            inputChannelsPerAddr.add(inputChannel);
        });
        ArrayList inputChannelsList = new ArrayList(socketAddrToInputChannels.size());
        socketAddrToInputChannels.forEach((socketAddr, inputChannelsPerAddr) -> {
            if (this.randomRequestsPerConnection) {
                Collections.shuffle(inputChannelsPerAddr);
            }
            inputChannelsList.add(inputChannelsPerAddr);
        });
        Collections.shuffle(inputChannelsList);
        remoteInputChannels.clear();
        int i = 0;
        while (!inputChannelsList.isEmpty()) {
            ArrayList channelList = (ArrayList)inputChannelsList.get(i %= inputChannelsList.size());
            RemoteInputChannel inputChannel2 = (RemoteInputChannel)channelList.remove(channelList.size() - 1);
            remoteInputChannels.add(inputChannel2);
            if (channelList.isEmpty()) {
                inputChannelsList.remove(i--);
            }
            ++i;
        }
        Preconditions.checkState((initialSize == remoteInputChannels.size() ? 1 : 0) != 0);
    }
}

