package org.apache.flink.runtime.schedule;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobType;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.runtime.event.ExecutionVertexFailoverEvent;
import org.apache.flink.runtime.event.ExecutionVertexStateChangedEvent;
import org.apache.flink.runtime.event.ResultPartitionConsumableEvent;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.ControlType;
import org.apache.flink.runtime.jobgraph.ExecutionVertexID;
import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
import org.apache.flink.runtime.jobgraph.JobControlEdge;
import org.apache.flink.runtime.jobgraph.JobEdge;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.SchedulingMode;
import org.apache.flink.runtime.jobmaster.ExecutionSlotAllocator;
import org.apache.flink.runtime.jobmaster.GraphManager;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/schedule/ConcurrentGroupGraphManagerPlugin.class */
public class ConcurrentGroupGraphManagerPlugin implements GraphManagerPlugin {
    private static final Logger LOG = LoggerFactory.getLogger(ConcurrentGroupGraphManagerPlugin.class);
    private Set<ConcurrentSchedulingGroup> concurrentSchedulingGroups = new HashSet();
    private Map<ExecutionVertexID, ConcurrentSchedulingGroup> executionToConcurrentSchedulingGroups = new LinkedHashMap();
    private Map<JobVertexID, Set<JobVertex>> predecessorToSuccessors = new HashMap();
    private Map<JobVertexID, Set<JobVertexID>> successorToPredecessors = new HashMap();
    private Set<JobControlEdge> ignoredControlEdges = new HashSet();
    private VertexInputTracker inputTracker;
    private VertexScheduler scheduler;
    private JobGraph jobGraph;
    private ExecutionGraph executionGraph;
    private GraphManager graphManager;
    private ExecutionSlotAllocator executionSlotAllocator;
    private boolean allowGroupSplit;
    private Time allocationLongTimeout;

    @Override // org.apache.flink.runtime.schedule.GraphManagerPlugin
    public void open(VertexScheduler vertexScheduler, JobGraph jobGraph, SchedulingConfig schedulingConfig, ExecutionGraph executionGraph, GraphManager graphManager, ExecutionSlotAllocator executionSlotAllocator) {
        this.scheduler = vertexScheduler;
        this.jobGraph = jobGraph;
        this.inputTracker = new VertexInputTracker(jobGraph, vertexScheduler, schedulingConfig);
        this.executionGraph = executionGraph;
        this.graphManager = graphManager;
        this.allowGroupSplit = schedulingConfig.getConfiguration().getBoolean(JobManagerOptions.ALLOW_GROUP_SPLIT);
        this.executionSlotAllocator = executionSlotAllocator;
        this.allocationLongTimeout = Time.milliseconds(schedulingConfig.getConfiguration().getLong(JobManagerOptions.SLOT_REQUEST_LONG_TIMEOUT));
        initConcurrentSchedulingGroups();
    }

    private void initConcurrentSchedulingGroups() {
        ArrayList arrayList = new ArrayList();
        List<JobVertex> verticesSortedTopologicallyFromSources = this.jobGraph.getVerticesSortedTopologicallyFromSources();
        if (this.jobGraph.getJobType() != JobType.INFINITE_STREAM) {
            buildStartOnFinishRelation(this.jobGraph);
            buildConcurrentSchedulingGroups(verticesSortedTopologicallyFromSources, false);
            return;
        }
        LOG.debug("All executions will be in one group for streaming job {}", this.jobGraph.getJobID());
        ArrayList arrayList2 = new ArrayList(this.executionGraph.getRegisteredExecutions().size());
        Iterator<ExecutionVertex> it = this.executionGraph.getAllExecutionVertices().iterator();
        while (it.hasNext()) {
            arrayList2.add(it.next());
        }
        arrayList.add(new ConcurrentJobVertexGroup(verticesSortedTopologicallyFromSources, this.ignoredControlEdges));
        this.concurrentSchedulingGroups.add(new ConcurrentSchedulingGroup(arrayList2, false));
    }

    @Override // org.apache.flink.runtime.schedule.GraphManagerPlugin
    public void close() {
    }

    @Override // org.apache.flink.runtime.schedule.GraphManagerPlugin
    public void reset() {
        this.concurrentSchedulingGroups.clear();
        this.executionToConcurrentSchedulingGroups.clear();
        this.predecessorToSuccessors.clear();
        this.successorToPredecessors.clear();
        this.ignoredControlEdges.clear();
        initConcurrentSchedulingGroups();
    }

