/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.jobmaster;

import java.io.IOException;
import java.io.Serializable;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.queryablestate.KvStateID;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.StoppingException;
import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
import org.apache.flink.runtime.checkpoint.CheckpointDeclineReason;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CheckpointTriggerException;
import org.apache.flink.runtime.checkpoint.Checkpoints;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.deployment.ResultPartitionLocationTrackerProxy;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.SuppressRestartsException;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.IntermediateResult;
import org.apache.flink.runtime.executiongraph.JobStatusListener;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
import org.apache.flink.runtime.heartbeat.HeartbeatListener;
import org.apache.flink.runtime.heartbeat.HeartbeatManager;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.heartbeat.HeartbeatTarget;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.jobmanager.OnCompletionActions;
import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
import org.apache.flink.runtime.jobmanager.SubmittedJobGraph;
import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.jobmaster.EstablishedResourceManagerConnection;
import org.apache.flink.runtime.jobmaster.ExecutionGraphException;
import org.apache.flink.runtime.jobmaster.GraphManager;
import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess;
import org.apache.flink.runtime.jobmaster.JobManagerSharedServices;
import org.apache.flink.runtime.jobmaster.JobMasterConfiguration;
import org.apache.flink.runtime.jobmaster.JobMasterException;
import org.apache.flink.runtime.jobmaster.JobMasterGateway;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.RescalingBehaviour;
import org.apache.flink.runtime.jobmaster.ResourceManagerAddress;
import org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway;
import org.apache.flink.runtime.jobmaster.SerializedInputSplit;
import org.apache.flink.runtime.jobmaster.exceptions.JobModificationException;
import org.apache.flink.runtime.jobmaster.factories.JobManagerJobMetricGroupFactory;
import org.apache.flink.runtime.jobmaster.failover.OperationLogManager;
import org.apache.flink.runtime.jobmaster.failover.OperationLogStoreLoader;
import org.apache.flink.runtime.jobmaster.failover.OperationLogType;
import org.apache.flink.runtime.jobmaster.message.ClassloadingProps;
import org.apache.flink.runtime.jobmaster.message.PendingSlotRequest;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolFactory;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolGateway;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
import org.apache.flink.runtime.preaggregatedaccumulators.AccumulatorAggregationCoordinator;
import org.apache.flink.runtime.preaggregatedaccumulators.CommitAccumulator;
import org.apache.flink.runtime.query.KvStateLocation;
import org.apache.flink.runtime.query.KvStateLocationRegistry;
import org.apache.flink.runtime.query.UnknownKvStateLocation;
import org.apache.flink.runtime.registration.RegisteredRpcConnection;
import org.apache.flink.runtime.registration.RegistrationResponse;
import org.apache.flink.runtime.registration.RetryingRegistration;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker;
import org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStats;
import org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStatsResponse;
import org.apache.flink.runtime.rest.messages.job.JobPendingSlotRequestDetail;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.FencedRpcEndpoint;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcTimeout;
import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
import org.apache.flink.runtime.schedule.GraphManagerPlugin;
import org.apache.flink.runtime.schedule.GraphManagerPluginFactory;
import org.apache.flink.runtime.schedule.SchedulingConfig;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.taskexecutor.AccumulatorReport;
import org.apache.flink.runtime.taskexecutor.TaskExecutionStatus;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TaskExecutorReportResponse;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.update.JobUpdateRequest;
import org.apache.flink.runtime.update.action.JobGraphReplaceAction;
import org.apache.flink.runtime.update.action.JobGraphUpdateAction;
import org.apache.flink.runtime.update.action.JobUpdateAction;
import org.apache.flink.runtime.util.EvictingBoundedList;
import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;

