/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.jobmaster;

import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.runtime.event.ExecutionVertexFailoverEvent;
import org.apache.flink.runtime.event.ExecutionVertexStateChangedEvent;
import org.apache.flink.runtime.event.ResultPartitionConsumableEvent;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionStatusListener;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.IOMetrics;
import org.apache.flink.runtime.executiongraph.IntermediateResult;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobgraph.ExecutionVertexID;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobmaster.JobMasterGateway;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.failover.ExecutionGraphOperationLog;
import org.apache.flink.runtime.jobmaster.failover.FailoverOperationLog;
import org.apache.flink.runtime.jobmaster.failover.InputSplitsOperationLog;
import org.apache.flink.runtime.jobmaster.failover.OperationLog;
import org.apache.flink.runtime.jobmaster.failover.OperationLogManager;
import org.apache.flink.runtime.jobmaster.failover.Replayable;
import org.apache.flink.runtime.jobmaster.failover.ResultDescriptor;
import org.apache.flink.runtime.jobmaster.failover.ResultPartitionOperationLog;
import org.apache.flink.runtime.schedule.ExecutionVertexStatus;
import org.apache.flink.runtime.schedule.GraphManagerPlugin;
import org.apache.flink.runtime.schedule.ResultPartitionStatus;
import org.apache.flink.runtime.schedule.SchedulingConfig;
import org.apache.flink.runtime.schedule.VertexScheduler;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class GraphManager
implements Replayable,
ExecutionStatusListener {
    static final Logger LOG = LoggerFactory.getLogger(GraphManager.class);
    private final GraphManagerPlugin graphManagerPlugin;
    private final OperationLogManager operationLogManager;
    private final ExecutionGraph executionGraph;
    private final List<Collection<ExecutionVertexID>> executionVerticesToBeScheduled;
    private volatile boolean isReconciling;

    public GraphManager(GraphManagerPlugin graphManagerPlugin, JobMasterGateway jobMasterGateway, OperationLogManager operationLogManager, ExecutionGraph executionGraph) {
        this.graphManagerPlugin = (GraphManagerPlugin)Preconditions.checkNotNull((Object)graphManagerPlugin);
        this.operationLogManager = (OperationLogManager)Preconditions.checkNotNull((Object)operationLogManager);
        this.executionGraph = (ExecutionGraph)Preconditions.checkNotNull((Object)executionGraph);
        this.executionVerticesToBeScheduled = new LinkedList<Collection<ExecutionVertexID>>();
    }

    public void open(JobGraph jobGraph, SchedulingConfig config) {
        this.graphManagerPlugin.open(new ExecutionGraphVertexScheduler(), jobGraph, config);
    }

    public void dispose() {
        this.graphManagerPlugin.close();
    }

    public void reset() {
        this.graphManagerPlugin.reset();
        this.operationLogManager.stop();
        this.operationLogManager.clear();
        this.operationLogManager.start();
    }

    public boolean isReplaying() {
        return this.operationLogManager.isReplaying();
    }

    public boolean isReconciling() {
        return this.isReconciling;
    }

    public void enterReconcile() {
        this.isReconciling = true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void leaveReconcile() {
        List<Collection<ExecutionVertexID>> list = this.executionVerticesToBeScheduled;
        synchronized (list) {
            for (Collection<ExecutionVertexID> executionVertexIDS : this.executionVerticesToBeScheduled) {
                this.executionGraph.scheduleVertices(executionVertexIDS);
            }
            this.isReconciling = false;
        }
    }

    public boolean allowLazyDeployment() {
        return this.graphManagerPlugin.allowLazyDeployment();
    }

    public void startScheduling() {
        LOG.info("Start scheduling execution graph with graph manager plugin: {}", (Object)this.graphManagerPlugin.getClass().getName());
        this.graphManagerPlugin.onSchedulingStarted();
    }

    public void notifyExecutionVertexFailover(List<ExecutionVertexID> failoverExecutionVertices) {
        if (!this.operationLogManager.isReplaying()) {
            this.operationLogManager.writeOpLog(new FailoverOperationLog(failoverExecutionVertices));
        }
        this.graphManagerPlugin.onExecutionVertexFailover(new ExecutionVertexFailoverEvent(failoverExecutionVertices));
    }

    public void notifyResultPartitionConsumable(ExecutionVertexID executionVertexID, IntermediateDataSetID resultID, int partitionNumber, TaskManagerLocation location) {
        if (!this.operationLogManager.isReplaying() && this.executionGraph.getAllIntermediateResults().get((Object)resultID).getResultType().isPipelined()) {
            this.operationLogManager.writeOpLog(new ResultPartitionOperationLog(executionVertexID, resultID, location));
        }
        this.graphManagerPlugin.onResultPartitionConsumable(new ResultPartitionConsumableEvent(resultID, partitionNumber));
    }

    public void notifyInputSplitsCreated(JobVertexID jobVertexID, Map<OperatorID, InputSplit[]> inputSplitsMap) {
        if (!this.operationLogManager.isReplaying()) {
            this.operationLogManager.writeOpLog(new InputSplitsOperationLog(jobVertexID, inputSplitsMap));
        }
    }

    public boolean reconcileExecutionVertex(JobVertexID vertexId, int subtask, ExecutionState state, ExecutionAttemptID executionId, int attemptNumber, long startTimestamp, ResultPartitionID[] partitionIds, boolean[] partitionsConsumable, Map<OperatorID, List<InputSplit>> assignedInputSplits, LogicalSlot slot) {
        ExecutionJobVertex ejv = this.executionGraph.getJobVertex(vertexId);
        if (ejv == null || ejv.getParallelism() <= subtask) {
            LOG.info("Can not find the execution vertex {}_{}", (Object)vertexId, (Object)subtask);
            return false;
        }
        try {
            ExecutionVertex ev = ejv.getTaskVertices()[subtask];
            return ev.reconcileExecution(state, executionId, attemptNumber, startTimestamp, partitionIds, partitionsConsumable, assignedInputSplits, slot);
        }
        catch (Throwable t) {
            LOG.info("Fail to reconcile vertex {}_{}.", new Object[]{vertexId, subtask, t});
            return false;
        }
    }

    public boolean hasToBeScheduledVertices() {
        return !this.executionVerticesToBeScheduled.isEmpty();
    }

    @Override
    public void replayOpLog(OperationLog opLog) {
        Preconditions.checkArgument((this.isReconciling && JobStatus.CREATED.equals((Object)this.executionGraph.getState()) ? 1 : 0) != 0, (Object)("Job is in " + (Object)((Object)this.executionGraph.getState()) + " while replaying log."));
        if (opLog instanceof ExecutionGraphOperationLog) {
            ExecutionGraphOperationLog egOperationLog = (ExecutionGraphOperationLog)opLog;
            JobVertexID jobVertexID = egOperationLog.getExecutionVertexID().getJobVertexID();
            int subTaskIndex = egOperationLog.getExecutionVertexID().getSubTaskIndex();
            this.executionGraph.getJobVertex(jobVertexID).getTaskVertices()[subTaskIndex].recoverStatus(egOperationLog.getExecutionState(), egOperationLog.getConsumedInputs(), egOperationLog.getResultDescriptor());
        } else if (opLog instanceof ResultPartitionOperationLog) {
            ResultPartitionOperationLog rpOperationLog = (ResultPartitionOperationLog)opLog;
            JobVertexID jobVertexID = rpOperationLog.getExecutionVertexID().getJobVertexID();
            int subTaskIndex = rpOperationLog.getExecutionVertexID().getSubTaskIndex();
            this.executionGraph.getJobVertex(jobVertexID).getTaskVertices()[subTaskIndex].recoverResultPartitionStatus(rpOperationLog.getResultID(), rpOperationLog.getLocation());
        } else if (opLog instanceof FailoverOperationLog) {
            FailoverOperationLog failoverOperationLog = (FailoverOperationLog)opLog;
            List<ExecutionVertexID> ids = failoverOperationLog.getExecutionVertexIDs();
            ArrayList<ExecutionVertex> evs = new ArrayList<ExecutionVertex>(ids.size());
            for (ExecutionVertexID executionVertexID : ids) {
                evs.add(this.executionGraph.getJobVertex(executionVertexID.getJobVertexID()).getTaskVertices()[executionVertexID.getSubTaskIndex()]);
            }
            try {
                for (Collection collection : this.executionVerticesToBeScheduled) {
                    collection.removeAll(evs);
                }
                this.executionGraph.resetExecutionVerticesAndNotify(this.executionGraph.getGlobalModVersion(), evs);
            }
            catch (Exception e) {
                throw new FlinkRuntimeException("Fail to reset execution vertex", (Throwable)e);
            }
        } else if (opLog instanceof InputSplitsOperationLog) {
            InputSplitsOperationLog splitsOperationLog = (InputSplitsOperationLog)opLog;
            try {
                this.executionGraph.getAllVertices().get((Object)splitsOperationLog.getJobVertexID()).setUpInputSplits(splitsOperationLog.getInputSplitsMap());
            }
            catch (Exception e) {
                throw new FlinkRuntimeException("Fail to set up input splits of vertex", (Throwable)e);
            }
        } else {
            throw new FlinkRuntimeException("Unsupported operation log " + opLog);
        }
    }

    @Override
    public void executionStatusChanged(JobID jobID, JobVertexID vertexID, String taskName, int totalNumberOfSubTasks, int subtaskIndex, ExecutionAttemptID executionID, ExecutionState newExecutionState, long timestamp, String optionalMessage) {
        ExecutionVertexID executionVertexID = new ExecutionVertexID(vertexID, subtaskIndex);
        Map<OperatorID, List<InputSplit>> inputSplits = null;
        ResultDescriptor resultDescriptor = null;
        Map<String, Accumulator<?, ?>> userAccumulators = null;
        IOMetrics metrics = null;
        switch (newExecutionState) {
            case FINISHED: {
                ExecutionVertex ev = this.executionGraph.getJobVertex(vertexID).getTaskVertices()[subtaskIndex];
                inputSplits = ev.getAssignedInputSplits();
                ResultPartitionID[] resultPartitionIds = new ResultPartitionID[ev.getProducedPartitions().size()];
                int i = 0;
                for (IntermediateResultPartitionID irp : ev.getProducedPartitions().keySet()) {
                    resultPartitionIds[i++] = new ResultPartitionID(irp, ev.getCurrentExecutionAttempt().getAttemptId());
                }
                resultDescriptor = new ResultDescriptor(ev.getCurrentAssignedResourceLocation(), resultPartitionIds);
                userAccumulators = ev.getCurrentExecutionAttempt().getUserAccumulators();
                metrics = ev.getCurrentExecutionAttempt().getIOMetrics();
            }
            case RUNNING: 
            case DEPLOYING: {
                if (!this.operationLogManager.isReplaying()) {
                    this.operationLogManager.writeOpLog(new ExecutionGraphOperationLog(executionVertexID, newExecutionState, inputSplits, resultDescriptor, userAccumulators, metrics));
                }
                this.graphManagerPlugin.onExecutionVertexStateChanged(new ExecutionVertexStateChangedEvent(executionVertexID, newExecutionState));
                break;
            }
        }
    }

    public class ExecutionGraphVertexScheduler
    implements VertexScheduler {
        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void scheduleExecutionVertices(Collection<ExecutionVertexID> verticesToSchedule) {
            List list = GraphManager.this.executionVerticesToBeScheduled;
            synchronized (list) {
                if (GraphManager.this.isReconciling) {
                    GraphManager.this.executionVerticesToBeScheduled.add(verticesToSchedule);
                    return;
                }
            }
            GraphManager.this.executionGraph.scheduleVertices(verticesToSchedule);
        }

        @Override
        public ExecutionVertexStatus getExecutionVertexStatus(ExecutionVertexID executionVertexID) {
            Preconditions.checkNotNull((Object)executionVertexID);
            ExecutionJobVertex vertex = GraphManager.this.executionGraph.getJobVertex(executionVertexID.getJobVertexID());
            if (vertex == null) {
                throw new IllegalArgumentException("Cannot find any vertex with id " + (Object)((Object)executionVertexID.getJobVertexID()));
            }
            return vertex.getTaskVertices()[executionVertexID.getSubTaskIndex()].getCurrentStatus();
        }

        @Override
        public ResultPartitionStatus getResultPartitionStatus(IntermediateDataSetID resultID, int partitionNumber) {
            Preconditions.checkNotNull((Object)((Object)resultID));
            IntermediateResult result = GraphManager.this.executionGraph.getAllIntermediateResults().get((Object)resultID);
            if (result == null) {
                throw new IllegalArgumentException("Cannot find any result with id " + (Object)((Object)resultID));
            }
            return new ResultPartitionStatus(resultID, partitionNumber, result.getPartitions()[partitionNumber].isConsumable());
        }

        @Override
        public double getResultConsumablePartitionRatio(IntermediateDataSetID resultID) {
            Preconditions.checkNotNull((Object)((Object)resultID));
            IntermediateResult result = GraphManager.this.executionGraph.getAllIntermediateResults().get((Object)resultID);
            if (result == null) {
                throw new IllegalArgumentException("Cannot find any result with id " + (Object)((Object)resultID));
            }
            return result.getResultConsumablePartitionRatio();
        }
    }
}

