package org.apache.flink.runtime.io.network.partition.external;

import java.util.ArrayDeque;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.PriorityQueue;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/external/CreditBasedSubpartitionViewScheduler.class */
public class CreditBasedSubpartitionViewScheduler implements ExternalBlockSubpartitionViewScheduler {

    @VisibleForTesting
    protected final Map<ResultPartitionID, ResultPartitionNode> resultPartitionNodeMap = new HashMap(16);

    @VisibleForTesting
    protected final ArrayDeque<ExternalBlockSubpartitionView> cacheList = new ArrayDeque<>(16);

    /* loaded from: input_file:org/apache/flink/runtime/io/network/partition/external/CreditBasedSubpartitionViewScheduler$ResultPartitionNode.class */
    private static class ResultPartitionNode {
        final ResultPartitionID resultPartitionID;
        long totalCredits = 0;
        final PriorityQueue<ExternalBlockSubpartitionView> subpartitionViews = new PriorityQueue<>(16, new SimpleLocalityBasedSubpartitionViewComparator());
        long timestamp = System.currentTimeMillis();

        ResultPartitionNode(ResultPartitionID resultPartitionID) {
            this.resultPartitionID = resultPartitionID;
        }

        void updateTotalCredits(long j) {
            long j2 = 0;
            while (this.subpartitionViews.iterator().hasNext()) {
                j2 += r0.next().getCreditUnsafe();
            }
            this.totalCredits = j2 * (1 + ((j - this.timestamp) / 10000));
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/io/network/partition/external/CreditBasedSubpartitionViewScheduler$SimpleLocalityBasedSubpartitionViewComparator.class */
    private static class SimpleLocalityBasedSubpartitionViewComparator implements Comparator<ExternalBlockSubpartitionView> {
        private SimpleLocalityBasedSubpartitionViewComparator() {
        }

        @Override // java.util.Comparator
        public int compare(ExternalBlockSubpartitionView externalBlockSubpartitionView, ExternalBlockSubpartitionView externalBlockSubpartitionView2) {
            return externalBlockSubpartitionView.getSubpartitionIndex() - externalBlockSubpartitionView2.getSubpartitionIndex();
        }
    }

    @Override // org.apache.flink.runtime.io.network.partition.external.ExternalBlockSubpartitionViewScheduler
    public void addToSchedule(ExternalBlockSubpartitionView externalBlockSubpartitionView) {
        ResultPartitionNode resultPartitionNode = this.resultPartitionNodeMap.get(externalBlockSubpartitionView.getResultPartitionID());
        if (resultPartitionNode == null) {
            resultPartitionNode = new ResultPartitionNode(externalBlockSubpartitionView.getResultPartitionID());
            this.resultPartitionNodeMap.put(externalBlockSubpartitionView.getResultPartitionID(), resultPartitionNode);
        }
        resultPartitionNode.subpartitionViews.offer(externalBlockSubpartitionView);
    }

    @Override // org.apache.flink.runtime.io.network.partition.external.ExternalBlockSubpartitionViewScheduler
    public ExternalBlockSubpartitionView schedule() {
        while (true) {
            if (this.cacheList.isEmpty() && this.resultPartitionNodeMap.isEmpty()) {
                return null;
            }
            if (!this.cacheList.isEmpty()) {
                return this.cacheList.poll();
            }
            if (this.resultPartitionNodeMap.isEmpty()) {
                return null;
            }
            long currentTimeMillis = System.currentTimeMillis();
            ResultPartitionNode resultPartitionNode = null;
            long j = 0;
            Iterator<Map.Entry<ResultPartitionID, ResultPartitionNode>> it = this.resultPartitionNodeMap.entrySet().iterator();
            while (it.hasNext()) {
                ResultPartitionNode value = it.next().getValue();
                value.updateTotalCredits(currentTimeMillis);
                if (value.totalCredits < 1) {
                    it.remove();
                } else if (value.totalCredits > j) {
                    j = value.totalCredits;
                    resultPartitionNode = value;
                }
            }
            if (resultPartitionNode != null) {
                for (int size = resultPartitionNode.subpartitionViews.size(); size > 0; size--) {
                    this.cacheList.offer(resultPartitionNode.subpartitionViews.poll());
                }
                resultPartitionNode.timestamp = currentTimeMillis;
            }
        }
    }
}