    @Override // org.apache.flink.runtime.schedule.GraphManagerPlugin
    public void onSchedulingStarted() {
        new ArrayList(this.concurrentSchedulingGroups).stream().forEach(concurrentSchedulingGroup -> {
            if (concurrentSchedulingGroup.hasPrecedingGroup()) {
                return;
            }
            checkAndScheduleGroup(concurrentSchedulingGroup);
        });
    }

    @Override // org.apache.flink.runtime.schedule.GraphManagerPlugin
    public void onResultPartitionConsumable(ResultPartitionConsumableEvent resultPartitionConsumableEvent) {
        HashSet hashSet = new HashSet();
        Iterator<Collection<ExecutionVertexID>> it = this.jobGraph.getResultPartitionConsumerExecutionVertices(resultPartitionConsumableEvent.getResultID(), resultPartitionConsumableEvent.getPartitionNumber()).iterator();
        while (it.hasNext()) {
            for (ExecutionVertexID executionVertexID : it.next()) {
                if (isReadyToSchedule(executionVertexID)) {
                    hashSet.add(executionVertexID);
                }
            }
        }
        scheduleInConcurrentGroup(hashSet);
    }

    @Override // org.apache.flink.runtime.schedule.GraphManagerPlugin
    public void onExecutionVertexFailover(ExecutionVertexFailoverEvent executionVertexFailoverEvent) {
        HashSet<ConcurrentSchedulingGroup> hashSet = new HashSet();
        if (this.jobGraph.getJobType() == JobType.INFINITE_STREAM) {
            this.scheduler.scheduleExecutionVertices(executionVertexFailoverEvent.getAffectedExecutionVertexIDs());
            return;
        }
        for (ExecutionVertexID executionVertexID : executionVertexFailoverEvent.getAffectedExecutionVertexIDs()) {
            if (isReadyToSchedule(executionVertexID)) {
                hashSet.add(this.executionToConcurrentSchedulingGroups.get(executionVertexID));
            }
        }
        for (ConcurrentSchedulingGroup concurrentSchedulingGroup : hashSet) {
            if (!this.graphManager.cacheGroupIfReconciling(concurrentSchedulingGroup)) {
                LOG.info("Group {} is scheduled again.", concurrentSchedulingGroup);
                scheduleGroup(concurrentSchedulingGroup);
            }
        }
    }

    @Override // org.apache.flink.runtime.schedule.GraphManagerPlugin
    public synchronized void onExecutionVertexStateChanged(ExecutionVertexStateChangedEvent executionVertexStateChangedEvent) {
        Set<JobVertex> set;
        HashSet hashSet = new HashSet();
        if (executionVertexStateChangedEvent.getNewExecutionState() == ExecutionState.FINISHED && this.scheduler.getExecutionJobVertexStatus(executionVertexStateChangedEvent.getExecutionVertexID().getJobVertexID()) == ExecutionState.FINISHED && (set = this.predecessorToSuccessors.get(executionVertexStateChangedEvent.getExecutionVertexID().getJobVertexID())) != null) {
            for (JobVertex jobVertex : set) {
                for (int i = 0; i < jobVertex.getParallelism(); i++) {
                    ExecutionVertexID executionVertexID = new ExecutionVertexID(jobVertex.getID(), i);
                    if (isReadyToSchedule(executionVertexID)) {
                        hashSet.add(executionVertexID);
                    }
                }
            }
        }
        scheduleInConcurrentGroup(hashSet);
    }

    @Override // org.apache.flink.runtime.schedule.GraphManagerPlugin
    public boolean allowLazyDeployment() {
        return this.jobGraph.getJobType() != JobType.INFINITE_STREAM;
    }

