/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.driver;

import java.io.File;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.io.FilenameUtils;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.DriverContextEnvironment;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.client.program.StandaloneClusterClient;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.streaming.api.driver.DriverStreamEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DriverSourceFunction
extends RichSourceFunction<Byte> {
    private final Logger logger = LoggerFactory.getLogger(DriverSourceFunction.class);
    private final String clusterIP;
    private final int clusterPort;
    private final Class<?> userMainClass;
    private DriverStreamEnvironment driverStreamEnvironment;
    private final List<URL> userJar;
    private final List<URL> libJarURLs;
    private final List<URL> externalFileURLs;
    private final List<URL> classPath;
    private final Configuration configuration;
    private final String driverName;
    private final String[] args;

    public DriverSourceFunction(String driverName, Class<?> userMainClass, String[] args, List<URL> userJar, List<URL> classPath, List<URL> libjars, List<URL> externalFiles, Configuration configuration) {
        this.clusterIP = configuration.getString(JobManagerOptions.ADDRESS);
        this.clusterPort = configuration.getInteger(JobManagerOptions.PORT);
        this.userJar = userJar;
        this.classPath = classPath;
        this.driverName = driverName;
        this.userMainClass = userMainClass;
        this.args = args;
        this.libJarURLs = libjars;
        this.externalFileURLs = externalFiles;
        this.configuration = configuration;
    }

    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        List<URI> jarFiles = this.getBlods(this.userJar, "jarFile");
        List<String> jarFileStrings = jarFiles.stream().flatMap(uri -> Stream.of(uri.getPath())).collect(Collectors.toList());
        List<URI> globalClassPath = this.getBlods(this.classPath, "classPath");
        List<URI> libJars = this.getBlods(this.libJarURLs, "libJars");
        List<URI> externalFiles = this.getBlods(this.externalFileURLs, "externalFile");
        ArrayList<URI> addUpGlobalClassPath = new ArrayList<URI>();
        addUpGlobalClassPath.addAll(globalClassPath);
        addUpGlobalClassPath.addAll(libJars);
        addUpGlobalClassPath.addAll(externalFiles);
        URL[] addUpGlobalClassPathURL = DriverSourceFunction.transURItoURL(addUpGlobalClassPath).toArray(new URL[0]);
        this.driverStreamEnvironment = new DriverStreamEnvironment(this.clusterIP, this.clusterPort, this.driverName, jarFileStrings.toArray(new String[0]), addUpGlobalClassPathURL, this.configuration);
        this.driverStreamEnvironment.setParallelism(this.configuration.getInteger("flink.driver.parallelism", this.configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM)));
        this.driverStreamEnvironment.setAsContext();
        ClusterClient<?> client = this.prepareClusterClient(false);
        DriverContextEnvironment driverContextEnvironment = new DriverContextEnvironment(client, this.driverName, DriverSourceFunction.transURItoURL(jarFiles), DriverSourceFunction.transURItoURL(globalClassPath), libJars, externalFiles, this.getRuntimeContext().getUserCodeClassLoader(), this.getJobSavePointSettingsFromConfiguration());
        driverContextEnvironment.setParallelism(this.configuration.getInteger("flink.driver.parallelism", this.configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM)));
        driverContextEnvironment.setAsContext();
    }

    @Override
    public void run(SourceFunction.SourceContext<Byte> ctx) throws Exception {
        PackagedProgram.callMainMethod(this.userMainClass, (String[])this.args);
    }

    public void close() throws Exception {
        this.driverStreamEnvironment.resetContextEnvironments();
    }

    @Override
    public void cancel() {
    }

    protected static List<URL> transURItoURL(List<URI> uris) {
        return uris.stream().flatMap(uri -> {
            try {
                return Stream.of(uri.toURL());
            }
            catch (MalformedURLException e) {
                e.printStackTrace();
                return Stream.empty();
            }
        }).collect(Collectors.toList());
    }

    private List<URI> getBlods(List<URL> urls, String blobType) {
        ArrayList<URI> result = new ArrayList<URI>();
        DistributedCache distributedCache = this.getRuntimeContext().getDistributedCache();
        for (URL url : urls) {
            String fileName = FilenameUtils.getName((String)url.getPath());
            File file = distributedCache.getFile(blobType + "#" + fileName);
            if (file == null) continue;
            result.add(file.toURI());
        }
        return result;
    }

    private ClusterClient<?> prepareClusterClient(boolean detached) throws Exception {
        Object client;
        Configuration configuration = new Configuration();
        configuration.addAll(this.configuration);
        try {
            client = "legacy".equals(configuration.getString(CoreOptions.MODE)) ? new StandaloneClusterClient(configuration) : new RestClusterClient(configuration, (Object)"DriverContextEnvironment");
            this.logger.info(String.format("connection info: host: %s, port: %d", this.clusterIP, this.clusterPort));
            client.setDetached(detached);
        }
        catch (Exception e) {
            throw new ProgramInvocationException("Cannot establish connection to JobManager: " + e.getMessage(), (Throwable)e);
        }
        client.setPrintStatusDuringExecution(this.getRuntimeContext().getExecutionConfig().isSysoutLoggingEnabled());
        return client;
    }

    private SavepointRestoreSettings getJobSavePointSettingsFromConfiguration() {
        String savepointRestorePath = this.configuration.getString("flink.driver.savepointrestoresettings.path", null);
        boolean allowNonRestoredState = this.configuration.getBoolean("flink.driver.savepointrestoresettings.allowNonRestoredState", false);
        boolean resumeFromLatestCheckpoint = this.configuration.getBoolean("flink.driver.savepointrestoresettings.resumeFromLatestCheckpoint", false);
        if (savepointRestorePath == null) {
            return SavepointRestoreSettings.none();
        }
        if (resumeFromLatestCheckpoint) {
            return SavepointRestoreSettings.forResumePath((String)savepointRestorePath, (boolean)allowNonRestoredState);
        }
        return SavepointRestoreSettings.forPath((String)savepointRestorePath, (boolean)allowNonRestoredState);
    }
}

