/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.client.program;

import akka.actor.ActorSystem;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.DriverProgram;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.accumulators.AccumulatorHelper;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.JobListener;
import org.apache.flink.client.program.ContextEnvironment;
import org.apache.flink.client.program.ContextEnvironmentFactory;
import org.apache.flink.client.program.DetachedEnvironment;
import org.apache.flink.client.program.JobWithJars;
import org.apache.flink.client.program.OptimizerPlanEnvironment;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.client.program.ProgramMissingJobException;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.optimizer.CompilerException;
import org.apache.flink.optimizer.DataStatistics;
import org.apache.flink.optimizer.Optimizer;
import org.apache.flink.optimizer.costs.CostEstimator;
import org.apache.flink.optimizer.costs.DefaultCostEstimator;
import org.apache.flink.optimizer.plan.FlinkPlan;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plan.StreamingPlan;
import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
import org.apache.flink.runtime.akka.AkkaJobManagerGateway;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.client.JobClient;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.client.JobListeningContext;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.messages.accumulators.AccumulatorResultsErroneous;
import org.apache.flink.runtime.messages.accumulators.AccumulatorResultsFound;
import org.apache.flink.runtime.messages.accumulators.RequestAccumulatorResults;
import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
import org.apache.flink.runtime.messages.webmonitor.RequestJobDetails;
import org.apache.flink.runtime.util.LeaderConnectionInfo;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.OptionalFailure;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.concurrent.Await;
import scala.concurrent.Awaitable;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;

public abstract class ClusterClient<T> {
    protected final Logger log = LoggerFactory.getLogger(this.getClass());
    final Optimizer compiler;
    protected final LazyActorSystemLoader actorSystemLoader;
    protected final Configuration flinkConfig;
    protected final FiniteDuration timeout;
    private final FiniteDuration lookupTimeout;
    protected final HighAvailabilityServices highAvailabilityServices;
    private final boolean sharedHaServices;
    private boolean printStatusDuringExecution = true;
    protected JobExecutionResult lastJobExecutionResult;
    private boolean detachedJobSubmission = false;
    protected List<JobListener> jobListeners;
    public static final int MAX_SLOTS_UNKNOWN = -1;

    public ClusterClient(Configuration flinkConfig) throws Exception {
        this(flinkConfig, HighAvailabilityServicesUtils.createHighAvailabilityServices((Configuration)flinkConfig, (Executor)Executors.directExecutor(), (HighAvailabilityServicesUtils.AddressResolution)HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION), false);
    }

    public ClusterClient(Configuration flinkConfig, HighAvailabilityServices highAvailabilityServices, boolean sharedHaServices) {
        this.flinkConfig = (Configuration)Preconditions.checkNotNull((Object)flinkConfig);
        this.compiler = new Optimizer(new DataStatistics(), (CostEstimator)new DefaultCostEstimator(), flinkConfig);
        this.timeout = AkkaUtils.getClientTimeout((Configuration)flinkConfig);
        this.lookupTimeout = AkkaUtils.getLookupTimeout((Configuration)flinkConfig);
        this.actorSystemLoader = new LazyActorSystemLoader(highAvailabilityServices, Time.milliseconds((long)this.lookupTimeout.toMillis()), flinkConfig, this.log);
        this.highAvailabilityServices = (HighAvailabilityServices)Preconditions.checkNotNull((Object)highAvailabilityServices);
        this.sharedHaServices = sharedHaServices;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void shutdown() throws Exception {
        ClusterClient clusterClient = this;
        synchronized (clusterClient) {
            this.actorSystemLoader.shutdown();
            if (!this.sharedHaServices && this.highAvailabilityServices != null) {
                this.highAvailabilityServices.close();
            }
        }
    }

    public void killCluster() throws Exception {
    }

    public void setPrintStatusDuringExecution(boolean print) {
        this.printStatusDuringExecution = print;
    }

    public void setJobListeners(List<JobListener> jobListeners) {
        this.jobListeners = jobListeners;
    }

    public boolean getPrintStatusDuringExecution() {
        return this.printStatusDuringExecution;
    }

    public LeaderConnectionInfo getClusterConnectionInfo() throws LeaderRetrievalException {
        return LeaderRetrievalUtils.retrieveLeaderConnectionInfo((LeaderRetrievalService)this.highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID), (FiniteDuration)this.timeout);
    }

