/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition.external;

import java.util.ArrayDeque;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.PriorityQueue;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.external.ExternalBlockSubpartitionView;
import org.apache.flink.runtime.io.network.partition.external.ExternalBlockSubpartitionViewScheduler;

public class CreditBasedSubpartitionViewScheduler
implements ExternalBlockSubpartitionViewScheduler {
    @VisibleForTesting
    protected final Map<ResultPartitionID, ResultPartitionNode> resultPartitionNodeMap = new HashMap<ResultPartitionID, ResultPartitionNode>(16);
    @VisibleForTesting
    protected final ArrayDeque<ExternalBlockSubpartitionView> cacheList = new ArrayDeque(16);

    @Override
    public void addToSchedule(ExternalBlockSubpartitionView subpartitionView) {
        ResultPartitionNode node = this.resultPartitionNodeMap.get(subpartitionView.getResultPartitionID());
        if (node == null) {
            node = new ResultPartitionNode(subpartitionView.getResultPartitionID());
            this.resultPartitionNodeMap.put(subpartitionView.getResultPartitionID(), node);
        }
        node.subpartitionViews.offer(subpartitionView);
    }

    @Override
    public ExternalBlockSubpartitionView schedule() {
        while (!this.cacheList.isEmpty() || !this.resultPartitionNodeMap.isEmpty()) {
            if (!this.cacheList.isEmpty()) {
                return this.cacheList.poll();
            }
            if (this.resultPartitionNodeMap.isEmpty()) {
                return null;
            }
            long currentTimestamp = System.currentTimeMillis();
            ResultPartitionNode nodeWithMaxCredits = null;
            long maxCredit = 0L;
            Iterator<Map.Entry<ResultPartitionID, ResultPartitionNode>> iterator = this.resultPartitionNodeMap.entrySet().iterator();
            while (iterator.hasNext()) {
                ResultPartitionNode currentNode = iterator.next().getValue();
                currentNode.updateTotalCredits(currentTimestamp);
                if (currentNode.totalCredits < 1L) {
                    iterator.remove();
                    continue;
                }
                if (currentNode.totalCredits <= maxCredit) continue;
                maxCredit = currentNode.totalCredits;
                nodeWithMaxCredits = currentNode;
            }
            if (nodeWithMaxCredits == null) continue;
            for (int i = nodeWithMaxCredits.subpartitionViews.size(); i > 0; --i) {
                this.cacheList.offer(nodeWithMaxCredits.subpartitionViews.poll());
            }
            nodeWithMaxCredits.timestamp = currentTimestamp;
        }
        return null;
    }

    private static class SimpleLocalityBasedSubpartitionViewComparator
    implements Comparator<ExternalBlockSubpartitionView> {
        private SimpleLocalityBasedSubpartitionViewComparator() {
        }

        @Override
        public int compare(ExternalBlockSubpartitionView o1, ExternalBlockSubpartitionView o2) {
            return o1.getSubpartitionIndex() - o2.getSubpartitionIndex();
        }
    }

    private static class ResultPartitionNode {
        long totalCredits = 0L;
        final PriorityQueue<ExternalBlockSubpartitionView> subpartitionViews;
        final ResultPartitionID resultPartitionID;
        long timestamp;

        ResultPartitionNode(ResultPartitionID resultPartitionID) {
            this.resultPartitionID = resultPartitionID;
            this.subpartitionViews = new PriorityQueue<ExternalBlockSubpartitionView>(16, new SimpleLocalityBasedSubpartitionViewComparator());
            this.timestamp = System.currentTimeMillis();
        }

        void updateTotalCredits(long currentTimestamp) {
            long newTotalCredits = 0L;
            for (ExternalBlockSubpartitionView subpartitionView : this.subpartitionViews) {
                newTotalCredits += (long)subpartitionView.getCreditUnsafe();
            }
            this.totalCredits = newTotalCredits * (1L + (currentTimestamp - this.timestamp) / 10000L);
        }
    }
}

