package org.apache.flink.api.java;

import java.io.File;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.flink.annotation.Public;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.PlanExecutor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.ShutdownHookUtil;

@Public
/* loaded from: input_file:org/apache/flink/api/java/RemoteEnvironment.class */
public class RemoteEnvironment extends ExecutionEnvironment {
    protected final String host;
    protected final int port;
    protected final List<URL> jarFiles;
    protected Configuration clientConfiguration;
    protected PlanExecutor executor;
    private Thread shutdownHook;
    protected final List<URL> globalClasspaths;

    public RemoteEnvironment(String str, int i, String... strArr) {
        this(str, i, null, strArr, null);
    }

    public RemoteEnvironment(String str, int i, Configuration configuration, String[] strArr) {
        this(str, i, configuration, strArr, null);
    }

    public RemoteEnvironment(String str, int i, Configuration configuration, String[] strArr, URL[] urlArr) {
        if (!ExecutionEnvironment.areExplicitEnvironmentsAllowed()) {
            throw new InvalidProgramException("The RemoteEnvironment cannot be instantiated when running in a pre-defined context (such as Command Line Client, Scala Shell, or TestEnvironment)");
        }
        if (str == null) {
            throw new NullPointerException("Host must not be null.");
        }
        if (i < 1 || i >= 65535) {
            throw new IllegalArgumentException("Port out of range");
        }
        this.host = str;
        this.port = i;
        this.clientConfiguration = configuration == null ? new Configuration() : configuration;
        if (strArr != null) {
            this.jarFiles = new ArrayList(strArr.length);
            for (String str2 : strArr) {
                try {
                    this.jarFiles.add(new File(str2).getAbsoluteFile().toURI().toURL());
                } catch (MalformedURLException e) {
                    throw new IllegalArgumentException("JAR file path invalid", e);
                }
            }
        } else {
            this.jarFiles = Collections.emptyList();
        }
        if (urlArr == null) {
            this.globalClasspaths = Collections.emptyList();
        } else {
            this.globalClasspaths = Arrays.asList(urlArr);
        }
    }

    @Override // org.apache.flink.api.java.ExecutionEnvironment
    public JobSubmissionResult executeInternal(String str, boolean z) throws Exception {
        PlanExecutor executor = getExecutor();
        Plan createProgramPlan = createProgramPlan(str);
        createProgramPlan.setJobId(this.jobID);
        createProgramPlan.setSessionTimeout(this.sessionTimeout);
        JobExecutionResult executePlan = executor.executePlan(createProgramPlan, z);
        if (executePlan != null && executePlan.isJobExecutionResult()) {
            this.lastJobExecutionResult = executePlan;
        }
        return executePlan;
    }

    @Override // org.apache.flink.api.java.ExecutionEnvironment
    public void cancel(JobID jobID) throws Exception {
        PlanExecutor executor = getExecutor();
        if (executor != null) {
            executor.cancelPlan(jobID);
        }
    }

    @Override // org.apache.flink.api.java.ExecutionEnvironment
    public void stop() throws Exception {
        PlanExecutor executor = getExecutor();
        if (executor != null) {
            executor.stop();
        }
    }

    @Override // org.apache.flink.api.java.ExecutionEnvironment
    public String getExecutionPlan() throws Exception {
        Plan createProgramPlan = createProgramPlan("plan", false);
        if (this.executor != null) {
            return this.executor.getOptimizerPlanAsJSON(createProgramPlan);
        }
        PlanExecutor createLocalExecutor = PlanExecutor.createLocalExecutor((Configuration) null);
        String optimizerPlanAsJSON = createLocalExecutor.getOptimizerPlanAsJSON(createProgramPlan);
        createLocalExecutor.stop();
        return optimizerPlanAsJSON;
    }

    @Override // org.apache.flink.api.java.ExecutionEnvironment
    @PublicEvolving
    public void startNewSession() throws Exception {
        dispose();
        this.jobID = JobID.generate();
        installShutdownHook();
    }

    protected PlanExecutor getExecutor() throws Exception {
        if (this.executor == null) {
            this.executor = PlanExecutor.createRemoteExecutor(this.host, this.port, this.clientConfiguration, this.jarFiles, this.globalClasspaths);
            this.executor.setPrintStatusDuringExecution(getConfig().isSysoutLoggingEnabled());
            this.executor.setJobListeners(getJobListeners());
        }
        if (getSessionTimeout() > 0 && !this.executor.isRunning()) {
            this.executor.start();
            installShutdownHook();
        }
        return this.executor;
    }

    protected void dispose() {
        ShutdownHookUtil.removeShutdownHook(this.shutdownHook, getClass().getSimpleName(), LOG);
        try {
            PlanExecutor planExecutor = this.executor;
            if (planExecutor != null) {
                planExecutor.endSession(this.jobID);
                planExecutor.stop();
            }
        } catch (Exception e) {
            throw new RuntimeException("Failed to dispose the session shutdown hook.");
        }
    }

    public String toString() {
        return "Remote Environment (" + this.host + ":" + this.port + " - parallelism = " + (getParallelism() == -1 ? "default" : Integer.valueOf(getParallelism())) + ") : " + getIdString();
    }

    protected void installShutdownHook() {
        if (this.shutdownHook == null) {
            this.shutdownHook = ShutdownHookUtil.addShutdownHook(this::dispose, getClass().getSimpleName(), LOG);
        }
    }
}