    public static String getOptimizedPlanAsJson(Optimizer compiler, PackagedProgram prog, int parallelism) throws CompilerException, ProgramInvocationException {
        PlanJSONDumpGenerator jsonGen = new PlanJSONDumpGenerator();
        return jsonGen.getOptimizerPlanAsJSON((OptimizedPlan)ClusterClient.getOptimizedPlan(compiler, prog, parallelism));
    }

    public static FlinkPlan getOptimizedPlan(Optimizer compiler, PackagedProgram prog, int parallelism) throws CompilerException, ProgramInvocationException {
        Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader());
        if (prog.isUsingProgramEntryPoint()) {
            return ClusterClient.getOptimizedPlan(compiler, prog.getPlanWithJars(), parallelism);
        }
        if (prog.isUsingInteractiveMode()) {
            OptimizerPlanEnvironment env = new OptimizerPlanEnvironment(compiler);
            if (parallelism > 0) {
                env.setParallelism(parallelism);
            }
            return env.getOptimizedPlan(prog);
        }
        throw new RuntimeException("Couldn't determine program mode.");
    }

    public static OptimizedPlan getOptimizedPlan(Optimizer compiler, Plan p, int parallelism) throws CompilerException {
        Logger log = LoggerFactory.getLogger(ClusterClient.class);
        if (parallelism > 0 && p.getDefaultParallelism() <= 0) {
            log.debug("Changing plan default parallelism from {} to {}", (Object)p.getDefaultParallelism(), (Object)parallelism);
            p.setDefaultParallelism(parallelism);
        }
        log.debug("Set parallelism {}, plan default parallelism {}", (Object)parallelism, (Object)p.getDefaultParallelism());
        return compiler.compile(p);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public JobSubmissionResult run(PackagedProgram prog, int parallelism) throws ProgramInvocationException, ProgramMissingJobException {
        Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader());
        if (prog.isUsingProgramEntryPoint()) {
            JobWithJars jobWithJars = this.hasUserJarsInClassPath(prog.getAllLibraries()) ? prog.getPlanWithoutJars() : prog.getPlanWithJars();
            return this.run(jobWithJars, parallelism, prog.getSavepointSettings(), false);
        }
        if (prog.isUsingInteractiveMode()) {
            this.log.info("Starting program in interactive mode (detached: {})", (Object)this.isDetached());
            List<Object> libraries = this.hasUserJarsInClassPath(prog.getAllLibraries()) ? Collections.emptyList() : prog.getAllLibraries();
            this.setDriverModeContext(prog);
            ContextEnvironmentFactory factory = new ContextEnvironmentFactory(this, libraries, prog.getClasspaths(), prog.getLibjars(), prog.getFiles(), prog.getUserCodeClassLoader(), parallelism, this.isDetached(), prog.getSavepointSettings());
            ContextEnvironment.setAsContext(factory);
            try {
                prog.invokeInteractiveModeForExecution();
                if (this.lastJobExecutionResult == null && factory.getLastEnvCreated() == null) {
                    throw new ProgramMissingJobException("The program didn't contain a Flink job.");
                }
                if (this.isDetached()) {
                    JobSubmissionResult jobSubmissionResult = ((DetachedEnvironment)factory.getLastEnvCreated()).finalizeExecute();
                    return jobSubmissionResult;
                }
                JobExecutionResult jobExecutionResult = this.lastJobExecutionResult;
                return jobExecutionResult;
            }
            finally {
                ContextEnvironment.unsetContext();
                this.unsetDriverModeContext(prog);
            }
        }
        throw new ProgramInvocationException("PackagedProgram does not have a valid invocation mode.");
    }

    public JobSubmissionResult run(JobWithJars program, int parallelism, boolean detached) throws ProgramInvocationException {
        return this.run(program, parallelism, SavepointRestoreSettings.none(), detached);
    }

    private void setDriverModeContext(PackagedProgram prog) {
        if (prog.getMainClass() != null && DriverProgram.class.isAssignableFrom(prog.getMainClass())) {
            DriverProgram prg = (DriverProgram)InstantiationUtil.instantiate(prog.getMainClass().asSubclass(DriverProgram.class), DriverProgram.class);
            prg.setParameter((Object)prog);
            try {
                InetSocketAddress address = AkkaUtils.getInetSocketAddressFromAkkaURL((String)this.getClusterConnectionInfo().getAddress());
                prg.setClusterInfo(address.getAddress().getHostAddress(), address.getPort());
                Configuration configuration = prog.getUserJobConf();
                SavepointRestoreSettings savepointRestoreSettings = prog.getSavepointSettings();
                if (savepointRestoreSettings.getRestorePath() != null) {
                    configuration.setString("flink.driver.savepointrestoresettings.path", savepointRestoreSettings.getRestorePath());
                }
                configuration.setBoolean("flink.driver.savepointrestoresettings.allowNonRestoredState", savepointRestoreSettings.allowNonRestoredState());
                configuration.setBoolean("flink.driver.savepointrestoresettings.resumeFromLatestCheckpoint", savepointRestoreSettings.resumeFromLatestCheckpoint());
                prg.setConfiguration(configuration);
                prog.setSavepointRestoreSettings(SavepointRestoreSettings.none());
            }
            catch (Exception e) {
                throw new RuntimeException("check driver mode fails.", e);
            }
            this.setDetached(true);
        }
    }

    private void unsetDriverModeContext(PackagedProgram prog) {
        if (prog.getMainClass() != null && DriverProgram.class.isAssignableFrom(prog.getMainClass())) {
            DriverProgram prg = (DriverProgram)InstantiationUtil.instantiate(prog.getMainClass().asSubclass(DriverProgram.class), DriverProgram.class);
            prg.setParameter(null);
            prg.setClusterInfo(null, -1);
        }
    }

    public JobSubmissionResult run(JobWithJars jobWithJars, int parallelism, SavepointRestoreSettings savepointSettings, boolean detached) throws CompilerException, ProgramInvocationException {
        ClassLoader classLoader = jobWithJars.getUserCodeClassLoader();
        if (classLoader == null) {
            throw new IllegalArgumentException("The given JobWithJars does not provide a usercode class loader.");
        }
        OptimizedPlan optPlan = ClusterClient.getOptimizedPlan(this.compiler, jobWithJars, parallelism);
        return this.run((FlinkPlan)optPlan, jobWithJars.getJarFiles(), jobWithJars.getClasspaths(), jobWithJars.getLibjars(), jobWithJars.getFiles(), classLoader, savepointSettings, detached);
    }

    public JobSubmissionResult run(FlinkPlan compiledPlan, List<URL> libraries, List<URL> classpaths, ClassLoader classLoader, SavepointRestoreSettings savepointSettings, boolean detached) throws ProgramInvocationException {
        return this.run(compiledPlan, libraries, classpaths, Collections.emptyList(), Collections.emptyList(), classLoader, savepointSettings, detached);
    }

    public JobSubmissionResult run(FlinkPlan compiledPlan, List<URL> libraries, List<URL> classpaths, List<URI> libjars, List<URI> files, ClassLoader classLoader, SavepointRestoreSettings savepointSettings, boolean detached) throws ProgramInvocationException {
        JobGraph job = ClusterClient.getJobGraph(this.flinkConfig, compiledPlan, libraries, classpaths, libjars, files, savepointSettings);
        return this.submitJob(job, classLoader, detached);
    }

    public JobExecutionResult run(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException {
        ActorSystem actorSystem;
        this.waitForClusterToBeReady();
        try {
            actorSystem = this.actorSystemLoader.get();
        }
        catch (FlinkException fe) {
            throw new ProgramInvocationException("Could not start the ActorSystem needed to talk to the JobManager.", fe);
        }
        try {
            this.logAndSysout("Submitting job with JobID: " + jobGraph.getJobID() + ". Waiting for job completion.");
            this.lastJobExecutionResult = JobClient.submitJobAndWait((ActorSystem)actorSystem, (Configuration)this.flinkConfig, (HighAvailabilityServices)this.highAvailabilityServices, (JobGraph)jobGraph, (FiniteDuration)this.timeout, (boolean)this.printStatusDuringExecution, (ClassLoader)classLoader);
            return this.lastJobExecutionResult;
        }
        catch (JobExecutionException e) {
            throw new ProgramInvocationException("The program execution failed: " + e.getMessage(), e);
        }
    }

    public JobSubmissionResult runDetached(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException {
        ActorGateway jobManagerGateway;
        this.waitForClusterToBeReady();
        try {
            jobManagerGateway = this.getJobManagerGateway();
        }
        catch (Exception e) {
            throw new ProgramInvocationException("Failed to retrieve the JobManager gateway.", e);
        }
        try {
            this.logAndSysout("Submitting Job with JobID: " + jobGraph.getJobID() + ". Returning after job submission.");
            JobClient.submitJobDetached((JobManagerGateway)new AkkaJobManagerGateway(jobManagerGateway), (Configuration)this.flinkConfig, (JobGraph)jobGraph, (Time)Time.milliseconds((long)this.timeout.toMillis()), (ClassLoader)classLoader);
            return new JobSubmissionResult(jobGraph.getJobID());
        }
        catch (JobExecutionException e) {
            throw new ProgramInvocationException("The program execution failed: " + e.getMessage(), e);
        }
    }

    public JobExecutionResult retrieveJob(JobID jobID) throws JobExecutionException {
        ActorSystem actorSystem;
        try {
            actorSystem = this.actorSystemLoader.get();
        }
        catch (FlinkException fe) {
            throw new JobExecutionException(jobID, "Could not start the ActorSystem needed to talk to the JobManager.", (Throwable)fe);
        }
        JobListeningContext listeningContext = JobClient.attachToRunningJob((JobID)jobID, (Configuration)this.flinkConfig, (ActorSystem)actorSystem, (HighAvailabilityServices)this.highAvailabilityServices, (FiniteDuration)this.timeout, (boolean)this.printStatusDuringExecution);
        return JobClient.awaitJobResult((JobListeningContext)listeningContext);
    }

    public JobListeningContext connectToJob(JobID jobID) throws JobExecutionException {
        ActorSystem actorSystem;
        try {
            actorSystem = this.actorSystemLoader.get();
        }
        catch (FlinkException fe) {
            throw new JobExecutionException(jobID, "Could not start the ActorSystem needed to talk to the JobManager.", (Throwable)fe);
        }
        return JobClient.attachToRunningJob((JobID)jobID, (Configuration)this.flinkConfig, (ActorSystem)actorSystem, (HighAvailabilityServices)this.highAvailabilityServices, (FiniteDuration)this.timeout, (boolean)this.printStatusDuringExecution);
    }

    public CompletableFuture<JobStatus> getJobStatus(JobID jobId) {
        ActorGateway jobManager;
        try {
            jobManager = this.getJobManagerGateway();
        }
        catch (FlinkException e) {
            throw new RuntimeException("Could not retrieve JobManage gateway.", e);
        }
        Future response = jobManager.ask(JobManagerMessages.getRequestJobStatus((JobID)jobId), this.timeout);
        CompletableFuture javaFuture = FutureUtils.toJava((Future)response);
        return javaFuture.thenApply(responseMessage -> {
            if (responseMessage instanceof JobManagerMessages.CurrentJobStatus) {
                return ((JobManagerMessages.CurrentJobStatus)responseMessage).status();
            }
            if (responseMessage instanceof JobManagerMessages.JobNotFound) {
                throw new CompletionException(new IllegalStateException("Could not find job with JobId " + jobId));
            }
            throw new CompletionException(new IllegalStateException("Unknown JobManager response of type " + responseMessage.getClass()));
        });
    }

    public void cancel(JobID jobId) throws Exception {
        JobManagerMessages.CancelJob cancelMsg;
        ActorGateway jobManager = this.getJobManagerGateway();
        Future response = jobManager.ask((Object)(cancelMsg = new JobManagerMessages.CancelJob(jobId)), this.timeout);
        Object rc = Await.result((Awaitable)response, (Duration)this.timeout);
        if (!(rc instanceof JobManagerMessages.CancellationSuccess)) {
            if (rc instanceof JobManagerMessages.CancellationFailure) {
                throw new Exception("Canceling the job with ID " + jobId + " failed.", ((JobManagerMessages.CancellationFailure)rc).cause());
            }
            throw new IllegalStateException("Unexpected response: " + rc);
        }
    }

    public String cancelWithSavepoint(JobID jobId, @Nullable String savepointDirectory) throws Exception {
        JobManagerMessages.CancelJobWithSavepoint cancelMsg;
        ActorGateway jobManager = this.getJobManagerGateway();
        Future response = jobManager.ask((Object)(cancelMsg = new JobManagerMessages.CancelJobWithSavepoint(jobId, savepointDirectory)), this.timeout);
        Object rc = Await.result((Awaitable)response, (Duration)this.timeout);
        if (rc instanceof JobManagerMessages.CancellationSuccess) {
            JobManagerMessages.CancellationSuccess success = (JobManagerMessages.CancellationSuccess)rc;
            return success.savepointPath();
        }
        if (rc instanceof JobManagerMessages.CancellationFailure) {
            throw new Exception("Cancel & savepoint for the job with ID " + jobId + " failed.", ((JobManagerMessages.CancellationFailure)rc).cause());
        }
        throw new IllegalStateException("Unexpected response: " + rc);
    }

    public void stop(JobID jobId) throws Exception {
        ActorGateway jobManager = this.getJobManagerGateway();
        Future response = jobManager.ask((Object)new JobManagerMessages.StopJob(jobId), this.timeout);
        Object rc = Await.result((Awaitable)response, (Duration)this.timeout);
        if (!(rc instanceof JobManagerMessages.StoppingSuccess)) {
            if (rc instanceof JobManagerMessages.StoppingFailure) {
                throw new Exception("Stopping the job with ID " + jobId + " failed.", ((JobManagerMessages.StoppingFailure)rc).cause());
            }
            throw new IllegalStateException("Unexpected response: " + rc);
        }
    }

    public CompletableFuture<String> triggerSavepoint(JobID jobId, @Nullable String savepointDirectory) throws FlinkException {
        ActorGateway jobManager = this.getJobManagerGateway();
        Future response = jobManager.ask((Object)new JobManagerMessages.TriggerSavepoint(jobId, Option.apply((Object)savepointDirectory)), new FiniteDuration(1L, TimeUnit.HOURS));
        CompletableFuture responseFuture = FutureUtils.toJava((Future)response);
        return responseFuture.thenApply(responseMessage -> {
            if (responseMessage instanceof JobManagerMessages.TriggerSavepointSuccess) {
                JobManagerMessages.TriggerSavepointSuccess success = (JobManagerMessages.TriggerSavepointSuccess)responseMessage;
                return success.savepointPath();
            }
            if (responseMessage instanceof JobManagerMessages.TriggerSavepointFailure) {
                JobManagerMessages.TriggerSavepointFailure failure = (JobManagerMessages.TriggerSavepointFailure)responseMessage;
                throw new CompletionException(failure.cause());
            }
            throw new CompletionException(new IllegalStateException("Unknown JobManager response of type " + responseMessage.getClass()));
        });
    }

    public CompletableFuture<Acknowledge> disposeSavepoint(String savepointPath) throws FlinkException {
        ActorGateway jobManager = this.getJobManagerGateway();
        JobManagerMessages.DisposeSavepoint msg = new JobManagerMessages.DisposeSavepoint(savepointPath);
        CompletableFuture responseFuture = FutureUtils.toJava((Future)jobManager.ask((Object)msg, this.timeout));
        return responseFuture.thenApply(response -> {
            if (response instanceof JobManagerMessages.DisposeSavepointSuccess$) {
                return Acknowledge.get();
            }
            if (response instanceof JobManagerMessages.DisposeSavepointFailure) {
                JobManagerMessages.DisposeSavepointFailure failureResponse = (JobManagerMessages.DisposeSavepointFailure)response;
                if (failureResponse.cause() instanceof ClassNotFoundException) {
                    throw new CompletionException(new ClassNotFoundException("Savepoint disposal failed, because of a missing class. This is most likely caused by a custom state instance, which cannot be disposed without the user code class loader. Please provide the program jar with which you have created the savepoint via -j <JAR> for disposal.", failureResponse.cause().getCause()));
                }
                throw new CompletionException(failureResponse.cause());
            }
            throw new CompletionException((Throwable)new FlinkRuntimeException("Unknown response type " + response.getClass().getSimpleName() + '.'));
        });
    }

    public CompletableFuture<Collection<JobStatusMessage>> listJobs() throws Exception {
        ActorGateway jobManager = this.getJobManagerGateway();
        Future response = jobManager.ask((Object)new RequestJobDetails(true, false), this.timeout);
        CompletableFuture responseFuture = FutureUtils.toJava((Future)response);
        return responseFuture.thenApply(responseMessage -> {
            if (responseMessage instanceof MultipleJobsDetails) {
                MultipleJobsDetails details = (MultipleJobsDetails)responseMessage;
                Collection jobDetails = details.getJobs();
                ArrayList flattenedDetails = new ArrayList(jobDetails.size());
                jobDetails.forEach(detail -> flattenedDetails.add(new JobStatusMessage(detail.getJobId(), detail.getJobName(), detail.getStatus(), detail.getStartTime())));
                return flattenedDetails;
            }
            throw new CompletionException(new IllegalStateException("Unknown JobManager response of type " + responseMessage.getClass()));
        });
    }

    public Map<String, OptionalFailure<Object>> getAccumulators(JobID jobID) throws Exception {
        return this.getAccumulators(jobID, ClassLoader.getSystemClassLoader());
    }

    public Map<String, OptionalFailure<Object>> getAccumulators(JobID jobID, ClassLoader loader) throws Exception {
        Future response;
        ActorGateway jobManagerGateway = this.getJobManagerGateway();
        try {
            response = jobManagerGateway.ask((Object)new RequestAccumulatorResults(jobID), this.timeout);
        }
        catch (Exception e) {
            throw new Exception("Failed to query the job manager gateway for accumulators.", e);
        }
        Object result = Await.result((Awaitable)response, (Duration)this.timeout);
        if (result instanceof AccumulatorResultsFound) {
            Map serializedAccumulators = ((AccumulatorResultsFound)result).result();
            return AccumulatorHelper.deserializeAccumulators((Map)serializedAccumulators, (ClassLoader)loader);
        }
        if (result instanceof AccumulatorResultsErroneous) {
            throw ((AccumulatorResultsErroneous)result).cause();
        }
        throw new Exception("Failed to fetch accumulators for the job " + jobID + ".");
    }

    public void endSession(JobID jobId) throws Exception {
        if (jobId == null) {
            throw new IllegalArgumentException("The JobID must not be null.");
        }
        this.endSessions(Collections.singletonList(jobId));
    }

    public void endSessions(List<JobID> jobIds) throws Exception {
        if (jobIds == null) {
            throw new IllegalArgumentException("The JobIDs must not be null");
        }
        ActorGateway jobManagerGateway = this.getJobManagerGateway();
        for (JobID jid : jobIds) {
            if (jid == null) continue;
            this.log.info("Telling job manager to end the session {}.", (Object)jid);
            jobManagerGateway.tell((Object)new JobManagerMessages.RemoveCachedJob(jid));
        }
    }

    private static OptimizedPlan getOptimizedPlan(Optimizer compiler, JobWithJars prog, int parallelism) throws CompilerException, ProgramInvocationException {
        return ClusterClient.getOptimizedPlan(compiler, prog.getPlan(), parallelism);
    }

    public static JobGraph getJobGraph(Configuration flinkConfig, PackagedProgram prog, FlinkPlan optPlan, SavepointRestoreSettings savepointSettings) throws ProgramInvocationException {
        return ClusterClient.getJobGraph(flinkConfig, optPlan, prog.getAllLibraries(), prog.getClasspaths(), prog.getLibjars(), prog.getFiles(), savepointSettings);
    }

    public static JobGraph getJobGraph(Configuration flinkConfig, FlinkPlan optPlan, List<URL> jarFiles, List<URL> classpaths, SavepointRestoreSettings savepointSettings) {
        return ClusterClient.getJobGraph(flinkConfig, optPlan, jarFiles, classpaths, Collections.emptyList(), Collections.emptyList(), savepointSettings);
    }

    public static JobGraph getJobGraph(Configuration flinkConfig, FlinkPlan optPlan, List<URL> jarFiles, List<URL> classpaths, List<URI> libjars, List<URI> files, SavepointRestoreSettings savepointSettings) {
        JobGraph job;
        if (optPlan instanceof StreamingPlan) {
            job = ((StreamingPlan)optPlan).getJobGraph(flinkConfig);
            job.setSavepointRestoreSettings(savepointSettings);
        } else {
            JobGraphGenerator gen = new JobGraphGenerator(flinkConfig);
            job = gen.compileJobGraph((OptimizedPlan)optPlan);
        }
        for (URL jar : jarFiles) {
            try {
                job.addJar(new Path(jar.toURI()));
            }
            catch (URISyntaxException e) {
                throw new RuntimeException("URL is invalid. This should not happen.", e);
            }
        }
        for (URI libjar : libjars) {
            job.addJar(new Path(libjar));
        }
        for (URI file : files) {
            String fileKey = file.getFragment() != null ? file.getFragment() : new Path(file).getName();
            job.addUserArtifact(fileKey, new DistributedCache.DistributedCacheEntry(StringUtils.substringBeforeLast((String)file.toString(), (String)"#"), false, false));
        }
        job.setClasspaths(classpaths);
        return job;
    }

    public ActorGateway getJobManagerGateway() throws FlinkException {
        this.log.debug("Looking up JobManager");
        try {
            return LeaderRetrievalUtils.retrieveLeaderGateway((LeaderRetrievalService)this.highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID), (ActorSystem)this.actorSystemLoader.get(), (FiniteDuration)this.lookupTimeout);
        }
        catch (LeaderRetrievalException lre) {
            throw new FlinkException("Could not connect to the leading JobManager. Please check that the JobManager is running.", (Throwable)lre);
        }
    }

    protected void logAndSysout(String message) {
        this.log.info(message);
        if (this.printStatusDuringExecution) {
            System.out.println(message);
        }
    }

    public abstract void waitForClusterToBeReady();

    public abstract String getWebInterfaceURL();

    public abstract GetClusterStatusResponse getClusterStatus();

    public abstract List<String> getNewMessages();

    public abstract T getClusterId();

    public void setDetached(boolean isDetached) {
        this.detachedJobSubmission = isDetached;
    }

    public boolean isDetached() {
        return this.detachedJobSubmission;
    }

    public Configuration getFlinkConfiguration() {
        return this.flinkConfig.clone();
    }

    public abstract int getMaxSlots();

    public abstract boolean hasUserJarsInClassPath(List<URL> var1);

    public abstract JobSubmissionResult submitJob(JobGraph var1, ClassLoader var2, boolean var3) throws ProgramInvocationException;

    public JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException {
        return this.submitJob(jobGraph, classLoader, false);
    }

    public CompletableFuture<Acknowledge> rescaleJob(JobID jobId, int newParallelism) {
        throw new UnsupportedOperationException("The " + this.getClass().getSimpleName() + " does not support rescaling.");
    }

    public void shutDownCluster() {
        throw new UnsupportedOperationException("The " + this.getClass().getSimpleName() + " does not support shutDownCluster.");
    }

    protected static class LazyActorSystemLoader {
        private final Logger log;
        private final HighAvailabilityServices highAvailabilityServices;
        private final Time timeout;
        private final Configuration configuration;
        private ActorSystem actorSystem;

        private LazyActorSystemLoader(HighAvailabilityServices highAvailabilityServices, Time timeout, Configuration configuration, Logger log) {
            this.highAvailabilityServices = (HighAvailabilityServices)Preconditions.checkNotNull((Object)highAvailabilityServices);
            this.timeout = (Time)Preconditions.checkNotNull((Object)timeout);
            this.configuration = (Configuration)Preconditions.checkNotNull((Object)configuration);
            this.log = (Logger)Preconditions.checkNotNull((Object)log);
        }

        public boolean isLoaded() {
            return this.actorSystem != null;
        }

        public void shutdown() {
            if (this.isLoaded()) {
                this.actorSystem.shutdown();
                this.actorSystem.awaitTermination();
                this.actorSystem = null;
            }
        }

        public ActorSystem get() throws FlinkException {
            if (!this.isLoaded()) {
                InetAddress ownHostname;
                this.log.info("Starting client actor system.");
                try {
                    ownHostname = LeaderRetrievalUtils.findConnectingAddress((LeaderRetrievalService)this.highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID), (Time)this.timeout);
                }
                catch (LeaderRetrievalException lre) {
                    throw new FlinkException("Could not find out our own hostname by connecting to the leading JobManager. Please make sure that the Flink cluster has been started.", (Throwable)lre);
                }
                try {
                    this.actorSystem = BootstrapTools.startActorSystem((Configuration)this.configuration, (String)ownHostname.getCanonicalHostName(), (int)0, (Logger)this.log);
                }
                catch (Exception e) {
                    throw new FlinkException("Could not start the ActorSystem lazily.", (Throwable)e);
                }
            }
            return this.actorSystem;
        }
    }
}

