package org.apache.flink.runtime.jobmaster;

import java.io.IOException;
import java.io.Serializable;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.queryablestate.KvStateID;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.StoppingException;
import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
import org.apache.flink.runtime.checkpoint.CheckpointDeclineReason;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CheckpointTriggerException;
import org.apache.flink.runtime.checkpoint.Checkpoints;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.deployment.ResultPartitionLocationTrackerProxy;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.SuppressRestartsException;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.IntermediateResult;
import org.apache.flink.runtime.executiongraph.JobStatusListener;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
import org.apache.flink.runtime.heartbeat.HeartbeatListener;
import org.apache.flink.runtime.heartbeat.HeartbeatManager;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.heartbeat.HeartbeatTarget;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.jobmanager.OnCompletionActions;
import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
import org.apache.flink.runtime.jobmanager.SubmittedJobGraph;
import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.jobmaster.exceptions.JobModificationException;
import org.apache.flink.runtime.jobmaster.factories.JobManagerJobMetricGroupFactory;
import org.apache.flink.runtime.jobmaster.failover.OperationLogManager;
import org.apache.flink.runtime.jobmaster.failover.OperationLogStoreLoader;
import org.apache.flink.runtime.jobmaster.failover.OperationLogType;
import org.apache.flink.runtime.jobmaster.message.ClassloadingProps;
import org.apache.flink.runtime.jobmaster.message.PendingSlotRequest;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolFactory;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolGateway;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
import org.apache.flink.runtime.metrics.scope.ScopeFormat;
import org.apache.flink.runtime.preaggregatedaccumulators.AccumulatorAggregationCoordinator;
import org.apache.flink.runtime.preaggregatedaccumulators.CommitAccumulator;
import org.apache.flink.runtime.query.KvStateLocation;
import org.apache.flink.runtime.query.UnknownKvStateLocation;
import org.apache.flink.runtime.registration.RegisteredRpcConnection;
import org.apache.flink.runtime.registration.RegistrationResponse;
import org.apache.flink.runtime.registration.RetryingRegistration;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker;
import org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStatsResponse;
import org.apache.flink.runtime.rest.messages.job.JobPendingSlotRequestDetail;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.FencedRpcEndpoint;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcTimeout;
import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
import org.apache.flink.runtime.schedule.GraphManagerPlugin;
import org.apache.flink.runtime.schedule.GraphManagerPluginFactory;
import org.apache.flink.runtime.schedule.SchedulingConfig;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.taskexecutor.AccumulatorReport;
import org.apache.flink.runtime.taskexecutor.TaskExecutionStatus;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TaskExecutorReportResponse;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.update.JobUpdateRequest;
import org.apache.flink.runtime.update.action.JobGraphReplaceAction;
import org.apache.flink.runtime.update.action.JobGraphUpdateAction;
import org.apache.flink.runtime.update.action.JobUpdateAction;
import org.apache.flink.runtime.util.EvictingBoundedList;
import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/flink/runtime/jobmaster/JobMaster.class */
public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMasterGateway {
    public static final String JOB_MANAGER_NAME = "jobmanager";
    public static final String ARCHIVE_NAME = "archive";
    private final JobMasterConfiguration jobMasterConfiguration;
    private final ResourceID resourceId;
    private JobGraph jobGraph;
    private final Time rpcTimeout;
    private final HighAvailabilityServices highAvailabilityServices;
    private final BlobServer blobServer;
    private final JobManagerJobMetricGroupFactory jobMetricGroupFactory;
    private final AccumulatorAggregationCoordinator accumulatorAggregationCoordinator;
    private final HeartbeatManager<AccumulatorReport, Void> taskManagerHeartbeatManager;
    private final HeartbeatManager<Void, Void> resourceManagerHeartbeatManager;
    private final ScheduledExecutorService scheduledExecutorService;
    private final OnCompletionActions jobCompletionActions;
    private final FatalErrorHandler fatalErrorHandler;
    private final ClassLoader userCodeLoader;
    private final SubmittedJobGraphStore submittedJobGraphStore;
    private final SlotPool slotPool;
    private final SlotPoolGateway slotPoolGateway;
    private final RestartStrategy restartStrategy;
    private final ResultPartitionLocationTrackerProxy resultPartitionLocationTrackerProxy;
    private final OperationLogManager operationLogManager;
    private final BackPressureStatsTracker backPressureStatsTracker;
    private final LeaderRetrievalService resourceManagerLeaderRetriever;
    private final Map<ResourceID, Tuple2<TaskManagerLocation, TaskExecutorGateway>> registeredTaskManagers;
    private ExecutionGraph executionGraph;
    private boolean inJobUpdate;
    private final EvictingBoundedList<ArchivedExecutionGraph> executionGraphHistories;
    private GraphManager graphManager;

    @Nullable
    private JobManagerJobStatusListener jobStatusListener;

    @Nullable
    private JobManagerJobMetricGroup jobManagerJobMetricGroup;

    @Nullable
    private String lastInternalSavepoint;

    @Nullable
    private ResourceManagerAddress resourceManagerAddress;

    @Nullable
    private ResourceManagerConnection resourceManagerConnection;

    @Nullable
    private EstablishedResourceManagerConnection establishedResourceManagerConnection;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/jobmaster/JobMaster$JobManagerJobStatusListener.class */
    public class JobManagerJobStatusListener implements JobStatusListener {
        private volatile boolean running;

        private JobManagerJobStatusListener() {
            this.running = true;
        }

        @Override // org.apache.flink.runtime.executiongraph.JobStatusListener
        public void jobStatusChanges(JobID jobID, JobStatus jobStatus, long j, Throwable th) {
            if (this.running) {
                JobMaster.this.runAsync(() -> {
                    JobMaster.this.jobStatusChanged(jobStatus, j, th);
                });
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void stop() {
            this.running = false;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/jobmaster/JobMaster$ResourceManagerConnection.class */
    public class ResourceManagerConnection extends RegisteredRpcConnection<ResourceManagerId, ResourceManagerGateway, JobMasterRegistrationSuccess> {
        private final JobID jobID;
        private final ResourceID jobManagerResourceID;
        private final String jobManagerRpcAddress;
        private final JobMasterId jobMasterId;

        /* JADX WARN: Multi-variable type inference failed */
        ResourceManagerConnection(Logger logger, JobID jobID, ResourceID resourceID, String str, JobMasterId jobMasterId, String str2, ResourceManagerId resourceManagerId, Executor executor) {
            super(logger, str2, resourceManagerId, executor);
            this.jobID = (JobID) Preconditions.checkNotNull(jobID);
            this.jobManagerResourceID = (ResourceID) Preconditions.checkNotNull(resourceID);
            this.jobManagerRpcAddress = (String) Preconditions.checkNotNull(str);
            this.jobMasterId = (JobMasterId) Preconditions.checkNotNull(jobMasterId);
        }

        @Override // org.apache.flink.runtime.registration.RegisteredRpcConnection
        protected RetryingRegistration<ResourceManagerId, ResourceManagerGateway, JobMasterRegistrationSuccess> generateRegistration() {
            return new RetryingRegistration<ResourceManagerId, ResourceManagerGateway, JobMasterRegistrationSuccess>(this.log, JobMaster.this.getRpcService(), "ResourceManager", ResourceManagerGateway.class, getTargetAddress(), getTargetLeaderId()) { // from class: org.apache.flink.runtime.jobmaster.JobMaster.ResourceManagerConnection.1
                /* JADX INFO: Access modifiers changed from: protected */
                @Override // org.apache.flink.runtime.registration.RetryingRegistration
                public CompletableFuture<RegistrationResponse> invokeRegistration(ResourceManagerGateway resourceManagerGateway, ResourceManagerId resourceManagerId, long j) {
                    return resourceManagerGateway.registerJobManager(ResourceManagerConnection.this.jobMasterId, ResourceManagerConnection.this.jobManagerResourceID, ResourceManagerConnection.this.jobManagerRpcAddress, ResourceManagerConnection.this.jobID, Time.milliseconds(j));
                }
            };
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // org.apache.flink.runtime.registration.RegisteredRpcConnection
        public void onRegistrationSuccess(JobMasterRegistrationSuccess jobMasterRegistrationSuccess) {
            JobMaster.this.runAsync(() -> {
                if (this == JobMaster.this.resourceManagerConnection) {
                    JobMaster.this.establishResourceManagerConnection(jobMasterRegistrationSuccess);
                }
            });
        }

        @Override // org.apache.flink.runtime.registration.RegisteredRpcConnection
        protected void onRegistrationFailure(Throwable th) {
            JobMaster.this.handleJobMasterError(th);
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/jobmaster/JobMaster$ResourceManagerHeartbeatListener.class */
    private class ResourceManagerHeartbeatListener implements HeartbeatListener<Void, Void> {
        private ResourceManagerHeartbeatListener() {
        }

        @Override // org.apache.flink.runtime.heartbeat.HeartbeatListener
        public void notifyHeartbeatTimeout(ResourceID resourceID) {
            JobMaster.this.runAsync(() -> {
                JobMaster.this.log.info("The heartbeat of ResourceManager with id {} timed out.", resourceID);
                if (JobMaster.this.establishedResourceManagerConnection == null || !JobMaster.this.establishedResourceManagerConnection.getResourceManagerResourceID().equals(resourceID)) {
                    return;
                }
                JobMaster.this.reconnectToResourceManager(new JobMasterException(String.format("The heartbeat of ResourceManager with id %s timed out.", resourceID)));
            });
        }

        @Override // org.apache.flink.runtime.heartbeat.HeartbeatListener
        public void reportPayload(ResourceID resourceID, Void r3) {
        }

        @Override // org.apache.flink.runtime.heartbeat.HeartbeatListener
        public CompletableFuture<Void> retrievePayload(ResourceID resourceID) {
            return CompletableFuture.completedFuture(null);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/jobmaster/JobMaster$ResourceManagerLeaderListener.class */
    public class ResourceManagerLeaderListener implements LeaderRetrievalListener {
        private ResourceManagerLeaderListener() {
        }

        @Override // org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener
        public void notifyLeaderAddress(String str, UUID uuid) {
            JobMaster.this.runAsync(() -> {
                JobMaster.this.notifyOfNewResourceManagerLeader(str, ResourceManagerId.fromUuidOrNull(uuid));
            });
        }

        @Override // org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener
        public void handleError(Exception exc) {
            JobMaster.this.handleJobMasterError(new Exception("Fatal error in the ResourceManager leader service", exc));
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/jobmaster/JobMaster$TaskManagerHeartbeatListener.class */
    private class TaskManagerHeartbeatListener implements HeartbeatListener<AccumulatorReport, Void> {
        private final JobMasterGateway jobMasterGateway;

        private TaskManagerHeartbeatListener(JobMasterGateway jobMasterGateway) {
            this.jobMasterGateway = (JobMasterGateway) Preconditions.checkNotNull(jobMasterGateway);
        }

        @Override // org.apache.flink.runtime.heartbeat.HeartbeatListener
        public void notifyHeartbeatTimeout(ResourceID resourceID) {
            this.jobMasterGateway.disconnectTaskManager(resourceID, new TimeoutException("Heartbeat of TaskManager with id " + resourceID + " timed out."));
        }

        @Override // org.apache.flink.runtime.heartbeat.HeartbeatListener
        public void reportPayload(ResourceID resourceID, AccumulatorReport accumulatorReport) {
            Iterator<AccumulatorSnapshot> it = accumulatorReport.getAccumulatorSnapshots().iterator();
            while (it.hasNext()) {
                JobMaster.this.executionGraph.updateAccumulators(it.next());
            }
        }

        @Override // org.apache.flink.runtime.heartbeat.HeartbeatListener
        public CompletableFuture<Void> retrievePayload(ResourceID resourceID) {
            return CompletableFuture.completedFuture(null);
        }
    }

    public JobMaster(RpcService rpcService, JobMasterConfiguration jobMasterConfiguration, ResourceID resourceID, JobGraph jobGraph, HighAvailabilityServices highAvailabilityServices, SlotPoolFactory slotPoolFactory, JobManagerSharedServices jobManagerSharedServices, HeartbeatServices heartbeatServices, BlobServer blobServer, JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory, OnCompletionActions onCompletionActions, FatalErrorHandler fatalErrorHandler, ClassLoader classLoader, SubmittedJobGraphStore submittedJobGraphStore) throws Exception {
        super(rpcService, AkkaRpcServiceUtils.createRandomName("jobmanager"));
        this.inJobUpdate = false;
        JobMasterGateway jobMasterGateway = (JobMasterGateway) getSelfGateway(JobMasterGateway.class);
        this.jobMasterConfiguration = (JobMasterConfiguration) Preconditions.checkNotNull(jobMasterConfiguration);
        this.resourceId = (ResourceID) Preconditions.checkNotNull(resourceID);
        this.jobGraph = (JobGraph) Preconditions.checkNotNull(jobGraph);
        this.rpcTimeout = jobMasterConfiguration.getRpcTimeout();
        this.highAvailabilityServices = (HighAvailabilityServices) Preconditions.checkNotNull(highAvailabilityServices);
        this.blobServer = (BlobServer) Preconditions.checkNotNull(blobServer);
        this.scheduledExecutorService = jobManagerSharedServices.getScheduledExecutorService();
        this.jobCompletionActions = (OnCompletionActions) Preconditions.checkNotNull(onCompletionActions);
        this.fatalErrorHandler = (FatalErrorHandler) Preconditions.checkNotNull(fatalErrorHandler);
        this.userCodeLoader = (ClassLoader) Preconditions.checkNotNull(classLoader);
        this.jobMetricGroupFactory = (JobManagerJobMetricGroupFactory) Preconditions.checkNotNull(jobManagerJobMetricGroupFactory);
        this.submittedJobGraphStore = (SubmittedJobGraphStore) Preconditions.checkNotNull(submittedJobGraphStore);
        this.accumulatorAggregationCoordinator = new AccumulatorAggregationCoordinator();
        this.taskManagerHeartbeatManager = heartbeatServices.createHeartbeatManagerSender(resourceID, new TaskManagerHeartbeatListener(jobMasterGateway), rpcService.getScheduledExecutor(), this.log);
        this.resourceManagerHeartbeatManager = heartbeatServices.createHeartbeatManager(resourceID, new ResourceManagerHeartbeatListener(), rpcService.getScheduledExecutor(), this.log);
        String name = jobGraph.getName();
        JobID jobID = jobGraph.getJobID();
        this.log.info("Initializing job {} ({}).", name, jobID);
        RestartStrategies.RestartStrategyConfiguration restartStrategy = ((ExecutionConfig) jobGraph.getSerializedExecutionConfig().deserializeValue(classLoader)).getRestartStrategy();
        this.restartStrategy = restartStrategy != null ? RestartStrategyFactory.createRestartStrategy(restartStrategy) : jobManagerSharedServices.getRestartStrategyFactory().createRestartStrategy();
        this.resultPartitionLocationTrackerProxy = new ResultPartitionLocationTrackerProxy(jobMasterConfiguration.getConfiguration());
        this.log.info("Using restart strategy {} for {} ({}).", new Object[]{this.restartStrategy, name, jobID});
        this.resourceManagerLeaderRetriever = this.highAvailabilityServices.getResourceManagerLeaderRetriever();
        this.slotPool = ((SlotPoolFactory) Preconditions.checkNotNull(slotPoolFactory)).createSlotPool(jobGraph.getJobID());
        this.slotPoolGateway = (SlotPoolGateway) this.slotPool.getSelfGateway(SlotPoolGateway.class);
        this.registeredTaskManagers = new HashMap(4);
        this.backPressureStatsTracker = (BackPressureStatsTracker) Preconditions.checkNotNull(jobManagerSharedServices.getBackPressureStatsTracker());
        this.lastInternalSavepoint = null;
        this.operationLogManager = new OperationLogManager(OperationLogStoreLoader.loadOperationLogStore(jobGraph.getJobID(), jobMasterConfiguration.getConfiguration()));
        this.jobStatusListener = null;
        this.resourceManagerConnection = null;
        this.establishedResourceManagerConnection = null;
        JobManagerJobMetricGroup create = jobManagerJobMetricGroupFactory.create(jobGraph);
        ExecutionGraph createAndRestoreExecutionGraph = createAndRestoreExecutionGraph(jobGraph, create);
        this.executionGraphHistories = new EvictingBoundedList<>(jobMasterConfiguration.getConfiguration().getInteger(JobManagerOptions.MAX_EXECUTION_GRAPH_HISTORY_SIZE));
        assignExecutionGraph(createAndRestoreExecutionGraph, create);
    }

    @Override // org.apache.flink.runtime.rpc.RpcEndpoint
    public void start() {
        throw new UnsupportedOperationException("Should never call start() without leader ID");
    }

    public CompletableFuture<Acknowledge> start(JobMasterId jobMasterId, Time time) throws Exception {
        super.start();
        return callAsyncWithoutFencing(() -> {
            return startJobExecution(jobMasterId);
        }, time);
    }

    public CompletableFuture<Acknowledge> suspend(Exception exc, Time time) {
        CompletableFuture callAsyncWithoutFencing = callAsyncWithoutFencing(() -> {
            return suspendExecution(exc);
        }, time);
        stop();
        return callAsyncWithoutFencing;
    }

    public void reconcile() throws Exception {
        if (this.executionGraph.getState() != JobStatus.CREATED) {
            clearExecutionGraphFields();
            JobManagerJobMetricGroup create = this.jobMetricGroupFactory.create(this.jobGraph);
            assignExecutionGraph(createAndRestoreExecutionGraph(this.jobGraph, create), create);
        }
        this.graphManager.enterReconcile();
        try {
            this.operationLogManager.replay();
            this.log.info("Job master replay log finish.");
            this.operationLogManager.start();
            this.executionGraph.reconcile();
        } catch (Throwable th) {
            this.log.warn("Fail to replay log for {}, will start the job as a new one.", this.executionGraph.getJobID(), th);
            this.operationLogManager.clear();
            clearExecutionGraphFields();
            JobManagerJobMetricGroup create2 = this.jobMetricGroupFactory.create(this.jobGraph);
            assignExecutionGraph(createAndRestoreExecutionGraph(this.jobGraph, create2), create2);
        }
    }

    @Override // org.apache.flink.runtime.rpc.RpcEndpoint
    public CompletableFuture<Void> postStop() {
        this.log.info("Stopping the JobMaster for job {}({}).", this.jobGraph.getName(), this.jobGraph.getJobID());
        if (this.graphManager != null) {
            this.graphManager.dispose();
        }
        HashSet hashSet = new HashSet(this.registeredTaskManagers.keySet());
        FlinkException flinkException = new FlinkException("Stopping JobMaster for job " + this.jobGraph.getName() + '(' + this.jobGraph.getJobID() + ").");
        Iterator it = hashSet.iterator();
        while (it.hasNext()) {
            disconnectTaskManager((ResourceID) it.next(), flinkException);
        }
        this.taskManagerHeartbeatManager.stop();
        this.resourceManagerHeartbeatManager.stop();
        suspendExecution(new FlinkException("JobManager is shutting down."));
        this.slotPool.shutDown();
        CompletableFuture<Void> runAsync = this.lastInternalSavepoint != null ? CompletableFuture.runAsync(() -> {
            disposeSavepoint(this.lastInternalSavepoint);
        }) : CompletableFuture.completedFuture(null);
        this.accumulatorAggregationCoordinator.clear();
        return FutureUtils.completeAll(Arrays.asList(runAsync, this.slotPool.getTerminationFuture()));
    }

    @Override // org.apache.flink.runtime.jobmaster.JobMasterGateway
    public CompletableFuture<Acknowledge> cancel(Time time) {
        this.executionGraph.cancel();
        return CompletableFuture.completedFuture(Acknowledge.get());
    }

    @Override // org.apache.flink.runtime.jobmaster.JobMasterGateway
    public CompletableFuture<Acknowledge> stop(Time time) {
        try {
            this.executionGraph.stop();
            return CompletableFuture.completedFuture(Acknowledge.get());
        } catch (StoppingException e) {
            return FutureUtils.completedExceptionally(e);
        }
    }

    @Override // org.apache.flink.runtime.jobmaster.JobMasterGateway
    public CompletableFuture<JobGraph> requestJobGraph(Time time) {
        this.log.debug("Requesting job graph {}.", this.jobGraph);
        return CompletableFuture.completedFuture(this.jobGraph);
    }

    @Override // org.apache.flink.runtime.jobmaster.JobMasterGateway
    public CompletableFuture<Acknowledge> updateJob(JobUpdateRequest jobUpdateRequest, Time time) {
        try {
            JobGraph jobGraph = (JobGraph) InstantiationUtil.clone(this.jobGraph);
            for (JobUpdateAction jobUpdateAction : jobUpdateRequest.getJobUpdateActions()) {
                if (jobUpdateAction instanceof JobGraphUpdateAction) {
                    ((JobGraphUpdateAction) jobUpdateAction).updateJobGraph(jobGraph);
                } else {
                    if (!(jobUpdateAction instanceof JobGraphReplaceAction)) {
                        return FutureUtils.completedExceptionally(new IllegalArgumentException("Unknown job update action: " + jobUpdateAction));
                    }
                    jobGraph = ((JobGraphReplaceAction) jobUpdateAction).getNewJobGraph();
                    jobGraph.setJobVersion(this.jobGraph.getJobVersion());
                }
            }
            jobGraph.setJobVersion(jobGraph.getJobVersion() + 1);
            if (jobUpdateRequest.shouldTriggerJobReload()) {
                this.log.debug("Update original job graph {} with new job graph {}.", this.jobGraph, jobGraph);
                return updateJob(jobGraph);
            }
            try {
                this.jobGraph = jobGraph;
                this.log.info("Persisting the new JobGraph...");
                this.submittedJobGraphStore.putJobGraph(new SubmittedJobGraph(jobGraph, null));
                this.log.info("Job update finished successfully without restarting the job.");
                return CompletableFuture.completedFuture(Acknowledge.get());
            } catch (Throwable th) {
                String str = "Failed to persist the new job graph of job " + jobGraph.getJobID();
                this.log.warn(str, th);
                return FutureUtils.completedExceptionally(new JobModificationException(str, th));
            }
        } catch (Exception e) {
            return FutureUtils.completedExceptionally(new JobModificationException("Failed to make a copy of current job graph " + this.jobGraph.getJobID(), e));
        }
    }

    @Override // org.apache.flink.runtime.jobmaster.JobMasterGateway
    public CompletableFuture<Acknowledge> rescaleJob(int i, RescalingBehaviour rescalingBehaviour, Time time) {
        ArrayList arrayList = new ArrayList(this.jobGraph.getNumberOfVertices());
        Iterator<JobVertex> it = this.jobGraph.getVertices().iterator();
        while (it.hasNext()) {
            arrayList.add(it.next().getID());
        }
        return rescaleOperators(arrayList, i, rescalingBehaviour, time);
    }

    @Override // org.apache.flink.runtime.jobmaster.JobMasterGateway
    public CompletableFuture<Acknowledge> rescaleOperators(Collection<JobVertexID> collection, int i, RescalingBehaviour rescalingBehaviour, Time time) {
        if (i <= 0) {
            return FutureUtils.completedExceptionally(new JobModificationException("The target parallelism of a rescaling operation must be larger than 0."));
        }
        try {
            rescaleJobGraph(collection, i, rescalingBehaviour);
            ExecutionGraph executionGraph = this.executionGraph;
            JobManagerJobMetricGroup create = this.jobMetricGroupFactory.create(this.jobGraph);
            try {
                ExecutionGraph createExecutionGraph = createExecutionGraph(this.jobGraph, create);
                CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
                checkpointCoordinator.stopCheckpointScheduler();
                CompletableFuture<U> handleAsync = restoreExecutionGraphFromRescalingSavepoint(createExecutionGraph, getJobModificationSavepoint(time)).handleAsync((executionGraph2, th) -> {
                    if (th == null) {
                        return executionGraph2;
                    }
                    if (checkpointCoordinator.isPeriodicCheckpointingConfigured()) {
                        checkpointCoordinator.startCheckpointScheduler();
                    }
                    throw new CompletionException(ExceptionUtils.stripCompletionException(th));
                }, (Executor) getMainThreadExecutor());
                CompletableFuture thenCombineAsync = handleAsync.thenComposeAsync((Function<? super U, ? extends CompletionStage<U>>) executionGraph3 -> {
                    suspendExecutionGraph(new FlinkException("Job is being rescaled."));
                    return executionGraph.getTerminationFuture();
                }, (Executor) getMainThreadExecutor()).thenAccept(jobStatus -> {
                    if (jobStatus != JobStatus.SUSPENDED) {
                        String format = String.format("Job %s rescaling failed because we could not suspend the execution graph.", this.jobGraph.getName());
                        this.log.info(format);
                        throw new CompletionException((Throwable) new JobModificationException(format));
                    }
                }).thenCombineAsync((CompletionStage) handleAsync, (r9, executionGraph4) -> {
                    if (this.executionGraph != executionGraph) {
                        throw new CompletionException((Throwable) new JobModificationException("Detected concurrent modification of ExecutionGraph. Aborting the rescaling."));
                    }
                    clearExecutionGraphFields();
                    Preconditions.checkState(this.executionGraph.getState().isTerminalState());
                    assignExecutionGraph(executionGraph4, create);
                    this.operationLogManager.clear();
                    scheduleExecutionGraph();
                    return Acknowledge.get();
                }, (Executor) getMainThreadExecutor());
                thenCombineAsync.whenComplete((acknowledge, th2) -> {
                    if (th2 != null) {
                        createExecutionGraph.failGlobal(new SuppressRestartsException(new FlinkException(String.format("Failed to rescale the job %s.", this.jobGraph.getJobID()), th2)));
                    }
                });
                return thenCombineAsync;
            } catch (JobException | JobExecutionException e) {
                return FutureUtils.completedExceptionally(new JobModificationException("Could not create rescaled ExecutionGraph.", e));
            }
        } catch (FlinkException e2) {
            String format = String.format("Cannot rescale job %s.", this.jobGraph.getName());
            this.log.info(format, e2);
            return FutureUtils.completedExceptionally(new JobModificationException(format, e2));
        }
    }

    @Override // org.apache.flink.runtime.jobmaster.JobMasterGateway
    public CompletableFuture<Acknowledge> updateTaskExecutionState(TaskExecutionState taskExecutionState) {
        Preconditions.checkNotNull(taskExecutionState, "taskExecutionState");
        return this.graphManager.isReconciling() ? FutureUtils.completedExceptionally(new JobMasterException("The job master is reconciling with task executors.")) : this.executionGraph.updateState(taskExecutionState) ? CompletableFuture.completedFuture(Acknowledge.get()) : FutureUtils.completedExceptionally(new ExecutionGraphException("The execution attempt " + taskExecutionState.getID() + " was not found."));
    }

    @Override // org.apache.flink.runtime.jobmaster.JobMasterGateway
    public CompletableFuture<SerializedInputSplit> requestNextInputSplit(JobVertexID jobVertexID, OperatorID operatorID, ExecutionAttemptID executionAttemptID) {
        Execution execution = this.executionGraph.getRegisteredExecutions().get(executionAttemptID);
        if (execution == null) {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Can not find Execution for attempt {}.", executionAttemptID);
            }
            return FutureUtils.completedExceptionally(new Exception("Can not find Execution for attempt " + executionAttemptID));
        }
        ExecutionJobVertex jobVertex = this.executionGraph.getJobVertex(jobVertexID);
        if (jobVertex == null) {
            this.log.error("Cannot find execution vertex for vertex ID {}.", jobVertexID);
            return FutureUtils.completedExceptionally(new Exception("Cannot find execution vertex for vertex ID " + jobVertexID));
        }
        ExecutionVertex vertex = execution.getVertex();
        InputSplit nextInputSplitFromAssgined = vertex.getNextInputSplitFromAssgined(operatorID);
        if (nextInputSplitFromAssgined == null && !vertex.isOverInputSplitsLimit(operatorID)) {
            InputSplitAssigner splitAssigner = jobVertex.getSplitAssigner(operatorID);
            if (splitAssigner == null) {
                this.log.error("No InputSplitAssigner for vertexID {}, operatorID {}.", jobVertexID, operatorID);
                return FutureUtils.completedExceptionally(new Exception("No InputSplitAssigner for vertexID " + jobVertexID + ", operatorID " + operatorID));
            }
            LogicalSlot assignedResource = execution.getAssignedResource();
            nextInputSplitFromAssgined = splitAssigner.getNextInputSplit(assignedResource != null ? assignedResource.getTaskManagerLocation().getHostname() : null, execution.getVertex().getParallelSubtaskIndex());
            if (nextInputSplitFromAssgined != null) {
                vertex.inputSplitAssigned(operatorID, nextInputSplitFromAssgined);
            }
            if (this.log.isDebugEnabled()) {
                this.log.debug("Send next input split {}.", nextInputSplitFromAssgined);
            }
        }
        try {
            return CompletableFuture.completedFuture(new SerializedInputSplit(InstantiationUtil.serializeObject(nextInputSplitFromAssgined)));
        } catch (Exception e) {
            this.log.error("Could not serialize the next input split of class {}.", nextInputSplitFromAssgined.getClass(), e);
            IOException iOException = new IOException("Could not serialize the next input split of class " + nextInputSplitFromAssgined.getClass() + ScopeFormat.SCOPE_SEPARATOR, e);
            vertex.fail(iOException);
            return FutureUtils.completedExceptionally(iOException);
        }
    }

    @Override // org.apache.flink.runtime.jobmaster.JobMasterGateway
    public CompletableFuture<ExecutionState> requestPartitionState(IntermediateDataSetID intermediateDataSetID, ResultPartitionID resultPartitionID) {
        Execution execution = this.executionGraph.getRegisteredExecutions().get(resultPartitionID.getProducerId());
        if (execution != null) {
            return CompletableFuture.completedFuture(execution.getState());
        }
        IntermediateResult intermediateResult = this.executionGraph.getAllIntermediateResults().get(intermediateDataSetID);
        if (intermediateResult == null) {
            return FutureUtils.completedExceptionally(new IllegalArgumentException("Intermediate data set with ID " + intermediateDataSetID + " not found."));
        }
        Execution currentExecutionAttempt = intermediateResult.getPartitionById(resultPartitionID.getPartitionId()).getProducer().getCurrentExecutionAttempt();
        return currentExecutionAttempt.getAttemptId().equals(resultPartitionID.getProducerId()) ? CompletableFuture.completedFuture(currentExecutionAttempt.getState()) : FutureUtils.completedExceptionally(new PartitionProducerDisposedException(resultPartitionID));
    }

    @Override // org.apache.flink.runtime.jobmaster.JobMasterGateway
    public CompletableFuture<Acknowledge> scheduleOrUpdateConsumers(ResultPartitionID resultPartitionID, Time time) {
        if (this.graphManager.isReconciling()) {
            return FutureUtils.completedExceptionally(new JobMasterException("The job master is reconciling with task executors."));
        }
        try {
            this.executionGraph.scheduleOrUpdateConsumers(resultPartitionID);
            return CompletableFuture.completedFuture(Acknowledge.get());
        } catch (Exception e) {
            return FutureUtils.completedExceptionally(e);
        }
    }

    @Override // org.apache.flink.runtime.jobmaster.JobMasterGateway
    public CompletableFuture<Acknowledge> disconnectTaskManager(ResourceID resourceID, Exception exc) {
        this.log.info("Disconnect TaskExecutor {} because: {}", resourceID, exc.getMessage());
        this.taskManagerHeartbeatManager.unmonitorTarget(resourceID);
        CompletableFuture<Acknowledge> releaseTaskManager = this.slotPoolGateway.releaseTaskManager(resourceID, exc);
        Tuple2<TaskManagerLocation, TaskExecutorGateway> remove = this.registeredTaskManagers.remove(resourceID);
        if (remove != null) {
            ((TaskExecutorGateway) remove.f1).disconnectJobManager(this.jobGraph.getJobID(), exc);
        }
        return releaseTaskManager;
    }

    @Override // org.apache.flink.runtime.checkpoint.CheckpointCoordinatorGateway
    public void acknowledgeCheckpoint(JobID jobID, ExecutionAttemptID executionAttemptID, long j, CheckpointMetrics checkpointMetrics, TaskStateSnapshot taskStateSnapshot) {
        CheckpointCoordinator checkpointCoordinator = this.executionGraph.getCheckpointCoordinator();
        AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(jobID, executionAttemptID, j, checkpointMetrics, taskStateSnapshot);
        if (checkpointCoordinator != null) {
            getRpcService().execute(() -> {
                try {
                    checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint);
                } catch (Throwable th) {
                    this.log.warn("Error while processing checkpoint acknowledgement message");
                }
            });
        } else {
            this.log.error("Received AcknowledgeCheckpoint message for job {} with no CheckpointCoordinator", this.jobGraph.getJobID());
        }
    }

    @Override // org.apache.flink.runtime.checkpoint.CheckpointCoordinatorGateway
    public void declineCheckpoint(JobID jobID, ExecutionAttemptID executionAttemptID, long j, Throwable th) {
        DeclineCheckpoint declineCheckpoint = new DeclineCheckpoint(jobID, executionAttemptID, j, th);
        CheckpointCoordinator checkpointCoordinator = this.executionGraph.getCheckpointCoordinator();
        if (checkpointCoordinator != null) {
            getRpcService().execute(() -> {
                try {
                    checkpointCoordinator.receiveDeclineMessage(declineCheckpoint);
                } catch (Exception e) {
                    this.log.error("Error in CheckpointCoordinator while processing {}", declineCheckpoint, e);
                }
            });
        } else {
            this.log.error("Received DeclineCheckpoint message for job {} with no CheckpointCoordinator", this.jobGraph.getJobID());
        }
    }

    @Override // org.apache.flink.runtime.jobmaster.KvStateLocationOracle
    public CompletableFuture<KvStateLocation> requestKvStateLocation(JobID jobID, String str) {
        if (!this.jobGraph.getJobID().equals(jobID)) {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Request of key-value state location for unknown job {} received.", jobID);
            }
            return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobID));
        }
        if (this.log.isDebugEnabled()) {
            this.log.debug("Lookup key-value state for job {} with registration name {}.", this.jobGraph.getJobID(), str);
        }
        KvStateLocation kvStateLocation = this.executionGraph.getKvStateLocationRegistry().getKvStateLocation(str);
        return kvStateLocation != null ? CompletableFuture.completedFuture(kvStateLocation) : FutureUtils.completedExceptionally(new UnknownKvStateLocation(str));
    }

    @Override // org.apache.flink.runtime.jobmaster.KvStateRegistryGateway
    public CompletableFuture<Acknowledge> notifyKvStateRegistered(JobID jobID, JobVertexID jobVertexID, KeyGroupRange keyGroupRange, String str, KvStateID kvStateID, InetSocketAddress inetSocketAddress) {
        if (!this.jobGraph.getJobID().equals(jobID)) {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Notification about key-value state registration for unknown job {} received.", jobID);
            }
            return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobID));
        }
        if (this.log.isDebugEnabled()) {
            this.log.debug("Key value state registered for job {} under name {}.", this.jobGraph.getJobID(), str);
        }
        try {
            this.executionGraph.getKvStateLocationRegistry().notifyKvStateRegistered(jobVertexID, keyGroupRange, str, kvStateID, inetSocketAddress);
            return CompletableFuture.completedFuture(Acknowledge.get());
        } catch (Exception e) {
            this.log.error("Failed to notify KvStateRegistry about registration {}.", str);
            return FutureUtils.completedExceptionally(e);
        }
    }

    @Override // org.apache.flink.runtime.jobmaster.KvStateRegistryGateway
    public CompletableFuture<Acknowledge> notifyKvStateUnregistered(JobID jobID, JobVertexID jobVertexID, KeyGroupRange keyGroupRange, String str) {
        if (!this.jobGraph.getJobID().equals(jobID)) {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Notification about key-value state deregistration for unknown job {} received.", jobID);
            }
            return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobID));
        }
        if (this.log.isDebugEnabled()) {
            this.log.debug("Key value state unregistered for job {} under name {}.", this.jobGraph.getJobID(), str);
        }
        try {
            this.executionGraph.getKvStateLocationRegistry().notifyKvStateUnregistered(jobVertexID, keyGroupRange, str);
            return CompletableFuture.completedFuture(Acknowledge.get());
        } catch (Exception e) {
            this.log.error("Failed to notify KvStateRegistry about registration {}.", str, e);
            return FutureUtils.completedExceptionally(e);
        }
    }

    @Override // org.apache.flink.runtime.jobmaster.JobMasterGateway
    public CompletableFuture<ClassloadingProps> requestClassloadingProps() {
        return CompletableFuture.completedFuture(new ClassloadingProps(this.blobServer.getPort(), this.executionGraph.getRequiredJarFiles(), this.executionGraph.getRequiredClasspaths()));
    }

    @Override // org.apache.flink.runtime.jobmaster.JobMasterGateway
    public CompletableFuture<Collection<SlotOffer>> offerSlots(ResourceID resourceID, Collection<SlotOffer> collection, Time time) {
        Tuple2<TaskManagerLocation, TaskExecutorGateway> tuple2 = this.registeredTaskManagers.get(resourceID);
        if (tuple2 == null) {
            return FutureUtils.completedExceptionally(new Exception("Unknown TaskManager " + resourceID));
        }
        return this.slotPoolGateway.offerSlots((TaskManagerLocation) tuple2.f0, new RpcTaskManagerGateway((TaskExecutorGateway) tuple2.f1, getFencingToken()), collection);
    }

    @Override // org.apache.flink.runtime.jobmaster.JobMasterGateway
    public CompletableFuture<TaskExecutorReportResponse> reportTasksExecutionStatus(ResourceID resourceID, List<TaskExecutionStatus> list, @RpcTimeout Time time) {
        if (this.executionGraph.getState() != JobStatus.RECONCILING) {
            this.log.info("Refuse reported task status from {}", resourceID);
            return CompletableFuture.completedFuture(new TaskExecutorReportResponse.Decline("The reported task exceeds the duration time, the job master is not in reconciling state now."));
        }
        Tuple2<TaskManagerLocation, TaskExecutorGateway> tuple2 = this.registeredTaskManagers.get(resourceID);
        if (tuple2 == null) {
            return CompletableFuture.completedFuture(new TaskExecutorReportResponse.Decline("Unknown TaskManager " + resourceID));
        }
        TaskManagerLocation taskManagerLocation = (TaskManagerLocation) tuple2.f0;
        RpcTaskManagerGateway rpcTaskManagerGateway = new RpcTaskManagerGateway((TaskExecutorGateway) tuple2.f1, getFencingToken());
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < list.size(); i++) {
            TaskExecutionStatus taskExecutionStatus = list.get(i);
            SlotOffer boundSlot = taskExecutionStatus.getBoundSlot();
            this.log.debug("Accept reported task execution {} with state {} from {}", new Object[]{taskExecutionStatus.getExecutionAttemptID(), taskExecutionStatus.getExecutionState(), resourceID});
            ExecutionJobVertex jobVertex = this.executionGraph.getJobVertex(taskExecutionStatus.getJobVertexID());
            SlotSharingGroup slotSharingGroup = jobVertex.getSlotSharingGroup();
            try {
                LogicalSlot recoverSlot = this.slotPool.recoverSlot(taskExecutionStatus.getJobVertexID(), slotSharingGroup == null ? null : slotSharingGroup.getSlotSharingGroupId(), jobVertex.getCoLocationGroup() != null ? jobVertex.getCoLocationGroup().getLocationConstraint(taskExecutionStatus.getIndexOfSubtask()) : null, boundSlot.getAllocationId(), taskManagerLocation, boundSlot.getSlotIndex(), boundSlot.getResourceProfile(), boundSlot.getTags(), rpcTaskManagerGateway);
                try {
                    if (!this.graphManager.reconcileExecutionVertex(taskExecutionStatus.getJobVertexID(), taskExecutionStatus.getIndexOfSubtask(), taskExecutionStatus.getExecutionState(), taskExecutionStatus.getExecutionAttemptID(), taskExecutionStatus.getAttemptNumber(), taskExecutionStatus.getCreateTimestamp(), taskExecutionStatus.getResultPartitionIDs(), taskExecutionStatus.getResultPartitionsConsumable(), taskExecutionStatus.getAssignedInputSplits(), recoverSlot)) {
                        this.slotPool.releaseSlot(recoverSlot.getSlotRequestId(), recoverSlot.getSlotSharingGroupId(), recoverSlot.getCoLocationConstraint(), new Exception("Fail to recover the slot"));
                        arrayList.add(Integer.valueOf(i));
                        this.log.info("Fail to reconcile status for execution {} with state {} from {}", new Object[]{taskExecutionStatus.getExecutionAttemptID(), taskExecutionStatus.getExecutionState(), resourceID});
                    }
                } catch (Exception e) {
                    this.log.warn("Fail to recover vertex {}_{} execution {}", new Object[]{taskExecutionStatus.getJobVertexID(), Integer.valueOf(taskExecutionStatus.getIndexOfSubtask()), taskExecutionStatus.getExecutionAttemptID(), e});
                    arrayList.add(Integer.valueOf(i));
                }
            } catch (Exception e2) {
                arrayList.add(Integer.valueOf(i));
            }
        }
        return CompletableFuture.completedFuture(new TaskExecutorReportResponse.Success(arrayList));
    }

    @Override // org.apache.flink.runtime.jobmaster.JobMasterGateway
    public void failSlot(ResourceID resourceID, AllocationID allocationID, Exception exc) {
        if (this.registeredTaskManagers.containsKey(resourceID)) {
            this.slotPoolGateway.failAllocation(allocationID, exc);
        } else {
            this.log.warn("Cannot fail slot " + allocationID + " because the TaskManager " + resourceID + " is unknown.");
        }
    }

    @Override // org.apache.flink.runtime.jobmaster.JobMasterGateway
    public CompletableFuture<RegistrationResponse> registerTaskManager(String str, TaskManagerLocation taskManagerLocation, Time time) {
        ResourceID resourceID = taskManagerLocation.getResourceID();
        return this.registeredTaskManagers.containsKey(resourceID) ? CompletableFuture.completedFuture(new JMTMRegistrationSuccess(this.resourceId)) : getRpcService().connect(str, TaskExecutorGateway.class).handleAsync((taskExecutorGateway, th) -> {
            if (th != null) {
                return new RegistrationResponse.Decline(th.getMessage());
            }
            this.slotPoolGateway.registerTaskManager(resourceID);
            this.registeredTaskManagers.put(resourceID, Tuple2.of(taskManagerLocation, taskExecutorGateway));
            this.taskManagerHeartbeatManager.monitorTarget(resourceID, new HeartbeatTarget<Void>() { // from class: org.apache.flink.runtime.jobmaster.JobMaster.1
                @Override // org.apache.flink.runtime.heartbeat.HeartbeatTarget
                public void receiveHeartbeat(ResourceID resourceID2, Void r3) {
                }

                @Override // org.apache.flink.runtime.heartbeat.HeartbeatTarget
                public void requestHeartbeat(ResourceID resourceID2, Void r5) {
                    taskExecutorGateway.heartbeatFromJobManager(resourceID2);
                }
            });
            return new JMTMRegistrationSuccess(this.resourceId);
        }, (Executor) getMainThreadExecutor());
    }

    @Override // org.apache.flink.runtime.jobmaster.JobMasterGateway
    public void disconnectResourceManager(ResourceManagerId resourceManagerId, Exception exc) {
        if (isConnectingToResourceManager(resourceManagerId)) {
            reconnectToResourceManager(exc);
        }
    }

    private boolean isConnectingToResourceManager(ResourceManagerId resourceManagerId) {
        return this.resourceManagerAddress != null && this.resourceManagerAddress.getResourceManagerId().equals(resourceManagerId);
    }

    @Override // org.apache.flink.runtime.jobmaster.JobMasterGateway
    public void heartbeatFromTaskManager(ResourceID resourceID, AccumulatorReport accumulatorReport) {
        this.taskManagerHeartbeatManager.receiveHeartbeat(resourceID, accumulatorReport);
    }

    @Override // org.apache.flink.runtime.jobmaster.JobMasterGateway
    public void heartbeatFromResourceManager(ResourceID resourceID) {
        this.resourceManagerHeartbeatManager.requestHeartbeat(resourceID, null);
    }

    @Override // org.apache.flink.runtime.jobmaster.JobMasterGateway
    public CompletableFuture<JobDetails> requestJobDetails(@RpcTimeout Time time) {
        ExecutionGraph executionGraph = this.executionGraph;
        return CompletableFuture.supplyAsync(() -> {
            return WebMonitorUtils.createDetailsForJob(executionGraph);
        }, this.scheduledExecutorService);
    }

    @Override // org.apache.flink.runtime.jobmaster.JobMasterGateway
    public CompletableFuture<JobStatus> requestJobStatus(Time time) {
        return CompletableFuture.completedFuture(this.executionGraph.getState());
    }

    @Override // org.apache.flink.runtime.jobmaster.JobMasterGateway
    public CompletableFuture<ArchivedExecutionGraph> requestJob(Time time) {
        return CompletableFuture.completedFuture(ArchivedExecutionGraph.createFrom(this.executionGraph));
    }

    @Override // org.apache.flink.runtime.jobmaster.JobMasterGateway
    public CompletableFuture<EvictingBoundedList<ArchivedExecutionGraph>> requestJobHistories(Time time) {
        return CompletableFuture.completedFuture(this.executionGraphHistories);
    }

    @Override // org.apache.flink.runtime.jobmaster.JobMasterGateway
    public CompletableFuture<Collection<JobPendingSlotRequestDetail>> requestPendingSlotRequestDetails(@RpcTimeout Time time) {
        return this.slotPoolGateway.requestPendingSlotRequests(time).thenApply(collection -> {
            ArrayList arrayList = new ArrayList();
            Iterator it = collection.iterator();
            while (it.hasNext()) {
                PendingSlotRequest pendingSlotRequest = (PendingSlotRequest) it.next();
                List<PendingSlotRequest.PendingScheduledUnit> pendingScheduledUnits = pendingSlotRequest.getPendingScheduledUnits();
                Preconditions.checkState(pendingScheduledUnits.size() > 0);
                ArrayList arrayList2 = new ArrayList();
                for (PendingSlotRequest.PendingScheduledUnit pendingScheduledUnit : pendingScheduledUnits) {
                    arrayList2.add(new JobPendingSlotRequestDetail.VertexTaskInfo(pendingScheduledUnit.getJobVertexId(), pendingScheduledUnit.getTaskName(), pendingScheduledUnit.getSubTaskIndex(), pendingScheduledUnit.getSubTaskAttempt()));
                }
                PendingSlotRequest.PendingScheduledUnit next = pendingScheduledUnits.iterator().next();
                arrayList.add(new JobPendingSlotRequestDetail(pendingSlotRequest.getSlotRequestId(), pendingSlotRequest.getResourceProfile(), pendingSlotRequest.getStartTimestamp(), next.getSlotSharingGroupId(), next.getCoLocationGroupId(), arrayList2));
            }
            return arrayList;
        });
    }

    @Override // org.apache.flink.runtime.jobmaster.JobMasterGateway
    public CompletableFuture<String> triggerSavepoint(@Nullable String str, boolean z, Time time) {
        CheckpointCoordinator checkpointCoordinator = this.executionGraph.getCheckpointCoordinator();
        if (checkpointCoordinator == null) {
            return FutureUtils.completedExceptionally(new IllegalStateException(String.format("Job %s is not a streaming job.", this.jobGraph.getJobID())));
        }
        if (z) {
            checkpointCoordinator.stopCheckpointScheduler();
        }
        return checkpointCoordinator.triggerSavepoint(System.currentTimeMillis(), str).thenApply((v0) -> {
            return v0.getExternalPointer();
        }).thenApplyAsync((Function<? super U, ? extends U>) str2 -> {
            if (z) {
                this.log.info("Savepoint stored in {}. Now cancelling {}.", str2, this.jobGraph.getJobID());
                cancel(time);
            }
            return str2;
        }, (Executor) getMainThreadExecutor()).exceptionally(th -> {
            if (z) {
                startCheckpointScheduler(checkpointCoordinator);
            }
            throw new CompletionException(th);
        });
    }

    private void startCheckpointScheduler(CheckpointCoordinator checkpointCoordinator) {
        if (checkpointCoordinator.isPeriodicCheckpointingConfigured()) {
            try {
                checkpointCoordinator.startCheckpointScheduler();
            } catch (IllegalStateException e) {
            }
        }
    }

    @Override // org.apache.flink.runtime.jobmaster.JobMasterGateway
    public CompletableFuture<OperatorBackPressureStatsResponse> requestOperatorBackPressureStats(JobVertexID jobVertexID) {
        ExecutionJobVertex jobVertex = this.executionGraph.getJobVertex(jobVertexID);
        return jobVertex == null ? FutureUtils.completedExceptionally(new FlinkException("JobVertexID not found " + jobVertexID)) : CompletableFuture.completedFuture(OperatorBackPressureStatsResponse.of(this.backPressureStatsTracker.getOperatorBackPressureStats(jobVertex).orElse(null)));
    }

    @Override // org.apache.flink.runtime.jobmaster.JobMasterGateway
    public void notifyAllocationFailure(AllocationID allocationID, Exception exc) {
        this.slotPoolGateway.failAllocation(allocationID, exc);
    }

    @Override // org.apache.flink.runtime.jobmaster.JobMasterGateway
    public void commitPreAggregatedAccumulator(List<CommitAccumulator> list) {
        Iterator<CommitAccumulator> it = list.iterator();
        while (it.hasNext()) {
            this.accumulatorAggregationCoordinator.commitPreAggregatedAccumulator(this.executionGraph, it.next());
        }
    }

    @Override // org.apache.flink.runtime.jobmaster.JobMasterGateway
    public <V, A extends Serializable> CompletableFuture<Accumulator<V, A>> queryPreAggregatedAccumulator(String str) {
        return this.accumulatorAggregationCoordinator.queryPreAggregatedAccumulator(str);
    }

    private Acknowledge startJobExecution(JobMasterId jobMasterId) throws Exception {
        validateRunsInMainThread();
        Preconditions.checkNotNull(jobMasterId, "The new JobMasterId must not be null.");
        if (Objects.equals(getFencingToken(), jobMasterId)) {
            this.log.info("Already started the job execution with JobMasterId {}.", jobMasterId);
            return Acknowledge.get();
        }
        setNewFencingToken(jobMasterId);
        startJobMasterServices();
        this.log.info("Starting execution of job {} ({})", this.jobGraph.getName(), this.jobGraph.getJobID());
        if (!JobStatus.CREATED.equals(this.executionGraph.getState()) || this.graphManager.isReconciling()) {
            this.executionGraph.getReconcileFuture().thenApplyAsync(collection -> {
                Execution execution;
                this.graphManager.leaveReconcile();
                if (!JobStatus.RUNNING.equals(this.executionGraph.getState())) {
                    if (JobStatus.CREATED.equals(this.executionGraph.getState())) {
                        scheduleExecutionGraph();
                        return null;
                    }
                    this.log.error("When reconcile finished, the job is in {}, this is a logical error.", this.executionGraph.getState());
                    return null;
                }
                Iterator it = collection.iterator();
                while (it.hasNext()) {
                    ExecutionAttemptID executionAttemptID = (ExecutionAttemptID) it.next();
                    if (executionAttemptID != null && (execution = this.executionGraph.getRegisteredExecutions().get(executionAttemptID)) != null) {
                        updateTaskExecutionState(new TaskExecutionState(this.executionGraph.getJobID(), execution.getAttemptId(), ExecutionState.FAILED, new FlinkException("Fail to reconcile with master")));
                    }
                }
                return null;
            }, (Executor) getMainThreadExecutor());
        } else {
            scheduleExecutionGraph();
        }
        return Acknowledge.get();
    }

    private void startJobMasterServices() throws Exception {
        this.slotPool.start(getFencingToken(), getAddress());
        reconnectToResourceManager(new FlinkException("Starting JobMaster component."));
        this.resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
    }

    private void setNewFencingToken(JobMasterId jobMasterId) {
        if (getFencingToken() != null) {
            this.log.info("Restarting old job with JobMasterId {}. The new JobMasterId is {}.", getFencingToken(), jobMasterId);
        }
        setFencingToken(jobMasterId);
    }

    private Acknowledge suspendExecution(Exception exc) {
        validateRunsInMainThread();
        if (getFencingToken() == null) {
            this.log.debug("Job has already been suspended or shutdown.");
            return Acknowledge.get();
        }
        setFencingToken(null);
        try {
            this.resourceManagerLeaderRetriever.stop();
        } catch (Throwable th) {
            this.log.warn("Failed to stop resource manager leader retriever when suspending.", th);
        }
        suspendAndClearExecutionGraphFields(exc);
        this.operationLogManager.stop();
        this.slotPoolGateway.suspend();
        closeResourceManagerConnection(exc);
        return Acknowledge.get();
    }

    private void assignExecutionGraph(ExecutionGraph executionGraph, JobManagerJobMetricGroup jobManagerJobMetricGroup) {
        Preconditions.checkState(this.executionGraph == null || JobStatus.CREATED.equals(this.executionGraph.getState()) || this.executionGraph.getState().isTerminalState(), "The job state is " + (this.executionGraph == null ? null : this.executionGraph.getState()));
        Preconditions.checkState(this.jobManagerJobMetricGroup == null);
        if (this.executionGraph != null) {
            this.executionGraphHistories.add(ArchivedExecutionGraph.createFrom(this.executionGraph));
            this.log.info("The old execution graph is added to execution graph history store({}/{}).", Integer.valueOf(this.executionGraphHistories.size()), Integer.valueOf(this.executionGraphHistories.getSizeLimit()));
        }
        this.executionGraph = executionGraph;
        this.jobManagerJobMetricGroup = jobManagerJobMetricGroup;
        Preconditions.checkState(this.jobStatusListener == null);
        this.jobStatusListener = new JobManagerJobStatusListener();
        this.executionGraph.registerJobStatusListener(this.jobStatusListener);
        setupGraphManager();
    }

    private void scheduleExecutionGraph() {
        try {
            this.operationLogManager.start();
            if (this.establishedResourceManagerConnection != null) {
                this.establishedResourceManagerConnection.getResourceManagerGateway().setPlacementConstraints(this.jobGraph.getJobID(), this.jobGraph.getPlacementConstraints(), this.rpcTimeout);
            }
            this.executionGraph.scheduleForExecution();
        } catch (JobException e) {
            this.jobCompletionActions.jobMasterFailed(e);
        } catch (Throwable th) {
            this.executionGraph.failGlobal(th);
        }
    }

    private ExecutionGraph createAndRestoreExecutionGraph(JobGraph jobGraph, JobManagerJobMetricGroup jobManagerJobMetricGroup) throws Exception {
        ExecutionGraph createExecutionGraph = createExecutionGraph(jobGraph, jobManagerJobMetricGroup);
        CheckpointCoordinator checkpointCoordinator = createExecutionGraph.getCheckpointCoordinator();
        if (checkpointCoordinator != null && !checkpointCoordinator.restoreLatestCheckpointedState(createExecutionGraph.getAllVertices(), false, false)) {
            tryRestoreExecutionGraphFromSavepoint(createExecutionGraph, jobGraph.getSavepointRestoreSettings());
        }
        return createExecutionGraph;
    }

    private ExecutionGraph createExecutionGraph(JobGraph jobGraph, JobManagerJobMetricGroup jobManagerJobMetricGroup) throws JobExecutionException, JobException {
        return ExecutionGraphBuilder.buildGraph(null, jobGraph, this.jobMasterConfiguration.getConfiguration(), this.scheduledExecutorService, this.scheduledExecutorService, this.slotPool.getSlotProvider(), this.userCodeLoader, this.highAvailabilityServices.getCheckpointRecoveryFactory(), this.rpcTimeout, this.restartStrategy, jobManagerJobMetricGroup, this.blobServer, this.resultPartitionLocationTrackerProxy, this.jobMasterConfiguration.getSlotRequestTimeout(), this.log);
    }

    private void suspendAndClearExecutionGraphFields(Exception exc) {
        suspendExecutionGraph(exc);
        clearExecutionGraphFields();
    }

    private void suspendExecutionGraph(Exception exc) {
        this.executionGraph.suspend(exc);
    }

    private void clearExecutionGraphFields() {
        if (this.jobManagerJobMetricGroup != null) {
            this.jobManagerJobMetricGroup.close();
            this.jobManagerJobMetricGroup = null;
        }
        if (this.jobStatusListener != null) {
            this.jobStatusListener.stop();
            this.jobStatusListener = null;
        }
    }

    private GraphManager createGraphManager() {
        Configuration schedulingConfiguration = this.jobGraph.getSchedulingConfiguration();
        GraphManagerPlugin createGraphManagerPlugin = GraphManagerPluginFactory.createGraphManagerPlugin(schedulingConfiguration, this.userCodeLoader);
        Configuration jobConfiguration = this.jobGraph.getJobConfiguration();
        Configuration configuration = new Configuration();
        if (jobConfiguration != null) {
            configuration.addAll(jobConfiguration);
        }
        if (schedulingConfiguration != null) {
            configuration.addAll(schedulingConfiguration);
        }
        GraphManager graphManager = new GraphManager(createGraphManagerPlugin, (JobMasterGateway) getSelfGateway(JobMasterGateway.class), this.operationLogManager, this.executionGraph);
        graphManager.open(this.jobGraph, new SchedulingConfig(configuration, this.userCodeLoader));
        return graphManager;
    }

    private void setupGraphManager() {
        if (this.graphManager != null) {
            this.graphManager.dispose();
        }
        this.graphManager = createGraphManager();
        this.operationLogManager.registerLogHandler(OperationLogType.GRAPH_MANAGER, this.graphManager);
        this.executionGraph.setGraphManager(this.graphManager);
        this.executionGraph.registerExecutionListener(this.graphManager);
    }

    private void disposeSavepoint(String str) {
        try {
            Checkpoints.disposeSavepoint(str, this.jobMasterConfiguration.getConfiguration(), this.userCodeLoader, this.log);
        } catch (FlinkException | IOException e) {
            this.log.info("Could not dispose temporary rescaling savepoint under {}.", str, e);
        }
    }

    private void tryRestoreExecutionGraphFromSavepoint(ExecutionGraph executionGraph, SavepointRestoreSettings savepointRestoreSettings) throws Exception {
        CheckpointCoordinator checkpointCoordinator;
        if (!savepointRestoreSettings.restoreSavepoint() || (checkpointCoordinator = executionGraph.getCheckpointCoordinator()) == null) {
            return;
        }
        checkpointCoordinator.restoreSavepoint(savepointRestoreSettings.getRestorePath(), savepointRestoreSettings.allowNonRestoredState(), savepointRestoreSettings.resumeFromLatestCheckpoint(), executionGraph.getAllVertices(), this.userCodeLoader);
    }

    private CompletableFuture<Acknowledge> updateJob(JobGraph jobGraph) {
        if (this.inJobUpdate) {
            this.log.warn("Fail to update the job as a previous update is still in progress.");
            return FutureUtils.completedExceptionally(new JobModificationException("Fail to update the job as a previous update is still in progress."));
        }
        this.inJobUpdate = true;
        try {
            ExecutionGraph executionGraph = this.executionGraph;
            CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
            if (checkpointCoordinator != null) {
                checkpointCoordinator.stopCheckpointScheduler();
            }
            JobManagerJobMetricGroup create = this.jobMetricGroupFactory.create(jobGraph);
            try {
                ExecutionGraph createAndRestoreExecutionGraph = createAndRestoreExecutionGraph(jobGraph, create);
                this.jobGraph = jobGraph;
                runAsync(() -> {
                    suspendExecutionGraph(new FlinkException("Suspend the old job to update it."));
                });
                CompletableFuture thenApply = executionGraph.getTerminationFuture().thenAccept(jobStatus -> {
                    if (jobStatus != JobStatus.SUSPENDED) {
                        String format = String.format("Job %s update failed because we could not suspend the execution graph.", this.jobGraph.getName());
                        this.log.warn(format);
                        throw new CompletionException((Throwable) new JobModificationException(format));
                    }
                }).thenAcceptAsync(r10 -> {
                    if (this.executionGraph != executionGraph) {
                        this.log.warn("Detected unexpected concurrent modification of ExecutionGraph.");
                        throw new CompletionException((Throwable) new JobModificationException("Detected unexpected concurrent modification of ExecutionGraph."));
                    }
                    clearExecutionGraphFields();
                    Preconditions.checkState(this.executionGraph.getState().isTerminalState());
                    optimizeNewExecutionGraph(createAndRestoreExecutionGraph, executionGraph);
                    assignExecutionGraph(createAndRestoreExecutionGraph, create);
                    this.operationLogManager.clear();
                    scheduleExecutionGraph();
                }, (Executor) getMainThreadExecutor()).thenApply(r8 -> {
                    try {
                        this.submittedJobGraphStore.putJobGraph(new SubmittedJobGraph(jobGraph, null));
                        return Acknowledge.get();
                    } catch (Exception e) {
                        String str = "Failed to store job graph of job " + jobGraph.getJobID();
                        this.log.warn(str);
                        throw new CompletionException((Throwable) new JobModificationException(str));
                    }
                });
                thenApply.whenComplete((acknowledge, th) -> {
                    if (th != null) {
                        this.log.error("Unexpected error happens in job update.", th);
                        handleJobMasterError(new RuntimeException("Unexpected error happens in job update.", th));
                    }
                    this.inJobUpdate = false;
                });
                return thenApply;
            } catch (Exception e) {
                if (checkpointCoordinator != null && checkpointCoordinator.isPeriodicCheckpointingConfigured()) {
                    checkpointCoordinator.startCheckpointScheduler();
                }
                this.log.warn("Fail to create the new ExecutionGraph or restore states from the latest checkpoint.");
                throw new JobModificationException("Fail to create the new ExecutionGraph or restore states from the latest checkpoint.", e);
            }
        } catch (Throwable th2) {
            this.inJobUpdate = false;
            return FutureUtils.completedExceptionally(th2);
        }
    }

    private void optimizeNewExecutionGraph(ExecutionGraph executionGraph, ExecutionGraph executionGraph2) {
        Map<JobVertexID, ExecutionJobVertex> allVertices = executionGraph2.getAllVertices();
        Map<JobVertexID, ExecutionJobVertex> allVertices2 = executionGraph.getAllVertices();
        for (JobVertexID jobVertexID : allVertices2.keySet()) {
            ExecutionJobVertex executionJobVertex = allVertices.get(jobVertexID);
            if (executionJobVertex != null) {
                ExecutionJobVertex executionJobVertex2 = allVertices2.get(jobVertexID);
                if (executionJobVertex.getParallelism() == executionJobVertex2.getParallelism()) {
                    for (int i = 0; i < executionJobVertex.getParallelism(); i++) {
                        executionJobVertex2.getTaskVertices()[i].setLatestPriorLocation(executionJobVertex.getTaskVertices()[i].getCurrentAssignedResourceLocation());
                    }
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleJobMasterError(Throwable th) {
        if (!ExceptionUtils.isJvmFatalError(th)) {
            this.jobCompletionActions.jobMasterFailed(th);
        } else {
            this.log.error("Fatal error occurred on JobManager.", th);
            this.fatalErrorHandler.onFatalError(th);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void jobStatusChanged(JobStatus jobStatus, long j, @Nullable Throwable th) {
        validateRunsInMainThread();
        if (jobStatus.isGloballyTerminalState()) {
            ArchivedExecutionGraph createFrom = ArchivedExecutionGraph.createFrom(this.executionGraph);
            this.operationLogManager.clear();
            this.scheduledExecutorService.execute(() -> {
                this.jobCompletionActions.jobReachedGloballyTerminalState(createFrom);
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void notifyOfNewResourceManagerLeader(String str, ResourceManagerId resourceManagerId) {
        this.resourceManagerAddress = createResourceManagerAddress(str, resourceManagerId);
        reconnectToResourceManager(new FlinkException(String.format("ResourceManager leader changed to new address %s", this.resourceManagerAddress)));
    }

    @Nullable
    private ResourceManagerAddress createResourceManagerAddress(@Nullable String str, @Nullable ResourceManagerId resourceManagerId) {
        if (str == null) {
            return null;
        }
        Preconditions.checkNotNull(resourceManagerId);
        return new ResourceManagerAddress(str, resourceManagerId);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void reconnectToResourceManager(Exception exc) {
        closeResourceManagerConnection(exc);
        tryConnectToResourceManager();
    }

    private void tryConnectToResourceManager() {
        if (this.resourceManagerAddress != null) {
            connectToResourceManager();
        }
    }

    private void connectToResourceManager() {
        if (!$assertionsDisabled && this.resourceManagerAddress == null) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && this.resourceManagerConnection != null) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && this.establishedResourceManagerConnection != null) {
            throw new AssertionError();
        }
        this.log.info("Connecting to ResourceManager {}", this.resourceManagerAddress);
        this.resourceManagerConnection = new ResourceManagerConnection(this.log, this.jobGraph.getJobID(), this.resourceId, getAddress(), getFencingToken(), this.resourceManagerAddress.getAddress(), this.resourceManagerAddress.getResourceManagerId(), this.scheduledExecutorService);
        this.resourceManagerConnection.start();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void establishResourceManagerConnection(JobMasterRegistrationSuccess jobMasterRegistrationSuccess) {
        ResourceManagerId resourceManagerId = jobMasterRegistrationSuccess.getResourceManagerId();
        if (this.resourceManagerConnection == null || !Objects.equals(this.resourceManagerConnection.getTargetLeaderId(), resourceManagerId)) {
            this.log.debug("Ignoring resource manager connection to {} because its a duplicate or outdated.", resourceManagerId);
            return;
        }
        this.log.info("JobManager successfully registered at ResourceManager, leader id: {}.", resourceManagerId);
        final ResourceManagerGateway targetGateway = this.resourceManagerConnection.getTargetGateway();
        ResourceID resourceManagerResourceId = jobMasterRegistrationSuccess.getResourceManagerResourceId();
        this.establishedResourceManagerConnection = new EstablishedResourceManagerConnection(targetGateway, resourceManagerResourceId);
        targetGateway.setPlacementConstraints(this.jobGraph.getJobID(), this.jobGraph.getPlacementConstraints(), this.rpcTimeout).thenAccept(acknowledge -> {
            this.slotPoolGateway.connectToResourceManager(targetGateway);
        });
        this.resourceManagerHeartbeatManager.monitorTarget(resourceManagerResourceId, new HeartbeatTarget<Void>() { // from class: org.apache.flink.runtime.jobmaster.JobMaster.2
            @Override // org.apache.flink.runtime.heartbeat.HeartbeatTarget
            public void receiveHeartbeat(ResourceID resourceID, Void r5) {
                targetGateway.heartbeatFromJobManager(resourceID);
            }

            @Override // org.apache.flink.runtime.heartbeat.HeartbeatTarget
            public void requestHeartbeat(ResourceID resourceID, Void r3) {
            }
        });
    }

    private void closeResourceManagerConnection(Exception exc) {
        if (this.establishedResourceManagerConnection != null) {
            dissolveResourceManagerConnection(this.establishedResourceManagerConnection, exc);
            this.establishedResourceManagerConnection = null;
        }
        if (this.resourceManagerConnection != null) {
            this.resourceManagerConnection.close();
            this.resourceManagerConnection = null;
        }
    }

    private void dissolveResourceManagerConnection(EstablishedResourceManagerConnection establishedResourceManagerConnection, Exception exc) {
        ResourceID resourceManagerResourceID = establishedResourceManagerConnection.getResourceManagerResourceID();
        if (this.log.isDebugEnabled()) {
            this.log.debug("Close ResourceManager connection {}.", resourceManagerResourceID, exc);
        } else {
            this.log.info("Close ResourceManager connection {}: {}.", resourceManagerResourceID, exc.getMessage());
        }
        this.resourceManagerHeartbeatManager.unmonitorTarget(resourceManagerResourceID);
        establishedResourceManagerConnection.getResourceManagerGateway().disconnectJobManager(this.jobGraph.getJobID(), exc);
        this.slotPoolGateway.disconnectResourceManager();
    }

    private CompletableFuture<ExecutionGraph> restoreExecutionGraphFromRescalingSavepoint(ExecutionGraph executionGraph, CompletableFuture<String> completableFuture) {
        return completableFuture.thenApplyAsync(str -> {
            if (str != null) {
                try {
                    tryRestoreExecutionGraphFromSavepoint(executionGraph, SavepointRestoreSettings.forPath(str, false));
                } catch (Exception e) {
                    String format = String.format("Could not restore from temporary rescaling savepoint. This might indicate that the savepoint %s got corrupted. Deleting this savepoint as a precaution.", str);
                    this.log.info(format);
                    CompletableFuture.runAsync(() -> {
                        if (str.equals(this.lastInternalSavepoint)) {
                            this.lastInternalSavepoint = null;
                        }
                    }, getMainThreadExecutor()).thenRunAsync(() -> {
                        disposeSavepoint(str);
                    }, (Executor) this.scheduledExecutorService);
                    throw new CompletionException((Throwable) new JobModificationException(format, e));
                }
            } else {
                try {
                    tryRestoreExecutionGraphFromSavepoint(executionGraph, this.jobGraph.getSavepointRestoreSettings());
                } catch (Exception e2) {
                    String format2 = String.format("Could not restore from initial savepoint. This might indicate that the savepoint %s got corrupted.", this.jobGraph.getSavepointRestoreSettings().getRestorePath());
                    this.log.info(format2);
                    throw new CompletionException((Throwable) new JobModificationException(format2, e2));
                }
            }
            return executionGraph;
        }, (Executor) this.scheduledExecutorService);
    }

    private CompletableFuture<String> getJobModificationSavepoint(Time time) {
        return triggerSavepoint(null, false, time).handleAsync((str, th) -> {
            if (th == null) {
                String str = this.lastInternalSavepoint;
                this.lastInternalSavepoint = str;
                if (str != null) {
                    CompletableFuture.runAsync(() -> {
                        disposeSavepoint(str);
                    }, this.scheduledExecutorService);
                }
                return this.lastInternalSavepoint;
            }
            Throwable stripCompletionException = ExceptionUtils.stripCompletionException(th);
            if (!(stripCompletionException instanceof CheckpointTriggerException)) {
                throw new CompletionException(stripCompletionException);
            }
            ?? r0 = (CheckpointTriggerException) stripCompletionException;
            if (r0.getCheckpointDeclineReason() == CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING) {
                return this.lastInternalSavepoint;
            }
            throw new CompletionException((Throwable) r0);
        }, (Executor) getMainThreadExecutor());
    }

    private void rescaleJobGraph(Collection<JobVertexID> collection, int i, RescalingBehaviour rescalingBehaviour) throws FlinkException {
        for (JobVertexID jobVertexID : collection) {
            JobVertex findVertexByID = this.jobGraph.findVertexByID(jobVertexID);
            ExecutionJobVertex jobVertex = this.executionGraph.getJobVertex(jobVertexID);
            if (jobVertex != null) {
                findVertexByID.setMaxParallelism(jobVertex.getMaxParallelism());
            }
            rescalingBehaviour.acceptWithException(findVertexByID, Integer.valueOf(i));
        }
    }

    @VisibleForTesting
    public ExecutionGraph getExecutionGraph() {
        return this.executionGraph;
    }

    @VisibleForTesting
    SlotPool getSlotPool() {
        return this.slotPool;
    }

    static {
        $assertionsDisabled = !JobMaster.class.desiredAssertionStatus();
    }
}