    private List<ConcurrentSchedulingGroup> buildConcurrentSchedulingGroups(List<JobVertex> list, boolean z) {
        ArrayList arrayList = new ArrayList();
        ArrayList<ConcurrentSchedulingGroup> arrayList2 = new ArrayList();
        HashSet hashSet = new HashSet();
        for (JobVertex jobVertex : list) {
            if (hashSet.add(jobVertex)) {
                ArrayList arrayList3 = new ArrayList();
                arrayList3.add(jobVertex);
                Iterator<IntermediateDataSet> it = jobVertex.getProducedDataSets().iterator();
                while (it.hasNext()) {
                    for (JobEdge jobEdge : it.next().getConsumers()) {
                        if (list.contains(jobEdge.getTarget()) && !hashSet.contains(jobEdge.getTarget()) && jobEdge.getSchedulingMode() == SchedulingMode.CONCURRENT && !isStartOnFinishedEdge(jobEdge)) {
                            hashSet.add(jobEdge.getTarget());
                            arrayList3.add(jobEdge.getTarget());
                            arrayList3.addAll(getAllConcurrentVertices(jobEdge.getTarget(), list, hashSet));
                        }
                    }
                }
                arrayList.add(new ConcurrentJobVertexGroup(arrayList3, this.ignoredControlEdges));
            }
        }
        LOG.info("{} vertex group was built with {} vertices.", Integer.valueOf(arrayList.size()), Integer.valueOf(list.size()));
        breakCircleDependencies(arrayList);
        for (ConcurrentJobVertexGroup concurrentJobVertexGroup : arrayList) {
            LOG.info("Concurrent vertex group has {} with preceding {}", concurrentJobVertexGroup.getVertices(), Boolean.valueOf(concurrentJobVertexGroup.hasPrecedingGroup()));
        }
        Iterator<ConcurrentJobVertexGroup> it2 = arrayList.iterator();
        while (it2.hasNext()) {
            arrayList2.addAll(buildSchedulingGroupsFromJobVertexGroup(it2.next(), z));
        }
        this.concurrentSchedulingGroups.addAll(arrayList2);
        for (ConcurrentSchedulingGroup concurrentSchedulingGroup : arrayList2) {
            Iterator<ExecutionVertex> it3 = concurrentSchedulingGroup.getExecutionVertices().iterator();
            while (it3.hasNext()) {
                this.executionToConcurrentSchedulingGroups.put(it3.next().getExecutionVertexID(), concurrentSchedulingGroup);
            }
        }
        LOG.info("{} concurrent group was built with {} vertices for job {}.", new Object[]{Integer.valueOf(arrayList2.size()), Integer.valueOf(list.size()), this.jobGraph.getJobID()});
        return arrayList2;
    }

    private void splitGroupAndContinueScheduling(List<JobVertex> list, List<JobVertex> list2, ConcurrentSchedulingGroup concurrentSchedulingGroup) {
        LOG.info("Split scheduling group {} as resource is not enough, assigned {}, unassigned {}.", new Object[]{concurrentSchedulingGroup, list, list2});
        this.concurrentSchedulingGroups.remove(concurrentSchedulingGroup);
        HashSet hashSet = new HashSet();
        for (JobVertex jobVertex : list) {
            if (hashSet.add(jobVertex)) {
                for (int i = 0; i < jobVertex.getProducedDataSets().size(); i++) {
                    IntermediateDataSet intermediateDataSet = jobVertex.getProducedDataSets().get(i);
                    if (!intermediateDataSet.getConsumers().isEmpty()) {
                        JobEdge jobEdge = intermediateDataSet.getConsumers().get(0);
                        ExecutionVertex[] taskVertices = this.executionGraph.getJobVertex(jobEdge.getTarget().getID()).getTaskVertices();
                        int length = taskVertices.length;
                        int i2 = 0;
                        while (true) {
                            if (i2 >= length) {
                                break;
                            }
                            if (taskVertices[i2].getExecutionState() == ExecutionState.CREATED) {
                                jobEdge.setSchedulingMode(SchedulingMode.SEQUENTIAL);
                                this.executionGraph.getJobVertex(jobVertex.getID()).getProducedDataSets()[i].setResultType(ResultPartitionType.BLOCKING);
                                break;
                            }
                            i2++;
                        }
                    }
                }
            }
        }
        buildStartOnFinishRelation(this.jobGraph);
        List<ConcurrentSchedulingGroup> buildConcurrentSchedulingGroups = buildConcurrentSchedulingGroups(list, true);
        List<ConcurrentSchedulingGroup> buildConcurrentSchedulingGroups2 = buildConcurrentSchedulingGroups(list2, false);
        for (ConcurrentSchedulingGroup concurrentSchedulingGroup2 : buildConcurrentSchedulingGroups) {
            List<ExecutionVertex> executionVertices = concurrentSchedulingGroup2.getExecutionVertices();
            if (executionVertices.size() == 1 && executionVertices.get(0).getCurrentAssignedResource() == null) {
                scheduleGroup(concurrentSchedulingGroup2);
            } else {
                for (ExecutionVertex executionVertex : executionVertices) {
                    try {
                        executionVertex.getCurrentExecutionAttempt().deploy();
                    } catch (Exception e) {
                        LOG.info("Fail to deploy execution {}", executionVertex, e);
                        executionVertex.getCurrentExecutionAttempt().fail(e);
                    }
                }
            }
        }
        for (ConcurrentSchedulingGroup concurrentSchedulingGroup3 : buildConcurrentSchedulingGroups2) {
            if (concurrentSchedulingGroup3.hasPrecedingGroup()) {
                Iterator<ExecutionVertex> it = concurrentSchedulingGroup3.getExecutionVertices().iterator();
                while (true) {
                    if (it.hasNext()) {
                        if (isReadyToSchedule(it.next().getExecutionVertexID())) {
                            checkAndScheduleGroup(concurrentSchedulingGroup3);
                            break;
                        }
                    } else {
                        break;
                    }
                }
            } else {
                checkAndScheduleGroup(concurrentSchedulingGroup3);
            }
        }
    }

