/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.resourcemanager;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.blob.TransientBlobKey;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.messages.InfoMessage;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.entrypoint.ClusterInformation;
import org.apache.flink.runtime.heartbeat.HeartbeatListener;
import org.apache.flink.runtime.heartbeat.HeartbeatManager;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.heartbeat.HeartbeatTarget;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.instance.HardwareDescription;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.instance.TaskManagerResourceDescription;
import org.apache.flink.runtime.jobmaster.JobMasterGateway;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess;
import org.apache.flink.runtime.leaderelection.LeaderContender;
import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.registration.RegistrationResponse;
import org.apache.flink.runtime.resourcemanager.InfoMessageListenerRpcGateway;
import org.apache.flink.runtime.resourcemanager.JobLeaderIdActions;
import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
import org.apache.flink.runtime.resourcemanager.ResourceOverview;
import org.apache.flink.runtime.resourcemanager.SlotRequest;
import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
import org.apache.flink.runtime.resourcemanager.exceptions.UnknownTaskExecutorException;
import org.apache.flink.runtime.resourcemanager.placementconstraint.PlacementConstraint;
import org.apache.flink.runtime.resourcemanager.placementconstraint.SlotTag;
import org.apache.flink.runtime.resourcemanager.registration.JobManagerRegistration;
import org.apache.flink.runtime.resourcemanager.registration.WorkerRegistration;
import org.apache.flink.runtime.resourcemanager.slotmanager.ResourceActions;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerException;
import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfo;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.FencedRpcEndpoint;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcTimeout;
import org.apache.flink.runtime.taskexecutor.SlotReport;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
import org.apache.flink.runtime.util.FileOffsetRange;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;

