/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.schedule;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import org.apache.flink.runtime.event.ExecutionVertexFailoverEvent;
import org.apache.flink.runtime.event.ExecutionVertexStateChangedEvent;
import org.apache.flink.runtime.event.ResultPartitionConsumableEvent;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.ExecutionVertexID;
import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmaster.ExecutionSlotAllocator;
import org.apache.flink.runtime.jobmaster.GraphManager;
import org.apache.flink.runtime.schedule.ExecutionVertexStatus;
import org.apache.flink.runtime.schedule.GraphManagerPlugin;
import org.apache.flink.runtime.schedule.SchedulingConfig;
import org.apache.flink.runtime.schedule.VertexInputTracker;
import org.apache.flink.runtime.schedule.VertexScheduler;
import org.apache.flink.util.Preconditions;

public class StepwiseSchedulingPlugin
implements GraphManagerPlugin {
    private VertexScheduler scheduler;
    private JobGraph jobGraph;
    private VertexInputTracker inputTracker;

    @Override
    public void open(VertexScheduler scheduler, JobGraph jobGraph, SchedulingConfig config, ExecutionGraph eg, GraphManager graphManager, ExecutionSlotAllocator executionSlotAllocator) {
        Preconditions.checkNotNull((Object)config);
        this.scheduler = (VertexScheduler)Preconditions.checkNotNull((Object)scheduler);
        this.jobGraph = (JobGraph)Preconditions.checkNotNull((Object)jobGraph);
        this.inputTracker = new VertexInputTracker(jobGraph, scheduler, config);
    }

    @Override
    public void close() {
    }

    @Override
    public void reset() {
    }

    @Override
    public void onSchedulingStarted() {
        ArrayList<ExecutionVertexID> verticesToSchedule = new ArrayList<ExecutionVertexID>();
        for (JobVertex vertex : this.jobGraph.getVerticesSortedTopologicallyFromSources()) {
            if (!vertex.isInputVertex()) continue;
            for (int i = 0; i < vertex.getParallelism(); ++i) {
                verticesToSchedule.add(new ExecutionVertexID(vertex.getID(), i));
            }
        }
        this.scheduleOneByOne(verticesToSchedule);
    }

    @Override
    public void onResultPartitionConsumable(ResultPartitionConsumableEvent event) {
        ArrayList<ExecutionVertexID> verticesToSchedule = new ArrayList<ExecutionVertexID>();
        List<Collection<ExecutionVertexID>> consumerVertices = this.jobGraph.getResultPartitionConsumerExecutionVertices(event.getResultID(), event.getPartitionNumber());
        IntermediateDataSet dataSet = this.jobGraph.getResult(event.getResultID());
        for (int i = 0; i < consumerVertices.size(); ++i) {
            Collection<ExecutionVertexID> executionVertexIDs = consumerVertices.get(i);
            if (executionVertexIDs.size() <= 0 || dataSet.getConsumers().get(i).getDistributionPattern() == DistributionPattern.ALL_TO_ALL && !this.inputTracker.isInputReady(executionVertexIDs.iterator().next(), dataSet.getId())) continue;
            for (ExecutionVertexID executionVertexID : executionVertexIDs) {
                if (!this.isReadyToSchedule(executionVertexID)) continue;
                verticesToSchedule.add(executionVertexID);
            }
        }
        this.scheduleOneByOne(verticesToSchedule);
    }

    @Override
    public void onExecutionVertexStateChanged(ExecutionVertexStateChangedEvent event) {
    }

    @Override
    public void onExecutionVertexFailover(ExecutionVertexFailoverEvent event) {
        ArrayList<ExecutionVertexID> verticesToRestartNow = new ArrayList<ExecutionVertexID>();
        for (ExecutionVertexID executionVertexID : event.getAffectedExecutionVertexIDs()) {
            if (!this.isReadyToSchedule(executionVertexID)) continue;
            verticesToRestartNow.add(executionVertexID);
        }
        this.scheduleOneByOne(verticesToRestartNow);
    }

    private boolean isReadyToSchedule(ExecutionVertexID vertexID) {
        ExecutionVertexStatus vertexStatus = this.scheduler.getExecutionVertexStatus(vertexID);
        if (vertexStatus.getExecutionState() != ExecutionState.CREATED) {
            return false;
        }
        if (this.jobGraph.findVertexByID(vertexID.getJobVertexID()).isInputVertex()) {
            return true;
        }
        return this.inputTracker.areInputsReady(vertexID);
    }

    private void scheduleOneByOne(List<ExecutionVertexID> verticesToSchedule) {
        for (ExecutionVertexID executionVertexID : verticesToSchedule) {
            this.scheduler.scheduleExecutionVertices(Collections.singleton(executionVertexID));
        }
    }
}