    @VisibleForTesting
    Set<ConcurrentSchedulingGroup> getConcurrentSchedulingGroups() {
        return this.concurrentSchedulingGroups;
    }

    @VisibleForTesting
    Map<JobVertexID, Set<JobVertex>> getPredecessorToSuccessors() {
        return this.predecessorToSuccessors;
    }

    @VisibleForTesting
    Map<JobVertexID, Set<JobVertexID>> getSuccessorsToPredecessor() {
        return this.successorToPredecessors;
    }

    private Set<JobVertex> getAllConcurrentVertices(JobVertex jobVertex, List<JobVertex> list, Set<JobVertex> set) {
        HashSet hashSet = new HashSet();
        for (JobEdge jobEdge : jobVertex.getInputs()) {
            if (jobEdge.getSchedulingMode() == SchedulingMode.CONCURRENT && list.contains(jobEdge.getSource().getProducer()) && !set.contains(jobEdge.getSource().getProducer()) && !isStartOnFinishedEdge(jobEdge)) {
                set.add(jobEdge.getSource().getProducer());
                hashSet.add(jobEdge.getSource().getProducer());
                hashSet.addAll(getAllConcurrentVertices(jobEdge.getSource().getProducer(), list, set));
            }
        }
        Iterator<IntermediateDataSet> it = jobVertex.getProducedDataSets().iterator();
        while (it.hasNext()) {
            for (JobEdge jobEdge2 : it.next().getConsumers()) {
                if (list.contains(jobEdge2.getTarget()) && !set.contains(jobEdge2.getTarget()) && jobEdge2.getSchedulingMode() == SchedulingMode.CONCURRENT && !isStartOnFinishedEdge(jobEdge2)) {
                    set.add(jobEdge2.getTarget());
                    hashSet.add(jobEdge2.getTarget());
                    hashSet.addAll(getAllConcurrentVertices(jobEdge2.getTarget(), list, set));
                }
            }
        }
        return hashSet;
    }

    private List<ConcurrentSchedulingGroup> buildSchedulingGroupsFromJobVertexGroup(ConcurrentJobVertexGroup concurrentJobVertexGroup, boolean z) {
        ArrayList arrayList = new ArrayList();
        List<JobVertex> vertices = concurrentJobVertexGroup.getVertices();
        if (vertices.size() == 1) {
            for (ExecutionVertex executionVertex : this.executionGraph.getJobVertex(vertices.get(0).getID()).getTaskVertices()) {
                arrayList.add(new ConcurrentSchedulingGroup(Collections.singletonList(executionVertex), concurrentJobVertexGroup.hasPrecedingGroup(), z));
            }
        } else {
            ArrayList arrayList2 = new ArrayList();
            Iterator<JobVertex> it = vertices.iterator();
            while (it.hasNext()) {
                for (ExecutionVertex executionVertex2 : this.executionGraph.getJobVertex(it.next().getID()).getTaskVertices()) {
                    arrayList2.add(executionVertex2);
                }
            }
            arrayList.add(new ConcurrentSchedulingGroup(arrayList2, concurrentJobVertexGroup.hasPrecedingGroup(), z));
        }
        return arrayList;
    }

