/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.schedule;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobType;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.runtime.event.ExecutionVertexFailoverEvent;
import org.apache.flink.runtime.event.ExecutionVertexStateChangedEvent;
import org.apache.flink.runtime.event.ResultPartitionConsumableEvent;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.ControlType;
import org.apache.flink.runtime.jobgraph.ExecutionVertexID;
import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
import org.apache.flink.runtime.jobgraph.JobControlEdge;
import org.apache.flink.runtime.jobgraph.JobEdge;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.SchedulingMode;
import org.apache.flink.runtime.jobmaster.ExecutionSlotAllocator;
import org.apache.flink.runtime.jobmaster.GraphManager;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.schedule.ConcurrentJobVertexGroup;
import org.apache.flink.runtime.schedule.ConcurrentSchedulingGroup;
import org.apache.flink.runtime.schedule.ExecutionVertexStatus;
import org.apache.flink.runtime.schedule.GraphManagerPlugin;
import org.apache.flink.runtime.schedule.SchedulingConfig;
import org.apache.flink.runtime.schedule.VertexInputTracker;
import org.apache.flink.runtime.schedule.VertexScheduler;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConcurrentGroupGraphManagerPlugin
implements GraphManagerPlugin {
    private static final Logger LOG = LoggerFactory.getLogger(ConcurrentGroupGraphManagerPlugin.class);
    private Set<ConcurrentSchedulingGroup> concurrentSchedulingGroups = new HashSet<ConcurrentSchedulingGroup>();
    private Map<ExecutionVertexID, ConcurrentSchedulingGroup> executionToConcurrentSchedulingGroups = new LinkedHashMap<ExecutionVertexID, ConcurrentSchedulingGroup>();
    private Map<JobVertexID, Set<JobVertex>> predecessorToSuccessors = new HashMap<JobVertexID, Set<JobVertex>>();
    private Map<JobVertexID, Set<JobVertexID>> successorToPredecessors = new HashMap<JobVertexID, Set<JobVertexID>>();
    private Set<JobControlEdge> ignoredControlEdges = new HashSet<JobControlEdge>();
    private VertexInputTracker inputTracker;
    private VertexScheduler scheduler;
    private JobGraph jobGraph;
    private ExecutionGraph executionGraph;
    private GraphManager graphManager;
    private ExecutionSlotAllocator executionSlotAllocator;
    private boolean allowGroupSplit;
    private Time allocationLongTimeout;

    @Override
    public void open(VertexScheduler scheduler, JobGraph jobGraph, SchedulingConfig schedulingConfig, ExecutionGraph eg, GraphManager graphManager, ExecutionSlotAllocator executionSlotAllocator) {
        this.scheduler = scheduler;
        this.jobGraph = jobGraph;
        this.inputTracker = new VertexInputTracker(jobGraph, scheduler, schedulingConfig);
        this.executionGraph = eg;
        this.graphManager = graphManager;
        this.allowGroupSplit = schedulingConfig.getConfiguration().getBoolean(JobManagerOptions.ALLOW_GROUP_SPLIT);
        this.executionSlotAllocator = executionSlotAllocator;
        this.allocationLongTimeout = Time.milliseconds((long)schedulingConfig.getConfiguration().getLong(JobManagerOptions.SLOT_REQUEST_LONG_TIMEOUT));
        this.initConcurrentSchedulingGroups();
    }

    private void initConcurrentSchedulingGroups() {
        ArrayList<ConcurrentJobVertexGroup> concurrentJobVertexGroups = new ArrayList<ConcurrentJobVertexGroup>();
        List<JobVertex> allJobVertices = this.jobGraph.getVerticesSortedTopologicallyFromSources();
        if (this.jobGraph.getJobType() == JobType.INFINITE_STREAM) {
            LOG.debug("All executions will be in one group for streaming job {}", (Object)this.jobGraph.getJobID());
            ArrayList<ExecutionVertex> allExecutionVertices = new ArrayList<ExecutionVertex>(this.executionGraph.getRegisteredExecutions().size());
            for (ExecutionVertex ev : this.executionGraph.getAllExecutionVertices()) {
                allExecutionVertices.add(ev);
            }
            concurrentJobVertexGroups.add(new ConcurrentJobVertexGroup(allJobVertices, this.ignoredControlEdges));
            this.concurrentSchedulingGroups.add(new ConcurrentSchedulingGroup(allExecutionVertices, false));
        } else {
            this.buildStartOnFinishRelation(this.jobGraph);
            this.buildConcurrentSchedulingGroups(allJobVertices, false);
        }
    }

    @Override
    public void close() {
    }

    @Override
    public void reset() {
        this.concurrentSchedulingGroups.clear();
        this.executionToConcurrentSchedulingGroups.clear();
        this.predecessorToSuccessors.clear();
        this.successorToPredecessors.clear();
        this.ignoredControlEdges.clear();
        this.initConcurrentSchedulingGroups();
    }

    @Override
    public void onSchedulingStarted() {
        ArrayList<ConcurrentSchedulingGroup> groups = new ArrayList<ConcurrentSchedulingGroup>(this.concurrentSchedulingGroups);
        groups.stream().forEach(group -> {
            if (!group.hasPrecedingGroup()) {
                this.checkAndScheduleGroup((ConcurrentSchedulingGroup)group);
            }
        });
    }

    @Override
    public void onResultPartitionConsumable(ResultPartitionConsumableEvent event) {
        HashSet<ExecutionVertexID> verticesToSchedule = new HashSet<ExecutionVertexID>();
        List<Collection<ExecutionVertexID>> consumerVertices = this.jobGraph.getResultPartitionConsumerExecutionVertices(event.getResultID(), event.getPartitionNumber());
        for (Collection collection : consumerVertices) {
            for (ExecutionVertexID executionVertexID : collection) {
                if (!this.isReadyToSchedule(executionVertexID)) continue;
                verticesToSchedule.add(executionVertexID);
            }
        }
        this.scheduleInConcurrentGroup(verticesToSchedule);
    }

    @Override
    public void onExecutionVertexFailover(ExecutionVertexFailoverEvent event) {
        HashSet<ConcurrentSchedulingGroup> groupToSchedule = new HashSet<ConcurrentSchedulingGroup>();
        if (this.jobGraph.getJobType() == JobType.INFINITE_STREAM) {
            this.scheduler.scheduleExecutionVertices(event.getAffectedExecutionVertexIDs());
        } else {
            for (ExecutionVertexID executionVertexID : event.getAffectedExecutionVertexIDs()) {
                if (!this.isReadyToSchedule(executionVertexID)) continue;
                ConcurrentSchedulingGroup groupsBelongTo = this.executionToConcurrentSchedulingGroups.get(executionVertexID);
                groupToSchedule.add(groupsBelongTo);
            }
            for (ConcurrentSchedulingGroup group : groupToSchedule) {
                if (this.graphManager.cacheGroupIfReconciling(group)) continue;
                LOG.info("Group {} is scheduled again.", (Object)group);
                this.scheduleGroup(group);
            }
        }
    }

    @Override
    public synchronized void onExecutionVertexStateChanged(ExecutionVertexStateChangedEvent event) {
        Set<JobVertex> successorVertices;
        HashSet<ExecutionVertexID> verticesToSchedule = new HashSet<ExecutionVertexID>();
        if (event.getNewExecutionState() == ExecutionState.FINISHED && this.scheduler.getExecutionJobVertexStatus(event.getExecutionVertexID().getJobVertexID()) == ExecutionState.FINISHED && (successorVertices = this.predecessorToSuccessors.get((Object)event.getExecutionVertexID().getJobVertexID())) != null) {
            for (JobVertex successor : successorVertices) {
                for (int i = 0; i < successor.getParallelism(); ++i) {
                    ExecutionVertexID executionVertexID = new ExecutionVertexID(successor.getID(), i);
                    if (!this.isReadyToSchedule(executionVertexID)) continue;
                    verticesToSchedule.add(executionVertexID);
                }
            }
        }
        this.scheduleInConcurrentGroup(verticesToSchedule);
    }

    @Override
    public boolean allowLazyDeployment() {
        return this.jobGraph.getJobType() != JobType.INFINITE_STREAM;
    }

    private List<ConcurrentSchedulingGroup> buildConcurrentSchedulingGroups(List<JobVertex> jobVerticesTopologically, boolean scheduled) {
        ArrayList<ConcurrentJobVertexGroup> concurrentJobVertexGroups = new ArrayList<ConcurrentJobVertexGroup>();
        ArrayList<ConcurrentSchedulingGroup> schedulingGroups = new ArrayList<ConcurrentSchedulingGroup>();
        HashSet<JobVertex> visitedJobVertices = new HashSet<JobVertex>();
        for (JobVertex jobVertex : jobVerticesTopologically) {
            if (!visitedJobVertices.add(jobVertex)) continue;
            ArrayList<JobVertex> concurrentVertices = new ArrayList<JobVertex>();
            concurrentVertices.add(jobVertex);
            for (IntermediateDataSet output : jobVertex.getProducedDataSets()) {
                for (JobEdge jobEdge : output.getConsumers()) {
                    if (!jobVerticesTopologically.contains(jobEdge.getTarget()) || visitedJobVertices.contains(jobEdge.getTarget()) || jobEdge.getSchedulingMode() != SchedulingMode.CONCURRENT || this.isStartOnFinishedEdge(jobEdge)) continue;
                    visitedJobVertices.add(jobEdge.getTarget());
                    concurrentVertices.add(jobEdge.getTarget());
                    concurrentVertices.addAll(this.getAllConcurrentVertices(jobEdge.getTarget(), jobVerticesTopologically, visitedJobVertices));
                }
            }
            concurrentJobVertexGroups.add(new ConcurrentJobVertexGroup(concurrentVertices, this.ignoredControlEdges));
        }
        LOG.info("{} vertex group was built with {} vertices.", (Object)concurrentJobVertexGroups.size(), (Object)jobVerticesTopologically.size());
        this.breakCircleDependencies(concurrentJobVertexGroups);
        for (ConcurrentJobVertexGroup group : concurrentJobVertexGroups) {
            LOG.info("Concurrent vertex group has {} with preceding {}", group.getVertices(), (Object)group.hasPrecedingGroup());
        }
        for (ConcurrentJobVertexGroup regionGroup : concurrentJobVertexGroups) {
            schedulingGroups.addAll(this.buildSchedulingGroupsFromJobVertexGroup(regionGroup, scheduled));
        }
        this.concurrentSchedulingGroups.addAll(schedulingGroups);
        for (ConcurrentSchedulingGroup schedulingGroup : schedulingGroups) {
            for (ExecutionVertex ev : schedulingGroup.getExecutionVertices()) {
                this.executionToConcurrentSchedulingGroups.put(ev.getExecutionVertexID(), schedulingGroup);
            }
        }
        LOG.info("{} concurrent group was built with {} vertices for job {}.", new Object[]{schedulingGroups.size(), jobVerticesTopologically.size(), this.jobGraph.getJobID()});
        return schedulingGroups;
    }

    private void splitGroupAndContinueScheduling(List<JobVertex> assignedJobVertices, List<JobVertex> unAssignedJobVertices, ConcurrentSchedulingGroup originalGroup) {
        List<ExecutionVertex> evs;
        LOG.info("Split scheduling group {} as resource is not enough, assigned {}, unassigned {}.", new Object[]{originalGroup, assignedJobVertices, unAssignedJobVertices});
        this.concurrentSchedulingGroups.remove(originalGroup);
        HashSet<JobVertex> visitedJobVertices = new HashSet<JobVertex>();
        for (JobVertex jobVertex : assignedJobVertices) {
            if (!visitedJobVertices.add(jobVertex)) continue;
            block3: for (int i = 0; i < jobVertex.getProducedDataSets().size(); ++i) {
                IntermediateDataSet output = jobVertex.getProducedDataSets().get(i);
                if (output.getConsumers().isEmpty()) continue;
                JobEdge jobEdge = output.getConsumers().get(0);
                for (ExecutionVertex executionVertex : this.executionGraph.getJobVertex(jobEdge.getTarget().getID()).getTaskVertices()) {
                    if (executionVertex.getExecutionState() != ExecutionState.CREATED) continue;
                    jobEdge.setSchedulingMode(SchedulingMode.SEQUENTIAL);
                    this.executionGraph.getJobVertex(jobVertex.getID()).getProducedDataSets()[i].setResultType(ResultPartitionType.BLOCKING);
                    continue block3;
                }
            }
        }
        this.buildStartOnFinishRelation(this.jobGraph);
        List<ConcurrentSchedulingGroup> newAssignedGroups = this.buildConcurrentSchedulingGroups(assignedJobVertices, true);
        List<ConcurrentSchedulingGroup> newUnAssignedGroups = this.buildConcurrentSchedulingGroups(unAssignedJobVertices, false);
        for (ConcurrentSchedulingGroup group : newAssignedGroups) {
            evs = group.getExecutionVertices();
            if (evs.size() == 1 && evs.get(0).getCurrentAssignedResource() == null) {
                this.scheduleGroup(group);
                continue;
            }
            for (ExecutionVertex ev : evs) {
                try {
                    ev.getCurrentExecutionAttempt().deploy();
                }
                catch (Exception e) {
                    LOG.info("Fail to deploy execution {}", (Object)ev, (Object)e);
                    ev.getCurrentExecutionAttempt().fail(e);
                }
            }
        }
        block7: for (ConcurrentSchedulingGroup group : newUnAssignedGroups) {
            if (!group.hasPrecedingGroup()) {
                this.checkAndScheduleGroup(group);
                continue;
            }
            evs = group.getExecutionVertices();
            for (ExecutionVertex ev : evs) {
                if (!this.isReadyToSchedule(ev.getExecutionVertexID())) continue;
                this.checkAndScheduleGroup(group);
                continue block7;
            }
        }
    }

    @VisibleForTesting
    Set<ConcurrentSchedulingGroup> getConcurrentSchedulingGroups() {
        return this.concurrentSchedulingGroups;
    }

    @VisibleForTesting
    Map<JobVertexID, Set<JobVertex>> getPredecessorToSuccessors() {
        return this.predecessorToSuccessors;
    }

    @VisibleForTesting
    Map<JobVertexID, Set<JobVertexID>> getSuccessorsToPredecessor() {
        return this.successorToPredecessors;
    }

    private Set<JobVertex> getAllConcurrentVertices(JobVertex jobVertex, List<JobVertex> allJobVerticesTopologically, Set<JobVertex> visitedJobVertices) {
        HashSet<JobVertex> concurrentVertices = new HashSet<JobVertex>();
        for (JobEdge jobEdge : jobVertex.getInputs()) {
            if (jobEdge.getSchedulingMode() != SchedulingMode.CONCURRENT || !allJobVerticesTopologically.contains(jobEdge.getSource().getProducer()) || visitedJobVertices.contains(jobEdge.getSource().getProducer()) || this.isStartOnFinishedEdge(jobEdge)) continue;
            visitedJobVertices.add(jobEdge.getSource().getProducer());
            concurrentVertices.add(jobEdge.getSource().getProducer());
            concurrentVertices.addAll(this.getAllConcurrentVertices(jobEdge.getSource().getProducer(), allJobVerticesTopologically, visitedJobVertices));
        }
        for (IntermediateDataSet output : jobVertex.getProducedDataSets()) {
            for (JobEdge jobEdge : output.getConsumers()) {
                if (!allJobVerticesTopologically.contains(jobEdge.getTarget()) || visitedJobVertices.contains(jobEdge.getTarget()) || jobEdge.getSchedulingMode() != SchedulingMode.CONCURRENT || this.isStartOnFinishedEdge(jobEdge)) continue;
                visitedJobVertices.add(jobEdge.getTarget());
                concurrentVertices.add(jobEdge.getTarget());
                concurrentVertices.addAll(this.getAllConcurrentVertices(jobEdge.getTarget(), allJobVerticesTopologically, visitedJobVertices));
            }
        }
        return concurrentVertices;
    }

    private List<ConcurrentSchedulingGroup> buildSchedulingGroupsFromJobVertexGroup(ConcurrentJobVertexGroup jobVertexGroup, boolean scheduled) {
        ArrayList<ConcurrentSchedulingGroup> schedulingGroups = new ArrayList<ConcurrentSchedulingGroup>();
        List<JobVertex> jobVerticesTopologically = jobVertexGroup.getVertices();
        if (jobVerticesTopologically.size() == 1) {
            for (ExecutionVertex ev : this.executionGraph.getJobVertex(jobVerticesTopologically.get(0).getID()).getTaskVertices()) {
                schedulingGroups.add(new ConcurrentSchedulingGroup(Collections.singletonList(ev), jobVertexGroup.hasPrecedingGroup(), scheduled));
            }
        } else {
            ArrayList<ExecutionVertex> executionVertices = new ArrayList<ExecutionVertex>();
            for (JobVertex jobVertex : jobVerticesTopologically) {
                for (ExecutionVertex ev : this.executionGraph.getJobVertex(jobVertex.getID()).getTaskVertices()) {
                    executionVertices.add(ev);
                }
            }
            schedulingGroups.add(new ConcurrentSchedulingGroup(executionVertices, jobVertexGroup.hasPrecedingGroup(), scheduled));
        }
        return schedulingGroups;
    }

    private void scheduleInConcurrentGroup(Set<ExecutionVertexID> verticesToSchedule) {
        HashSet<ConcurrentSchedulingGroup> groupsToSchedule = new HashSet<ConcurrentSchedulingGroup>();
        for (ExecutionVertexID vertexID : verticesToSchedule) {
            ConcurrentSchedulingGroup groupsBelongTo = this.executionToConcurrentSchedulingGroups.get(vertexID);
            if (groupsBelongTo == null) {
                throw new RuntimeException("Can not find a group for " + vertexID + ", this is logic error.");
            }
            groupsToSchedule.add(groupsBelongTo);
        }
        for (ConcurrentSchedulingGroup group : groupsToSchedule) {
            if (this.graphManager.cacheGroupIfReconciling(group)) continue;
            this.checkAndScheduleGroup(group);
        }
    }

    private boolean isReadyToSchedule(ExecutionVertexID vertexID) {
        ExecutionVertexStatus vertexStatus = this.scheduler.getExecutionVertexStatus(vertexID);
        if (vertexStatus.getExecutionState() != ExecutionState.CREATED) {
            return false;
        }
        Set<JobVertexID> predecessorIds = this.successorToPredecessors.get((Object)vertexID.getJobVertexID());
        if (predecessorIds != null) {
            for (JobVertexID predecessorId : predecessorIds) {
                if (this.scheduler.getExecutionJobVertexStatus(predecessorId) == ExecutionState.FINISHED) continue;
                return false;
            }
        }
        if (this.jobGraph.findVertexByID(vertexID.getJobVertexID()).isInputVertex()) {
            return true;
        }
        return this.inputTracker.areInputsReady(vertexID);
    }

    private void buildStartOnFinishRelation(JobGraph jobGraph) {
        this.successorToPredecessors.clear();
        this.predecessorToSuccessors.clear();
        this.ignoredControlEdges.clear();
        for (JobVertex jobVertex : jobGraph.getVerticesSortedTopologicallyFromSources()) {
            for (JobControlEdge controlEdge : jobVertex.getOutControlEdges()) {
                LOG.debug("ControlEdge from {} to {} with type {}", new Object[]{controlEdge.getSource().getID(), controlEdge.getTarget().getID(), controlEdge.getControlType()});
                if (controlEdge.getControlType() != ControlType.START_ON_FINISH) continue;
                for (IntermediateDataSet output : controlEdge.getTarget().getProducedDataSets()) {
                    if (output.getResultType().isBlocking()) continue;
                    LOG.warn("Vertex {} is a start on finished but not blocking.", (Object)controlEdge.getTarget().getName());
                }
                Set<JobVertex> concurrentAncestors = this.getAllConcurrentAncestors(controlEdge.getTarget());
                boolean hasCircleDependency = false;
                for (JobVertex ancestor : concurrentAncestors) {
                    if (!this.hasCircleDependencyInVertices(ancestor, jobVertex)) continue;
                    hasCircleDependency = true;
                    this.ignoredControlEdges.add(controlEdge);
                    break;
                }
                if (hasCircleDependency) continue;
                for (JobVertex ancestor : concurrentAncestors) {
                    Set existingPredecessors = this.successorToPredecessors.computeIfAbsent(ancestor.getID(), k -> new HashSet());
                    existingPredecessors.add(jobVertex.getID());
                }
                Set<JobVertex> existingSuccessors = this.predecessorToSuccessors.putIfAbsent(jobVertex.getID(), concurrentAncestors);
                if (existingSuccessors == null) continue;
                existingSuccessors.addAll(concurrentAncestors);
            }
        }
    }

    private Set<JobVertex> getAllConcurrentAncestors(JobVertex jobVertex) {
        HashSet<JobVertex> ancestors = new HashSet<JobVertex>();
        if (jobVertex.isInputVertex()) {
            ancestors.add(jobVertex);
        } else {
            for (JobEdge jobEdge : jobVertex.getInputs()) {
                if (jobEdge.getSchedulingMode() != SchedulingMode.CONCURRENT || this.isStartOnFinishedEdge(jobEdge)) continue;
                ancestors.addAll(this.getAllConcurrentAncestors(jobEdge.getSource().getProducer()));
            }
            if (ancestors.isEmpty()) {
                ancestors.add(jobVertex);
            }
        }
        return ancestors;
    }

    public void checkAndScheduleGroup(ConcurrentSchedulingGroup schedulingGroup) {
        if (!schedulingGroup.markScheduled()) {
            LOG.info("Group {} has already been scheduled, will not schedule again.", (Object)schedulingGroup);
            return;
        }
        this.scheduleGroup(schedulingGroup);
    }

    public void scheduleGroup(ConcurrentSchedulingGroup schedulingGroup) {
        List<ExecutionVertex> executionVertices = schedulingGroup.getExecutionVertices();
        ArrayList<Execution> scheduledExecutions = new ArrayList<Execution>();
        for (ExecutionVertex ev : executionVertices) {
            if (ev.getCurrentExecutionAttempt().enterScheduled()) {
                scheduledExecutions.add(ev.getCurrentExecutionAttempt());
                continue;
            }
            LOG.info("{} is in state {} while scheduled in group {}", new Object[]{ev.getTaskNameWithSubtaskIndex(), ev.getExecutionState(), schedulingGroup});
        }
        Time allocationTimeout = executionVertices.size() > 1 ? this.executionGraph.getAllocationTimeout() : this.allocationLongTimeout;
        CompletableFuture<Collection<LogicalSlot>> allocationFuture = this.executionSlotAllocator.allocateSlotsFor(scheduledExecutions, allocationTimeout);
        CompletionStage currentSchedulingFuture = allocationFuture.handleAsync((slots, throwable) -> {
            if (throwable == null) {
                if (scheduledExecutions.size() != slots.size()) {
                    LOG.warn("Execution state change during allocating resource.");
                }
                int failedNumber = 0;
                for (Object slot : slots) {
                    if (slot != null) continue;
                    ++failedNumber;
                }
                Exception strippedThrowable = new Exception("Batch request " + scheduledExecutions.size() + ", but " + failedNumber + " does not return.");
                if (!(failedNumber <= 0 || this.allowGroupSplit && executionVertices.size() >= 2)) {
                    for (LogicalSlot slot : slots) {
                        if (slot == null) continue;
                        slot.releaseSlot(strippedThrowable);
                    }
                    for (Execution execution : scheduledExecutions) {
                        execution.fail(strippedThrowable);
                    }
                    return null;
                }
                if (failedNumber > 0) {
                    int i = 0;
                    int index = -1;
                    boolean hasFailure = false;
                    for (LogicalSlot slot : slots) {
                        if (slot == null && !hasFailure) {
                            hasFailure = true;
                            index = i;
                            ((Execution)scheduledExecutions.get(i)).rollbackToCreated();
                            LOG.debug("The first failed slot request is {}.", (Object)index);
                        } else if (hasFailure) {
                            if (slot != null) {
                                slot.releaseSlot(strippedThrowable);
                            }
                            ((Execution)scheduledExecutions.get(i)).rollbackToCreated();
                        }
                        ++i;
                    }
                    if (index < 0) {
                        LOG.info("All allocations is assigned, but the request fail, this is strange.", throwable);
                    } else {
                        ArrayList<JobVertex> assignedJobVertices = new ArrayList<JobVertex>();
                        ArrayList<JobVertex> unAssignedJobVertices = new ArrayList<JobVertex>();
                        if (index == 0) {
                            assignedJobVertices.add(((ExecutionVertex)executionVertices.get(0)).getJobVertex().getJobVertex());
                            for (int j = 1; j < executionVertices.size(); ++j) {
                                JobVertexID jobVertexID = ((ExecutionVertex)executionVertices.get(j)).getJobvertexId();
                                if (jobVertexID.equals((Object)((JobVertex)assignedJobVertices.get(assignedJobVertices.size() - 1)).getID()) || !unAssignedJobVertices.isEmpty() && jobVertexID.equals((Object)((JobVertex)unAssignedJobVertices.get(unAssignedJobVertices.size() - 1)).getID())) continue;
                                unAssignedJobVertices.add(((ExecutionVertex)executionVertices.get(j)).getJobVertex().getJobVertex());
                            }
                        } else {
                            boolean lastAssignedVertexFulfilled = false;
                            boolean firstVertexFullyAssigned = true;
                            if (!((Execution)scheduledExecutions.get(index)).getVertex().getJobvertexId().equals((Object)((Execution)scheduledExecutions.get(index - 1)).getVertex().getJobvertexId())) {
                                lastAssignedVertexFulfilled = true;
                                LOG.debug("The last assigned vertex is fully filled.");
                            }
                            if (((Execution)scheduledExecutions.get(index)).getVertex().getJobvertexId().equals((Object)((ExecutionVertex)executionVertices.get(0)).getJobvertexId())) {
                                firstVertexFullyAssigned = false;
                                LOG.debug("The first vertex is not fully filled.");
                            }
                            i = 0;
                            for (LogicalSlot slot : slots) {
                                if (lastAssignedVertexFulfilled) {
                                    this.assignResourceElseFail((Execution)scheduledExecutions.get(i), slot);
                                } else if (firstVertexFullyAssigned) {
                                    if (((ExecutionVertex)executionVertices.get(i)).getJobvertexId().equals((Object)((Execution)scheduledExecutions.get(index)).getVertex().getJobvertexId())) {
                                        slot.releaseSlot(strippedThrowable);
                                        ((Execution)scheduledExecutions.get(i)).rollbackToCreated();
                                    } else {
                                        this.assignResourceElseFail((Execution)scheduledExecutions.get(i), slot);
                                    }
                                } else {
                                    this.assignResourceElseFail((Execution)scheduledExecutions.get(i), slot);
                                }
                                if (++i < index) continue;
                                break;
                            }
                            assignedJobVertices.add(((ExecutionVertex)executionVertices.get(0)).getJobVertex().getJobVertex());
                            boolean enterUnAssigned = false;
                            for (int j = 0; j < executionVertices.size(); ++j) {
                                JobVertex jobVertex = ((ExecutionVertex)executionVertices.get(j)).getJobVertex().getJobVertex();
                                if (!enterUnAssigned) {
                                    if (jobVertex.getID().equals((Object)((Execution)scheduledExecutions.get(index)).getVertex().getJobvertexId())) {
                                        if (!jobVertex.getID().equals((Object)((JobVertex)assignedJobVertices.get(0)).getID())) {
                                            unAssignedJobVertices.add(jobVertex);
                                        }
                                        enterUnAssigned = true;
                                        continue;
                                    }
                                    if (jobVertex.getID().equals((Object)((JobVertex)assignedJobVertices.get(assignedJobVertices.size() - 1)).getID())) continue;
                                    assignedJobVertices.add(jobVertex);
                                    continue;
                                }
                                if (jobVertex.getID().equals((Object)((JobVertex)assignedJobVertices.get(0)).getID()) || !unAssignedJobVertices.isEmpty() && jobVertex.getID().equals((Object)((JobVertex)unAssignedJobVertices.get(unAssignedJobVertices.size() - 1)).getID())) continue;
                                unAssignedJobVertices.add(jobVertex);
                            }
                        }
                        this.splitGroupAndContinueScheduling(assignedJobVertices, unAssignedJobVertices, schedulingGroup);
                        return null;
                    }
                }
                int i = 0;
                for (LogicalSlot slot : slots) {
                    if (!((Execution)scheduledExecutions.get(i)).tryAssignResource(slot)) {
                        FlinkException e = new FlinkException("Could not assign logical slot to execution " + scheduledExecutions.get(i) + '.');
                        slot.releaseSlot(e);
                        ((Execution)scheduledExecutions.get(i)).fail(e);
                    }
                    ++i;
                }
                for (i = 0; i < scheduledExecutions.size(); ++i) {
                    try {
                        ((Execution)scheduledExecutions.get(i)).deploy();
                        continue;
                    }
                    catch (Exception e) {
                        LOG.info("Fail to deploy execution {}", scheduledExecutions.get(i), (Object)e);
                        ((Execution)scheduledExecutions.get(i)).fail(e);
                    }
                }
            }
            return null;
        }, this.executionGraph.getFutureExecutor());
        this.executionGraph.registerSchedulingFuture((CompletableFuture<Void>)currentSchedulingFuture);
        ((CompletableFuture)currentSchedulingFuture).whenComplete((arg_0, arg_1) -> this.lambda$scheduleGroup$3(allocationFuture, (CompletableFuture)currentSchedulingFuture, arg_0, arg_1));
    }

    private void assignResourceElseFail(Execution execution, LogicalSlot slot) {
        if (!execution.tryAssignResource(slot)) {
            FlinkException e = new FlinkException("Could not assign logical slot to execution " + execution + '.');
            slot.releaseSlot(e);
            execution.fail(e);
        }
    }

    private void breakCircleDependencies(List<ConcurrentJobVertexGroup> jobVertexGroups) {
        HashMap<JobVertex, ConcurrentJobVertexGroup> vertexToConcurrentGroupMap = new HashMap<JobVertex, ConcurrentJobVertexGroup>();
        for (ConcurrentJobVertexGroup jobVertexGroup : jobVertexGroups) {
            List<JobVertex> vertices = jobVertexGroup.getVertices();
            for (JobVertex vertex : vertices) {
                vertexToConcurrentGroupMap.put(vertex, jobVertexGroup);
            }
        }
        for (ConcurrentJobVertexGroup jobVertexGroup : jobVertexGroups) {
            if (!this.hasCircleDependencyInGroups(jobVertexGroup, vertexToConcurrentGroupMap) || !jobVertexGroup.hasInputVertex()) continue;
            jobVertexGroup.noPrecedingGroup();
        }
    }

    private boolean hasCircleDependencyInGroups(ConcurrentJobVertexGroup jobVertexGroup, Map<JobVertex, ConcurrentJobVertexGroup> vertexToConcurrentGroupMap) {
        List<JobVertex> predecessors = jobVertexGroup.getPredecessorVertices();
        if (predecessors.size() > 0) {
            ArrayList<JobVertex> ancestors = new ArrayList<JobVertex>();
            HashSet<ConcurrentJobVertexGroup> visitedGroups = new HashSet<ConcurrentJobVertexGroup>();
            for (JobVertex predecessor : predecessors) {
                ConcurrentJobVertexGroup group = vertexToConcurrentGroupMap.get(predecessor);
                if (group == null) continue;
                ancestors.addAll(group.getPredecessorVertices());
                if (visitedGroups.contains(group)) continue;
                visitedGroups.add(group);
            }
            while (!ancestors.isEmpty()) {
                ArrayList<JobVertex> newAddedAncestors = new ArrayList<JobVertex>();
                for (JobVertex ancestor : ancestors) {
                    ConcurrentJobVertexGroup group = vertexToConcurrentGroupMap.get(ancestor);
                    if (group == jobVertexGroup) {
                        return true;
                    }
                    if (group == null || visitedGroups.contains(group)) continue;
                    visitedGroups.add(group);
                    newAddedAncestors.addAll(group.getPredecessorVertices());
                }
                ancestors = newAddedAncestors;
            }
        }
        return false;
    }

    private boolean hasCircleDependencyInVertices(JobVertex successor, JobVertex predecessor) {
        ArrayList<JobVertex> ancestors = new ArrayList<JobVertex>();
        Set<JobVertexID> virtualPredecessors = this.successorToPredecessors.get((Object)predecessor.getID());
        if (virtualPredecessors != null) {
            for (JobVertexID virtualPredecessor : virtualPredecessors) {
                ancestors.add(this.executionGraph.getJobVertex(virtualPredecessor).getJobVertex());
            }
        }
        for (JobEdge jobEdge : predecessor.getInputs()) {
            ancestors.add(jobEdge.getSource().getProducer());
        }
        while (!ancestors.isEmpty()) {
            ArrayList<JobVertex> newAddedAncestors = new ArrayList<JobVertex>();
            for (JobVertex ancestor : ancestors) {
                if (ancestor.equals(successor)) {
                    return true;
                }
                Set<JobVertexID> newVirtualPredecessors = this.successorToPredecessors.get((Object)ancestor.getID());
                if (newVirtualPredecessors != null) {
                    for (JobVertexID newVirtualPredecessor : newVirtualPredecessors) {
                        newAddedAncestors.add(this.executionGraph.getJobVertex(newVirtualPredecessor).getJobVertex());
                    }
                }
                for (JobEdge jobEdge : ancestor.getInputs()) {
                    newAddedAncestors.add(jobEdge.getSource().getProducer());
                }
            }
            ancestors = newAddedAncestors;
        }
        return false;
    }

    private boolean isStartOnFinishedEdge(JobEdge jobEdge) {
        JobVertex source = jobEdge.getSource().getProducer();
        for (JobControlEdge controlEdge : source.getInControlEdges()) {
            JobVertex controlEdgeSource = controlEdge.getSource();
            for (IntermediateDataSet output : controlEdgeSource.getProducedDataSets()) {
                for (JobEdge outputEdge : output.getConsumers()) {
                    if (!outputEdge.getTarget().equals(jobEdge.getTarget())) continue;
                    return true;
                }
            }
        }
        return false;
    }

    private /* synthetic */ void lambda$scheduleGroup$3(CompletableFuture allocationFuture, CompletableFuture currentSchedulingFuture, Void ignored, Throwable throwable) {
        Throwable strippedThrowable = ExceptionUtils.stripCompletionException((Throwable)throwable);
        if (strippedThrowable instanceof CancellationException) {
            allocationFuture.cancel(false);
        }
        this.executionGraph.unregisterSchedulingFuture(currentSchedulingFuture);
    }
}

