package org.apache.flink.streaming.api.driver;

import java.io.File;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URL;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.io.FilenameUtils;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.DriverContextEnvironment;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.client.program.StandaloneClusterClient;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/streaming/api/driver/DriverSourceFunction.class */
public class DriverSourceFunction extends RichSourceFunction<Byte> {
    private final Logger logger = LoggerFactory.getLogger(DriverSourceFunction.class);
    private final String clusterIP;
    private final int clusterPort;
    private final Class<?> userMainClass;
    private DriverStreamEnvironment driverStreamEnvironment;
    private final List<URL> userJar;
    private final List<URL> libJarURLs;
    private final List<URL> externalFileURLs;
    private final List<URL> classPath;
    private final Configuration configuration;
    private final String driverName;
    private final String[] args;

    public DriverSourceFunction(String str, Class<?> cls, String[] strArr, List<URL> list, List<URL> list2, List<URL> list3, List<URL> list4, Configuration configuration) {
        this.clusterIP = configuration.getString(JobManagerOptions.ADDRESS);
        this.clusterPort = configuration.getInteger(JobManagerOptions.PORT);
        this.userJar = list;
        this.classPath = list2;
        this.driverName = str;
        this.userMainClass = cls;
        this.args = strArr;
        this.libJarURLs = list3;
        this.externalFileURLs = list4;
        this.configuration = configuration;
    }

    public void open(Configuration configuration) throws Exception {
        super.open(configuration);
        List<URI> blods = getBlods(this.userJar, DriverEntry.BLOB_TYPE_JARFILE);
        List list = (List) blods.stream().flatMap(uri -> {
            return Stream.of(uri.getPath());
        }).collect(Collectors.toList());
        List<URI> blods2 = getBlods(this.classPath, DriverEntry.BLOB_TYPE_CLASSPATH);
        List<URI> blods3 = getBlods(this.libJarURLs, DriverEntry.BLOB_TYPE_LIB_JARS);
        List<URI> blods4 = getBlods(this.externalFileURLs, DriverEntry.BLOB_TYPE_EXTERNAL_FILE);
        ArrayList arrayList = new ArrayList();
        arrayList.addAll(blods2);
        arrayList.addAll(blods3);
        arrayList.addAll(blods4);
        this.driverStreamEnvironment = new DriverStreamEnvironment(this.clusterIP, this.clusterPort, this.driverName, (String[]) list.toArray(new String[0]), (URL[]) transURItoURL(arrayList).toArray(new URL[0]), this.configuration);
        this.driverStreamEnvironment.setParallelism(this.configuration.getInteger("flink.driver.parallelism", this.configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM)));
        this.driverStreamEnvironment.setAsContext();
        DriverContextEnvironment driverContextEnvironment = new DriverContextEnvironment(prepareClusterClient(false), this.driverName, transURItoURL(blods), transURItoURL(blods2), blods3, blods4, getRuntimeContext().getUserCodeClassLoader(), getJobSavePointSettingsFromConfiguration());
        driverContextEnvironment.setParallelism(this.configuration.getInteger("flink.driver.parallelism", this.configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM)));
        driverContextEnvironment.setAsContext();
    }

    @Override // org.apache.flink.streaming.api.functions.source.SourceFunction
    public void run(SourceFunction.SourceContext<Byte> sourceContext) throws Exception {
        PackagedProgram.callMainMethod(this.userMainClass, this.args);
    }

    public void close() throws Exception {
        this.driverStreamEnvironment.resetContextEnvironments();
    }

    @Override // org.apache.flink.streaming.api.functions.source.SourceFunction
    public void cancel() {
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static List<URL> transURItoURL(List<URI> list) {
        return (List) list.stream().flatMap(uri -> {
            try {
                return Stream.of(uri.toURL());
            } catch (MalformedURLException e) {
                e.printStackTrace();
                return Stream.empty();
            }
        }).collect(Collectors.toList());
    }

    private List<URI> getBlods(List<URL> list, String str) {
        ArrayList arrayList = new ArrayList();
        DistributedCache distributedCache = getRuntimeContext().getDistributedCache();
        Iterator<URL> it = list.iterator();
        while (it.hasNext()) {
            File file = distributedCache.getFile(str + DriverEntry.BLOB_TYPE_SEPARATOR + FilenameUtils.getName(it.next().getPath()));
            if (file != null) {
                arrayList.add(file.toURI());
            }
        }
        return arrayList;
    }

    private ClusterClient<?> prepareClusterClient(boolean z) throws Exception {
        Configuration configuration = new Configuration();
        configuration.addAll(this.configuration);
        try {
            StandaloneClusterClient standaloneClusterClient = "legacy".equals(configuration.getString(CoreOptions.MODE)) ? new StandaloneClusterClient(configuration) : new RestClusterClient(configuration, "DriverContextEnvironment");
            this.logger.info(String.format("connection info: host: %s, port: %d", this.clusterIP, Integer.valueOf(this.clusterPort)));
            standaloneClusterClient.setDetached(z);
            standaloneClusterClient.setPrintStatusDuringExecution(getRuntimeContext().getExecutionConfig().isSysoutLoggingEnabled());
            return standaloneClusterClient;
        } catch (Exception e) {
            throw new ProgramInvocationException("Cannot establish connection to JobManager: " + e.getMessage(), e);
        }
    }

    private SavepointRestoreSettings getJobSavePointSettingsFromConfiguration() {
        String string = this.configuration.getString("flink.driver.savepointrestoresettings.path", (String) null);
        boolean z = this.configuration.getBoolean("flink.driver.savepointrestoresettings.allowNonRestoredState", false);
        return string == null ? SavepointRestoreSettings.none() : this.configuration.getBoolean("flink.driver.savepointrestoresettings.resumeFromLatestCheckpoint", false) ? SavepointRestoreSettings.forResumePath(string, z) : SavepointRestoreSettings.forPath(string, z);
    }
}