    private void scheduleInConcurrentGroup(Set<ExecutionVertexID> set) {
        HashSet<ConcurrentSchedulingGroup> hashSet = new HashSet();
        for (ExecutionVertexID executionVertexID : set) {
            ConcurrentSchedulingGroup concurrentSchedulingGroup = this.executionToConcurrentSchedulingGroups.get(executionVertexID);
            if (concurrentSchedulingGroup == null) {
                throw new RuntimeException("Can not find a group for " + executionVertexID + ", this is logic error.");
            }
            hashSet.add(concurrentSchedulingGroup);
        }
        for (ConcurrentSchedulingGroup concurrentSchedulingGroup2 : hashSet) {
            if (!this.graphManager.cacheGroupIfReconciling(concurrentSchedulingGroup2)) {
                checkAndScheduleGroup(concurrentSchedulingGroup2);
            }
        }
    }

    private boolean isReadyToSchedule(ExecutionVertexID executionVertexID) {
        if (this.scheduler.getExecutionVertexStatus(executionVertexID).getExecutionState() != ExecutionState.CREATED) {
            return false;
        }
        Set<JobVertexID> set = this.successorToPredecessors.get(executionVertexID.getJobVertexID());
        if (set != null) {
            Iterator<JobVertexID> it = set.iterator();
            while (it.hasNext()) {
                if (this.scheduler.getExecutionJobVertexStatus(it.next()) != ExecutionState.FINISHED) {
                    return false;
                }
            }
        }
        if (this.jobGraph.findVertexByID(executionVertexID.getJobVertexID()).isInputVertex()) {
            return true;
        }
        return this.inputTracker.areInputsReady(executionVertexID);
    }

    private void buildStartOnFinishRelation(JobGraph jobGraph) {
        this.successorToPredecessors.clear();
        this.predecessorToSuccessors.clear();
        this.ignoredControlEdges.clear();
        for (JobVertex jobVertex : jobGraph.getVerticesSortedTopologicallyFromSources()) {
            for (JobControlEdge jobControlEdge : jobVertex.getOutControlEdges()) {
                LOG.debug("ControlEdge from {} to {} with type {}", new Object[]{jobControlEdge.getSource().getID(), jobControlEdge.getTarget().getID(), jobControlEdge.getControlType()});
                if (jobControlEdge.getControlType() == ControlType.START_ON_FINISH) {
                    Iterator<IntermediateDataSet> it = jobControlEdge.getTarget().getProducedDataSets().iterator();
                    while (it.hasNext()) {
                        if (!it.next().getResultType().isBlocking()) {
                            LOG.warn("Vertex {} is a start on finished but not blocking.", jobControlEdge.getTarget().getName());
                        }
                    }
                    Set<JobVertex> allConcurrentAncestors = getAllConcurrentAncestors(jobControlEdge.getTarget());
                    boolean z = false;
                    Iterator<JobVertex> it2 = allConcurrentAncestors.iterator();
                    while (true) {
                        if (!it2.hasNext()) {
                            break;
                        }
                        if (hasCircleDependencyInVertices(it2.next(), jobVertex)) {
                            z = true;
                            this.ignoredControlEdges.add(jobControlEdge);
                            break;
                        }
                    }
                    if (!z) {
                        Iterator<JobVertex> it3 = allConcurrentAncestors.iterator();
                        while (it3.hasNext()) {
                            this.successorToPredecessors.computeIfAbsent(it3.next().getID(), jobVertexID -> {
                                return new HashSet();
                            }).add(jobVertex.getID());
                        }
                        Set<JobVertex> putIfAbsent = this.predecessorToSuccessors.putIfAbsent(jobVertex.getID(), allConcurrentAncestors);
                        if (putIfAbsent != null) {
                            putIfAbsent.addAll(allConcurrentAncestors);
                        }
                    }
                }
            }
        }
    }

    private Set<JobVertex> getAllConcurrentAncestors(JobVertex jobVertex) {
        HashSet hashSet = new HashSet();
        if (jobVertex.isInputVertex()) {
            hashSet.add(jobVertex);
        } else {
            for (JobEdge jobEdge : jobVertex.getInputs()) {
                if (jobEdge.getSchedulingMode() == SchedulingMode.CONCURRENT && !isStartOnFinishedEdge(jobEdge)) {
                    hashSet.addAll(getAllConcurrentAncestors(jobEdge.getSource().getProducer()));
                }
            }
            if (hashSet.isEmpty()) {
                hashSet.add(jobVertex);
            }
        }
        return hashSet;
    }