public class JobMaster
extends FencedRpcEndpoint<JobMasterId>
implements JobMasterGateway {
    public static final String JOB_MANAGER_NAME = "jobmanager";
    public static final String ARCHIVE_NAME = "archive";
    private final JobMasterConfiguration jobMasterConfiguration;
    private final ResourceID resourceId;
    private JobGraph jobGraph;
    private final Time rpcTimeout;
    private final HighAvailabilityServices highAvailabilityServices;
    private final BlobServer blobServer;
    private final JobManagerJobMetricGroupFactory jobMetricGroupFactory;
    private final AccumulatorAggregationCoordinator accumulatorAggregationCoordinator;
    private final HeartbeatManager<AccumulatorReport, Void> taskManagerHeartbeatManager;
    private final HeartbeatManager<Void, Void> resourceManagerHeartbeatManager;
    private final ScheduledExecutorService scheduledExecutorService;
    private final OnCompletionActions jobCompletionActions;
    private final FatalErrorHandler fatalErrorHandler;
    private final ClassLoader userCodeLoader;
    private final SubmittedJobGraphStore submittedJobGraphStore;
    private final SlotPool slotPool;
    private final SlotPoolGateway slotPoolGateway;
    private final RestartStrategy restartStrategy;
    private final ResultPartitionLocationTrackerProxy resultPartitionLocationTrackerProxy;
    private final OperationLogManager operationLogManager;
    private final BackPressureStatsTracker backPressureStatsTracker;
    private final LeaderRetrievalService resourceManagerLeaderRetriever;
    private final Map<ResourceID, Tuple2<TaskManagerLocation, TaskExecutorGateway>> registeredTaskManagers;
    private ExecutionGraph executionGraph;
    private boolean inJobUpdate = false;
    private final EvictingBoundedList<ArchivedExecutionGraph> executionGraphHistories;
    private GraphManager graphManager;
    @Nullable
    private JobManagerJobStatusListener jobStatusListener;
    @Nullable
    private JobManagerJobMetricGroup jobManagerJobMetricGroup;
    @Nullable
    private String lastInternalSavepoint;
    @Nullable
    private ResourceManagerAddress resourceManagerAddress;
    @Nullable
    private ResourceManagerConnection resourceManagerConnection;
    @Nullable
    private EstablishedResourceManagerConnection establishedResourceManagerConnection;

    public JobMaster(RpcService rpcService, JobMasterConfiguration jobMasterConfiguration, ResourceID resourceId, JobGraph jobGraph, HighAvailabilityServices highAvailabilityService, SlotPoolFactory slotPoolFactory, JobManagerSharedServices jobManagerSharedServices, HeartbeatServices heartbeatServices, BlobServer blobServer, JobManagerJobMetricGroupFactory jobMetricGroupFactory, OnCompletionActions jobCompletionActions, FatalErrorHandler fatalErrorHandler, ClassLoader userCodeLoader, SubmittedJobGraphStore submittedJobGraphStore) throws Exception {
        super(rpcService, AkkaRpcServiceUtils.createRandomName(JOB_MANAGER_NAME));
        JobMasterGateway selfGateway = this.getSelfGateway(JobMasterGateway.class);
        this.jobMasterConfiguration = (JobMasterConfiguration)Preconditions.checkNotNull((Object)jobMasterConfiguration);
        this.resourceId = (ResourceID)Preconditions.checkNotNull((Object)resourceId);
        this.jobGraph = (JobGraph)Preconditions.checkNotNull((Object)jobGraph);
        this.rpcTimeout = jobMasterConfiguration.getRpcTimeout();
        this.highAvailabilityServices = (HighAvailabilityServices)Preconditions.checkNotNull((Object)highAvailabilityService);
        this.blobServer = (BlobServer)Preconditions.checkNotNull((Object)blobServer);
        this.scheduledExecutorService = jobManagerSharedServices.getScheduledExecutorService();
        this.jobCompletionActions = (OnCompletionActions)Preconditions.checkNotNull((Object)jobCompletionActions);
        this.fatalErrorHandler = (FatalErrorHandler)Preconditions.checkNotNull((Object)fatalErrorHandler);
        this.userCodeLoader = (ClassLoader)Preconditions.checkNotNull((Object)userCodeLoader);
        this.jobMetricGroupFactory = (JobManagerJobMetricGroupFactory)Preconditions.checkNotNull((Object)jobMetricGroupFactory);
        this.submittedJobGraphStore = (SubmittedJobGraphStore)Preconditions.checkNotNull((Object)submittedJobGraphStore);
        this.accumulatorAggregationCoordinator = new AccumulatorAggregationCoordinator();
        this.taskManagerHeartbeatManager = heartbeatServices.createHeartbeatManagerSender(resourceId, new TaskManagerHeartbeatListener(selfGateway), rpcService.getScheduledExecutor(), this.log);
        this.resourceManagerHeartbeatManager = heartbeatServices.createHeartbeatManager(resourceId, new ResourceManagerHeartbeatListener(), rpcService.getScheduledExecutor(), this.log);
        String jobName = jobGraph.getName();
        JobID jid = jobGraph.getJobID();
        this.log.info("Initializing job {} ({}).", (Object)jobName, (Object)jid);
        ExecutionConfig executionConfig = (ExecutionConfig)jobGraph.getSerializedExecutionConfig().deserializeValue(userCodeLoader);
        RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration = executionConfig.getRestartStrategy();
        this.restartStrategy = restartStrategyConfiguration != null ? RestartStrategyFactory.createRestartStrategy(restartStrategyConfiguration) : jobManagerSharedServices.getRestartStrategyFactory().createRestartStrategy();
        this.resultPartitionLocationTrackerProxy = new ResultPartitionLocationTrackerProxy(jobMasterConfiguration.getConfiguration());
        this.log.info("Using restart strategy {} for {} ({}).", new Object[]{this.restartStrategy, jobName, jid});
        this.resourceManagerLeaderRetriever = this.highAvailabilityServices.getResourceManagerLeaderRetriever();
        this.slotPool = ((SlotPoolFactory)Preconditions.checkNotNull((Object)slotPoolFactory)).createSlotPool(jobGraph.getJobID());
        this.slotPoolGateway = this.slotPool.getSelfGateway(SlotPoolGateway.class);
        this.registeredTaskManagers = new HashMap<ResourceID, Tuple2<TaskManagerLocation, TaskExecutorGateway>>(4);
        this.backPressureStatsTracker = (BackPressureStatsTracker)Preconditions.checkNotNull((Object)jobManagerSharedServices.getBackPressureStatsTracker());
        this.lastInternalSavepoint = null;
        this.operationLogManager = new OperationLogManager(OperationLogStoreLoader.loadOperationLogStore(jobGraph.getJobID(), jobMasterConfiguration.getConfiguration()));
        this.jobStatusListener = null;
        this.resourceManagerConnection = null;
        this.establishedResourceManagerConnection = null;
        JobManagerJobMetricGroup jmJobMetricGroup = jobMetricGroupFactory.create(jobGraph);
        ExecutionGraph eg = this.createAndRestoreExecutionGraph(jobGraph, jmJobMetricGroup);
        this.executionGraphHistories = new EvictingBoundedList(jobMasterConfiguration.getConfiguration().getInteger(JobManagerOptions.MAX_EXECUTION_GRAPH_HISTORY_SIZE));
        this.assignExecutionGraph(eg, jmJobMetricGroup);
    }

    @Override
    public void start() {
        throw new UnsupportedOperationException("Should never call start() without leader ID");
    }

    public CompletableFuture<Acknowledge> start(JobMasterId newJobMasterId, Time timeout) throws Exception {
        super.start();
        return this.callAsyncWithoutFencing(() -> this.startJobExecution(newJobMasterId), timeout);
    }

    public CompletableFuture<Acknowledge> suspend(Exception cause, Time timeout) {
        CompletableFuture<Acknowledge> suspendFuture = this.callAsyncWithoutFencing(() -> this.suspendExecution(cause), timeout);
        this.stop();
        return suspendFuture;
    }

    public void reconcile() throws Exception {
        if (this.executionGraph.getState() != JobStatus.CREATED) {
            this.clearExecutionGraphFields();
            JobManagerJobMetricGroup newJobManagerJobMetricGroup = this.jobMetricGroupFactory.create(this.jobGraph);
            ExecutionGraph newExecutionGraph = this.createAndRestoreExecutionGraph(this.jobGraph, newJobManagerJobMetricGroup);
            this.assignExecutionGraph(newExecutionGraph, newJobManagerJobMetricGroup);
        }
        this.graphManager.enterReconcile();
        try {
            this.operationLogManager.replay();
        }
        catch (Throwable t) {
            this.log.warn("Fail to replay log for {}, will start the job as a new one.", (Object)this.executionGraph.getJobID(), (Object)t);
            this.operationLogManager.clear();
            this.clearExecutionGraphFields();
            JobManagerJobMetricGroup newJobManagerJobMetricGroup = this.jobMetricGroupFactory.create(this.jobGraph);
            ExecutionGraph newExecutionGraph = this.createAndRestoreExecutionGraph(this.jobGraph, newJobManagerJobMetricGroup);
            this.assignExecutionGraph(newExecutionGraph, newJobManagerJobMetricGroup);
            return;
        }
        this.log.info("Job master replay log finish.");
        this.operationLogManager.start();
        this.executionGraph.reconcile();
    }

    @Override
    public CompletableFuture<Void> postStop() {
        this.log.info("Stopping the JobMaster for job {}({}).", (Object)this.jobGraph.getName(), (Object)this.jobGraph.getJobID());
        if (this.graphManager != null) {
            this.graphManager.dispose();
        }
        HashSet<ResourceID> taskManagerResourceIds = new HashSet<ResourceID>(this.registeredTaskManagers.keySet());
        FlinkException cause = new FlinkException("Stopping JobMaster for job " + this.jobGraph.getName() + '(' + this.jobGraph.getJobID() + ").");
        for (ResourceID taskManagerResourceId : taskManagerResourceIds) {
            this.disconnectTaskManager(taskManagerResourceId, (Exception)cause);
        }
        this.taskManagerHeartbeatManager.stop();
        this.resourceManagerHeartbeatManager.stop();
        this.suspendExecution((Exception)new FlinkException("JobManager is shutting down."));
        this.slotPool.shutDown();
        CompletableFuture<Object> disposeInternalSavepointFuture = this.lastInternalSavepoint != null ? CompletableFuture.runAsync(() -> this.disposeSavepoint(this.lastInternalSavepoint)) : CompletableFuture.completedFuture(null);
        this.accumulatorAggregationCoordinator.clear();
        CompletableFuture<Void> slotPoolTerminationFuture = this.slotPool.getTerminationFuture();
        return FutureUtils.completeAll(Arrays.asList(disposeInternalSavepointFuture, slotPoolTerminationFuture));
    }

    @Override
    public CompletableFuture<Acknowledge> cancel(Time timeout) {
        this.executionGraph.cancel();
        return CompletableFuture.completedFuture(Acknowledge.get());
    }

    @Override
    public CompletableFuture<Acknowledge> stop(Time timeout) {
        try {
            this.executionGraph.stop();
        }
        catch (StoppingException e) {
            return FutureUtils.completedExceptionally((Throwable)((Object)e));
        }
        return CompletableFuture.completedFuture(Acknowledge.get());
    }

    @Override
    public CompletableFuture<JobGraph> requestJobGraph(Time timeout) {
        this.log.debug("Requesting job graph {}.", (Object)this.jobGraph);
        return CompletableFuture.completedFuture(this.jobGraph);
    }

    @Override
    public CompletableFuture<Acknowledge> updateJob(JobUpdateRequest request, Time timeout) {
        JobGraph newJobGraph;
        try {
            newJobGraph = (JobGraph)InstantiationUtil.clone((Serializable)this.jobGraph);
        }
        catch (Exception e) {
            return FutureUtils.completedExceptionally((Throwable)((Object)new JobModificationException("Failed to make a copy of current job graph " + this.jobGraph.getJobID(), e)));
        }
        for (JobUpdateAction action : request.getJobUpdateActions()) {
            if (action instanceof JobGraphUpdateAction) {
                ((JobGraphUpdateAction)action).updateJobGraph(newJobGraph);
                continue;
            }
            if (action instanceof JobGraphReplaceAction) {
                newJobGraph = ((JobGraphReplaceAction)action).getNewJobGraph();
                newJobGraph.setJobVersion(this.jobGraph.getJobVersion());
                continue;
            }
            return FutureUtils.completedExceptionally(new IllegalArgumentException("Unknown job update action: " + action));
        }
        newJobGraph.setJobVersion(newJobGraph.getJobVersion() + 1L);
        if (!request.shouldTriggerJobReload()) {
            try {
                this.jobGraph = newJobGraph;
                this.log.info("Persisting the new JobGraph...");
                this.submittedJobGraphStore.putJobGraph(new SubmittedJobGraph(newJobGraph, null));
                this.log.info("Job update finished successfully without restarting the job.");
                return CompletableFuture.completedFuture(Acknowledge.get());
            }
            catch (Throwable t) {
                String msg = "Failed to persist the new job graph of job " + newJobGraph.getJobID();
                this.log.warn(msg, t);
                return FutureUtils.completedExceptionally((Throwable)((Object)new JobModificationException(msg, t)));
            }
        }
        this.log.debug("Update original job graph {} with new job graph {}.", (Object)this.jobGraph, (Object)newJobGraph);
        return this.updateJob(newJobGraph);
    }

    @Override
    public CompletableFuture<Acknowledge> rescaleJob(int newParallelism, RescalingBehaviour rescalingBehaviour, Time timeout) {
        ArrayList<JobVertexID> allOperators = new ArrayList<JobVertexID>(this.jobGraph.getNumberOfVertices());
        for (JobVertex jobVertex : this.jobGraph.getVertices()) {
            allOperators.add(jobVertex.getID());
        }
        return this.rescaleOperators(allOperators, newParallelism, rescalingBehaviour, timeout);
    }

    @Override
    public CompletableFuture<Acknowledge> rescaleOperators(Collection<JobVertexID> operators, int newParallelism, RescalingBehaviour rescalingBehaviour, Time timeout) {
        ExecutionGraph newExecutionGraph;
        if (newParallelism <= 0) {
            return FutureUtils.completedExceptionally((Throwable)((Object)new JobModificationException("The target parallelism of a rescaling operation must be larger than 0.")));
        }
        try {
            this.rescaleJobGraph(operators, newParallelism, rescalingBehaviour);
        }
        catch (FlinkException e) {
            String msg = String.format("Cannot rescale job %s.", this.jobGraph.getName());
            this.log.info(msg, (Throwable)e);
            return FutureUtils.completedExceptionally((Throwable)((Object)new JobModificationException(msg, e)));
        }
        ExecutionGraph currentExecutionGraph = this.executionGraph;
        JobManagerJobMetricGroup newJobManagerJobMetricGroup = this.jobMetricGroupFactory.create(this.jobGraph);
        try {
            newExecutionGraph = this.createExecutionGraph(this.jobGraph, newJobManagerJobMetricGroup);
        }
        catch (JobException | JobExecutionException e) {
            return FutureUtils.completedExceptionally((Throwable)((Object)new JobModificationException("Could not create rescaled ExecutionGraph.", e)));
        }
        CheckpointCoordinator checkpointCoordinator = currentExecutionGraph.getCheckpointCoordinator();
        checkpointCoordinator.stopCheckpointScheduler();
        CompletableFuture<String> savepointFuture = this.getJobModificationSavepoint(timeout);
        CompletionStage executionGraphFuture = this.restoreExecutionGraphFromRescalingSavepoint(newExecutionGraph, savepointFuture).handleAsync((executionGraph, failure) -> {
            if (failure != null) {
                if (checkpointCoordinator.isPeriodicCheckpointingConfigured()) {
                    checkpointCoordinator.startCheckpointScheduler();
                }
                throw new CompletionException(ExceptionUtils.stripCompletionException((Throwable)failure));
            }
            return executionGraph;
        }, (Executor)this.getMainThreadExecutor());
        CompletionStage terminationFuture = ((CompletableFuture)executionGraphFuture).thenComposeAsync(ignored -> {
            this.suspendExecutionGraph((Exception)((Object)new FlinkException("Job is being rescaled.")));
            return currentExecutionGraph.getTerminationFuture();
        }, (Executor)this.getMainThreadExecutor());
        CompletionStage suspendedFuture = ((CompletableFuture)terminationFuture).thenAccept(jobStatus -> {
            if (jobStatus != JobStatus.SUSPENDED) {
                String msg = String.format("Job %s rescaling failed because we could not suspend the execution graph.", this.jobGraph.getName());
                this.log.info(msg);
                throw new CompletionException((Throwable)((Object)new JobModificationException(msg)));
            }
        });
        CompletionStage rescalingFuture = ((CompletableFuture)suspendedFuture).thenCombineAsync(executionGraphFuture, (ignored, restoredExecutionGraph) -> {
            if (this.executionGraph == currentExecutionGraph) {
                this.clearExecutionGraphFields();
                Preconditions.checkState((boolean)this.executionGraph.getState().isTerminalState());
                this.assignExecutionGraph((ExecutionGraph)restoredExecutionGraph, newJobManagerJobMetricGroup);
                this.operationLogManager.clear();
                this.scheduleExecutionGraph();
                return Acknowledge.get();
            }
            throw new CompletionException((Throwable)((Object)new JobModificationException("Detected concurrent modification of ExecutionGraph. Aborting the rescaling.")));
        }, (Executor)this.getMainThreadExecutor());
        ((CompletableFuture)rescalingFuture).whenComplete((ignored, throwable) -> {
            if (throwable != null) {
                newExecutionGraph.failGlobal(new SuppressRestartsException(new FlinkException(String.format("Failed to rescale the job %s.", this.jobGraph.getJobID()), throwable)));
            }
        });
        return rescalingFuture;
    }

    @Override
    public CompletableFuture<Acknowledge> updateTaskExecutionState(TaskExecutionState taskExecutionState) {
        Preconditions.checkNotNull((Object)taskExecutionState, (String)"taskExecutionState");
        if (this.graphManager.isReconciling()) {
            return FutureUtils.completedExceptionally((Throwable)((Object)new JobMasterException("The job master is reconciling with task executors.")));
        }
        if (this.executionGraph.updateState(taskExecutionState)) {
            return CompletableFuture.completedFuture(Acknowledge.get());
        }
        return FutureUtils.completedExceptionally((Throwable)((Object)new ExecutionGraphException("The execution attempt " + (Object)((Object)taskExecutionState.getID()) + " was not found.")));
    }

    @Override
    public CompletableFuture<SerializedInputSplit> requestNextInputSplit(JobVertexID vertexID, OperatorID operatorID, ExecutionAttemptID executionAttempt) {
        Execution execution = this.executionGraph.getRegisteredExecutions().get((Object)executionAttempt);
        if (execution == null) {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Can not find Execution for attempt {}.", (Object)executionAttempt);
            }
            return FutureUtils.completedExceptionally(new Exception("Can not find Execution for attempt " + (Object)((Object)executionAttempt)));
        }
        ExecutionJobVertex vertex = this.executionGraph.getJobVertex(vertexID);
        if (vertex == null) {
            this.log.error("Cannot find execution vertex for vertex ID {}.", (Object)vertexID);
            return FutureUtils.completedExceptionally(new Exception("Cannot find execution vertex for vertex ID " + (Object)((Object)vertexID)));
        }
        ExecutionVertex executionVertex = execution.getVertex();
        InputSplit nextInputSplit = executionVertex.getNextInputSplitFromAssgined(operatorID);
        if (nextInputSplit == null && !executionVertex.isOverInputSplitsLimit(operatorID)) {
            int taskId;
            InputSplitAssigner splitAssigner = vertex.getSplitAssigner(operatorID);
            if (splitAssigner == null) {
                this.log.error("No InputSplitAssigner for vertexID {}, operatorID {}.", (Object)vertexID, (Object)operatorID);
                return FutureUtils.completedExceptionally(new Exception("No InputSplitAssigner for vertexID " + (Object)((Object)vertexID) + ", operatorID " + (Object)((Object)operatorID)));
            }
            LogicalSlot slot = execution.getAssignedResource();
            String host = slot != null ? slot.getTaskManagerLocation().getHostname() : null;
            nextInputSplit = splitAssigner.getNextInputSplit(host, taskId = execution.getVertex().getParallelSubtaskIndex());
            if (nextInputSplit != null) {
                executionVertex.inputSplitAssigned(operatorID, nextInputSplit);
            }
            if (this.log.isDebugEnabled()) {
                this.log.debug("Send next input split {}.", (Object)nextInputSplit);
            }
        }
        try {
            byte[] serializedInputSplit = InstantiationUtil.serializeObject((Object)nextInputSplit);
            return CompletableFuture.completedFuture(new SerializedInputSplit(serializedInputSplit));
        }
        catch (Exception ex) {
            this.log.error("Could not serialize the next input split of class {}.", nextInputSplit.getClass(), (Object)ex);
            IOException reason = new IOException("Could not serialize the next input split of class " + nextInputSplit.getClass() + ".", ex);
            executionVertex.fail(reason);
            return FutureUtils.completedExceptionally(reason);
        }
    }

    @Override
    public CompletableFuture<ExecutionState> requestPartitionState(IntermediateDataSetID intermediateResultId, ResultPartitionID resultPartitionId) {
        Execution execution = this.executionGraph.getRegisteredExecutions().get((Object)resultPartitionId.getProducerId());
        if (execution != null) {
            return CompletableFuture.completedFuture(execution.getState());
        }
        IntermediateResult intermediateResult = this.executionGraph.getAllIntermediateResults().get((Object)intermediateResultId);
        if (intermediateResult != null) {
            Execution producerExecution = intermediateResult.getPartitionById(resultPartitionId.getPartitionId()).getProducer().getCurrentExecutionAttempt();
            if (producerExecution.getAttemptId().equals((Object)resultPartitionId.getProducerId())) {
                return CompletableFuture.completedFuture(producerExecution.getState());
            }
            return FutureUtils.completedExceptionally(new PartitionProducerDisposedException(resultPartitionId));
        }
        return FutureUtils.completedExceptionally(new IllegalArgumentException("Intermediate data set with ID " + (Object)((Object)intermediateResultId) + " not found."));
    }

    @Override
    public CompletableFuture<Acknowledge> scheduleOrUpdateConsumers(ResultPartitionID partitionID, Time timeout) {
        if (this.graphManager.isReconciling()) {
            return FutureUtils.completedExceptionally((Throwable)((Object)new JobMasterException("The job master is reconciling with task executors.")));
        }
        try {
            this.executionGraph.scheduleOrUpdateConsumers(partitionID);
            return CompletableFuture.completedFuture(Acknowledge.get());
        }
        catch (Exception e) {
            return FutureUtils.completedExceptionally(e);
        }
    }

    @Override
    public CompletableFuture<Acknowledge> disconnectTaskManager(ResourceID resourceID, Exception cause) {
        this.log.info("Disconnect TaskExecutor {} because: {}", (Object)resourceID, (Object)cause.getMessage());
        this.taskManagerHeartbeatManager.unmonitorTarget(resourceID);
        CompletableFuture<Acknowledge> releaseFuture = this.slotPoolGateway.releaseTaskManager(resourceID, cause);
        Tuple2<TaskManagerLocation, TaskExecutorGateway> taskManagerConnection = this.registeredTaskManagers.remove(resourceID);
        if (taskManagerConnection != null) {
            ((TaskExecutorGateway)taskManagerConnection.f1).disconnectJobManager(this.jobGraph.getJobID(), cause);
        }
        return releaseFuture;
    }

    @Override
    public void acknowledgeCheckpoint(JobID jobID, ExecutionAttemptID executionAttemptID, long checkpointId, CheckpointMetrics checkpointMetrics, TaskStateSnapshot checkpointState) {
        CheckpointCoordinator checkpointCoordinator = this.executionGraph.getCheckpointCoordinator();
        AcknowledgeCheckpoint ackMessage = new AcknowledgeCheckpoint(jobID, executionAttemptID, checkpointId, checkpointMetrics, checkpointState);
        if (checkpointCoordinator != null) {
            this.getRpcService().execute(() -> {
                try {
                    checkpointCoordinator.receiveAcknowledgeMessage(ackMessage);
                }
                catch (Throwable t) {
                    this.log.warn("Error while processing checkpoint acknowledgement message");
                }
            });
        } else {
            this.log.error("Received AcknowledgeCheckpoint message for job {} with no CheckpointCoordinator", (Object)this.jobGraph.getJobID());
        }
    }

    @Override
    public void declineCheckpoint(JobID jobID, ExecutionAttemptID executionAttemptID, long checkpointID, Throwable reason) {
        DeclineCheckpoint decline = new DeclineCheckpoint(jobID, executionAttemptID, checkpointID, reason);
        CheckpointCoordinator checkpointCoordinator = this.executionGraph.getCheckpointCoordinator();
        if (checkpointCoordinator != null) {
            this.getRpcService().execute(() -> {
                try {
                    checkpointCoordinator.receiveDeclineMessage(decline);
                }
                catch (Exception e) {
                    this.log.error("Error in CheckpointCoordinator while processing {}", (Object)decline, (Object)e);
                }
            });
        } else {
            this.log.error("Received DeclineCheckpoint message for job {} with no CheckpointCoordinator", (Object)this.jobGraph.getJobID());
        }
    }

    @Override
    public CompletableFuture<KvStateLocation> requestKvStateLocation(JobID jobId, String registrationName) {
        if (this.jobGraph.getJobID().equals((Object)jobId)) {
            KvStateLocationRegistry registry;
            KvStateLocation location;
            if (this.log.isDebugEnabled()) {
                this.log.debug("Lookup key-value state for job {} with registration name {}.", (Object)this.jobGraph.getJobID(), (Object)registrationName);
            }
            if ((location = (registry = this.executionGraph.getKvStateLocationRegistry()).getKvStateLocation(registrationName)) != null) {
                return CompletableFuture.completedFuture(location);
            }
            return FutureUtils.completedExceptionally(new UnknownKvStateLocation(registrationName));
        }
        if (this.log.isDebugEnabled()) {
            this.log.debug("Request of key-value state location for unknown job {} received.", (Object)jobId);
        }
        return FutureUtils.completedExceptionally((Throwable)((Object)new FlinkJobNotFoundException(jobId)));
    }

    @Override
    public CompletableFuture<Acknowledge> notifyKvStateRegistered(JobID jobId, JobVertexID jobVertexId, KeyGroupRange keyGroupRange, String registrationName, KvStateID kvStateId, InetSocketAddress kvStateServerAddress) {
        if (this.jobGraph.getJobID().equals((Object)jobId)) {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Key value state registered for job {} under name {}.", (Object)this.jobGraph.getJobID(), (Object)registrationName);
            }
            try {
                this.executionGraph.getKvStateLocationRegistry().notifyKvStateRegistered(jobVertexId, keyGroupRange, registrationName, kvStateId, kvStateServerAddress);
                return CompletableFuture.completedFuture(Acknowledge.get());
            }
            catch (Exception e) {
                this.log.error("Failed to notify KvStateRegistry about registration {}.", (Object)registrationName);
                return FutureUtils.completedExceptionally(e);
            }
        }
        if (this.log.isDebugEnabled()) {
            this.log.debug("Notification about key-value state registration for unknown job {} received.", (Object)jobId);
        }
        return FutureUtils.completedExceptionally((Throwable)((Object)new FlinkJobNotFoundException(jobId)));
    }

    @Override
    public CompletableFuture<Acknowledge> notifyKvStateUnregistered(JobID jobId, JobVertexID jobVertexId, KeyGroupRange keyGroupRange, String registrationName) {
        if (this.jobGraph.getJobID().equals((Object)jobId)) {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Key value state unregistered for job {} under name {}.", (Object)this.jobGraph.getJobID(), (Object)registrationName);
            }
            try {
                this.executionGraph.getKvStateLocationRegistry().notifyKvStateUnregistered(jobVertexId, keyGroupRange, registrationName);
                return CompletableFuture.completedFuture(Acknowledge.get());
            }
            catch (Exception e) {
                this.log.error("Failed to notify KvStateRegistry about registration {}.", (Object)registrationName, (Object)e);
                return FutureUtils.completedExceptionally(e);
            }
        }
        if (this.log.isDebugEnabled()) {
            this.log.debug("Notification about key-value state deregistration for unknown job {} received.", (Object)jobId);
        }
        return FutureUtils.completedExceptionally((Throwable)((Object)new FlinkJobNotFoundException(jobId)));
    }

    @Override
    public CompletableFuture<ClassloadingProps> requestClassloadingProps() {
        return CompletableFuture.completedFuture(new ClassloadingProps(this.blobServer.getPort(), this.executionGraph.getRequiredJarFiles(), this.executionGraph.getRequiredClasspaths()));
    }

    @Override
    public CompletableFuture<Collection<SlotOffer>> offerSlots(ResourceID taskManagerId, Collection<SlotOffer> slots, Time timeout) {
        Tuple2<TaskManagerLocation, TaskExecutorGateway> taskManager = this.registeredTaskManagers.get(taskManagerId);
        if (taskManager == null) {
            return FutureUtils.completedExceptionally(new Exception("Unknown TaskManager " + taskManagerId));
        }
        TaskManagerLocation taskManagerLocation = (TaskManagerLocation)taskManager.f0;
        TaskExecutorGateway taskExecutorGateway = (TaskExecutorGateway)taskManager.f1;
        RpcTaskManagerGateway rpcTaskManagerGateway = new RpcTaskManagerGateway(taskExecutorGateway, (JobMasterId)((Object)this.getFencingToken()));
        return this.slotPoolGateway.offerSlots(taskManagerLocation, rpcTaskManagerGateway, slots);
    }

    @Override
    public CompletableFuture<TaskExecutorReportResponse> reportTasksExecutionStatus(ResourceID taskManagerId, List<TaskExecutionStatus> tasksExecutionStatus, @RpcTimeout Time timeout) {
        if (this.executionGraph.getState() != JobStatus.RECONCILING) {
            this.log.info("Refuse reported task status from {}", (Object)taskManagerId);
            TaskExecutorReportResponse.Decline response = new TaskExecutorReportResponse.Decline("The reported task exceeds the duration time, the job master is not in reconciling state now.");
            return CompletableFuture.completedFuture(response);
        }
        Tuple2<TaskManagerLocation, TaskExecutorGateway> taskManager = this.registeredTaskManagers.get(taskManagerId);
        if (taskManager == null) {
            TaskExecutorReportResponse.Decline response = new TaskExecutorReportResponse.Decline("Unknown TaskManager " + taskManagerId);
            return CompletableFuture.completedFuture(response);
        }
        TaskManagerLocation taskManagerLocation = (TaskManagerLocation)taskManager.f0;
        TaskExecutorGateway taskExecutorGateway = (TaskExecutorGateway)taskManager.f1;
        RpcTaskManagerGateway rpcTaskManagerGateway = new RpcTaskManagerGateway(taskExecutorGateway, (JobMasterId)((Object)this.getFencingToken()));
        ArrayList<Integer> rejected = new ArrayList<Integer>();
        for (int i = 0; i < tasksExecutionStatus.size(); ++i) {
            TaskExecutionStatus executionStatus = tasksExecutionStatus.get(i);
            SlotOffer slotOffer = executionStatus.getBoundSlot();
            this.log.debug("Accept reported task execution {} with state {} from {}", new Object[]{executionStatus.getExecutionAttemptID(), executionStatus.getExecutionState(), taskManagerId});
            ExecutionJobVertex ejv = this.executionGraph.getJobVertex(executionStatus.getJobVertexID());
            SlotSharingGroup sharingGroup = ejv.getSlotSharingGroup();
            CoLocationConstraint coLocationConstraint = null;
            if (ejv.getCoLocationGroup() != null) {
                coLocationConstraint = ejv.getCoLocationGroup().getLocationConstraint(executionStatus.getIndexOfSubtask());
            }
            LogicalSlot singleLogicalSlot = null;
            try {
                singleLogicalSlot = this.slotPool.recoverSlot(executionStatus.getJobVertexID(), sharingGroup == null ? null : sharingGroup.getSlotSharingGroupId(), coLocationConstraint, slotOffer.getAllocationId(), taskManagerLocation, slotOffer.getSlotIndex(), slotOffer.getResourceProfile(), slotOffer.getTags(), rpcTaskManagerGateway);
            }
            catch (Exception e) {
                rejected.add(i);
                continue;
            }
            try {
                if (this.graphManager.reconcileExecutionVertex(executionStatus.getJobVertexID(), executionStatus.getIndexOfSubtask(), executionStatus.getExecutionState(), executionStatus.getExecutionAttemptID(), executionStatus.getAttemptNumber(), executionStatus.getCreateTimestamp(), executionStatus.getResultPartitionIDs(), executionStatus.getResultPartitionsConsumable(), executionStatus.getAssignedInputSplits(), singleLogicalSlot)) continue;
                this.slotPool.releaseSlot(singleLogicalSlot.getSlotRequestId(), singleLogicalSlot.getSlotSharingGroupId(), singleLogicalSlot.getCoLocationConstraint(), new Exception("Fail to recover the slot"));
                rejected.add(i);
                this.log.info("Fail to reconcile status for execution {} with state {} from {}", new Object[]{executionStatus.getExecutionAttemptID(), executionStatus.getExecutionState(), taskManagerId});
                continue;
            }
            catch (Exception ex) {
                this.log.warn("Fail to recover vertex {}_{} execution {}", new Object[]{executionStatus.getJobVertexID(), executionStatus.getIndexOfSubtask(), executionStatus.getExecutionAttemptID(), ex});
                rejected.add(i);
            }
        }
        TaskExecutorReportResponse.Success response = new TaskExecutorReportResponse.Success(rejected);
        return CompletableFuture.completedFuture(response);
    }

    @Override
    public void failSlot(ResourceID taskManagerId, AllocationID allocationId, Exception cause) {
        if (this.registeredTaskManagers.containsKey(taskManagerId)) {
            this.slotPoolGateway.failAllocation(allocationId, cause);
        } else {
            this.log.warn("Cannot fail slot " + (Object)((Object)allocationId) + " because the TaskManager " + taskManagerId + " is unknown.");
        }
    }

    @Override
    public CompletableFuture<RegistrationResponse> registerTaskManager(String taskManagerRpcAddress, TaskManagerLocation taskManagerLocation, Time timeout) {
        ResourceID taskManagerId = taskManagerLocation.getResourceID();
        if (this.registeredTaskManagers.containsKey(taskManagerId)) {
            JMTMRegistrationSuccess response = new JMTMRegistrationSuccess(this.resourceId);
            return CompletableFuture.completedFuture(response);
        }
        return this.getRpcService().connect(taskManagerRpcAddress, TaskExecutorGateway.class).handleAsync((taskExecutorGateway, throwable) -> {
            if (throwable != null) {
                return new RegistrationResponse.Decline(throwable.getMessage());
            }
            this.slotPoolGateway.registerTaskManager(taskManagerId);
            this.registeredTaskManagers.put(taskManagerId, (Tuple2<TaskManagerLocation, TaskExecutorGateway>)Tuple2.of((Object)taskManagerLocation, (Object)taskExecutorGateway));
            this.taskManagerHeartbeatManager.monitorTarget(taskManagerId, new HeartbeatTarget<Void>(){

                @Override
                public void receiveHeartbeat(ResourceID resourceID, Void payload) {
                }

                @Override
                public void requestHeartbeat(ResourceID resourceID, Void payload) {
                    taskExecutorGateway.heartbeatFromJobManager(resourceID);
                }
            });
            return new JMTMRegistrationSuccess(this.resourceId);
        }, (Executor)this.getMainThreadExecutor());
    }

    @Override
    public void disconnectResourceManager(ResourceManagerId resourceManagerId, Exception cause) {
        if (this.isConnectingToResourceManager(resourceManagerId)) {
            this.reconnectToResourceManager(cause);
        }
    }

    private boolean isConnectingToResourceManager(ResourceManagerId resourceManagerId) {
        return this.resourceManagerAddress != null && this.resourceManagerAddress.getResourceManagerId().equals((Object)resourceManagerId);
    }

    @Override
    public void heartbeatFromTaskManager(ResourceID resourceID, AccumulatorReport accumulatorReport) {
        this.taskManagerHeartbeatManager.receiveHeartbeat(resourceID, accumulatorReport);
    }

    @Override
    public void heartbeatFromResourceManager(ResourceID resourceID) {
        this.resourceManagerHeartbeatManager.requestHeartbeat(resourceID, null);
    }

    @Override
    public CompletableFuture<JobDetails> requestJobDetails(@RpcTimeout Time timeout) {
        ExecutionGraph currentExecutionGraph = this.executionGraph;
        return CompletableFuture.supplyAsync(() -> WebMonitorUtils.createDetailsForJob(currentExecutionGraph), this.scheduledExecutorService);
    }

    @Override
    public CompletableFuture<JobStatus> requestJobStatus(Time timeout) {
        return CompletableFuture.completedFuture(this.executionGraph.getState());
    }

    @Override
    public CompletableFuture<ArchivedExecutionGraph> requestJob(Time timeout) {
        return CompletableFuture.completedFuture(ArchivedExecutionGraph.createFrom(this.executionGraph));
    }

    @Override
    public CompletableFuture<EvictingBoundedList<ArchivedExecutionGraph>> requestJobHistories(Time timeout) {
        return CompletableFuture.completedFuture(this.executionGraphHistories);
    }

    @Override
    public CompletableFuture<Collection<JobPendingSlotRequestDetail>> requestPendingSlotRequestDetails(@RpcTimeout Time timeout) {
        return this.slotPoolGateway.requestPendingSlotRequests(timeout).thenApply(pendingSlotRequests -> {
            ArrayList<JobPendingSlotRequestDetail> pendingSlotRequestDetailList = new ArrayList<JobPendingSlotRequestDetail>();
            for (PendingSlotRequest pendingSlotRequest : pendingSlotRequests) {
                List<PendingSlotRequest.PendingScheduledUnit> scheduledUnits = pendingSlotRequest.getPendingScheduledUnits();
                Preconditions.checkState((scheduledUnits.size() > 0 ? 1 : 0) != 0);
                ArrayList<JobPendingSlotRequestDetail.VertexTaskInfo> vertexTaskInfos = new ArrayList<JobPendingSlotRequestDetail.VertexTaskInfo>();
                for (PendingSlotRequest.PendingScheduledUnit scheduledUnit : scheduledUnits) {
                    vertexTaskInfos.add(new JobPendingSlotRequestDetail.VertexTaskInfo(scheduledUnit.getJobVertexId(), scheduledUnit.getTaskName(), scheduledUnit.getSubTaskIndex(), scheduledUnit.getSubTaskAttempt()));
                }
                PendingSlotRequest.PendingScheduledUnit anyScheduledUnit = scheduledUnits.iterator().next();
                pendingSlotRequestDetailList.add(new JobPendingSlotRequestDetail(pendingSlotRequest.getSlotRequestId(), pendingSlotRequest.getResourceProfile(), pendingSlotRequest.getStartTimestamp(), anyScheduledUnit.getSlotSharingGroupId(), anyScheduledUnit.getCoLocationGroupId(), vertexTaskInfos));
            }
            return pendingSlotRequestDetailList;
        });
    }

    @Override
    public CompletableFuture<String> triggerSavepoint(@Nullable String targetDirectory, boolean cancelJob, Time timeout) {
        CheckpointCoordinator checkpointCoordinator = this.executionGraph.getCheckpointCoordinator();
        if (checkpointCoordinator == null) {
            return FutureUtils.completedExceptionally(new IllegalStateException(String.format("Job %s is not a streaming job.", this.jobGraph.getJobID())));
        }
        if (cancelJob) {
            checkpointCoordinator.stopCheckpointScheduler();
        }
        return ((CompletableFuture)((CompletableFuture)checkpointCoordinator.triggerSavepoint(System.currentTimeMillis(), targetDirectory).thenApply(CompletedCheckpoint::getExternalPointer)).thenApplyAsync(path -> {
            if (cancelJob) {
                this.log.info("Savepoint stored in {}. Now cancelling {}.", path, (Object)this.jobGraph.getJobID());
                this.cancel(timeout);
            }
            return path;
        }, (Executor)this.getMainThreadExecutor())).exceptionally(throwable -> {
            if (cancelJob) {
                this.startCheckpointScheduler(checkpointCoordinator);
            }
            throw new CompletionException((Throwable)throwable);
        });
    }

    private void startCheckpointScheduler(CheckpointCoordinator checkpointCoordinator) {
        if (checkpointCoordinator.isPeriodicCheckpointingConfigured()) {
            try {
                checkpointCoordinator.startCheckpointScheduler();
            }
            catch (IllegalStateException illegalStateException) {
                // empty catch block
            }
        }
    }

    @Override
    public CompletableFuture<OperatorBackPressureStatsResponse> requestOperatorBackPressureStats(JobVertexID jobVertexId) {
        ExecutionJobVertex jobVertex = this.executionGraph.getJobVertex(jobVertexId);
        if (jobVertex == null) {
            return FutureUtils.completedExceptionally(new FlinkException("JobVertexID not found " + (Object)((Object)jobVertexId)));
        }
        Optional<OperatorBackPressureStats> operatorBackPressureStats = this.backPressureStatsTracker.getOperatorBackPressureStats(jobVertex);
        return CompletableFuture.completedFuture(OperatorBackPressureStatsResponse.of(operatorBackPressureStats.orElse(null)));
    }

    @Override
    public void notifyAllocationFailure(AllocationID allocationID, Exception cause) {
        this.slotPoolGateway.failAllocation(allocationID, cause);
    }

    @Override
    public void commitPreAggregatedAccumulator(List<CommitAccumulator> commitAccumulators) {
        for (CommitAccumulator commitAccumulator : commitAccumulators) {
            this.accumulatorAggregationCoordinator.commitPreAggregatedAccumulator(this.executionGraph, commitAccumulator);
        }
    }

    @Override
    public <V, A extends Serializable> CompletableFuture<Accumulator<V, A>> queryPreAggregatedAccumulator(String name) {
        return this.accumulatorAggregationCoordinator.queryPreAggregatedAccumulator(name);
    }

    private Acknowledge startJobExecution(JobMasterId newJobMasterId) throws Exception {
        this.validateRunsInMainThread();
        Preconditions.checkNotNull((Object)((Object)newJobMasterId), (String)"The new JobMasterId must not be null.");
        if (Objects.equals(this.getFencingToken(), (Object)newJobMasterId)) {
            this.log.info("Already started the job execution with JobMasterId {}.", (Object)newJobMasterId);
            return Acknowledge.get();
        }
        this.setNewFencingToken(newJobMasterId);
        this.startJobMasterServices();
        this.log.info("Starting execution of job {} ({})", (Object)this.jobGraph.getName(), (Object)this.jobGraph.getJobID());
        if (JobStatus.CREATED.equals((Object)this.executionGraph.getState()) && !this.graphManager.isReconciling()) {
            this.scheduleExecutionGraph();
        } else {
            this.executionGraph.getReconcileFuture().thenApplyAsync(reconcileFailedExecutions -> {
                this.graphManager.leaveReconcile();
                if (JobStatus.RUNNING.equals((Object)this.executionGraph.getState())) {
                    for (ExecutionAttemptID executionAttemptId : reconcileFailedExecutions) {
                        Execution execution;
                        if (executionAttemptId == null || (execution = this.executionGraph.getRegisteredExecutions().get((Object)executionAttemptId)) == null) continue;
                        this.updateTaskExecutionState(new TaskExecutionState(this.executionGraph.getJobID(), execution.getAttemptId(), ExecutionState.FAILED, new FlinkException("Fail to reconcile with master")));
                    }
                } else if (JobStatus.CREATED.equals((Object)this.executionGraph.getState())) {
                    this.scheduleExecutionGraph();
                } else {
                    this.log.error("When reconcile finished, the job is in {}, this is a logical error.", (Object)this.executionGraph.getState());
                }
                return null;
            }, (Executor)this.getMainThreadExecutor());
        }
        return Acknowledge.get();
    }

    private void startJobMasterServices() throws Exception {
        this.slotPool.start((JobMasterId)((Object)this.getFencingToken()), this.getAddress());
        this.reconnectToResourceManager((Exception)((Object)new FlinkException("Starting JobMaster component.")));
        this.resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
    }

    private void setNewFencingToken(JobMasterId newJobMasterId) {
        if (this.getFencingToken() != null) {
            this.log.info("Restarting old job with JobMasterId {}. The new JobMasterId is {}.", this.getFencingToken(), (Object)newJobMasterId);
        }
        this.setFencingToken(newJobMasterId);
    }

    private Acknowledge suspendExecution(Exception cause) {
        this.validateRunsInMainThread();
        if (this.getFencingToken() == null) {
            this.log.debug("Job has already been suspended or shutdown.");
            return Acknowledge.get();
        }
        this.setFencingToken(null);
        try {
            this.resourceManagerLeaderRetriever.stop();
        }
        catch (Throwable t) {
            this.log.warn("Failed to stop resource manager leader retriever when suspending.", t);
        }
        this.suspendAndClearExecutionGraphFields(cause);
        this.operationLogManager.stop();
        this.slotPoolGateway.suspend();
        this.closeResourceManagerConnection(cause);
        return Acknowledge.get();
    }

    private void assignExecutionGraph(ExecutionGraph newExecutionGraph, JobManagerJobMetricGroup newJobManagerJobMetricGroup) {
        Preconditions.checkState((this.executionGraph == null || JobStatus.CREATED.equals((Object)this.executionGraph.getState()) || this.executionGraph.getState().isTerminalState() ? 1 : 0) != 0, (Object)("The job state is " + (this.executionGraph == null ? null : this.executionGraph.getState())));
        Preconditions.checkState((this.jobManagerJobMetricGroup == null ? 1 : 0) != 0);
        if (this.executionGraph != null) {
            this.executionGraphHistories.add(ArchivedExecutionGraph.createFrom(this.executionGraph));
            this.log.info("The old execution graph is added to execution graph history store({}/{}).", (Object)this.executionGraphHistories.size(), (Object)this.executionGraphHistories.getSizeLimit());
        }
        this.executionGraph = newExecutionGraph;
        this.jobManagerJobMetricGroup = newJobManagerJobMetricGroup;
        Preconditions.checkState((this.jobStatusListener == null ? 1 : 0) != 0);
        this.jobStatusListener = new JobManagerJobStatusListener();
        this.executionGraph.registerJobStatusListener(this.jobStatusListener);
        this.setupGraphManager();
    }

    private void scheduleExecutionGraph() {
        try {
            this.operationLogManager.start();
            if (this.establishedResourceManagerConnection != null) {
                this.establishedResourceManagerConnection.getResourceManagerGateway().setPlacementConstraints(this.jobGraph.getJobID(), this.jobGraph.getPlacementConstraints(), this.rpcTimeout);
            }
            this.executionGraph.scheduleForExecution();
        }
        catch (JobException e) {
            this.jobCompletionActions.jobMasterFailed((Throwable)((Object)e));
        }
        catch (Throwable t) {
            this.executionGraph.failGlobal(t);
        }
    }

    private ExecutionGraph createAndRestoreExecutionGraph(JobGraph jobGraph, JobManagerJobMetricGroup currentJobManagerJobMetricGroup) throws Exception {
        ExecutionGraph newExecutionGraph = this.createExecutionGraph(jobGraph, currentJobManagerJobMetricGroup);
        CheckpointCoordinator checkpointCoordinator = newExecutionGraph.getCheckpointCoordinator();
        if (checkpointCoordinator != null && !checkpointCoordinator.restoreLatestCheckpointedState(newExecutionGraph.getAllVertices(), false, false)) {
            this.tryRestoreExecutionGraphFromSavepoint(newExecutionGraph, jobGraph.getSavepointRestoreSettings());
        }
        return newExecutionGraph;
    }

    private ExecutionGraph createExecutionGraph(JobGraph jobGraph, JobManagerJobMetricGroup currentJobManagerJobMetricGroup) throws JobExecutionException, JobException {
        return ExecutionGraphBuilder.buildGraph(null, jobGraph, this.jobMasterConfiguration.getConfiguration(), this.scheduledExecutorService, this.scheduledExecutorService, this.slotPool.getSlotProvider(), this.userCodeLoader, this.highAvailabilityServices.getCheckpointRecoveryFactory(), this.rpcTimeout, this.restartStrategy, currentJobManagerJobMetricGroup, this.blobServer, this.resultPartitionLocationTrackerProxy, this.jobMasterConfiguration.getSlotRequestTimeout(), this.log);
    }

    private void suspendAndClearExecutionGraphFields(Exception cause) {
        this.suspendExecutionGraph(cause);
        this.clearExecutionGraphFields();
    }

    private void suspendExecutionGraph(Exception cause) {
        this.executionGraph.suspend(cause);
    }

    private void clearExecutionGraphFields() {
        if (this.jobManagerJobMetricGroup != null) {
            this.jobManagerJobMetricGroup.close();
            this.jobManagerJobMetricGroup = null;
        }
        if (this.jobStatusListener != null) {
            this.jobStatusListener.stop();
            this.jobStatusListener = null;
        }
    }

    private GraphManager createGraphManager() {
        Configuration schedulingConfig = this.jobGraph.getSchedulingConfiguration();
        GraphManagerPlugin graphManagerPlugin = GraphManagerPluginFactory.createGraphManagerPlugin(schedulingConfig, this.userCodeLoader);
        Configuration jobConfig = this.jobGraph.getJobConfiguration();
        Configuration configuration = new Configuration();
        if (jobConfig != null) {
            configuration.addAll(jobConfig);
        }
        if (schedulingConfig != null) {
            configuration.addAll(schedulingConfig);
        }
        GraphManager graphManager = new GraphManager(graphManagerPlugin, this.getSelfGateway(JobMasterGateway.class), this.operationLogManager, this.executionGraph);
        graphManager.open(this.jobGraph, new SchedulingConfig(configuration, this.userCodeLoader));
        return graphManager;
    }

    private void setupGraphManager() {
        if (this.graphManager != null) {
            this.graphManager.dispose();
        }
        this.graphManager = this.createGraphManager();
        this.operationLogManager.registerLogHandler(OperationLogType.GRAPH_MANAGER, this.graphManager);
        this.executionGraph.setGraphManager(this.graphManager);
        this.executionGraph.registerExecutionListener(this.graphManager);
    }

    private void disposeSavepoint(String savepointPath) {
        try {
            Checkpoints.disposeSavepoint(savepointPath, this.jobMasterConfiguration.getConfiguration(), this.userCodeLoader, this.log);
        }
        catch (IOException | FlinkException e) {
            this.log.info("Could not dispose temporary rescaling savepoint under {}.", (Object)savepointPath, (Object)e);
        }
    }

    private void tryRestoreExecutionGraphFromSavepoint(ExecutionGraph executionGraphToRestore, SavepointRestoreSettings savepointRestoreSettings) throws Exception {
        CheckpointCoordinator checkpointCoordinator;
        if (savepointRestoreSettings.restoreSavepoint() && (checkpointCoordinator = executionGraphToRestore.getCheckpointCoordinator()) != null) {
            checkpointCoordinator.restoreSavepoint(savepointRestoreSettings.getRestorePath(), savepointRestoreSettings.allowNonRestoredState(), savepointRestoreSettings.resumeFromLatestCheckpoint(), executionGraphToRestore.getAllVertices(), this.userCodeLoader);
        }
    }

    private CompletableFuture<Acknowledge> updateJob(JobGraph newJobGraph) {
        if (this.inJobUpdate) {
            String msg = "Fail to update the job as a previous update is still in progress.";
            this.log.warn("Fail to update the job as a previous update is still in progress.");
            return FutureUtils.completedExceptionally((Throwable)((Object)new JobModificationException("Fail to update the job as a previous update is still in progress.")));
        }
        this.inJobUpdate = true;
        try {
            ExecutionGraph newExecutionGraph;
            ExecutionGraph currentExecutionGraph = this.executionGraph;
            CheckpointCoordinator checkpointCoordinator = currentExecutionGraph.getCheckpointCoordinator();
            if (checkpointCoordinator != null) {
                checkpointCoordinator.stopCheckpointScheduler();
            }
            JobManagerJobMetricGroup newJobManagerJobMetricGroup = this.jobMetricGroupFactory.create(newJobGraph);
            try {
                newExecutionGraph = this.createAndRestoreExecutionGraph(newJobGraph, newJobManagerJobMetricGroup);
            }
            catch (Exception e) {
                if (checkpointCoordinator != null && checkpointCoordinator.isPeriodicCheckpointingConfigured()) {
                    checkpointCoordinator.startCheckpointScheduler();
                }
                String msg = "Fail to create the new ExecutionGraph or restore states from the latest checkpoint.";
                this.log.warn("Fail to create the new ExecutionGraph or restore states from the latest checkpoint.");
                throw new JobModificationException("Fail to create the new ExecutionGraph or restore states from the latest checkpoint.", e);
            }
            this.jobGraph = newJobGraph;
            this.runAsync(() -> this.suspendExecutionGraph((Exception)((Object)new FlinkException("Suspend the old job to update it."))));
            CompletableFuture<JobStatus> terminationFuture = currentExecutionGraph.getTerminationFuture();
            CompletionStage suspendedFuture = terminationFuture.thenAccept(jobStatus -> {
                if (jobStatus != JobStatus.SUSPENDED) {
                    String msg = String.format("Job %s update failed because we could not suspend the execution graph.", this.jobGraph.getName());
                    this.log.warn(msg);
                    throw new CompletionException((Throwable)((Object)new JobModificationException(msg)));
                }
            });
            CompletionStage resumingFuture = ((CompletableFuture)suspendedFuture).thenAcceptAsync(ignore -> {
                if (this.executionGraph != currentExecutionGraph) {
                    String msg = "Detected unexpected concurrent modification of ExecutionGraph.";
                    this.log.warn("Detected unexpected concurrent modification of ExecutionGraph.");
                    throw new CompletionException((Throwable)((Object)new JobModificationException("Detected unexpected concurrent modification of ExecutionGraph.")));
                }
                this.clearExecutionGraphFields();
                Preconditions.checkState((boolean)this.executionGraph.getState().isTerminalState());
                this.optimizeNewExecutionGraph(newExecutionGraph, currentExecutionGraph);
                this.assignExecutionGraph(newExecutionGraph, newJobManagerJobMetricGroup);
                this.operationLogManager.clear();
                this.scheduleExecutionGraph();
            }, (Executor)this.getMainThreadExecutor());
            CompletionStage persistFuture = ((CompletableFuture)resumingFuture).thenApply(ignored -> {
                try {
                    this.submittedJobGraphStore.putJobGraph(new SubmittedJobGraph(newJobGraph, null));
                    return Acknowledge.get();
                }
                catch (Exception e) {
                    String msg = "Failed to store job graph of job " + newJobGraph.getJobID();
                    this.log.warn(msg);
                    throw new CompletionException((Throwable)((Object)new JobModificationException(msg)));
                }
            });
            ((CompletableFuture)persistFuture).whenComplete((ignored, throwable) -> {
                if (throwable != null) {
                    String msg = "Unexpected error happens in job update.";
                    this.log.error("Unexpected error happens in job update.", throwable);
                    RuntimeException unexpectedError = new RuntimeException("Unexpected error happens in job update.", (Throwable)throwable);
                    this.handleJobMasterError(unexpectedError);
                }
                this.inJobUpdate = false;
            });
            return persistFuture;
        }
        catch (Throwable t) {
            this.inJobUpdate = false;
            return FutureUtils.completedExceptionally(t);
        }
    }

    private void optimizeNewExecutionGraph(ExecutionGraph newExecutionGraph, ExecutionGraph oldExecutionGraph) {
        Map<JobVertexID, ExecutionJobVertex> oldVertices = oldExecutionGraph.getAllVertices();
        Map<JobVertexID, ExecutionJobVertex> newVertices = newExecutionGraph.getAllVertices();
        for (JobVertexID jobVertexID : newVertices.keySet()) {
            ExecutionJobVertex oldEjv = oldVertices.get((Object)jobVertexID);
            if (oldEjv == null) continue;
            ExecutionJobVertex newEjv = newVertices.get((Object)jobVertexID);
            if (oldEjv.getParallelism() != newEjv.getParallelism()) continue;
            for (int i = 0; i < oldEjv.getParallelism(); ++i) {
                newEjv.getTaskVertices()[i].setLatestPriorLocation(oldEjv.getTaskVertices()[i].getCurrentAssignedResourceLocation());
            }
        }
    }

    private void handleJobMasterError(Throwable cause) {
        if (ExceptionUtils.isJvmFatalError((Throwable)cause)) {
            this.log.error("Fatal error occurred on JobManager.", cause);
            this.fatalErrorHandler.onFatalError(cause);
        } else {
            this.jobCompletionActions.jobMasterFailed(cause);
        }
    }

    private void jobStatusChanged(JobStatus newJobStatus, long timestamp, @Nullable Throwable error) {
        this.validateRunsInMainThread();
        if (newJobStatus.isGloballyTerminalState()) {
            ArchivedExecutionGraph archivedExecutionGraph = ArchivedExecutionGraph.createFrom(this.executionGraph);
            this.operationLogManager.clear();
            this.scheduledExecutorService.execute(() -> this.jobCompletionActions.jobReachedGloballyTerminalState(archivedExecutionGraph));
        }
    }

    private void notifyOfNewResourceManagerLeader(String newResourceManagerAddress, ResourceManagerId resourceManagerId) {
        this.resourceManagerAddress = this.createResourceManagerAddress(newResourceManagerAddress, resourceManagerId);
        this.reconnectToResourceManager((Exception)((Object)new FlinkException(String.format("ResourceManager leader changed to new address %s", this.resourceManagerAddress))));
    }

    @Nullable
    private ResourceManagerAddress createResourceManagerAddress(@Nullable String newResourceManagerAddress, @Nullable ResourceManagerId resourceManagerId) {
        if (newResourceManagerAddress != null) {
            Preconditions.checkNotNull((Object)((Object)resourceManagerId));
            return new ResourceManagerAddress(newResourceManagerAddress, resourceManagerId);
        }
        return null;
    }

    private void reconnectToResourceManager(Exception cause) {
        this.closeResourceManagerConnection(cause);
        this.tryConnectToResourceManager();
    }

    private void tryConnectToResourceManager() {
        if (this.resourceManagerAddress != null) {
            this.connectToResourceManager();
        }
    }

    private void connectToResourceManager() {
        assert (this.resourceManagerAddress != null);
        assert (this.resourceManagerConnection == null);
        assert (this.establishedResourceManagerConnection == null);
        this.log.info("Connecting to ResourceManager {}", (Object)this.resourceManagerAddress);
        this.resourceManagerConnection = new ResourceManagerConnection(this.log, this.jobGraph.getJobID(), this.resourceId, this.getAddress(), (JobMasterId)((Object)this.getFencingToken()), this.resourceManagerAddress.getAddress(), this.resourceManagerAddress.getResourceManagerId(), this.scheduledExecutorService);
        this.resourceManagerConnection.start();
    }

    private void establishResourceManagerConnection(JobMasterRegistrationSuccess success) {
        ResourceManagerId resourceManagerId = success.getResourceManagerId();
        if (this.resourceManagerConnection != null && Objects.equals(this.resourceManagerConnection.getTargetLeaderId(), (Object)resourceManagerId)) {
            this.log.info("JobManager successfully registered at ResourceManager, leader id: {}.", (Object)resourceManagerId);
            final ResourceManagerGateway resourceManagerGateway = (ResourceManagerGateway)this.resourceManagerConnection.getTargetGateway();
            ResourceID resourceManagerResourceId = success.getResourceManagerResourceId();
            this.establishedResourceManagerConnection = new EstablishedResourceManagerConnection(resourceManagerGateway, resourceManagerResourceId);
            resourceManagerGateway.setPlacementConstraints(this.jobGraph.getJobID(), this.jobGraph.getPlacementConstraints(), this.rpcTimeout).thenAccept(ignored -> this.slotPoolGateway.connectToResourceManager(resourceManagerGateway));
            this.resourceManagerHeartbeatManager.monitorTarget(resourceManagerResourceId, new HeartbeatTarget<Void>(){

                @Override
                public void receiveHeartbeat(ResourceID resourceID, Void payload) {
                    resourceManagerGateway.heartbeatFromJobManager(resourceID);
                }

                @Override
                public void requestHeartbeat(ResourceID resourceID, Void payload) {
                }
            });
        } else {
            this.log.debug("Ignoring resource manager connection to {} because its a duplicate or outdated.", (Object)resourceManagerId);
        }
    }

    private void closeResourceManagerConnection(Exception cause) {
        if (this.establishedResourceManagerConnection != null) {
            this.dissolveResourceManagerConnection(this.establishedResourceManagerConnection, cause);
            this.establishedResourceManagerConnection = null;
        }
        if (this.resourceManagerConnection != null) {
            this.resourceManagerConnection.close();
            this.resourceManagerConnection = null;
        }
    }

    private void dissolveResourceManagerConnection(EstablishedResourceManagerConnection establishedResourceManagerConnection, Exception cause) {
        ResourceID resourceManagerResourceID = establishedResourceManagerConnection.getResourceManagerResourceID();
        if (this.log.isDebugEnabled()) {
            this.log.debug("Close ResourceManager connection {}.", (Object)resourceManagerResourceID, (Object)cause);
        } else {
            this.log.info("Close ResourceManager connection {}: {}.", (Object)resourceManagerResourceID, (Object)cause.getMessage());
        }
        this.resourceManagerHeartbeatManager.unmonitorTarget(resourceManagerResourceID);
        ResourceManagerGateway resourceManagerGateway = establishedResourceManagerConnection.getResourceManagerGateway();
        resourceManagerGateway.disconnectJobManager(this.jobGraph.getJobID(), cause);
        this.slotPoolGateway.disconnectResourceManager();
    }

    private CompletableFuture<ExecutionGraph> restoreExecutionGraphFromRescalingSavepoint(ExecutionGraph newExecutionGraph, CompletableFuture<String> savepointFuture) {
        return savepointFuture.thenApplyAsync(savepointPath -> {
            if (savepointPath != null) {
                try {
                    this.tryRestoreExecutionGraphFromSavepoint(newExecutionGraph, SavepointRestoreSettings.forPath(savepointPath, false));
                }
                catch (Exception e) {
                    String message = String.format("Could not restore from temporary rescaling savepoint. This might indicate that the savepoint %s got corrupted. Deleting this savepoint as a precaution.", savepointPath);
                    this.log.info(message);
                    CompletableFuture.runAsync(() -> {
                        if (savepointPath.equals(this.lastInternalSavepoint)) {
                            this.lastInternalSavepoint = null;
                        }
                    }, this.getMainThreadExecutor()).thenRunAsync(() -> this.disposeSavepoint((String)savepointPath), this.scheduledExecutorService);
                    throw new CompletionException((Throwable)((Object)new JobModificationException(message, e)));
                }
            }
            try {
                this.tryRestoreExecutionGraphFromSavepoint(newExecutionGraph, this.jobGraph.getSavepointRestoreSettings());
            }
            catch (Exception e) {
                String message = String.format("Could not restore from initial savepoint. This might indicate that the savepoint %s got corrupted.", this.jobGraph.getSavepointRestoreSettings().getRestorePath());
                this.log.info(message);
                throw new CompletionException((Throwable)((Object)new JobModificationException(message, e)));
            }
            return newExecutionGraph;
        }, (Executor)this.scheduledExecutorService);
    }

    private CompletableFuture<String> getJobModificationSavepoint(Time timeout) {
        return this.triggerSavepoint(null, false, timeout).handleAsync((savepointPath, throwable) -> {
            if (throwable != null) {
                Throwable strippedThrowable = ExceptionUtils.stripCompletionException((Throwable)throwable);
                if (strippedThrowable instanceof CheckpointTriggerException) {
                    CheckpointTriggerException checkpointTriggerException = (CheckpointTriggerException)((Object)((Object)strippedThrowable));
                    if (checkpointTriggerException.getCheckpointDeclineReason() == CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING) {
                        return this.lastInternalSavepoint;
                    }
                    throw new CompletionException((Throwable)((Object)checkpointTriggerException));
                }
                throw new CompletionException(strippedThrowable);
            }
            String savepointToDispose = this.lastInternalSavepoint;
            this.lastInternalSavepoint = savepointPath;
            if (savepointToDispose != null) {
                CompletableFuture.runAsync(() -> this.disposeSavepoint(savepointToDispose), this.scheduledExecutorService);
            }
            return this.lastInternalSavepoint;
        }, (Executor)this.getMainThreadExecutor());
    }

    private void rescaleJobGraph(Collection<JobVertexID> operators, int newParallelism, RescalingBehaviour rescalingBehaviour) throws FlinkException {
        for (JobVertexID jobVertexId : operators) {
            JobVertex jobVertex = this.jobGraph.findVertexByID(jobVertexId);
            ExecutionJobVertex executionJobVertex = this.executionGraph.getJobVertex(jobVertexId);
            if (executionJobVertex != null) {
                jobVertex.setMaxParallelism(executionJobVertex.getMaxParallelism());
            }
            rescalingBehaviour.acceptWithException(jobVertex, newParallelism);
        }
    }

    @VisibleForTesting
    public ExecutionGraph getExecutionGraph() {
        return this.executionGraph;
    }

    @VisibleForTesting
    SlotPool getSlotPool() {
        return this.slotPool;
    }

    private class ResourceManagerHeartbeatListener
    implements HeartbeatListener<Void, Void> {
        private ResourceManagerHeartbeatListener() {
        }

        @Override
        public void notifyHeartbeatTimeout(ResourceID resourceId) {
            JobMaster.this.runAsync(() -> {
                JobMaster.this.log.info("The heartbeat of ResourceManager with id {} timed out.", (Object)resourceId);
                if (JobMaster.this.establishedResourceManagerConnection != null && JobMaster.this.establishedResourceManagerConnection.getResourceManagerResourceID().equals(resourceId)) {
                    JobMaster.this.reconnectToResourceManager((Exception)((Object)new JobMasterException(String.format("The heartbeat of ResourceManager with id %s timed out.", resourceId))));
                }
            });
        }

        @Override
        public void reportPayload(ResourceID resourceID, Void payload) {
        }

        @Override
        public CompletableFuture<Void> retrievePayload(ResourceID resourceID) {
            return CompletableFuture.completedFuture(null);
        }
    }

    private class TaskManagerHeartbeatListener
    implements HeartbeatListener<AccumulatorReport, Void> {
        private final JobMasterGateway jobMasterGateway;

        private TaskManagerHeartbeatListener(JobMasterGateway jobMasterGateway) {
            this.jobMasterGateway = (JobMasterGateway)Preconditions.checkNotNull((Object)jobMasterGateway);
        }

        @Override
        public void notifyHeartbeatTimeout(ResourceID resourceID) {
            this.jobMasterGateway.disconnectTaskManager(resourceID, new TimeoutException("Heartbeat of TaskManager with id " + resourceID + " timed out."));
        }

        @Override
        public void reportPayload(ResourceID resourceID, AccumulatorReport payload) {
            for (AccumulatorSnapshot snapshot : payload.getAccumulatorSnapshots()) {
                JobMaster.this.executionGraph.updateAccumulators(snapshot);
            }
        }

        @Override
        public CompletableFuture<Void> retrievePayload(ResourceID resourceID) {
            return CompletableFuture.completedFuture(null);
        }
    }

    private class JobManagerJobStatusListener
    implements JobStatusListener {
        private volatile boolean running = true;

        private JobManagerJobStatusListener() {
        }

        @Override
        public void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long timestamp, Throwable error) {
            if (this.running) {
                JobMaster.this.runAsync(() -> JobMaster.this.jobStatusChanged(newJobStatus, timestamp, error));
            }
        }

        private void stop() {
            this.running = false;
        }
    }

    private class ResourceManagerConnection
    extends RegisteredRpcConnection<ResourceManagerId, ResourceManagerGateway, JobMasterRegistrationSuccess> {
        private final JobID jobID;
        private final ResourceID jobManagerResourceID;
        private final String jobManagerRpcAddress;
        private final JobMasterId jobMasterId;

        ResourceManagerConnection(Logger log, JobID jobID, ResourceID jobManagerResourceID, String jobManagerRpcAddress, JobMasterId jobMasterId, String resourceManagerAddress, ResourceManagerId resourceManagerId, Executor executor) {
            super(log, resourceManagerAddress, resourceManagerId, executor);
            this.jobID = (JobID)Preconditions.checkNotNull((Object)jobID);
            this.jobManagerResourceID = (ResourceID)Preconditions.checkNotNull((Object)jobManagerResourceID);
            this.jobManagerRpcAddress = (String)Preconditions.checkNotNull((Object)jobManagerRpcAddress);
            this.jobMasterId = (JobMasterId)((Object)Preconditions.checkNotNull((Object)((Object)jobMasterId)));
        }

        @Override
        protected RetryingRegistration<ResourceManagerId, ResourceManagerGateway, JobMasterRegistrationSuccess> generateRegistration() {
            return new RetryingRegistration<ResourceManagerId, ResourceManagerGateway, JobMasterRegistrationSuccess>(this.log, JobMaster.this.getRpcService(), "ResourceManager", ResourceManagerGateway.class, this.getTargetAddress(), (ResourceManagerId)((Object)this.getTargetLeaderId())){

                @Override
                protected CompletableFuture<RegistrationResponse> invokeRegistration(ResourceManagerGateway gateway, ResourceManagerId fencingToken, long timeoutMillis) {
                    Time timeout = Time.milliseconds((long)timeoutMillis);
                    return gateway.registerJobManager(ResourceManagerConnection.this.jobMasterId, ResourceManagerConnection.this.jobManagerResourceID, ResourceManagerConnection.this.jobManagerRpcAddress, ResourceManagerConnection.this.jobID, timeout);
                }
            };
        }

        @Override
        protected void onRegistrationSuccess(JobMasterRegistrationSuccess success) {
            JobMaster.this.runAsync(() -> {
                if (this == JobMaster.this.resourceManagerConnection) {
                    JobMaster.this.establishResourceManagerConnection(success);
                }
            });
        }

        @Override
        protected void onRegistrationFailure(Throwable failure) {
            JobMaster.this.handleJobMasterError(failure);
        }
    }

    private class ResourceManagerLeaderListener
    implements LeaderRetrievalListener {
        private ResourceManagerLeaderListener() {
        }

        @Override
        public void notifyLeaderAddress(String leaderAddress, UUID leaderSessionID) {
            JobMaster.this.runAsync(() -> JobMaster.this.notifyOfNewResourceManagerLeader(leaderAddress, ResourceManagerId.fromUuidOrNull(leaderSessionID)));
        }

        @Override
        public void handleError(Exception exception) {
            JobMaster.this.handleJobMasterError(new Exception("Fatal error in the ResourceManager leader service", exception));
        }
    }
}

