package org.apache.flink.streaming.runtime.partitioner;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.io.network.partition.PartitionBacklogChangeListener;
import org.apache.flink.streaming.runtime.partitioner.util.DoubleLinkedList;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/streaming/runtime/partitioner/DynamicRebalancePartitioner.class */
public class DynamicRebalancePartitioner<T> extends StreamPartitioner<T> implements PartitionBacklogChangeListener {
    private static final long serialVersionUID = 1;
    private final int threshold;
    private final int interval;
    private final DoubleLinkedList.Node<Integer>[] subpartitionIndexNodes;
    private final Bucket[] buckets;
    private int minNonEmptyBucket;
    private DoubleLinkedList.Node<Integer> lastChosenSubpartition;
    private int[] returnArray = {-1};
    private final Object lock = new Object();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/partitioner/DynamicRebalancePartitioner$Bucket.class */
    public static class Bucket {
        DoubleLinkedList<Integer> subpartitionIndexList;

        private Bucket() {
            this.subpartitionIndexList = new DoubleLinkedList<>();
        }
    }

    public DynamicRebalancePartitioner(int i, int i2, int i3) {
        this.threshold = i;
        this.interval = i2;
        int i4 = (i / i2) + (i % i2 == 0 ? 0 : 1) + 1;
        this.buckets = new Bucket[i4];
        for (int i5 = 0; i5 < i4; i5++) {
            this.buckets[i5] = new Bucket();
        }
        this.subpartitionIndexNodes = new DoubleLinkedList.Node[i3];
        for (int i6 = 0; i6 < i3; i6++) {
            this.subpartitionIndexNodes[i6] = new DoubleLinkedList.Node<>(Integer.valueOf(i6));
            this.buckets[0].subpartitionIndexList.appendNode(this.subpartitionIndexNodes[i6]);
        }
    }

    public int[] selectChannels(StreamRecord<T> streamRecord, int i) {
        synchronized (this.lock) {
            if (this.lastChosenSubpartition == null || this.lastChosenSubpartition.getNext() == null) {
                this.lastChosenSubpartition = this.buckets[this.minNonEmptyBucket].subpartitionIndexList.getHead();
                Preconditions.checkState(this.lastChosenSubpartition != null, "The bucket {} is marked as the minimum non-empty one, but it is empty", new Object[]{Integer.valueOf(this.minNonEmptyBucket)});
            } else {
                this.lastChosenSubpartition = this.lastChosenSubpartition.getNext();
            }
            this.returnArray[0] = this.lastChosenSubpartition.getElement().intValue();
        }
        return this.returnArray;
    }

    @Override // org.apache.flink.streaming.runtime.partitioner.StreamPartitioner
    public StreamPartitioner<T> copy() {
        return this;
    }

    public String toString() {
        return "DYNAMIC_REBALANCE";
    }

    public void notifyBacklogIncreaseOne(int i, int i2) {
        Preconditions.checkArgument(i2 >= 1, "The backlog of the notifier with the increased backlog should be at least one");
        int bucketIndex = getBucketIndex(i2 - 1);
        int bucketIndex2 = getBucketIndex(i2);
        if (bucketIndex2 != bucketIndex) {
            Preconditions.checkState(bucketIndex2 == bucketIndex + 1, "The subpartition node should only be moved between adjacent buckets");
            DoubleLinkedList.Node<Integer> node = this.subpartitionIndexNodes[i];
            synchronized (this.lock) {
                this.buckets[bucketIndex].subpartitionIndexList.removeNode(node);
                this.buckets[bucketIndex2].subpartitionIndexList.appendNode(node);
                if (this.buckets[bucketIndex].subpartitionIndexList.isEmpty() && this.minNonEmptyBucket == bucketIndex) {
                    this.minNonEmptyBucket = bucketIndex2;
                    this.lastChosenSubpartition = null;
                } else if (this.lastChosenSubpartition == node) {
                    this.lastChosenSubpartition = null;
                }
            }
        }
    }

    public void notifyBacklogDecreaseOne(int i, int i2) {
        Preconditions.checkArgument(i2 >= 0, "The backlog of each subpartition should be at least 0");
        int bucketIndex = getBucketIndex(i2 + 1);
        int bucketIndex2 = getBucketIndex(i2);
        if (bucketIndex2 != bucketIndex) {
            Preconditions.checkState(bucketIndex2 == bucketIndex - 1, "The subpartition node should only be moved between adjacent buckets");
            DoubleLinkedList.Node<Integer> node = this.subpartitionIndexNodes[i];
            synchronized (this.lock) {
                this.buckets[bucketIndex].subpartitionIndexList.removeNode(node);
                this.buckets[bucketIndex2].subpartitionIndexList.appendNode(node);
                if (this.minNonEmptyBucket == bucketIndex) {
                    this.minNonEmptyBucket = bucketIndex2;
                    this.lastChosenSubpartition = null;
                } else if (this.lastChosenSubpartition == node) {
                    this.lastChosenSubpartition = null;
                }
            }
        }
    }

    private int getBucketIndex(int i) {
        return i >= this.threshold ? this.buckets.length - 1 : i / this.interval;
    }

    @VisibleForTesting
    int getMinNonEmptyBucket() {
        int i;
        synchronized (this.lock) {
            i = this.minNonEmptyBucket;
        }
        return i;
    }

    @VisibleForTesting
    DoubleLinkedList<Integer> getSubpartitionIndexList(int i) {
        return this.buckets[i].subpartitionIndexList;
    }
}
