/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.executiongraph;

import java.io.IOException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ArchivedExecutionConfig;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.accumulators.AccumulatorHelper;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.Meter;
import org.apache.flink.metrics.MeterView;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.StoppingException;
import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
import org.apache.flink.runtime.blob.BlobWriter;
import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotProfile;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
import org.apache.flink.runtime.deployment.ResultPartitionLocationTrackerProxy;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.SuppressRestartsException;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.AllVerticesIterator;
import org.apache.flink.runtime.executiongraph.ErrorInfo;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraphException;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionStatusListener;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.IllegalExecutionStateException;
import org.apache.flink.runtime.executiongraph.IntermediateResult;
import org.apache.flink.runtime.executiongraph.JobInformation;
import org.apache.flink.runtime.executiongraph.JobStatusListener;
import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
import org.apache.flink.runtime.executiongraph.restart.ExecutionGraphRestartCallback;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobgraph.ExecutionVertexID;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.jobmaster.GraphManager;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
import org.apache.flink.runtime.metrics.SimpleHistogram;
import org.apache.flink.runtime.query.KvStateLocationRegistry;
import org.apache.flink.runtime.schedule.SlotSharingResourceCalculator;
import org.apache.flink.runtime.schedule.SummationSlotSharingResourceCalculator;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.types.Either;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.OptionalFailure;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedThrowable;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ExecutionGraph
implements AccessExecutionGraph {
    private static final AtomicReferenceFieldUpdater<ExecutionGraph, JobStatus> STATE_UPDATER = AtomicReferenceFieldUpdater.newUpdater(ExecutionGraph.class, JobStatus.class, "state");
    private static final AtomicLongFieldUpdater<ExecutionGraph> GLOBAL_VERSION_UPDATER = AtomicLongFieldUpdater.newUpdater(ExecutionGraph.class, "globalModVersion");
    static final Logger LOG = LoggerFactory.getLogger(ExecutionGraph.class);
    private final Object progressLock = new Object();
    private final JobInformation jobInformation;
    private final Either<SerializedValue<JobInformation>, PermanentBlobKey> jobInformationOrBlobKey;
    private final ScheduledExecutorService futureExecutor;
    private final Executor ioExecutor;
    private boolean isStoppable = true;
    private final ConcurrentHashMap<JobVertexID, ExecutionJobVertex> tasks;
    private final List<ExecutionJobVertex> verticesInCreationOrder;
    private final ConcurrentHashMap<IntermediateDataSetID, IntermediateResult> intermediateResults;
    private final ConcurrentHashMap<ExecutionAttemptID, Execution> currentExecutions;
    private final List<JobStatusListener> jobStatusListeners;
    private final List<ExecutionStatusListener> executionListeners;
    private final FailoverStrategy failoverStrategy;
    private final long[] stateTimestamps;
    private final Time rpcTimeout;
    private final Time allocationTimeout;
    private final RestartStrategy restartStrategy;
    private final SlotProvider slotProvider;
    private final ClassLoader userClassLoader;
    private final KvStateLocationRegistry kvStateLocationRegistry;
    private final BlobWriter blobWriter;
    private final ResultPartitionLocationTrackerProxy resultPartitionLocationTrackerProxy;
    private final SlotSharingResourceCalculator slotSharingResourceCalculator;
    private int numVerticesTotal;
    private GraphManager graphManager;
    private final Configuration jobManagerConfiguration;
    private boolean allowQueuedScheduling = false;
    private boolean allowLazyDeployment = true;
    private double perTaskInputSplitsLimitAsAverageMultiplier = -1.0;
    private final AtomicInteger verticesFinished;
    private volatile JobStatus state = JobStatus.CREATED;
    private volatile CompletableFuture<JobStatus> terminationFuture;
    private volatile CompletableFuture<Collection<ExecutionAttemptID>> reconcileFuture;
    private volatile long globalModVersion;
    private volatile Throwable failureCause;
    private volatile ErrorInfo failureInfo;
    private volatile ConcurrentHashMap<CompletableFuture<Void>, Object> schedulingFutures;
    private CheckpointCoordinator checkpointCoordinator;
    private CheckpointStatsTracker checkpointStatsTracker;
    private long updatePartitionInfoSendInterval;
    private String jsonPlan;
    private Meter failOverMetrics;
    private Histogram taskDeployMetrics;

    public ExecutionGraph(JobInformation jobInformation, ScheduledExecutorService futureExecutor, Executor ioExecutor, Time rpcTimeout, RestartStrategy restartStrategy, FailoverStrategy.Factory failoverStrategyFactory, SlotProvider slotProvider, ClassLoader userClassLoader, BlobWriter blobWriter, ResultPartitionLocationTrackerProxy resultPartitionLocationTrackerProxy, Time allocationTimeout, Configuration jobManagerConfiguration) throws IOException {
        this(jobInformation, futureExecutor, ioExecutor, rpcTimeout, restartStrategy, failoverStrategyFactory, slotProvider, userClassLoader, blobWriter, resultPartitionLocationTrackerProxy, allocationTimeout, (MetricGroup)new UnregisteredMetricsGroup(), jobManagerConfiguration);
    }

    public ExecutionGraph(JobInformation jobInformation, ScheduledExecutorService futureExecutor, Executor ioExecutor, Time rpcTimeout, RestartStrategy restartStrategy, FailoverStrategy.Factory failoverStrategyFactory, SlotProvider slotProvider, ClassLoader userClassLoader, BlobWriter blobWriter, ResultPartitionLocationTrackerProxy resultPartitionLocationTrackerProxy, Time allocationTimeout, MetricGroup metricGroup, Configuration jobManagerConfiguration) throws IOException {
        Preconditions.checkNotNull((Object)futureExecutor);
        this.jobInformation = (JobInformation)Preconditions.checkNotNull((Object)jobInformation);
        this.blobWriter = (BlobWriter)Preconditions.checkNotNull((Object)blobWriter);
        this.jobInformationOrBlobKey = BlobWriter.serializeAndTryOffload(jobInformation, jobInformation.getJobId(), blobWriter);
        this.futureExecutor = (ScheduledExecutorService)Preconditions.checkNotNull((Object)futureExecutor);
        this.ioExecutor = (Executor)Preconditions.checkNotNull((Object)ioExecutor);
        this.slotProvider = (SlotProvider)Preconditions.checkNotNull((Object)slotProvider, (String)"scheduler");
        this.userClassLoader = (ClassLoader)Preconditions.checkNotNull((Object)userClassLoader, (String)"userClassLoader");
        this.resultPartitionLocationTrackerProxy = (ResultPartitionLocationTrackerProxy)Preconditions.checkNotNull((Object)resultPartitionLocationTrackerProxy);
        this.tasks = new ConcurrentHashMap(16);
        this.intermediateResults = new ConcurrentHashMap(16);
        this.verticesInCreationOrder = new ArrayList<ExecutionJobVertex>(16);
        this.currentExecutions = new ConcurrentHashMap(16);
        this.jobStatusListeners = new CopyOnWriteArrayList<JobStatusListener>();
        this.executionListeners = new CopyOnWriteArrayList<ExecutionStatusListener>();
        this.stateTimestamps = new long[JobStatus.values().length];
        this.stateTimestamps[JobStatus.CREATED.ordinal()] = System.currentTimeMillis();
        this.rpcTimeout = (Time)Preconditions.checkNotNull((Object)rpcTimeout);
        this.allocationTimeout = (Time)Preconditions.checkNotNull((Object)allocationTimeout);
        this.restartStrategy = restartStrategy;
        this.kvStateLocationRegistry = new KvStateLocationRegistry(jobInformation.getJobId(), this.getAllVertices());
        this.verticesFinished = new AtomicInteger();
        this.globalModVersion = 1L;
        this.failoverStrategy = (FailoverStrategy)Preconditions.checkNotNull((Object)failoverStrategyFactory.create(this), (String)"null failover strategy");
        LOG.info("Job recovers via failover strategy: {}", (Object)this.failoverStrategy.getStrategyName());
        this.schedulingFutures = new ConcurrentHashMap();
        this.failOverMetrics = metricGroup.meter("task_failover", (Meter)new MeterView(60));
        this.taskDeployMetrics = metricGroup.histogram("task_deploy_cost", (Histogram)new SimpleHistogram());
        this.jobManagerConfiguration = (Configuration)Preconditions.checkNotNull((Object)jobManagerConfiguration);
        this.slotSharingResourceCalculator = new SummationSlotSharingResourceCalculator();
    }

    public int getNumberOfExecutionJobVertices() {
        return this.verticesInCreationOrder.size();
    }

    public boolean isQueuedSchedulingAllowed() {
        return this.allowQueuedScheduling;
    }

    public void setQueuedSchedulingAllowed(boolean allowed) {
        this.allowQueuedScheduling = allowed;
    }

    public GraphManager getGraphManager() {
        return this.graphManager;
    }

    public boolean isLazyDeploymentAllowed() {
        return this.allowLazyDeployment;
    }

    public double getPerTaskInputSplitsLimitAsAverageMultiplier() {
        return this.perTaskInputSplitsLimitAsAverageMultiplier;
    }

    public void setPerTaskInputSplitsLimitAsAverageMultiplier(double multiplier) {
        this.perTaskInputSplitsLimitAsAverageMultiplier = multiplier;
    }

    public void setGraphManager(GraphManager graphManager) {
        this.graphManager = (GraphManager)Preconditions.checkNotNull((Object)graphManager);
        this.allowLazyDeployment = graphManager.allowLazyDeployment();
    }

    public ResultPartitionLocationTrackerProxy getResultPartitionLocationTrackerProxy() {
        return this.resultPartitionLocationTrackerProxy;
    }

    public Time getAllocationTimeout() {
        return this.allocationTimeout;
    }

    @Override
    public boolean isArchived() {
        return false;
    }

    public void enableCheckpointing(long interval, long checkpointTimeout, long minPauseBetweenCheckpoints, int maxConcurrentCheckpoints, CheckpointRetentionPolicy retentionPolicy, List<ExecutionJobVertex> verticesToTrigger, List<ExecutionJobVertex> verticesToWaitFor, List<ExecutionJobVertex> verticesToCommitTo, List<MasterTriggerRestoreHook<?>> masterHooks, CheckpointIDCounter checkpointIDCounter, CompletedCheckpointStore checkpointStore, StateBackend checkpointStateBackend, CheckpointStatsTracker statsTracker) {
        Preconditions.checkArgument((interval >= 10L ? 1 : 0) != 0, (Object)"checkpoint interval must not be below 10ms");
        Preconditions.checkArgument((checkpointTimeout >= 10L ? 1 : 0) != 0, (Object)"checkpoint timeout must not be below 10ms");
        Preconditions.checkState((this.state == JobStatus.CREATED ? 1 : 0) != 0, (Object)"Job must be in CREATED state");
        Preconditions.checkState((this.checkpointCoordinator == null ? 1 : 0) != 0, (Object)"checkpointing already enabled");
        ExecutionVertex[] tasksToTrigger = this.collectExecutionVertices(verticesToTrigger);
        ExecutionVertex[] tasksToWaitFor = this.collectExecutionVertices(verticesToWaitFor);
        ExecutionVertex[] tasksToCommitTo = this.collectExecutionVertices(verticesToCommitTo);
        this.checkpointStatsTracker = (CheckpointStatsTracker)Preconditions.checkNotNull((Object)statsTracker, (String)"CheckpointStatsTracker");
        this.checkpointCoordinator = new CheckpointCoordinator(this.jobInformation.getJobId(), interval, checkpointTimeout, minPauseBetweenCheckpoints, maxConcurrentCheckpoints, retentionPolicy, tasksToTrigger, tasksToWaitFor, tasksToCommitTo, checkpointIDCounter, checkpointStore, checkpointStateBackend, this.ioExecutor, SharedStateRegistry.DEFAULT_FACTORY);
        for (MasterTriggerRestoreHook<?> hook : masterHooks) {
            if (this.checkpointCoordinator.addMasterHook(hook)) continue;
            LOG.warn("Trying to register multiple checkpoint hooks with the name: {}", (Object)hook.getIdentifier());
        }
        this.checkpointCoordinator.setCheckpointStatsTracker(this.checkpointStatsTracker);
        if (interval != Long.MAX_VALUE) {
            this.registerJobStatusListener(this.checkpointCoordinator.createActivatorDeactivator());
        }
    }

    @Nullable
    public CheckpointCoordinator getCheckpointCoordinator() {
        return this.checkpointCoordinator;
    }

    public KvStateLocationRegistry getKvStateLocationRegistry() {
        return this.kvStateLocationRegistry;
    }

    public RestartStrategy getRestartStrategy() {
        return this.restartStrategy;
    }

    @Override
    public CheckpointCoordinatorConfiguration getCheckpointCoordinatorConfiguration() {
        if (this.checkpointStatsTracker != null) {
            return this.checkpointStatsTracker.getJobCheckpointingConfiguration();
        }
        return null;
    }

    @Override
    public CheckpointStatsSnapshot getCheckpointStatsSnapshot() {
        if (this.checkpointStatsTracker != null) {
            return this.checkpointStatsTracker.createSnapshot();
        }
        return null;
    }

    private ExecutionVertex[] collectExecutionVertices(List<ExecutionJobVertex> jobVertices) {
        if (jobVertices.size() == 1) {
            ExecutionJobVertex jv = jobVertices.get(0);
            if (jv.getGraph() != this) {
                throw new IllegalArgumentException("Can only use ExecutionJobVertices of this ExecutionGraph");
            }
            return jv.getTaskVertices();
        }
        ArrayList<ExecutionVertex> all = new ArrayList<ExecutionVertex>();
        for (ExecutionJobVertex jv : jobVertices) {
            if (jv.getGraph() != this) {
                throw new IllegalArgumentException("Can only use ExecutionJobVertices of this ExecutionGraph");
            }
            all.addAll(Arrays.asList(jv.getTaskVertices()));
        }
        return all.toArray(new ExecutionVertex[all.size()]);
    }

    public Collection<PermanentBlobKey> getRequiredJarFiles() {
        return this.jobInformation.getRequiredJarFileBlobKeys();
    }

    public Collection<URL> getRequiredClasspaths() {
        return this.jobInformation.getRequiredClasspathURLs();
    }

    public void setJsonPlan(String jsonPlan) {
        this.jsonPlan = jsonPlan;
    }

    @Override
    public String getJsonPlan() {
        return this.jsonPlan;
    }

    public SlotProvider getSlotProvider() {
        return this.slotProvider;
    }

    public Either<SerializedValue<JobInformation>, PermanentBlobKey> getJobInformationOrBlobKey() {
        return this.jobInformationOrBlobKey;
    }

    @Override
    public JobID getJobID() {
        return this.jobInformation.getJobId();
    }

    @Override
    public long getJobVersion() {
        return this.jobInformation.getJobVersion();
    }

    @Override
    public String getJobName() {
        return this.jobInformation.getJobName();
    }

    @Override
    public boolean isStoppable() {
        return this.isStoppable;
    }

    public Configuration getJobConfiguration() {
        return this.jobInformation.getJobConfiguration();
    }

    public ClassLoader getUserClassLoader() {
        return this.userClassLoader;
    }

    @Override
    public JobStatus getState() {
        return this.state;
    }

    public Throwable getFailureCause() {
        return this.failureCause;
    }

    @Override
    public ErrorInfo getFailureInfo() {
        return this.failureInfo;
    }

    public long getNumberOfFullRestarts() {
        return this.globalModVersion - 1L;
    }

    @Override
    public ExecutionJobVertex getJobVertex(JobVertexID id) {
        return this.tasks.get((Object)id);
    }

    public Map<JobVertexID, ExecutionJobVertex> getAllVertices() {
        return Collections.unmodifiableMap(this.tasks);
    }

    public Iterable<ExecutionJobVertex> getVerticesTopologically() {
        final int numElements = this.verticesInCreationOrder.size();
        return new Iterable<ExecutionJobVertex>(){

            @Override
            public Iterator<ExecutionJobVertex> iterator() {
                return new Iterator<ExecutionJobVertex>(){
                    private int pos = 0;

                    @Override
                    public boolean hasNext() {
                        return this.pos < numElements;
                    }

                    @Override
                    public ExecutionJobVertex next() {
                        if (this.hasNext()) {
                            return (ExecutionJobVertex)ExecutionGraph.this.verticesInCreationOrder.get(this.pos++);
                        }
                        throw new NoSuchElementException();
                    }

                    @Override
                    public void remove() {
                        throw new UnsupportedOperationException();
                    }
                };
            }
        };
    }

    public int getTotalNumberOfVertices() {
        return this.numVerticesTotal;
    }

    public Map<IntermediateDataSetID, IntermediateResult> getAllIntermediateResults() {
        return Collections.unmodifiableMap(this.intermediateResults);
    }

    public Iterable<ExecutionVertex> getAllExecutionVertices() {
        return new Iterable<ExecutionVertex>(){

            @Override
            public Iterator<ExecutionVertex> iterator() {
                return new AllVerticesIterator(ExecutionGraph.this.getVerticesTopologically().iterator());
            }
        };
    }

    @Override
    public long getStatusTimestamp(JobStatus status) {
        return this.stateTimestamps[status.ordinal()];
    }

    public final BlobWriter getBlobWriter() {
        return this.blobWriter;
    }

    public Executor getFutureExecutor() {
        return this.futureExecutor;
    }

    public ScheduledExecutorService getFutureExecutorService() {
        return this.futureExecutor;
    }

    public Map<String, OptionalFailure<Accumulator<?, ?>>> aggregateUserAccumulators() {
        HashMap userAccumulators = new HashMap();
        for (ExecutionVertex vertex : this.getAllExecutionVertices()) {
            Map<String, Accumulator<?, ?>> next = vertex.getCurrentExecutionAttempt().getUserAccumulators();
            if (next == null) continue;
            AccumulatorHelper.mergeInto(userAccumulators, next);
        }
        return userAccumulators;
    }

    @Override
    public Map<String, SerializedValue<OptionalFailure<Object>>> getAccumulatorsSerialized() {
        return this.aggregateUserAccumulators().entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry -> ExecutionGraph.serializeAccumulator((String)entry.getKey(), (OptionalFailure)entry.getValue())));
    }

    private static SerializedValue<OptionalFailure<Object>> serializeAccumulator(String name, OptionalFailure<Accumulator<?, ?>> accumulator) {
        try {
            if (accumulator.isFailure()) {
                return new SerializedValue((Object)OptionalFailure.ofFailure((Throwable)accumulator.getFailureCause()));
            }
            return new SerializedValue((Object)OptionalFailure.of((Object)((Accumulator)accumulator.getUnchecked()).getLocalValue()));
        }
        catch (IOException ioe) {
            LOG.error("Could not serialize accumulator " + name + '.', (Throwable)ioe);
            try {
                return new SerializedValue((Object)OptionalFailure.ofFailure((Throwable)ioe));
            }
            catch (IOException e) {
                throw new RuntimeException("It should never happen that we cannot serialize the accumulator serialization exception.", e);
            }
        }
    }

    @Override
    public StringifiedAccumulatorResult[] getAccumulatorResultsStringified() {
        Map<String, OptionalFailure<Accumulator<?, ?>>> accumulatorMap = this.aggregateUserAccumulators();
        return StringifiedAccumulatorResult.stringifyAccumulatorResults(accumulatorMap);
    }

    public long getUpdatePartitionInfoSendInterval() {
        return this.updatePartitionInfoSendInterval;
    }

    public void setUpdatePartitionInfoSendInterval(long updatePartitionInfoSendInterval) {
        this.updatePartitionInfoSendInterval = updatePartitionInfoSendInterval;
    }

    Configuration getJobManagerConfiguration() {
        return this.jobManagerConfiguration;
    }

    private void createExecutionJobVertex(List<JobVertex> topologiallySorted) throws JobException {
        LinkedList<CompletableFuture<JobException>> futures = new LinkedList<CompletableFuture<JobException>>();
        long createTimestamp = System.currentTimeMillis();
        for (JobVertex jobVertex : topologiallySorted) {
            futures.add(CompletableFuture.supplyAsync(() -> {
                try {
                    ExecutionJobVertex ejv = new ExecutionJobVertex(this, jobVertex, this.getJobConfiguration().getInteger(CoreOptions.DEFAULT_PARALLELISM), this.rpcTimeout, this.globalModVersion, createTimestamp);
                    ExecutionJobVertex previousTask = this.tasks.putIfAbsent(jobVertex.getID(), ejv);
                    if (previousTask != null) {
                        throw new JobException(String.format("Encountered two job vertices with ID %s : previous=[%s] / new=[%s]", new Object[]{jobVertex.getID(), ejv, previousTask}));
                    }
                    return null;
                }
                catch (JobException e) {
                    return e;
                }
            }, this.futureExecutor));
        }
        try {
            Collection exceptions = (Collection)FutureUtils.combineAll(futures).get();
            Exception suppressedException = null;
            for (Exception exception : exceptions) {
                if (exception == null) continue;
                suppressedException = (Exception)ExceptionUtils.firstOrSuppressed((Throwable)exception, suppressedException);
            }
            if (suppressedException != null) {
                throw suppressedException;
            }
        }
        catch (Exception e) {
            throw new JobException("Could not create execution job vertex.", e);
        }
    }

    public void attachJobGraph(List<JobVertex> topologicallySorted) throws JobException {
        LOG.debug("Attaching {} topologically sorted vertices to existing job graph with {} vertices and {} intermediate results.", new Object[]{topologicallySorted.size(), this.tasks.size(), this.intermediateResults.size()});
        ArrayList<ExecutionJobVertex> newExecJobVertices = new ArrayList<ExecutionJobVertex>(topologicallySorted.size());
        this.createExecutionJobVertex(topologicallySorted);
        for (JobVertex jobVertex : topologicallySorted) {
            if (jobVertex.isInputVertex() && !jobVertex.isStoppable()) {
                this.isStoppable = false;
            }
            ExecutionJobVertex ejv = this.tasks.get((Object)jobVertex.getID());
            ejv.connectToPredecessors(this.intermediateResults);
            for (IntermediateResult res : ejv.getProducedDataSets()) {
                IntermediateResult previousDataSet = this.intermediateResults.putIfAbsent(res.getId(), res);
                if (previousDataSet == null) continue;
                throw new JobException(String.format("Encountered two intermediate data set with ID %s : previous=[%s] / new=[%s]", new Object[]{res.getId(), res, previousDataSet}));
            }
            this.verticesInCreationOrder.add(ejv);
            this.numVerticesTotal += ejv.getParallelism();
            newExecJobVertices.add(ejv);
        }
        this.terminationFuture = new CompletableFuture();
        this.failoverStrategy.notifyNewVertices(newExecJobVertices);
    }

    public void scheduleForExecution() {
        try {
            for (ExecutionJobVertex ejv : this.getAllVertices().values()) {
                ejv.setUpInputSplits(null);
            }
        }
        catch (JobException e) {
            this.failGlobal(new SuppressRestartsException((Throwable)((Object)e)));
            return;
        }
        this.updateSharedSlotResources();
        if (!this.transitionState(JobStatus.CREATED, JobStatus.RUNNING)) {
            throw new IllegalStateException("Job may only be scheduled from state " + (Object)((Object)JobStatus.CREATED));
        }
        this.graphManager.startScheduling();
    }

    private void updateSharedSlotResources() {
        for (ExecutionJobVertex jobVertex : this.getVerticesTopologically()) {
            SlotSharingGroup sharingGroup = jobVertex.getJobVertex().getSlotSharingGroup();
            if (sharingGroup == null || sharingGroup.getResourceProfile() != null) continue;
            ResourceProfile sharingGroupResources = this.slotSharingResourceCalculator.calculateSharedGroupResource(sharingGroup, this);
            sharingGroup.setResourceProfile(sharingGroupResources);
        }
    }

    private CompletableFuture<Void> schedule(Collection<ExecutionVertex> vertices) {
        Preconditions.checkState((this.state == JobStatus.RUNNING ? 1 : 0) != 0, (Object)"job is not running currently");
        boolean queued = this.allowQueuedScheduling;
        ArrayList<SlotRequestId> slotRequestIds = new ArrayList<SlotRequestId>(vertices.size());
        ArrayList<ScheduledUnit> scheduledUnits = new ArrayList<ScheduledUnit>(vertices.size());
        ArrayList<SlotProfile> slotProfiles = new ArrayList<SlotProfile>(vertices.size());
        ArrayList<Execution> scheduledExecutions = new ArrayList<Execution>(vertices.size());
        for (ExecutionVertex ev : vertices) {
            Execution exec = ev.getCurrentExecutionAttempt();
            try {
                Tuple2<ScheduledUnit, SlotProfile> scheduleUnitAndSlotProfile = exec.enterScheduledAndPrepareSchedulingResources();
                slotRequestIds.add(new SlotRequestId());
                scheduledUnits.add((ScheduledUnit)scheduleUnitAndSlotProfile.f0);
                slotProfiles.add((SlotProfile)scheduleUnitAndSlotProfile.f1);
                scheduledExecutions.add(exec);
            }
            catch (IllegalExecutionStateException e) {
                LOG.info("The execution {} may be already scheduled by other thread.", (Object)ev.getTaskNameWithSubtaskIndex(), (Object)e);
            }
        }
        if (slotRequestIds.isEmpty()) {
            return CompletableFuture.completedFuture(null);
        }
        List<CompletableFuture<LogicalSlot>> allocationFutures = this.slotProvider.allocateSlots(slotRequestIds, scheduledUnits, queued, slotProfiles, this.allocationTimeout);
        ArrayList<CompletionStage> assignFutures = new ArrayList<CompletionStage>(slotRequestIds.size());
        for (int i = 0; i < allocationFutures.size(); ++i) {
            int index = i;
            allocationFutures.get(i).whenComplete((ignore, throwable) -> {
                if (throwable != null) {
                    this.slotProvider.cancelSlotRequest((SlotRequestId)((Object)((Object)slotRequestIds.get(index))), ((ScheduledUnit)scheduledUnits.get(index)).getSlotSharingGroupId(), ((ScheduledUnit)scheduledUnits.get(index)).getCoLocationConstraint(), (Throwable)throwable);
                }
            });
            assignFutures.add(allocationFutures.get(i).thenAccept(logicalSlot -> {
                if (!((Execution)scheduledExecutions.get(index)).tryAssignResource((LogicalSlot)logicalSlot)) {
                    FlinkException e = new FlinkException("Could not assign logical slot to execution " + scheduledExecutions.get(index) + '.');
                    logicalSlot.releaseSlot(e);
                    throw new CompletionException(e);
                }
            }));
        }
        FutureUtils.ConjunctFuture allAssignFutures = FutureUtils.combineAll(assignFutures);
        CompletionStage currentSchedulingFuture = ((CompletableFuture)allAssignFutures.exceptionally(throwable -> {
            LOG.info("Batch request {} slots, but only {} are fulfilled.", (Object)allAssignFutures.getNumFuturesTotal(), (Object)allAssignFutures.getNumFuturesCompleted());
            for (int i = 0; i < allocationFutures.size(); ++i) {
                ((CompletableFuture)allocationFutures.get(i)).completeExceptionally((Throwable)throwable);
            }
            for (Execution execution : scheduledExecutions) {
                execution.markFailed(ExceptionUtils.stripCompletionException((Throwable)throwable));
            }
            throw new CompletionException((Throwable)throwable);
        })).handleAsync((ignored, throwable) -> {
            if (throwable != null) {
                throw new CompletionException((Throwable)throwable);
            }
            boolean hasFailure = false;
            for (int i = 0; i < scheduledExecutions.size(); ++i) {
                try {
                    ((Execution)scheduledExecutions.get(i)).deploy();
                    continue;
                }
                catch (Exception e) {
                    hasFailure = true;
                    LOG.info("Fail to deploy execution {}", scheduledExecutions.get(i), (Object)e);
                    ((Execution)scheduledExecutions.get(i)).markFailed(e);
                }
            }
            if (hasFailure) {
                throw new CompletionException(new FlinkException("Fail to deploy some executions."));
            }
            return null;
        }, (Executor)this.futureExecutor);
        ((CompletableFuture)currentSchedulingFuture).whenComplete((ignored, throwable) -> {
            Throwable strippedThrowable = ExceptionUtils.stripCompletionException((Throwable)throwable);
            if (strippedThrowable instanceof CancellationException) {
                allAssignFutures.cancel(false);
            }
        });
        return currentSchedulingFuture;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void cancel() {
        while (true) {
            JobStatus current;
            if ((current = this.state) == JobStatus.RUNNING || current == JobStatus.CREATED) {
                if (!this.transitionState(current, JobStatus.CANCELLING)) continue;
                long globalVersionForRestart = this.incrementGlobalModVersion();
                this.cancelOngoingSchedulings();
                ArrayList<CompletableFuture<Void>> futures = new ArrayList<CompletableFuture<Void>>(this.verticesInCreationOrder.size());
                for (ExecutionJobVertex ejv : this.verticesInCreationOrder) {
                    futures.add(ejv.cancelWithFuture());
                }
                FutureUtils.ConjunctFuture<Void> allTerminal = FutureUtils.waitForAll(futures);
                allTerminal.whenComplete((value, throwable) -> {
                    if (throwable != null) {
                        this.transitionState(JobStatus.CANCELLING, JobStatus.FAILED, new FlinkException("Could not cancel job " + this.getJobName() + " because not all execution job vertices could be cancelled.", throwable));
                    } else {
                        this.allVerticesInTerminalState(globalVersionForRestart);
                    }
                });
                return;
            }
            if (current == JobStatus.FAILING) {
                if (!this.transitionState(current, JobStatus.CANCELLING)) continue;
                return;
            }
            if (current != JobStatus.RESTARTING) break;
            Object object = this.progressLock;
            synchronized (object) {
                if (this.transitionState(current, JobStatus.CANCELED)) {
                    this.onTerminalState(JobStatus.CANCELED);
                    LOG.info("Canceled during restart.");
                    return;
                }
            }
        }
    }

    public void stop() throws StoppingException {
        if (this.isStoppable) {
            for (ExecutionVertex ev : this.getAllExecutionVertices()) {
                if (ev.getNumberOfInputs() != 0) continue;
                ev.stop();
            }
        } else {
            throw new StoppingException("This job is not stoppable.");
        }
    }

    public void suspend(Throwable suspensionCause) {
        JobStatus currentState;
        do {
            if (!(currentState = this.state).isTerminalState() && currentState != JobStatus.SUSPENDING) continue;
            return;
        } while (!this.transitionState(currentState, JobStatus.SUSPENDING, suspensionCause));
        this.initFailureCause(suspensionCause);
        this.incrementGlobalModVersion();
        this.cancelOngoingSchedulings();
        ArrayList<CompletableFuture<Void>> executionJobVertexTerminationFutures = new ArrayList<CompletableFuture<Void>>(this.verticesInCreationOrder.size());
        for (ExecutionJobVertex ejv : this.verticesInCreationOrder) {
            executionJobVertexTerminationFutures.add(ejv.cancelWithFuture());
        }
        FutureUtils.ConjunctFuture<Void> jobVerticesTerminationFuture = FutureUtils.waitForAll(executionJobVertexTerminationFutures);
        jobVerticesTerminationFuture.whenComplete((ignored, throwable) -> {
            if (throwable != null) {
                LOG.debug("Flink could not properly clean up resource after suspension.", throwable);
            }
            this.allVerticesInTerminalState(-1L);
            LOG.info("Job {} has been suspended.", (Object)this.getJobID());
        });
    }

    public void failGlobal(Throwable t) {
        long globalVersionForRestart;
        while (true) {
            JobStatus current;
            if ((current = this.state) == JobStatus.FAILING || current == JobStatus.SUSPENDING || current == JobStatus.SUSPENDED || current == JobStatus.RECONCILING || current.isGloballyTerminalState()) {
                return;
            }
            if (current == JobStatus.RESTARTING) {
                this.initFailureCause(t);
                globalVersionForRestart = this.incrementGlobalModVersion();
                if (!this.tryRestartOrFail(globalVersionForRestart)) continue;
                return;
            }
            if (this.transitionState(current, JobStatus.FAILING, t)) break;
        }
        this.initFailureCause(t);
        globalVersionForRestart = this.incrementGlobalModVersion();
        this.cancelOngoingSchedulings();
        ArrayList<CompletableFuture<Void>> futures = new ArrayList<CompletableFuture<Void>>(this.verticesInCreationOrder.size());
        for (ExecutionJobVertex ejv : this.verticesInCreationOrder) {
            futures.add(ejv.cancelWithFuture());
        }
        FutureUtils.ConjunctFuture<Void> allTerminal = FutureUtils.waitForAll(futures);
        allTerminal.whenComplete((ignored, throwable) -> {
            if (throwable != null) {
                this.transitionState(JobStatus.FAILING, JobStatus.FAILED, new FlinkException("Could not cancel all execution job vertices properly.", throwable));
            } else {
                this.allVerticesInTerminalState(globalVersionForRestart);
            }
        });
    }

    public CompletableFuture<Collection<ExecutionAttemptID>> reconcile() {
        JobStatus curStatus = this.state;
        Preconditions.checkState((boolean)JobStatus.CREATED.equals((Object)curStatus), (Object)("Not allow reconcile in state " + (Object)((Object)curStatus)));
        if (this.transitionState(curStatus, JobStatus.RECONCILING)) {
            ArrayList<CompletableFuture<ExecutionAttemptID>> executionReconcileFutures = new ArrayList<CompletableFuture<ExecutionAttemptID>>(this.currentExecutions.size());
            for (ExecutionVertex ev : this.getAllExecutionVertices()) {
                executionReconcileFutures.add(ev.getCurrentExecutionAttempt().reconcile());
            }
            this.reconcileFuture = FutureUtils.combineAll(executionReconcileFutures).whenComplete((ignore, throwable) -> {
                boolean allCreated = true;
                for (Execution execution : this.getRegisteredExecutions().values()) {
                    if (execution.getState() == ExecutionState.CREATED) continue;
                    allCreated = false;
                    break;
                }
                if (allCreated && !this.graphManager.hasToBeScheduledVertices()) {
                    if (!this.transitionState(JobStatus.RECONCILING, JobStatus.CREATED)) {
                        LOG.error("When reconcile finish, the job is in {}, this is a logical error.", (Object)this.state);
                    }
                } else if (!this.transitionState(JobStatus.RECONCILING, JobStatus.RUNNING)) {
                    LOG.error("When reconcile finish, the job is in {}, this is a logical error.", (Object)this.state);
                }
            });
            return this.reconcileFuture;
        }
        throw new IllegalStateException("ExecutionGraph reconcile fail while in state " + (Object)((Object)curStatus));
    }

    public void registerSchedulingFuture(CompletableFuture<Void> schedulingFuture) {
        this.schedulingFutures.put(schedulingFuture, schedulingFuture);
    }

    public void unregisterSchedulingFuture(CompletableFuture<Void> schedulingFuture) {
        this.schedulingFutures.remove(schedulingFuture);
    }

    private void cancelOngoingSchedulings() {
        for (CompletableFuture ongoingSchedulingFuture : this.schedulingFutures.keySet()) {
            ongoingSchedulingFuture.cancel(false);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void restart(long expectedGlobalVersion) {
        try {
            Object object = this.progressLock;
            synchronized (object) {
                if (this.globalModVersion != expectedGlobalVersion) {
                    LOG.info("Concurrent full restart subsumed this restart.");
                    return;
                }
                JobStatus current = this.state;
                if (current == JobStatus.CANCELED) {
                    LOG.info("Canceled job during restart. Aborting restart.");
                    return;
                }
                if (current == JobStatus.FAILED) {
                    LOG.info("Failed job during restart. Aborting restart.");
                    return;
                }
                if (current == JobStatus.SUSPENDING || current == JobStatus.SUSPENDED) {
                    LOG.info("Suspended job during restart. Aborting restart.");
                    return;
                }
                if (current != JobStatus.RESTARTING) {
                    throw new IllegalStateException("Can only restart job from state restarting.");
                }
                this.currentExecutions.clear();
                HashSet<CoLocationGroup> colGroups = new HashSet<CoLocationGroup>();
                long resetTimestamp = System.currentTimeMillis();
                for (ExecutionJobVertex jv : this.verticesInCreationOrder) {
                    CoLocationGroup cgroup = jv.getCoLocationGroup();
                    if (cgroup != null && !colGroups.contains(cgroup)) {
                        cgroup.resetConstraints();
                        colGroups.add(cgroup);
                    }
                    jv.resetForNewExecution(resetTimestamp, this.globalModVersion);
                }
                for (int i = 0; i < this.stateTimestamps.length; ++i) {
                    if (i == JobStatus.RESTARTING.ordinal()) continue;
                    this.stateTimestamps[i] = 0L;
                }
                this.transitionState(JobStatus.RESTARTING, JobStatus.CREATED);
                if (this.checkpointCoordinator != null) {
                    this.checkpointCoordinator.restoreLatestCheckpointedState(this.getAllVertices(), false, false);
                }
            }
            this.graphManager.reset();
            if (!this.transitionState(JobStatus.CREATED, JobStatus.RUNNING)) {
                throw new IllegalStateException("Job may only be scheduled from state " + (Object)((Object)JobStatus.CREATED));
            }
            this.graphManager.startScheduling();
        }
        catch (Throwable t) {
            LOG.warn("Failed to restart the job.", t);
            this.failGlobal(t);
        }
    }

    public void resetExecutionVertices(long modVersion, List<ExecutionVertex> executionVertices) throws Exception {
        long resetTimestamp = System.currentTimeMillis();
        for (ExecutionVertex ev : executionVertices) {
            ev.resetForNewExecution(resetTimestamp, modVersion);
        }
        if (this.checkpointCoordinator != null) {
            this.checkpointCoordinator.stopCheckpointScheduler();
            this.checkpointCoordinator.restoreLatestCheckpointedState(executionVertices, false, true);
            this.checkpointCoordinator.startCheckpointScheduler();
        }
    }

    public void notifyExecutionVertexFailover(List<ExecutionVertex> executionVertices) {
        this.graphManager.notifyExecutionVertexFailover(executionVertices.stream().map(ev -> ev.getExecutionVertexID()).collect(Collectors.toList()));
    }

    public void resetExecutionVerticesAndNotify(long modVersion, List<ExecutionVertex> executionVertices) throws Exception {
        this.resetExecutionVertices(modVersion, executionVertices);
        this.notifyExecutionVertexFailover(executionVertices);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void restoreLatestCheckpointedState(boolean errorIfNoCheckpoint, boolean allowNonRestoredState) throws Exception {
        Object object = this.progressLock;
        synchronized (object) {
            if (this.checkpointCoordinator != null) {
                this.checkpointCoordinator.restoreLatestCheckpointedState(this.getAllVertices(), errorIfNoCheckpoint, allowNonRestoredState);
            }
        }
    }

    @Override
    public ArchivedExecutionConfig getArchivedExecutionConfig() {
        try {
            ExecutionConfig executionConfig = (ExecutionConfig)this.jobInformation.getSerializedExecutionConfig().deserializeValue(this.userClassLoader);
            if (executionConfig != null) {
                return executionConfig.archive();
            }
        }
        catch (IOException | ClassNotFoundException e) {
            LOG.error("Couldn't create ArchivedExecutionConfig for job {} ", (Object)this.getJobID(), (Object)e);
        }
        return null;
    }

    public CompletableFuture<JobStatus> getTerminationFuture() {
        return this.terminationFuture;
    }

    public CompletableFuture<Collection<ExecutionAttemptID>> getReconcileFuture() {
        return this.reconcileFuture;
    }

    @VisibleForTesting
    public JobStatus waitUntilTerminal() throws InterruptedException {
        try {
            return this.terminationFuture.get();
        }
        catch (ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    public FailoverStrategy getFailoverStrategy() {
        return this.failoverStrategy;
    }

    public long getGlobalModVersion() {
        return this.globalModVersion;
    }

    public void scheduleVertices(Collection<ExecutionVertexID> verticesToSchedule) {
        try {
            long currentGlobalModVersion = this.globalModVersion;
            if (verticesToSchedule == null || verticesToSchedule.isEmpty()) {
                return;
            }
            int alreadyScheduledCount = 0;
            int unhealthyCount = 0;
            ArrayList<ExecutionVertex> vertices = new ArrayList<ExecutionVertex>(verticesToSchedule.size());
            for (ExecutionVertexID executionVertexID : verticesToSchedule) {
                ExecutionVertex ev = this.getJobVertex(executionVertexID.getJobVertexID()).getTaskVertices()[executionVertexID.getSubTaskIndex()];
                if (ev.getExecutionState() == ExecutionState.CREATED) {
                    vertices.add(ev);
                    continue;
                }
                if (ev.getExecutionState() == ExecutionState.SCHEDULED || ev.getExecutionState() == ExecutionState.DEPLOYING || ev.getExecutionState() == ExecutionState.RUNNING || ev.getExecutionState() == ExecutionState.FINISHED) {
                    ++alreadyScheduledCount;
                    continue;
                }
                LOG.debug("Execution {} is in {} state", (Object)ev.getTaskNameWithSubtaskIndex(), (Object)ev.getExecutionState());
                ++unhealthyCount;
            }
            if (unhealthyCount > 0) {
                throw new IllegalStateException("Not all submitted vertices can be scheduled. " + unhealthyCount + " vertices are in unhealthy state. Please check the schedule logic in " + this.graphManager.getClass().getCanonicalName());
            }
            LOG.info("Schedule {} vertices, already scheduled {}", (Object)vertices.size(), (Object)alreadyScheduledCount);
            if (vertices.isEmpty()) {
                return;
            }
            CompletableFuture<Void> schedulingFuture = this.schedule(vertices);
            if (this.state == JobStatus.RUNNING && currentGlobalModVersion == this.globalModVersion) {
                this.schedulingFutures.put(schedulingFuture, schedulingFuture);
                schedulingFuture.whenCompleteAsync((ignored, throwable) -> this.schedulingFutures.remove(schedulingFuture), (Executor)this.futureExecutor);
            } else {
                schedulingFuture.cancel(false);
            }
        }
        catch (Throwable t) {
            LOG.info("scheduleVertices meet exception, need to fail global execution graph", t);
            this.failGlobal(t);
        }
    }

    private boolean transitionState(JobStatus current, JobStatus newState) {
        return this.transitionState(current, newState, null);
    }

    private boolean transitionState(JobStatus current, JobStatus newState, Throwable error) {
        if (current.isTerminalState()) {
            String message = "Job is trying to leave terminal state " + (Object)((Object)current);
            LOG.error(message);
            throw new IllegalStateException(message);
        }
        if (STATE_UPDATER.compareAndSet(this, current, newState)) {
            LOG.info("Job {} ({}) switched from state {} to {}.", new Object[]{this.getJobName(), this.getJobID(), current, newState, error});
            this.stateTimestamps[newState.ordinal()] = System.currentTimeMillis();
            this.notifyJobStatusChange(newState, error);
            return true;
        }
        return false;
    }

    private long incrementGlobalModVersion() {
        return GLOBAL_VERSION_UPDATER.incrementAndGet(this);
    }

    private void initFailureCause(Throwable t) {
        this.failureCause = t;
        this.failureInfo = new ErrorInfo(t, System.currentTimeMillis());
    }

    void vertexFinished() {
        int numFinished = this.verticesFinished.incrementAndGet();
        if (numFinished == this.numVerticesTotal && this.state == JobStatus.RUNNING) {
            try {
                for (ExecutionJobVertex ejv : this.verticesInCreationOrder) {
                    ejv.getJobVertex().finalizeOnMaster(this.getUserClassLoader());
                }
            }
            catch (Throwable t) {
                ExceptionUtils.rethrowIfFatalError((Throwable)t);
                this.failGlobal(new Exception("Failed to finalize execution on master", t));
                return;
            }
            if (this.transitionState(JobStatus.RUNNING, JobStatus.FINISHED)) {
                this.onTerminalState(JobStatus.FINISHED);
            }
        }
    }

    void vertexUnFinished() {
        this.verticesFinished.getAndDecrement();
    }

    private void allVerticesInTerminalState(long expectedGlobalVersionForRestart) {
        block6: {
            JobStatus current;
            block7: {
                while (true) {
                    if ((current = this.state) == JobStatus.RUNNING) {
                        this.failGlobal(new Exception("ExecutionGraph went into allVerticesInTerminalState() from RUNNING"));
                        continue;
                    }
                    if (current == JobStatus.CANCELLING) {
                        if (!this.transitionState(current, JobStatus.CANCELED)) continue;
                        this.onTerminalState(JobStatus.CANCELED);
                        break block6;
                    }
                    if (current == JobStatus.FAILING) {
                        if (!this.tryRestartOrFail(expectedGlobalVersionForRestart)) continue;
                        break block6;
                    }
                    if (current != JobStatus.SUSPENDING) break block7;
                    if (this.transitionState(current, JobStatus.SUSPENDED)) break;
                }
                this.onTerminalState(JobStatus.SUSPENDED);
                break block6;
            }
            if (current.isGloballyTerminalState()) {
                LOG.warn("Job has entered globally terminal state without waiting for all job vertices to reach final state.");
            } else {
                this.failGlobal(new Exception("ExecutionGraph went into final state from state " + (Object)((Object)current)));
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean tryRestartOrFail(long globalModVersionForRestart) {
        JobStatus currentState = this.state;
        if (currentState == JobStatus.FAILING || currentState == JobStatus.RESTARTING) {
            Throwable failureCause = this.failureCause;
            Object object = this.progressLock;
            synchronized (object) {
                boolean isRestartable;
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Try to restart or fail the job {} ({}) if no longer possible.", new Object[]{this.getJobName(), this.getJobID(), failureCause});
                } else {
                    LOG.info("Try to restart or fail the job {} ({}) if no longer possible.", (Object)this.getJobName(), (Object)this.getJobID());
                }
                boolean isFailureCauseAllowingRestart = !(failureCause instanceof SuppressRestartsException);
                boolean isRestartStrategyAllowingRestart = this.restartStrategy.canRestart();
                boolean bl = isRestartable = isFailureCauseAllowingRestart && isRestartStrategyAllowingRestart;
                if (isRestartable && this.transitionState(currentState, JobStatus.RESTARTING)) {
                    LOG.info("Restarting the job {} ({}).", (Object)this.getJobName(), (Object)this.getJobID());
                    ExecutionGraphRestartCallback restarter = new ExecutionGraphRestartCallback(this, globalModVersionForRestart);
                    this.restartStrategy.restart(restarter, new ScheduledExecutorServiceAdapter(this.futureExecutor));
                    return true;
                }
                if (!isRestartable && this.transitionState(currentState, JobStatus.FAILED, failureCause)) {
                    String cause1 = isFailureCauseAllowingRestart ? null : "a type of SuppressRestartsException was thrown";
                    String cause2 = isRestartStrategyAllowingRestart ? null : "the restart strategy prevented it";
                    LOG.info("Could not restart the job {} ({}) because {}.", new Object[]{this.getJobName(), this.getJobID(), StringUtils.concatenateWithAnd((String)cause1, (String)cause2), failureCause});
                    this.onTerminalState(JobStatus.FAILED);
                    return true;
                }
                return false;
            }
        }
        return false;
    }

    private void onTerminalState(JobStatus status) {
        try {
            CheckpointCoordinator coord = this.checkpointCoordinator;
            this.checkpointCoordinator = null;
            if (coord != null) {
                coord.shutdown(status);
            }
        }
        catch (Exception e) {
            LOG.error("Error while cleaning up after execution", (Throwable)e);
        }
        finally {
            this.terminationFuture.complete(status);
        }
    }

    public boolean updateState(TaskExecutionState state) {
        Execution attempt = this.currentExecutions.get((Object)state.getID());
        if (attempt != null) {
            try {
                switch (state.getExecutionState()) {
                    case RUNNING: {
                        boolean result = attempt.switchToRunning();
                        if (result) {
                            this.taskDeployMetrics.update(attempt.getStateTimestamp(ExecutionState.RUNNING) - attempt.getStateTimestamp(ExecutionState.DEPLOYING));
                        }
                        return result;
                    }
                    case FINISHED: {
                        Map<String, Accumulator<?, ?>> accumulators = this.deserializeAccumulators(state);
                        attempt.markFinished(accumulators, state.getIOMetrics());
                        return true;
                    }
                    case CANCELED: {
                        Map<String, Accumulator<?, ?>> accumulators = this.deserializeAccumulators(state);
                        attempt.cancelingComplete(accumulators, state.getIOMetrics());
                        return true;
                    }
                    case FAILED: {
                        Map<String, Accumulator<?, ?>> accumulators = this.deserializeAccumulators(state);
                        attempt.markFailed(state.getError(this.userClassLoader), accumulators, state.getIOMetrics());
                        return true;
                    }
                }
                attempt.fail(new Exception("TaskManager sent illegal state update: " + (Object)((Object)state.getExecutionState())));
                return false;
            }
            catch (Throwable t) {
                ExceptionUtils.rethrowIfFatalErrorOrOOM((Throwable)t);
                this.failGlobal(t);
                return false;
            }
        }
        return false;
    }

    private Map<String, Accumulator<?, ?>> deserializeAccumulators(TaskExecutionState state) {
        AccumulatorSnapshot serializedAccumulators = state.getAccumulators();
        if (serializedAccumulators != null) {
            try {
                return serializedAccumulators.deserializeUserAccumulators(this.userClassLoader);
            }
            catch (Throwable t) {
                LOG.error("Failed to deserialize final accumulator results.", t);
            }
        }
        return null;
    }

    public void scheduleOrUpdateConsumers(ResultPartitionID partitionId) throws ExecutionGraphException {
        Execution execution = this.currentExecutions.get((Object)partitionId.getProducerId());
        if (execution == null) {
            throw new ExecutionGraphException("Cannot find execution for execution Id " + (Object)((Object)partitionId.getPartitionId()) + '.');
        }
        if (execution.getVertex() == null) {
            throw new ExecutionGraphException("Execution with execution Id " + (Object)((Object)partitionId.getPartitionId()) + " has no vertex assigned.");
        }
        execution.getVertex().scheduleOrUpdateConsumers(partitionId);
    }

    public Map<ExecutionAttemptID, Execution> getRegisteredExecutions() {
        return Collections.unmodifiableMap(this.currentExecutions);
    }

    void registerExecution(Execution exec) {
        Execution previous = this.currentExecutions.putIfAbsent(exec.getAttemptId(), exec);
        if (previous != null) {
            this.failGlobal(new Exception("Trying to register execution " + exec + " for already used ID " + (Object)((Object)exec.getAttemptId())));
        }
    }

    void deregisterExecution(Execution exec) {
        Execution contained = this.currentExecutions.remove((Object)exec.getAttemptId());
        if (contained != null && contained != exec) {
            this.failGlobal(new Exception("De-registering execution " + exec + " failed. Found for same ID execution " + contained));
        }
    }

    public void updateAccumulators(AccumulatorSnapshot accumulatorSnapshot) {
        try {
            Map<String, Accumulator<?, ?>> userAccumulators = accumulatorSnapshot.deserializeUserAccumulators(this.userClassLoader);
            ExecutionAttemptID execID = accumulatorSnapshot.getExecutionAttemptID();
            Execution execution = this.currentExecutions.get((Object)execID);
            if (execution != null) {
                execution.setAccumulators(userAccumulators);
            } else {
                LOG.debug("Received accumulator result for unknown execution {}.", (Object)execID);
            }
        }
        catch (Exception e) {
            LOG.error("Cannot update accumulators for job {}.", (Object)this.getJobID(), (Object)e);
        }
    }

    public void registerJobStatusListener(JobStatusListener listener) {
        if (listener != null) {
            this.jobStatusListeners.add(listener);
        }
    }

    public void registerExecutionListener(ExecutionStatusListener listener) {
        if (listener != null) {
            this.executionListeners.add(listener);
        }
    }

    private void notifyJobStatusChange(JobStatus newState, Throwable error) {
        if (this.jobStatusListeners.size() > 0) {
            long timestamp = System.currentTimeMillis();
            SerializedThrowable serializedError = error == null ? null : new SerializedThrowable(error);
            for (JobStatusListener listener : this.jobStatusListeners) {
                try {
                    listener.jobStatusChanges(this.getJobID(), newState, timestamp, (Throwable)serializedError);
                }
                catch (Throwable t) {
                    LOG.warn("Error while notifying JobStatusListener", t);
                }
            }
        }
    }

    void notifyExecutionChange(Execution execution, ExecutionState newExecutionState, Throwable error) {
        if (this.executionListeners.size() > 0) {
            ExecutionJobVertex vertex = execution.getVertex().getJobVertex();
            String message = error == null ? null : ExceptionUtils.stringifyException((Throwable)error);
            long timestamp = System.currentTimeMillis();
            for (ExecutionStatusListener listener : this.executionListeners) {
                try {
                    listener.executionStatusChanged(this.getJobID(), vertex.getJobVertexId(), vertex.getJobVertex().getName(), vertex.getParallelism(), execution.getParallelSubtaskIndex(), execution.getAttemptId(), newExecutionState, timestamp, message);
                }
                catch (Throwable t) {
                    LOG.warn("Error while notifying ExecutionStatusListener", t);
                }
            }
        }
        if (newExecutionState == ExecutionState.FAILED) {
            Throwable ex = error != null ? error : new FlinkException("Unknown Error (missing cause)");
            long timestamp = execution.getStateTimestamp(ExecutionState.FAILED);
            if (execution.getGlobalModVersion() == this.globalModVersion) {
                try {
                    if (this.failOverMetrics != null) {
                        this.failOverMetrics.markEvent();
                    }
                    if (this.checkpointCoordinator != null) {
                        this.checkpointCoordinator.failUnacknowledgedPendingCheckpointsFor(execution.getAttemptId(), ex);
                    }
                    this.failoverStrategy.onTaskFailure(execution, ex);
                }
                catch (Throwable t) {
                    LOG.warn("Error in failover strategy - falling back to global restart", t);
                    this.failGlobal(ex);
                }
            }
        }
    }
}