    public void checkAndScheduleGroup(ConcurrentSchedulingGroup concurrentSchedulingGroup) {
        if (concurrentSchedulingGroup.markScheduled()) {
            scheduleGroup(concurrentSchedulingGroup);
        } else {
            LOG.info("Group {} has already been scheduled, will not schedule again.", concurrentSchedulingGroup);
        }
    }

    public void scheduleGroup(ConcurrentSchedulingGroup concurrentSchedulingGroup) {
        List<ExecutionVertex> executionVertices = concurrentSchedulingGroup.getExecutionVertices();
        ArrayList arrayList = new ArrayList();
        for (ExecutionVertex executionVertex : executionVertices) {
            if (executionVertex.getCurrentExecutionAttempt().enterScheduled()) {
                arrayList.add(executionVertex.getCurrentExecutionAttempt());
            } else {
                LOG.info("{} is in state {} while scheduled in group {}", new Object[]{executionVertex.getTaskNameWithSubtaskIndex(), executionVertex.getExecutionState(), concurrentSchedulingGroup});
            }
        }
        CompletableFuture<Collection<LogicalSlot>> allocateSlotsFor = this.executionSlotAllocator.allocateSlotsFor(arrayList, executionVertices.size() > 1 ? this.executionGraph.getAllocationTimeout() : this.allocationLongTimeout);
        CompletableFuture<U> handleAsync = allocateSlotsFor.handleAsync((collection, th) -> {
            if (th != null) {
                return null;
            }
            if (arrayList.size() != collection.size()) {
                LOG.warn("Execution state change during allocating resource.");
            }
            int i = 0;
            Iterator it = collection.iterator();
            while (it.hasNext()) {
                if (((LogicalSlot) it.next()) == null) {
                    i++;
                }
            }
            Exception exc = new Exception("Batch request " + arrayList.size() + ", but " + i + " does not return.");
            if (i > 0 && (!this.allowGroupSplit || executionVertices.size() < 2)) {
                Iterator it2 = collection.iterator();
                while (it2.hasNext()) {
                    LogicalSlot logicalSlot = (LogicalSlot) it2.next();
                    if (logicalSlot != null) {
                        logicalSlot.releaseSlot(exc);
                    }
                }
                Iterator it3 = arrayList.iterator();
                while (it3.hasNext()) {
                    ((Execution) it3.next()).fail(exc);
                }
                return null;
            }
            if (i > 0) {
                int i2 = 0;
                int i3 = -1;
                boolean z = false;
                Iterator it4 = collection.iterator();
                while (it4.hasNext()) {
                    LogicalSlot logicalSlot2 = (LogicalSlot) it4.next();
                    if (logicalSlot2 == null && !z) {
                        z = true;
                        i3 = i2;
                        ((Execution) arrayList.get(i2)).rollbackToCreated();
                        LOG.debug("The first failed slot request is {}.", Integer.valueOf(i3));
                    } else if (z) {
                        if (logicalSlot2 != null) {
                            logicalSlot2.releaseSlot(exc);
                        }
                        ((Execution) arrayList.get(i2)).rollbackToCreated();
                    }
                    i2++;
                }
                if (i3 >= 0) {
                    ArrayList arrayList2 = new ArrayList();
                    ArrayList arrayList3 = new ArrayList();
                    if (i3 == 0) {
                        arrayList2.add(((ExecutionVertex) executionVertices.get(0)).getJobVertex().getJobVertex());
                        for (int i4 = 1; i4 < executionVertices.size(); i4++) {
                            JobVertexID jobvertexId = ((ExecutionVertex) executionVertices.get(i4)).getJobvertexId();
                            if (!jobvertexId.equals(arrayList2.get(arrayList2.size() - 1).getID()) && (arrayList3.isEmpty() || !jobvertexId.equals(arrayList3.get(arrayList3.size() - 1).getID()))) {
                                arrayList3.add(((ExecutionVertex) executionVertices.get(i4)).getJobVertex().getJobVertex());
                            }
                        }
                    } else {
                        boolean z2 = false;
                        boolean z3 = true;
                        if (!((Execution) arrayList.get(i3)).getVertex().getJobvertexId().equals(((Execution) arrayList.get(i3 - 1)).getVertex().getJobvertexId())) {
                            z2 = true;
                            LOG.debug("The last assigned vertex is fully filled.");
                        }
                        if (((Execution) arrayList.get(i3)).getVertex().getJobvertexId().equals(((ExecutionVertex) executionVertices.get(0)).getJobvertexId())) {
                            z3 = false;
                            LOG.debug("The first vertex is not fully filled.");
                        }
                        int i5 = 0;
                        Iterator it5 = collection.iterator();
                        while (it5.hasNext()) {
                            LogicalSlot logicalSlot3 = (LogicalSlot) it5.next();
                            if (z2) {
                                assignResourceElseFail((Execution) arrayList.get(i5), logicalSlot3);
                            } else if (!z3) {
                                assignResourceElseFail((Execution) arrayList.get(i5), logicalSlot3);
                            } else if (((ExecutionVertex) executionVertices.get(i5)).getJobvertexId().equals(((Execution) arrayList.get(i3)).getVertex().getJobvertexId())) {
                                logicalSlot3.releaseSlot(exc);
                                ((Execution) arrayList.get(i5)).rollbackToCreated();
                            } else {
                                assignResourceElseFail((Execution) arrayList.get(i5), logicalSlot3);
                            }
                            i5++;
                            if (i5 >= i3) {
                                break;
                            }
                        }
                        arrayList2.add(((ExecutionVertex) executionVertices.get(0)).getJobVertex().getJobVertex());
                        boolean z4 = false;
                        for (int i6 = 0; i6 < executionVertices.size(); i6++) {
                            JobVertex jobVertex = ((ExecutionVertex) executionVertices.get(i6)).getJobVertex().getJobVertex();
                            if (z4) {
                                if (!jobVertex.getID().equals(arrayList2.get(0).getID()) && (arrayList3.isEmpty() || !jobVertex.getID().equals(arrayList3.get(arrayList3.size() - 1).getID()))) {
                                    arrayList3.add(jobVertex);
                                }
                            } else if (jobVertex.getID().equals(((Execution) arrayList.get(i3)).getVertex().getJobvertexId())) {
                                if (!jobVertex.getID().equals(arrayList2.get(0).getID())) {
                                    arrayList3.add(jobVertex);
                                }
                                z4 = true;
                            } else if (!jobVertex.getID().equals(arrayList2.get(arrayList2.size() - 1).getID())) {
                                arrayList2.add(jobVertex);
                            }
                        }
                    }
                    splitGroupAndContinueScheduling(arrayList2, arrayList3, concurrentSchedulingGroup);
                    return null;
                }
                LOG.info("All allocations is assigned, but the request fail, this is strange.", th);
            }
            int i7 = 0;
            Iterator it6 = collection.iterator();
            while (it6.hasNext()) {
                LogicalSlot logicalSlot4 = (LogicalSlot) it6.next();
                if (!((Execution) arrayList.get(i7)).tryAssignResource(logicalSlot4)) {
                    Throwable flinkException = new FlinkException("Could not assign logical slot to execution " + arrayList.get(i7) + '.');
                    logicalSlot4.releaseSlot(flinkException);
                    ((Execution) arrayList.get(i7)).fail(flinkException);
                }
                i7++;
            }
            for (int i8 = 0; i8 < arrayList.size(); i8++) {
                try {
                    ((Execution) arrayList.get(i8)).deploy();
                } catch (Exception e) {
                    LOG.info("Fail to deploy execution {}", arrayList.get(i8), e);
                    ((Execution) arrayList.get(i8)).fail(e);
                }
            }
            return null;
        }, this.executionGraph.getFutureExecutor());
        this.executionGraph.registerSchedulingFuture(handleAsync);
        handleAsync.whenComplete((BiConsumer<? super U, ? super Throwable>) (r6, th2) -> {
            if (ExceptionUtils.stripCompletionException(th2) instanceof CancellationException) {
                allocateSlotsFor.cancel(false);
            }
            this.executionGraph.unregisterSchedulingFuture(handleAsync);
        });
    }

