package org.apache.flink.api.common.io;

import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.annotation.Public;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.core.io.LocatableInputSplit;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.NetUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Public
/* loaded from: input_file:org/apache/flink/api/common/io/LocatableInputSplitAssigner.class */
public final class LocatableInputSplitAssigner implements InputSplitAssigner {
    private static final Logger LOG = LoggerFactory.getLogger(LocatableInputSplitAssigner.class);
    private final BlockingQueue<AssigningInputSplit> unassigned;
    private Map<String, BlockingQueue<AssigningInputSplit>> splitHostMap;
    private AtomicInteger localAssignments;
    private AtomicInteger remoteAssignments;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/api/common/io/LocatableInputSplitAssigner$AssigningInputSplit.class */
    public static class AssigningInputSplit {
        private final LocatableInputSplit split;
        public volatile boolean isAssigned = false;

        public AssigningInputSplit(LocatableInputSplit locatableInputSplit) {
            this.split = locatableInputSplit;
        }

        public String[] getHostNames() {
            return this.split.getHostnames();
        }

        public LocatableInputSplit getSplit() {
            return this.split;
        }

        public boolean isAssigned() {
            return this.isAssigned;
        }

        public void setAssigned() {
            this.isAssigned = true;
        }
    }

    public LocatableInputSplitAssigner(Collection<LocatableInputSplit> collection) {
        this((LocatableInputSplit[]) collection.toArray(new LocatableInputSplit[collection.size()]));
    }

    public LocatableInputSplitAssigner(LocatableInputSplit[] locatableInputSplitArr) {
        String str;
        this.unassigned = new LinkedBlockingQueue();
        this.splitHostMap = new HashMap();
        this.localAssignments = new AtomicInteger();
        this.remoteAssignments = new AtomicInteger();
        int i = 0;
        for (LocatableInputSplit locatableInputSplit : locatableInputSplitArr) {
            AssigningInputSplit assigningInputSplit = new AssigningInputSplit(locatableInputSplit);
            this.unassigned.add(assigningInputSplit);
            i = i > assigningInputSplit.getHostNames().length ? i : assigningInputSplit.getHostNames().length;
        }
        for (int i2 = 0; i2 < i; i2++) {
            for (AssigningInputSplit assigningInputSplit2 : this.unassigned) {
                if (assigningInputSplit2.getHostNames().length > i2 && (str = assigningInputSplit2.getHostNames()[i2]) != null) {
                    this.splitHostMap.computeIfAbsent(NetUtils.getHostnameFromFQDN(str.toLowerCase()), str2 -> {
                        return new LinkedBlockingQueue();
                    }).add(assigningInputSplit2);
                }
            }
        }
    }

    @Override // org.apache.flink.core.io.InputSplitAssigner
    public LocatableInputSplit getNextInputSplit(String str, int i) {
        LocatableInputSplit takeUnAssignedSplit;
        if (str != null) {
            str = str.toLowerCase(Locale.US);
            BlockingQueue<AssigningInputSplit> blockingQueue = this.splitHostMap.get(str);
            if (blockingQueue != null && (takeUnAssignedSplit = takeUnAssignedSplit(blockingQueue)) != null) {
                this.localAssignments.incrementAndGet();
                LOG.info("Assigning local split to host " + str);
                return takeUnAssignedSplit;
            }
        }
        LocatableInputSplit takeUnAssignedSplit2 = takeUnAssignedSplit(this.unassigned);
        if (takeUnAssignedSplit2 != null) {
            this.remoteAssignments.incrementAndGet();
            LOG.info("Assigning remote split to host " + str);
        }
        return takeUnAssignedSplit2;
    }

    @Override // org.apache.flink.core.io.InputSplitAssigner
    public void inputSplitsAssigned(int i, List<InputSplit> list) {
        for (InputSplit inputSplit : list) {
            boolean z = false;
            Iterator it = this.unassigned.iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                AssigningInputSplit assigningInputSplit = (AssigningInputSplit) it.next();
                if (assigningInputSplit.getSplit().equals(inputSplit)) {
                    this.unassigned.remove(assigningInputSplit);
                    assigningInputSplit.setAssigned();
                    z = true;
                    break;
                }
            }
            if (!z) {
                throw new FlinkRuntimeException("InputSplit not found for " + inputSplit.getSplitNumber());
            }
        }
    }

    public int getNumberOfLocalAssignments() {
        return this.localAssignments.get();
    }

    public int getNumberOfRemoteAssignments() {
        return this.remoteAssignments.get();
    }

    private LocatableInputSplit takeUnAssignedSplit(BlockingQueue<AssigningInputSplit> blockingQueue) {
        AssigningInputSplit poll = blockingQueue.poll();
        while (true) {
            AssigningInputSplit assigningInputSplit = poll;
            if (assigningInputSplit == null) {
                return null;
            }
            if (!assigningInputSplit.isAssigned()) {
                synchronized (assigningInputSplit) {
                    if (!assigningInputSplit.isAssigned()) {
                        assigningInputSplit.setAssigned();
                        return assigningInputSplit.getSplit();
                    }
                }
            }
            poll = blockingQueue.poll();
        }
    }
}