public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
extends FencedRpcEndpoint<ResourceManagerId>
implements ResourceManagerGateway,
LeaderContender {
    public static final String RESOURCE_MANAGER_NAME = "resourcemanager";
    private final ResourceID resourceId;
    private final ResourceManagerConfiguration resourceManagerConfiguration;
    private final Map<JobID, JobManagerRegistration> jobManagerRegistrations;
    private final Map<ResourceID, JobManagerRegistration> jmResourceIdRegistrations;
    private final JobLeaderIdService jobLeaderIdService;
    private final Map<ResourceID, WorkerRegistration<WorkerType>> taskExecutors;
    private final HighAvailabilityServices highAvailabilityServices;
    private final HeartbeatManager<SlotReport, Void> taskManagerHeartbeatManager;
    private final HeartbeatManager<Void, Void> jobManagerHeartbeatManager;
    private final MetricRegistry metricRegistry;
    private final FatalErrorHandler fatalErrorHandler;
    protected final SlotManager slotManager;
    private final ClusterInformation clusterInformation;
    private LeaderElectionService leaderElectionService;
    private ConcurrentMap<String, InfoMessageListenerRpcGateway> infoMessageListeners;
    protected final double maxTotalCpuCore;
    protected final int maxTotalMemoryMb;
    protected ConcurrentHashMap<Long, Exception> tryAllocateExceedLimitExceptions;
    protected ConcurrentHashMap<Long, Tuple2<ResourceID, Exception>> taskManagerExceptions;

    public ResourceManager(RpcService rpcService, String resourceManagerEndpointId, ResourceID resourceId, ResourceManagerConfiguration resourceManagerConfiguration, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, SlotManager slotManager, MetricRegistry metricRegistry, JobLeaderIdService jobLeaderIdService, ClusterInformation clusterInformation, FatalErrorHandler fatalErrorHandler) {
        super(rpcService, resourceManagerEndpointId);
        this.resourceId = (ResourceID)Preconditions.checkNotNull((Object)resourceId);
        this.resourceManagerConfiguration = (ResourceManagerConfiguration)Preconditions.checkNotNull((Object)resourceManagerConfiguration);
        this.highAvailabilityServices = (HighAvailabilityServices)Preconditions.checkNotNull((Object)highAvailabilityServices);
        this.slotManager = (SlotManager)Preconditions.checkNotNull((Object)slotManager);
        this.metricRegistry = (MetricRegistry)Preconditions.checkNotNull((Object)metricRegistry);
        this.jobLeaderIdService = (JobLeaderIdService)Preconditions.checkNotNull((Object)jobLeaderIdService);
        this.clusterInformation = (ClusterInformation)Preconditions.checkNotNull((Object)clusterInformation);
        this.fatalErrorHandler = (FatalErrorHandler)Preconditions.checkNotNull((Object)fatalErrorHandler);
        this.taskManagerHeartbeatManager = heartbeatServices.createHeartbeatManagerSender(resourceId, new TaskManagerHeartbeatListener(), rpcService.getScheduledExecutor(), this.log);
        this.jobManagerHeartbeatManager = heartbeatServices.createHeartbeatManagerSender(resourceId, new JobManagerHeartbeatListener(), rpcService.getScheduledExecutor(), this.log);
        this.jobManagerRegistrations = new HashMap<JobID, JobManagerRegistration>(4);
        this.jmResourceIdRegistrations = new HashMap<ResourceID, JobManagerRegistration>(4);
        this.taskExecutors = new HashMap<ResourceID, WorkerRegistration<WorkerType>>(8);
        this.infoMessageListeners = new ConcurrentHashMap<String, InfoMessageListenerRpcGateway>(8);
        this.maxTotalCpuCore = resourceManagerConfiguration.getMaxTotalCpuCore();
        this.maxTotalMemoryMb = resourceManagerConfiguration.getMaxTotalMemoryMb();
        this.tryAllocateExceedLimitExceptions = new ConcurrentHashMap();
        this.taskManagerExceptions = new ConcurrentHashMap();
    }

    @Override
    public void start() throws Exception {
        super.start();
        this.leaderElectionService = this.highAvailabilityServices.getResourceManagerLeaderElectionService();
        try {
            this.leaderElectionService.start(this);
        }
        catch (Exception e) {
            throw new ResourceManagerException("Could not start the leader election service.", e);
        }
        try {
            this.jobLeaderIdService.start(new JobLeaderIdActionsImpl());
        }
        catch (Exception e) {
            throw new ResourceManagerException("Could not start the job leader id service.", e);
        }
        this.initialize();
    }

    @Override
    public CompletableFuture<Void> postStop() {
        Exception exception = null;
        this.taskManagerHeartbeatManager.stop();
        this.jobManagerHeartbeatManager.stop();
        try {
            this.slotManager.close();
        }
        catch (Exception e) {
            exception = (Exception)ExceptionUtils.firstOrSuppressed((Throwable)e, exception);
        }
        try {
            this.leaderElectionService.stop();
        }
        catch (Exception e) {
            exception = (Exception)ExceptionUtils.firstOrSuppressed((Throwable)e, (Throwable)exception);
        }
        try {
            this.jobLeaderIdService.stop();
        }
        catch (Exception e) {
            exception = (Exception)ExceptionUtils.firstOrSuppressed((Throwable)e, (Throwable)exception);
        }
        this.clearState();
        if (exception != null) {
            return FutureUtils.completedExceptionally(new FlinkException("Could not properly shut down the ResourceManager.", (Throwable)exception));
        }
        return CompletableFuture.completedFuture(null);
    }

    @Override
    public CompletableFuture<RegistrationResponse> registerJobManager(JobMasterId jobMasterId, ResourceID jobManagerResourceId, String jobManagerAddress, JobID jobId, Time timeout) {
        CompletableFuture<JobMasterId> jobMasterIdFuture;
        Preconditions.checkNotNull((Object)((Object)jobMasterId));
        Preconditions.checkNotNull((Object)jobManagerResourceId);
        Preconditions.checkNotNull((Object)jobManagerAddress);
        Preconditions.checkNotNull((Object)jobId);
        if (!this.jobLeaderIdService.containsJob(jobId)) {
            try {
                this.jobLeaderIdService.addJob(jobId);
            }
            catch (Exception e) {
                ResourceManagerException exception = new ResourceManagerException("Could not add the job " + jobId + " to the job id leader service.", e);
                this.onFatalError(exception);
                this.log.error("Could not add job {} to job leader id service.", (Object)jobId, (Object)e);
                return FutureUtils.completedExceptionally(exception);
            }
        }
        this.log.info("Registering job manager {}@{} for job {}.", new Object[]{jobMasterId, jobManagerAddress, jobId});
        try {
            jobMasterIdFuture = this.jobLeaderIdService.getLeaderId(jobId);
        }
        catch (Exception e) {
            ResourceManagerException exception = new ResourceManagerException("Cannot obtain the job leader id future to verify the correct job leader.", e);
            this.onFatalError(exception);
            this.log.debug("Could not obtain the job leader id future to verify the correct job leader.");
            return FutureUtils.completedExceptionally(exception);
        }
        CompletableFuture<JobMasterGateway> jobMasterGatewayFuture = this.getRpcService().connect(jobManagerAddress, jobMasterId, JobMasterGateway.class);
        CompletionStage registrationResponseFuture = jobMasterGatewayFuture.thenCombineAsync(jobMasterIdFuture, (jobMasterGateway, currentJobMasterId) -> {
            if (Objects.equals(currentJobMasterId, (Object)jobMasterId)) {
                return this.registerJobMasterInternal((JobMasterGateway)jobMasterGateway, jobId, jobManagerAddress, jobManagerResourceId);
            }
            this.log.debug("The current JobMaster leader id {} did not match the received JobMaster id {}.", (Object)jobMasterId, (Object)currentJobMasterId);
            return new RegistrationResponse.Decline("Job manager leader id did not match.");
        }, (Executor)this.getMainThreadExecutor());
        return ((CompletableFuture)registrationResponseFuture).handleAsync((registrationResponse, throwable) -> {
            if (throwable != null) {
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Registration of job manager {}@{} failed.", new Object[]{jobMasterId, jobManagerAddress, throwable});
                } else {
                    this.log.info("Registration of job manager {}@{} failed.", (Object)jobMasterId, (Object)jobManagerAddress);
                }
                return new RegistrationResponse.Decline(throwable.getMessage());
            }
            return registrationResponse;
        }, this.getRpcService().getExecutor());
    }

    @Override
    public CompletableFuture<RegistrationResponse> registerTaskExecutor(String taskExecutorAddress, ResourceID taskExecutorResourceId, int dataPort, HardwareDescription hardwareDescription, Time timeout) {
        CompletableFuture<TaskExecutorGateway> taskExecutorGatewayFuture = this.getRpcService().connect(taskExecutorAddress, TaskExecutorGateway.class);
        return taskExecutorGatewayFuture.handleAsync((taskExecutorGateway, throwable) -> {
            if (throwable != null) {
                return new RegistrationResponse.Decline(throwable.getMessage());
            }
            return this.registerTaskExecutorInternal((TaskExecutorGateway)taskExecutorGateway, taskExecutorAddress, taskExecutorResourceId, dataPort, hardwareDescription);
        }, (Executor)this.getMainThreadExecutor());
    }

    @Override
    public CompletableFuture<Acknowledge> sendSlotReport(ResourceID taskManagerResourceId, InstanceID taskManagerRegistrationId, SlotReport slotReport, Time timeout) {
        WorkerRegistration<WorkerType> workerTypeWorkerRegistration = this.taskExecutors.get(taskManagerResourceId);
        if (workerTypeWorkerRegistration.getInstanceID().equals((Object)taskManagerRegistrationId)) {
            this.slotManager.registerTaskManager(workerTypeWorkerRegistration, slotReport);
            return CompletableFuture.completedFuture(Acknowledge.get());
        }
        return FutureUtils.completedExceptionally(new ResourceManagerException(String.format("Unknown TaskManager registration id %s.", new Object[]{taskManagerRegistrationId})));
    }

    @Override
    public void heartbeatFromTaskManager(ResourceID resourceID, SlotReport slotReport) {
        this.taskManagerHeartbeatManager.receiveHeartbeat(resourceID, slotReport);
    }

    @Override
    public void heartbeatFromJobManager(ResourceID resourceID) {
        this.jobManagerHeartbeatManager.receiveHeartbeat(resourceID, null);
    }

    @Override
    public void disconnectTaskManager(ResourceID resourceId, Exception cause) {
        this.closeTaskManagerConnection(resourceId, cause);
    }

    @Override
    public void disconnectJobManager(JobID jobId, Exception cause) {
        this.closeJobManagerConnection(jobId, cause);
    }

    @Override
    public CompletableFuture<Acknowledge> setPlacementConstraints(JobID jobId, List<PlacementConstraint> constraints, @RpcTimeout Time timeout) {
        this.slotManager.setJobConstraints(jobId, constraints);
        return CompletableFuture.completedFuture(Acknowledge.get());
    }

    @Override
    public CompletableFuture<Acknowledge> requestSlot(JobMasterId jobMasterId, SlotRequest slotRequest, Time timeout) {
        JobID jobId = slotRequest.getJobId();
        JobManagerRegistration jobManagerRegistration = this.jobManagerRegistrations.get(jobId);
        if (null != jobManagerRegistration) {
            if (Objects.equals((Object)jobMasterId, (Object)jobManagerRegistration.getJobMasterId())) {
                this.log.info("Request slot with profile {} for job {} with allocation id {}.", new Object[]{slotRequest.getResourceProfile(), slotRequest.getJobId(), slotRequest.getAllocationId()});
                try {
                    this.slotManager.registerSlotRequest(slotRequest);
                }
                catch (SlotManagerException e) {
                    return FutureUtils.completedExceptionally(e);
                }
                return CompletableFuture.completedFuture(Acknowledge.get());
            }
            return FutureUtils.completedExceptionally(new ResourceManagerException("The job leader's id " + (Object)((Object)jobManagerRegistration.getJobMasterId()) + " does not match the received id " + (Object)((Object)jobMasterId) + '.'));
        }
        return FutureUtils.completedExceptionally(new ResourceManagerException("Could not find registered job manager for job " + jobId + '.'));
    }

    @Override
    public void cancelSlotRequest(AllocationID allocationID) {
        this.slotManager.unregisterSlotRequest(allocationID);
    }

    @Override
    public void notifySlotAvailable(InstanceID instanceID, SlotID slotId, AllocationID allocationId) {
        ResourceID resourceId = slotId.getResourceID();
        WorkerRegistration<WorkerType> registration = this.taskExecutors.get(resourceId);
        if (registration != null) {
            InstanceID registrationId = registration.getInstanceID();
            if (Objects.equals((Object)registrationId, (Object)instanceID)) {
                this.slotManager.freeSlot(slotId, allocationId);
            } else {
                this.log.debug("Invalid registration id for slot available message. This indicates an outdated request.");
            }
        } else {
            this.log.debug("Could not find registration for resource id {}. Discarding the slot availablemessage {}.", (Object)resourceId, (Object)slotId);
        }
    }

    @Override
    public void registerInfoMessageListener(String address) {
        if (this.infoMessageListeners.containsKey(address)) {
            this.log.warn("Receive a duplicate registration from info message listener on ({})", (Object)address);
        } else {
            CompletableFuture<InfoMessageListenerRpcGateway> infoMessageListenerRpcGatewayFuture = this.getRpcService().connect(address, InfoMessageListenerRpcGateway.class);
            infoMessageListenerRpcGatewayFuture.whenCompleteAsync((gateway, failure) -> {
                if (failure != null) {
                    this.log.warn("Receive a registration from unreachable info message listener on ({})", (Object)address);
                } else {
                    this.log.info("Receive a registration from info message listener on ({})", (Object)address);
                    this.infoMessageListeners.put(address, (InfoMessageListenerRpcGateway)gateway);
                }
            }, (Executor)this.getMainThreadExecutor());
        }
    }

    @Override
    public void unRegisterInfoMessageListener(String address) {
        this.infoMessageListeners.remove(address);
    }

    @Override
    public CompletableFuture<Acknowledge> deregisterApplication(ApplicationStatus finalStatus, @Nullable String diagnostics) {
        this.log.info("Shut down cluster because application is in {}, diagnostics {}.", (Object)finalStatus, (Object)diagnostics);
        try {
            this.internalDeregisterApplication(finalStatus, diagnostics);
        }
        catch (ResourceManagerException e) {
            this.log.warn("Could not properly shutdown the application.", (Throwable)e);
        }
        return CompletableFuture.completedFuture(Acknowledge.get());
    }

    @Override
    public CompletableFuture<Integer> getNumberOfRegisteredTaskManagers() {
        return CompletableFuture.completedFuture(this.taskExecutors.size());
    }

    @Override
    public CompletableFuture<Collection<TaskManagerInfo>> requestTaskManagerInfo(Time timeout) {
        ArrayList<TaskManagerInfo> taskManagerInfos = new ArrayList<TaskManagerInfo>(this.taskExecutors.size());
        for (Map.Entry<ResourceID, WorkerRegistration<WorkerType>> taskExecutorEntry : this.taskExecutors.entrySet()) {
            ResourceID resourceId = taskExecutorEntry.getKey();
            WorkerRegistration<WorkerType> taskExecutor = taskExecutorEntry.getValue();
            taskManagerInfos.add(new TaskManagerInfo(resourceId, taskExecutor.getTaskExecutorGateway().getAddress(), taskExecutor.getDataPort(), this.taskManagerHeartbeatManager.getLastHeartbeatFrom(resourceId), this.slotManager.getNumberRegisteredSlotsOf(taskExecutor.getInstanceID()), this.slotManager.getNumberFreeSlotsOf(taskExecutor.getInstanceID()), taskExecutor.getHardwareDescription(), TaskManagerResourceDescription.fromResourceProfile(this.slotManager.getTotalResourceOf(resourceId)), TaskManagerResourceDescription.fromResourceProfile(this.slotManager.getAvailableResourceOf(resourceId))));
        }
        return CompletableFuture.completedFuture(taskManagerInfos);
    }

    @Override
    public CompletableFuture<TaskManagerInfo> requestTaskManagerInfo(ResourceID resourceId, Time timeout) {
        WorkerRegistration<WorkerType> taskExecutor = this.taskExecutors.get(resourceId);
        if (taskExecutor == null) {
            return FutureUtils.completedExceptionally(new UnknownTaskExecutorException(resourceId));
        }
        InstanceID instanceId = taskExecutor.getInstanceID();
        TaskManagerInfo taskManagerInfo = new TaskManagerInfo(resourceId, taskExecutor.getTaskExecutorGateway().getAddress(), taskExecutor.getDataPort(), this.taskManagerHeartbeatManager.getLastHeartbeatFrom(resourceId), this.slotManager.getNumberRegisteredSlotsOf(instanceId), this.slotManager.getNumberFreeSlotsOf(instanceId), taskExecutor.getHardwareDescription(), TaskManagerResourceDescription.fromResourceProfile(this.slotManager.getTotalResourceOf(resourceId)), TaskManagerResourceDescription.fromResourceProfile(this.slotManager.getAvailableResourceOf(resourceId)));
        return CompletableFuture.completedFuture(taskManagerInfo);
    }

    @Override
    public CompletableFuture<ResourceOverview> requestResourceOverview(Time timeout) {
        int numberSlots = this.slotManager.getNumberRegisteredSlots();
        int numberFreeSlots = this.slotManager.getNumberFreeSlots();
        return CompletableFuture.completedFuture(new ResourceOverview(this.taskExecutors.size(), numberSlots, numberFreeSlots, this.slotManager.getTotalResource(), this.slotManager.getAvailableResource()));
    }

    @Override
    public CompletableFuture<Collection<Tuple2<ResourceID, String>>> requestTaskManagerMetricQueryServicePaths(Time timeout) {
        ArrayList<Tuple2> metricQueryServicePaths = new ArrayList<Tuple2>(this.taskExecutors.size());
        for (Map.Entry<ResourceID, WorkerRegistration<WorkerType>> workerRegistrationEntry : this.taskExecutors.entrySet()) {
            ResourceID tmResourceId = workerRegistrationEntry.getKey();
            WorkerRegistration<WorkerType> workerRegistration = workerRegistrationEntry.getValue();
            String taskManagerAddress = workerRegistration.getTaskExecutorGateway().getAddress();
            String tmMetricQueryServicePath = taskManagerAddress.substring(0, taskManagerAddress.lastIndexOf(47) + 1) + "MetricQueryService" + '_' + tmResourceId.getResourceIdString();
            metricQueryServicePaths.add(Tuple2.of((Object)tmResourceId, (Object)tmMetricQueryServicePath));
        }
        return CompletableFuture.completedFuture(metricQueryServicePaths);
    }

    @Override
    public CompletableFuture<TransientBlobKey> requestTaskManagerFileUpload(ResourceID taskManagerId, String filename, FileOffsetRange fileOffsetRange, Time timeout) {
        this.log.debug("Request file {} with {} upload from TaskExecutor {}.", new Object[]{filename, fileOffsetRange, taskManagerId});
        WorkerRegistration<WorkerType> taskExecutor = this.taskExecutors.get(taskManagerId);
        if (taskExecutor == null) {
            this.log.debug("Requested file {} upload from unregistered TaskExecutor {}.", (Object)filename, (Object)taskManagerId);
            return FutureUtils.completedExceptionally(new UnknownTaskExecutorException(taskManagerId));
        }
        return taskExecutor.getTaskExecutorGateway().requestFileUpload(filename, fileOffsetRange, timeout);
    }

    @Override
    public CompletableFuture<Tuple2<TransientBlobKey, Long>> requestTaskManagerFileUploadReturnLength(ResourceID taskManagerId, String filename, FileOffsetRange fileOffsetRange, Time timeout) {
        this.log.debug("Request file {} with {} upload from TaskExecutor {}.", new Object[]{filename, fileOffsetRange, taskManagerId});
        WorkerRegistration<WorkerType> taskExecutor = this.taskExecutors.get(taskManagerId);
        if (taskExecutor == null) {
            this.log.debug("Requested file {} upload from unregistered TaskExecutor {}.", (Object)filename, (Object)taskManagerId);
            return FutureUtils.completedExceptionally(new UnknownTaskExecutorException(taskManagerId));
        }
        return taskExecutor.getTaskExecutorGateway().requestTaskManagerFileUploadReturnLength(filename, fileOffsetRange, timeout);
    }

    @Override
    public CompletableFuture<Collection<Tuple2<String, Long>>> requestTaskManagerLogList(ResourceID taskManagerId, Time timeout) {
        WorkerRegistration<WorkerType> taskExecutor = this.taskExecutors.get(taskManagerId);
        if (taskExecutor == null) {
            this.log.debug("Requested historical loglist from unregistered TaskExecutor {}.", (Object)taskManagerId);
            return FutureUtils.completedExceptionally(new UnknownTaskExecutorException(taskManagerId));
        }
        return taskExecutor.getTaskExecutorGateway().requestLogList(timeout);
    }

    @Override
    public CompletableFuture<Tuple2<String, Long>> requestJmx(ResourceID taskManagerId, Time timeout) {
        WorkerRegistration<WorkerType> taskExecutor = this.taskExecutors.get(taskManagerId);
        if (taskExecutor == null) {
            this.log.debug("Requested jmx server information for TaskExecutor {}.", (Object)taskManagerId);
            return FutureUtils.completedExceptionally(new UnknownTaskExecutorException(taskManagerId));
        }
        return taskExecutor.getTaskExecutorGateway().requestJmx(timeout);
    }

    @Override
    public CompletableFuture<Tuple2<String, String>> requestTmLogAndStdoutFileName(ResourceID taskManagerId, Time timeout) {
        WorkerRegistration<WorkerType> taskExecutor = this.taskExecutors.get(taskManagerId);
        if (taskExecutor == null) {
            this.log.debug("Requested log and stdout file name for TaskExecutor {}.", (Object)taskManagerId);
            return FutureUtils.completedExceptionally(new UnknownTaskExecutorException(taskManagerId));
        }
        return taskExecutor.getTaskExecutorGateway().requestTmLogAndStdoutFileName(timeout);
    }

    @Override
    public CompletableFuture<Map<Long, Exception>> requestTotalResourceLimitExceptions(Time timeout) {
        return CompletableFuture.completedFuture(this.tryAllocateExceedLimitExceptions);
    }

    @Override
    public CompletableFuture<Map<Long, Tuple2<ResourceID, Exception>>> requestTaskManagerExceptions(Time timeout) {
        return CompletableFuture.completedFuture(this.taskManagerExceptions);
    }

    private RegistrationResponse registerJobMasterInternal(final JobMasterGateway jobMasterGateway, JobID jobId, String jobManagerAddress, ResourceID jobManagerResourceId) {
        if (this.jobManagerRegistrations.containsKey(jobId)) {
            JobManagerRegistration oldJobManagerRegistration = this.jobManagerRegistrations.get(jobId);
            if (Objects.equals((Object)oldJobManagerRegistration.getJobMasterId(), jobMasterGateway.getFencingToken())) {
                this.log.debug("Job manager {}@{} was already registered.", jobMasterGateway.getFencingToken(), (Object)jobManagerAddress);
            } else {
                this.disconnectJobManager(oldJobManagerRegistration.getJobID(), new Exception("New job leader for job " + jobId + " found."));
                JobManagerRegistration jobManagerRegistration = new JobManagerRegistration(jobId, jobManagerResourceId, jobMasterGateway);
                this.jobManagerRegistrations.put(jobId, jobManagerRegistration);
                this.jmResourceIdRegistrations.put(jobManagerResourceId, jobManagerRegistration);
            }
        } else {
            JobManagerRegistration jobManagerRegistration = new JobManagerRegistration(jobId, jobManagerResourceId, jobMasterGateway);
            this.jobManagerRegistrations.put(jobId, jobManagerRegistration);
            this.jmResourceIdRegistrations.put(jobManagerResourceId, jobManagerRegistration);
        }
        this.log.info("Registered job manager {}@{} for job {}.", new Object[]{jobMasterGateway.getFencingToken(), jobManagerAddress, jobId});
        this.jobManagerHeartbeatManager.monitorTarget(jobManagerResourceId, new HeartbeatTarget<Void>(){

            @Override
            public void receiveHeartbeat(ResourceID resourceID, Void payload) {
            }

            @Override
            public void requestHeartbeat(ResourceID resourceID, Void payload) {
                jobMasterGateway.heartbeatFromResourceManager(resourceID);
            }
        });
        return new JobMasterRegistrationSuccess(this.resourceManagerConfiguration.getHeartbeatInterval().toMilliseconds(), (ResourceManagerId)((Object)this.getFencingToken()), this.resourceId);
    }

    private RegistrationResponse registerTaskExecutorInternal(final TaskExecutorGateway taskExecutorGateway, String taskExecutorAddress, ResourceID taskExecutorResourceId, int dataPort, HardwareDescription hardwareDescription) {
        WorkerType newWorker = this.workerStarted(taskExecutorResourceId);
        if (newWorker == null) {
            this.log.warn("Discard registration from TaskExecutor {} at ({}) because the framework did not recognize it", (Object)taskExecutorResourceId, (Object)taskExecutorAddress);
            return new RegistrationResponse.Decline("unrecognized TaskExecutor");
        }
        WorkerRegistration<WorkerType> registration = this.taskExecutors.get(taskExecutorResourceId);
        if (registration != null && this.taskManagerHeartbeatManager.getLastHeartbeatFrom(taskExecutorResourceId) >= 0L) {
            this.log.info("The TaskExecutor {} has already registered and kept heartbeat, so will ignore and use original instance id {}", (Object)taskExecutorResourceId, (Object)registration.getInstanceID());
        } else {
            registration = new WorkerRegistration<WorkerType>(taskExecutorGateway, newWorker, dataPort, hardwareDescription);
            this.taskExecutors.put(taskExecutorResourceId, registration);
            this.taskManagerHeartbeatManager.monitorTarget(taskExecutorResourceId, new HeartbeatTarget<Void>(){

                @Override
                public void receiveHeartbeat(ResourceID resourceID, Void payload) {
                }

                @Override
                public void requestHeartbeat(ResourceID resourceID, Void payload) {
                    taskExecutorGateway.heartbeatFromResourceManager(resourceID);
                }
            });
        }
        return new TaskExecutorRegistrationSuccess(registration.getInstanceID(), this.resourceId, this.resourceManagerConfiguration.getHeartbeatInterval().toMilliseconds(), this.clusterInformation);
    }

    private void clearState() {
        this.jobManagerRegistrations.clear();
        this.jmResourceIdRegistrations.clear();
        this.taskExecutors.clear();
        try {
            this.jobLeaderIdService.clear();
        }
        catch (Exception e) {
            this.onFatalError(new ResourceManagerException("Could not properly clear the job leader id service.", e));
        }
    }

    protected void closeJobManagerConnection(JobID jobId, Exception cause) {
        JobManagerRegistration jobManagerRegistration = this.jobManagerRegistrations.remove(jobId);
        if (jobManagerRegistration != null) {
            ResourceID jobManagerResourceId = jobManagerRegistration.getJobManagerResourceID();
            JobMasterGateway jobMasterGateway = jobManagerRegistration.getJobManagerGateway();
            JobMasterId jobMasterId = jobManagerRegistration.getJobMasterId();
            this.log.info("Disconnect job manager {}@{} for job {} from the resource manager.", new Object[]{jobMasterId, jobMasterGateway.getAddress(), jobId});
            this.jobManagerHeartbeatManager.unmonitorTarget(jobManagerResourceId);
            this.jmResourceIdRegistrations.remove(jobManagerResourceId);
            jobMasterGateway.disconnectResourceManager((ResourceManagerId)((Object)this.getFencingToken()), cause);
        } else {
            this.log.debug("There was no registered job manager for job {}.", (Object)jobId);
        }
    }

    protected boolean closeTaskManagerConnection(ResourceID resourceID, Exception cause) {
        this.taskManagerHeartbeatManager.unmonitorTarget(resourceID);
        WorkerRegistration<WorkerType> workerRegistration = this.taskExecutors.remove(resourceID);
        if (workerRegistration != null) {
            this.log.info("Closing TaskExecutor connection {} because: {}", (Object)resourceID, (Object)cause.getMessage());
            this.slotManager.unregisterTaskManager(workerRegistration.getInstanceID());
            workerRegistration.getTaskExecutorGateway().disconnectResourceManager(cause);
            for (JobManagerRegistration jobManagerRegistration : this.jobManagerRegistrations.values()) {
                jobManagerRegistration.getJobManagerGateway().disconnectTaskManager(resourceID, cause);
            }
            return true;
        }
        this.log.debug("No open TaskExecutor connection {}. Ignoring close TaskExecutor connection.", (Object)resourceID);
        return false;
    }

    protected void removeJob(JobID jobId) {
        try {
            this.jobLeaderIdService.removeJob(jobId);
        }
        catch (Exception e) {
            this.log.warn("Could not properly remove the job {} from the job leader id service.", (Object)jobId, (Object)e);
        }
        if (this.jobManagerRegistrations.containsKey(jobId)) {
            this.disconnectJobManager(jobId, new Exception("Job " + jobId + "was removed"));
        }
    }

    protected void jobLeaderLostLeadership(JobID jobId, JobMasterId oldJobMasterId) {
        if (this.jobManagerRegistrations.containsKey(jobId)) {
            JobManagerRegistration jobManagerRegistration = this.jobManagerRegistrations.get(jobId);
            if (Objects.equals((Object)jobManagerRegistration.getJobMasterId(), (Object)oldJobMasterId)) {
                this.disconnectJobManager(jobId, new Exception("Job leader lost leadership."));
            } else {
                this.log.debug("Discarding job leader lost leadership, because a new job leader was found for job {}. ", (Object)jobId);
            }
        } else {
            this.log.debug("Discard job leader lost leadership for outdated leader {} for job {}.", (Object)oldJobMasterId, (Object)jobId);
        }
    }

    protected void releaseResource(InstanceID instanceId, Exception cause) {
        ResourceIDRetrievable worker = null;
        for (Map.Entry<ResourceID, WorkerRegistration<WorkerType>> entry : this.taskExecutors.entrySet()) {
            if (!entry.getValue().getInstanceID().equals((Object)instanceId)) continue;
            worker = entry.getValue().getWorker();
            break;
        }
        if (worker != null) {
            if (this.stopWorker(worker)) {
                this.closeTaskManagerConnection(worker.getResourceID(), cause);
            } else {
                this.log.debug("Worker {} could not be stopped.", (Object)worker.getResourceID());
            }
        } else {
            this.slotManager.unregisterTaskManager(instanceId);
        }
    }

    protected boolean taskExecutorRegistered(ResourceID taskExecutorId) {
        return this.taskExecutors.containsKey(taskExecutorId);
    }

    public void sendInfoMessage(final String message) {
        this.getRpcService().execute(new Runnable(){

            @Override
            public void run() {
                InfoMessage infoMessage = new InfoMessage(message);
                for (InfoMessageListenerRpcGateway listenerRpcGateway : ResourceManager.this.infoMessageListeners.values()) {
                    listenerRpcGateway.notifyInfoMessage(infoMessage);
                }
            }
        });
    }

    protected void onFatalError(Throwable t) {
        try {
            this.log.error("Fatal error occurred in ResourceManager.", t);
        }
        catch (Throwable throwable) {
            // empty catch block
        }
        this.fatalErrorHandler.onFatalError(t);
    }

    @Override
    public void grantLeadership(UUID newLeaderSessionID) {
        this.runAsyncWithoutFencing(() -> {
            ResourceManagerId newResourceManagerId = ResourceManagerId.fromUuid(newLeaderSessionID);
            this.log.info("ResourceManager {} was granted leadership with fencing token {}", (Object)this.getAddress(), (Object)newResourceManagerId);
            if (this.getFencingToken() != null) {
                this.clearState();
            }
            this.setFencingToken(newResourceManagerId);
            this.slotManager.start((ResourceManagerId)((Object)((Object)this.getFencingToken())), this.getMainThreadExecutor(), new ResourceActionsImpl());
            this.getRpcService().execute(() -> this.leaderElectionService.confirmLeaderSessionID(newLeaderSessionID));
        });
    }

    @Override
    public void revokeLeadership() {
        this.runAsyncWithoutFencing(() -> {
            this.log.info("ResourceManager {} was revoked leadership. Clearing fencing token.", (Object)this.getAddress());
            this.clearState();
            this.setFencingToken(null);
            this.slotManager.suspend();
        });
    }

    @Override
    public void handleError(Exception exception) {
        this.onFatalError(new ResourceManagerException("Received an error from the LeaderElectionService.", exception));
    }

    protected abstract void initialize() throws ResourceManagerException;

    protected abstract void internalDeregisterApplication(ApplicationStatus var1, @Nullable String var2) throws ResourceManagerException;

    @VisibleForTesting
    public abstract void startNewWorker(ResourceProfile var1);

    public void startNewWorker(ResourceProfile resourceProfile, Set<SlotTag> tags) {
        this.startNewWorker(resourceProfile);
    }

    protected abstract WorkerType workerStarted(ResourceID var1);

    public abstract boolean stopWorker(WorkerType var1);

    @VisibleForTesting
    public abstract void cancelNewWorker(ResourceProfile var1);

    @VisibleForTesting
    public void cancelNewWorker(ResourceProfile resourceProfile, Set<SlotTag> tags) {
        this.cancelNewWorker(resourceProfile);
    }

    @VisibleForTesting
    protected abstract int getNumberAllocatedWorkers();

    protected int getNumberPendingSlotRequests() {
        return this.slotManager.getNumberPendingSlotRequests();
    }

    private class JobManagerHeartbeatListener
    implements HeartbeatListener<Void, Void> {
        private JobManagerHeartbeatListener() {
        }

        @Override
        public void notifyHeartbeatTimeout(final ResourceID resourceID) {
            ResourceManager.this.runAsync(new Runnable(){

                @Override
                public void run() {
                    JobManagerRegistration jobManagerRegistration;
                    ResourceManager.this.log.info("The heartbeat of JobManager with id {} timed out.", (Object)resourceID);
                    if (ResourceManager.this.jmResourceIdRegistrations.containsKey(resourceID) && (jobManagerRegistration = (JobManagerRegistration)ResourceManager.this.jmResourceIdRegistrations.get(resourceID)) != null) {
                        ResourceManager.this.closeJobManagerConnection(jobManagerRegistration.getJobID(), new TimeoutException("The heartbeat of JobManager with id " + resourceID + " timed out."));
                    }
                }
            });
        }

        @Override
        public void reportPayload(ResourceID resourceID, Void payload) {
        }

        @Override
        public CompletableFuture<Void> retrievePayload(ResourceID resourceID) {
            return CompletableFuture.completedFuture(null);
        }
    }

    private class TaskManagerHeartbeatListener
    implements HeartbeatListener<SlotReport, Void> {
        private TaskManagerHeartbeatListener() {
        }

        @Override
        public void notifyHeartbeatTimeout(final ResourceID resourceID) {
            ResourceManager.this.runAsync(new Runnable(){

                @Override
                public void run() {
                    ResourceManager.this.log.info("The heartbeat of TaskManager with id {} timed out.", (Object)resourceID);
                    Object worker = ((WorkerRegistration)ResourceManager.this.taskExecutors.get(resourceID)).getWorker();
                    if (worker != null) {
                        ResourceManager.this.stopWorker(worker);
                        ResourceManager.this.closeTaskManagerConnection(resourceID, new TimeoutException("The heartbeat of TaskManager with id " + resourceID + "  timed out."));
                    }
                }
            });
        }

        @Override
        public void reportPayload(final ResourceID resourceID, final SlotReport slotReport) {
            ResourceManager.this.runAsync(new Runnable(){

                @Override
                public void run() {
                    ResourceManager.this.log.debug("Received new slot report from TaskManager {}.", (Object)resourceID);
                    WorkerRegistration workerRegistration = (WorkerRegistration)ResourceManager.this.taskExecutors.get(resourceID);
                    if (workerRegistration == null) {
                        ResourceManager.this.log.debug("Received slot report from TaskManager {} which is no longer registered.", (Object)resourceID);
                    } else {
                        InstanceID instanceId = workerRegistration.getInstanceID();
                        ResourceManager.this.slotManager.reportSlotStatus(instanceId, slotReport);
                    }
                }
            });
        }

        @Override
        public CompletableFuture<Void> retrievePayload(ResourceID resourceID) {
            return CompletableFuture.completedFuture(null);
        }
    }

    private class JobLeaderIdActionsImpl
    implements JobLeaderIdActions {
        private JobLeaderIdActionsImpl() {
        }

        @Override
        public void jobLeaderLostLeadership(final JobID jobId, final JobMasterId oldJobMasterId) {
            ResourceManager.this.runAsync(new Runnable(){

                @Override
                public void run() {
                    ResourceManager.this.jobLeaderLostLeadership(jobId, oldJobMasterId);
                }
            });
        }

        @Override
        public void notifyJobTimeout(final JobID jobId, final UUID timeoutId) {
            ResourceManager.this.runAsync(new Runnable(){

                @Override
                public void run() {
                    if (ResourceManager.this.jobLeaderIdService.isValidTimeout(jobId, timeoutId)) {
                        ResourceManager.this.removeJob(jobId);
                    }
                }
            });
        }

        @Override
        public void handleError(Throwable error) {
            ResourceManager.this.onFatalError(error);
        }
    }

    private class ResourceActionsImpl
    implements ResourceActions {
        private ResourceActionsImpl() {
        }

        @Override
        public void releaseResource(InstanceID instanceId, Exception cause) {
            ResourceManager.this.validateRunsInMainThread();
            ResourceManager.this.releaseResource(instanceId, cause);
        }

        @Override
        public void allocateResource(ResourceProfile resourceProfile) throws ResourceManagerException {
            ResourceManager.this.validateRunsInMainThread();
            ResourceManager.this.startNewWorker(resourceProfile);
        }

        @Override
        public void allocateResource(ResourceProfile resourceProfile, Set<SlotTag> tags) throws ResourceManagerException {
            ResourceManager.this.validateRunsInMainThread();
            ResourceManager.this.startNewWorker(resourceProfile, tags);
        }

        @Override
        public void notifyAllocationFailure(JobID jobId, AllocationID allocationId, Exception cause) {
            ResourceManager.this.validateRunsInMainThread();
            JobManagerRegistration jobManagerRegistration = (JobManagerRegistration)ResourceManager.this.jobManagerRegistrations.get(jobId);
            if (jobManagerRegistration != null) {
                jobManagerRegistration.getJobManagerGateway().notifyAllocationFailure(allocationId, cause);
            }
        }

        @Override
        public void cancelResourceAllocation(ResourceProfile resourceProfile) {
            ResourceManager.this.validateRunsInMainThread();
            ResourceManager.this.cancelNewWorker(resourceProfile);
        }

        @Override
        public void cancelResourceAllocation(ResourceProfile resourceProfile, Set<SlotTag> tags) {
            ResourceManager.this.validateRunsInMainThread();
            ResourceManager.this.cancelNewWorker(resourceProfile, tags);
        }
    }
}