    private void assignResourceElseFail(Execution execution, LogicalSlot logicalSlot) {
        if (execution.tryAssignResource(logicalSlot)) {
            return;
        }
        FlinkException flinkException = new FlinkException("Could not assign logical slot to execution " + execution + '.');
        logicalSlot.releaseSlot(flinkException);
        execution.fail(flinkException);
    }

    private void breakCircleDependencies(List<ConcurrentJobVertexGroup> list) {
        HashMap hashMap = new HashMap();
        for (ConcurrentJobVertexGroup concurrentJobVertexGroup : list) {
            Iterator<JobVertex> it = concurrentJobVertexGroup.getVertices().iterator();
            while (it.hasNext()) {
                hashMap.put(it.next(), concurrentJobVertexGroup);
            }
        }
        for (ConcurrentJobVertexGroup concurrentJobVertexGroup2 : list) {
            if (hasCircleDependencyInGroups(concurrentJobVertexGroup2, hashMap) && concurrentJobVertexGroup2.hasInputVertex()) {
                concurrentJobVertexGroup2.noPrecedingGroup();
            }
        }
    }

    private boolean hasCircleDependencyInGroups(ConcurrentJobVertexGroup concurrentJobVertexGroup, Map<JobVertex, ConcurrentJobVertexGroup> map) {
        List<JobVertex> predecessorVertices = concurrentJobVertexGroup.getPredecessorVertices();
        if (predecessorVertices.size() <= 0) {
            return false;
        }
        ArrayList arrayList = new ArrayList();
        HashSet hashSet = new HashSet();
        Iterator<JobVertex> it = predecessorVertices.iterator();
        while (it.hasNext()) {
            ConcurrentJobVertexGroup concurrentJobVertexGroup2 = map.get(it.next());
            if (concurrentJobVertexGroup2 != null) {
                arrayList.addAll(concurrentJobVertexGroup2.getPredecessorVertices());
                if (!hashSet.contains(concurrentJobVertexGroup2)) {
                    hashSet.add(concurrentJobVertexGroup2);
                }
            }
        }
        while (!arrayList.isEmpty()) {
            ArrayList arrayList2 = new ArrayList();
            Iterator it2 = arrayList.iterator();
            while (it2.hasNext()) {
                ConcurrentJobVertexGroup concurrentJobVertexGroup3 = map.get((JobVertex) it2.next());
                if (concurrentJobVertexGroup3 == concurrentJobVertexGroup) {
                    return true;
                }
                if (concurrentJobVertexGroup3 != null && !hashSet.contains(concurrentJobVertexGroup3)) {
                    hashSet.add(concurrentJobVertexGroup3);
                    arrayList2.addAll(concurrentJobVertexGroup3.getPredecessorVertices());
                }
            }
            arrayList = arrayList2;
        }
        return false;
    }

