/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.environment;

import java.io.File;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.flink.annotation.Public;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.JobListener;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.JobWithJars;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.client.program.StandaloneClusterClient;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.optimizer.plan.FlinkPlan;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Public
public class RemoteStreamEnvironment
extends StreamExecutionEnvironment {
    private static final Logger LOG = LoggerFactory.getLogger(RemoteStreamEnvironment.class);
    private final String host;
    private final int port;
    private final Configuration clientConfiguration;
    protected final List<URL> jarFiles;
    private final List<URL> globalClasspaths;

    public RemoteStreamEnvironment(String host, int port, String ... jarFiles) {
        this(host, port, (Configuration)null, jarFiles);
    }

    public RemoteStreamEnvironment(String host, int port, Configuration clientConfiguration, String ... jarFiles) {
        this(host, port, clientConfiguration, jarFiles, (URL[])null);
    }

    public RemoteStreamEnvironment(String host, int port, Configuration clientConfiguration, String[] jarFiles, URL[] globalClasspaths) {
        super(clientConfiguration);
        if (!ExecutionEnvironment.areExplicitEnvironmentsAllowed()) {
            throw new InvalidProgramException("The RemoteEnvironment cannot be used when submitting a program through a client, or running in a TestEnvironment context.");
        }
        if (host == null) {
            throw new NullPointerException("Host must not be null.");
        }
        if (port < 1 || port >= 65535) {
            throw new IllegalArgumentException("Port out of range");
        }
        this.host = host;
        this.port = port;
        this.clientConfiguration = clientConfiguration == null ? new Configuration() : clientConfiguration;
        this.jarFiles = new ArrayList<URL>(jarFiles.length);
        for (String jarFile : jarFiles) {
            try {
                URL jarFileUrl = new File(jarFile).getAbsoluteFile().toURI().toURL();
                this.jarFiles.add(jarFileUrl);
                JobWithJars.checkJarFile((URL)jarFileUrl);
            }
            catch (MalformedURLException e) {
                throw new IllegalArgumentException("JAR file path is invalid '" + jarFile + "'", e);
            }
            catch (IOException e) {
                throw new RuntimeException("Problem with jar file " + jarFile, e);
            }
        }
        this.globalClasspaths = globalClasspaths == null ? Collections.emptyList() : Arrays.asList(globalClasspaths);
    }

    @Override
    protected JobSubmissionResult executeInternal(String jobName, boolean detached, SavepointRestoreSettings savepointRestoreSettings) throws ProgramInvocationException {
        StreamGraph streamGraph = this.getStreamGraph();
        streamGraph.setJobName(jobName);
        this.transformations.clear();
        return this.executeRemotely(streamGraph, this.jarFiles, detached, savepointRestoreSettings);
    }

    @Override
    public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
        this.transformations.clear();
        return (JobExecutionResult)this.executeRemotely(streamGraph, this.jarFiles, false, SavepointRestoreSettings.none());
    }

    protected JobSubmissionResult executeRemotely(StreamGraph streamGraph, List<URL> jarFiles, boolean detached, SavepointRestoreSettings savepointRestoreSettings) throws ProgramInvocationException {
        if (LOG.isInfoEnabled()) {
            LOG.info("Running remotely at {}:{}", (Object)this.host, (Object)this.port);
        }
        ClassLoader usercodeClassLoader = JobWithJars.buildUserCodeClassLoader(jarFiles, this.globalClasspaths, (ClassLoader)this.getClass().getClassLoader());
        ClusterClient<?> client = null;
        try {
            client = this.prepareClusterClient(detached);
            JobSubmissionResult submissionResult = client.run((FlinkPlan)streamGraph, jarFiles, this.globalClasspaths, usercodeClassLoader, savepointRestoreSettings, detached);
            if (detached) {
                JobSubmissionResult jobSubmissionResult = submissionResult;
                return jobSubmissionResult;
            }
            JobExecutionResult jobExecutionResult = submissionResult.getJobExecutionResult();
            return jobExecutionResult;
        }
        catch (ProgramInvocationException e) {
            throw e;
        }
        catch (Exception e) {
            String term = e.getMessage() == null ? "." : ": " + e.getMessage();
            throw new ProgramInvocationException("The program execution failed" + term, (Throwable)e);
        }
        finally {
            try {
                client.shutdown();
            }
            catch (Exception e) {
                LOG.warn("Could not properly shut down the cluster client.", (Throwable)e);
            }
        }
    }

    private ClusterClient<?> prepareClusterClient(boolean detached) throws Exception {
        Object client;
        Configuration configuration = new Configuration();
        configuration.addAll(this.clientConfiguration);
        configuration.setString(JobManagerOptions.ADDRESS, this.host);
        configuration.setInteger(JobManagerOptions.PORT, this.port);
        try {
            client = "legacy".equals(configuration.getString(CoreOptions.MODE)) ? new StandaloneClusterClient(configuration) : new RestClusterClient(configuration, (Object)"RemoteStreamEnvironment");
            client.setJobListeners(this.getJobListeners());
            client.setDetached(detached);
        }
        catch (Exception e) {
            throw new ProgramInvocationException("Cannot establish connection to JobManager: " + e.getMessage(), (Throwable)e);
        }
        client.setPrintStatusDuringExecution(this.getConfig().isSysoutLoggingEnabled());
        return client;
    }

    @Override
    public void stopJob(JobID jobID) throws Exception {
        ClusterClient<?> clusterClient = null;
        try {
            clusterClient = this.prepareClusterClient(true);
            clusterClient.stop(jobID);
        }
        catch (Exception e) {
            LOG.error("Stop Job Fails with JobID = " + jobID, (Throwable)e);
            throw e;
        }
        finally {
            if (clusterClient != null) {
                clusterClient.shutdown();
            }
        }
    }

    @Override
    public void cancel(String jobId) throws Exception {
        RestClusterClient client;
        if (LOG.isInfoEnabled()) {
            LOG.info("Running remotely at {}:{}", (Object)this.host, (Object)this.port);
        }
        ClassLoader usercodeClassLoader = JobWithJars.buildUserCodeClassLoader(this.jarFiles, this.globalClasspaths, (ClassLoader)this.getClass().getClassLoader());
        Configuration configuration = new Configuration();
        configuration.addAll(this.clientConfiguration);
        configuration.setString(JobManagerOptions.ADDRESS, this.host);
        configuration.setInteger(JobManagerOptions.PORT, this.port);
        configuration.setInteger(RestOptions.PORT, this.port);
        try {
            client = new RestClusterClient(configuration, (Object)"RemoteStreamEnvironment");
            client.setDetached(true);
        }
        catch (Exception e) {
            throw new ProgramInvocationException("Cannot establish connection to JobManager: " + e.getMessage(), (Throwable)e);
        }
        LOG.info("Cancel Job: " + jobId);
        client.cancel(JobID.fromHexString((String)jobId));
        for (JobListener jobListener : this.getJobListeners()) {
            jobListener.onJobCanceled(JobID.fromHexString((String)jobId), null);
        }
    }

    @Override
    public String cancelWithSavepoint(String jobId, String path) throws Exception {
        RestClusterClient client;
        if (LOG.isInfoEnabled()) {
            LOG.info("Running remotely at {}:{}", (Object)this.host, (Object)this.port);
        }
        ClassLoader usercodeClassLoader = JobWithJars.buildUserCodeClassLoader(this.jarFiles, this.globalClasspaths, (ClassLoader)this.getClass().getClassLoader());
        Configuration configuration = new Configuration();
        configuration.addAll(this.clientConfiguration);
        configuration.setString(JobManagerOptions.ADDRESS, this.host);
        configuration.setInteger(JobManagerOptions.PORT, this.port);
        configuration.setInteger(RestOptions.PORT, this.port);
        try {
            client = new RestClusterClient(configuration, (Object)"RemoteStreamEnvironment");
            client.setDetached(true);
        }
        catch (Exception e) {
            throw new ProgramInvocationException("Cannot establish connection to JobManager: " + e.getMessage(), (Throwable)e);
        }
        LOG.info("CancelWithSavePoint Job: " + jobId);
        String savepointPath = client.cancelWithSavepoint(JobID.fromHexString((String)jobId), path);
        for (JobListener jobListener : this.getJobListeners()) {
            jobListener.onJobCanceled(JobID.fromHexString((String)jobId), savepointPath);
        }
        return savepointPath;
    }

    @Override
    public String triggerSavepoint(String jobId, String path) throws Exception {
        RestClusterClient client;
        if (LOG.isInfoEnabled()) {
            LOG.info("Running remotely at {}:{}", (Object)this.host, (Object)this.port);
        }
        ClassLoader usercodeClassLoader = JobWithJars.buildUserCodeClassLoader(this.jarFiles, this.globalClasspaths, (ClassLoader)this.getClass().getClassLoader());
        Configuration configuration = new Configuration();
        configuration.addAll(this.clientConfiguration);
        configuration.setString(JobManagerOptions.ADDRESS, this.host);
        configuration.setInteger(JobManagerOptions.PORT, this.port);
        configuration.setInteger(RestOptions.PORT, this.port);
        try {
            client = new RestClusterClient(configuration, (Object)"RemoteStreamEnvironment");
            client.setDetached(true);
        }
        catch (Exception e) {
            throw new ProgramInvocationException("Cannot establish connection to JobManager: " + e.getMessage(), (Throwable)e);
        }
        LOG.info("Trigger Savepoint for Job: " + jobId);
        return (String)client.triggerSavepoint(JobID.fromHexString((String)jobId), path).get();
    }

    public String toString() {
        return "Remote Environment (" + this.host + ":" + this.port + " - parallelism = " + (this.getParallelism() == -1 ? "default" : Integer.valueOf(this.getParallelism())) + ")";
    }

    public String getHost() {
        return this.host;
    }

    public int getPort() {
        return this.port;
    }

    public Configuration getClientConfiguration() {
        return this.clientConfiguration;
    }
}

