/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.driver;

import java.io.File;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.commons.io.FilenameUtils;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.streaming.api.driver.DriverStreamEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

public class DriverSourceFunction
extends RichSourceFunction<Byte> {
    private final String clusterIP;
    private final int clusterPort;
    private final Class<?> userMainClass;
    private DriverStreamEnvironment driverStreamEnvironment;
    private final List<URL> libJarURLs;
    private final List<URL> externalFileURLs;
    private final Configuration configuration;
    private final String driverName;
    private final String[] args;

    public DriverSourceFunction(String clusterIP, int clusterPort, String driverName, Class<?> userMainClass, String[] args, List<URL> libjars, List<URL> externalFiles, Configuration configuration) {
        this.clusterIP = clusterIP;
        this.clusterPort = clusterPort;
        this.driverName = driverName;
        this.userMainClass = userMainClass;
        this.args = args;
        this.libJarURLs = libjars;
        this.externalFileURLs = externalFiles;
        this.configuration = configuration;
    }

    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        DistributedCache distributedCache = this.getRuntimeContext().getDistributedCache();
        ArrayList<File> jarFiles = new ArrayList<File>(this.libJarURLs.size());
        for (URL uRL : this.libJarURLs) {
            String fileName = FilenameUtils.getName((String)uRL.getPath());
            File file2 = distributedCache.getFile(fileName);
            if (file2 == null) continue;
            jarFiles.add(file2);
        }
        ArrayList<URL> cachedExternalFileURLs = new ArrayList<URL>(this.externalFileURLs.size());
        for (URL externalFileURL : this.externalFileURLs) {
            String fileName = FilenameUtils.getName((String)externalFileURL.getPath());
            File file3 = distributedCache.getFile(fileName);
            if (file3 == null) continue;
            cachedExternalFileURLs.add(file3.toURI().toURL());
        }
        List<String> list = jarFiles.stream().map(file -> file.getPath()).collect(Collectors.toList());
        this.driverStreamEnvironment = new DriverStreamEnvironment(this.clusterIP, this.clusterPort, this.driverName, list.toArray(new String[0]), cachedExternalFileURLs.toArray(new URL[0]), this.configuration);
        this.driverStreamEnvironment.setParallelism(this.configuration.getInteger("flink.driver.parallelism", this.configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM)));
        this.driverStreamEnvironment.setAsContext();
    }

    @Override
    public void run(SourceFunction.SourceContext<Byte> ctx) throws Exception {
        PackagedProgram.callMainMethod(this.userMainClass, (String[])this.args);
    }

    public void close() throws Exception {
        this.driverStreamEnvironment.resetContextEnvironments();
    }

    @Override
    public void cancel() {
    }
}