    private boolean hasCircleDependencyInVertices(JobVertex jobVertex, JobVertex jobVertex2) {
        ArrayList<JobVertex> arrayList = new ArrayList();
        Set<JobVertexID> set = this.successorToPredecessors.get(jobVertex2.getID());
        if (set != null) {
            Iterator<JobVertexID> it = set.iterator();
            while (it.hasNext()) {
                arrayList.add(this.executionGraph.getJobVertex(it.next()).getJobVertex());
            }
        }
        Iterator<JobEdge> it2 = jobVertex2.getInputs().iterator();
        while (it2.hasNext()) {
            arrayList.add(it2.next().getSource().getProducer());
        }
        while (!arrayList.isEmpty()) {
            ArrayList arrayList2 = new ArrayList();
            for (JobVertex jobVertex3 : arrayList) {
                if (jobVertex3.equals(jobVertex)) {
                    return true;
                }
                Set<JobVertexID> set2 = this.successorToPredecessors.get(jobVertex3.getID());
                if (set2 != null) {
                    Iterator<JobVertexID> it3 = set2.iterator();
                    while (it3.hasNext()) {
                        arrayList2.add(this.executionGraph.getJobVertex(it3.next()).getJobVertex());
                    }
                }
                Iterator<JobEdge> it4 = jobVertex3.getInputs().iterator();
                while (it4.hasNext()) {
                    arrayList2.add(it4.next().getSource().getProducer());
                }
            }
            arrayList = arrayList2;
        }
        return false;
    }

    private boolean isStartOnFinishedEdge(JobEdge jobEdge) {
        Iterator<JobControlEdge> it = jobEdge.getSource().getProducer().getInControlEdges().iterator();
        while (it.hasNext()) {
            Iterator<IntermediateDataSet> it2 = it.next().getSource().getProducedDataSets().iterator();
            while (it2.hasNext()) {
                Iterator<JobEdge> it3 = it2.next().getConsumers().iterator();
                while (it3.hasNext()) {
                    if (it3.next().getTarget().equals(jobEdge.getTarget())) {
                        return true;
                    }
                }
            }
        }
        return false;
    }
}
