package org.apache.flink.streaming.api.driver;

import java.io.File;
import java.net.URL;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.commons.io.FilenameUtils;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

/* loaded from: input_file:org/apache/flink/streaming/api/driver/DriverSourceFunction.class */
public class DriverSourceFunction extends RichSourceFunction<Byte> {
    private final String clusterIP;
    private final int clusterPort;
    private final Class<?> userMainClass;
    private DriverStreamEnvironment driverStreamEnvironment;
    private final List<URL> libJarURLs;
    private final List<URL> externalFileURLs;
    private final Configuration configuration;
    private final String driverName;
    private final String[] args;

    public DriverSourceFunction(String str, int i, String str2, Class<?> cls, String[] strArr, List<URL> list, List<URL> list2, Configuration configuration) {
        this.clusterIP = str;
        this.clusterPort = i;
        this.driverName = str2;
        this.userMainClass = cls;
        this.args = strArr;
        this.libJarURLs = list;
        this.externalFileURLs = list2;
        this.configuration = configuration;
    }

    public void open(Configuration configuration) throws Exception {
        super.open(configuration);
        DistributedCache distributedCache = getRuntimeContext().getDistributedCache();
        ArrayList arrayList = new ArrayList(this.libJarURLs.size());
        Iterator<URL> it = this.libJarURLs.iterator();
        while (it.hasNext()) {
            File file = distributedCache.getFile(FilenameUtils.getName(it.next().getPath()));
            if (file != null) {
                arrayList.add(file);
            }
        }
        ArrayList arrayList2 = new ArrayList(this.externalFileURLs.size());
        Iterator<URL> it2 = this.externalFileURLs.iterator();
        while (it2.hasNext()) {
            File file2 = distributedCache.getFile(FilenameUtils.getName(it2.next().getPath()));
            if (file2 != null) {
                arrayList2.add(file2.toURI().toURL());
            }
        }
        this.driverStreamEnvironment = new DriverStreamEnvironment(this.clusterIP, this.clusterPort, this.driverName, (String[]) ((List) arrayList.stream().map(file3 -> {
            return file3.getPath();
        }).collect(Collectors.toList())).toArray(new String[0]), (URL[]) arrayList2.toArray(new URL[0]), this.configuration);
        this.driverStreamEnvironment.setParallelism(this.configuration.getInteger("flink.driver.parallelism", this.configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM)));
        this.driverStreamEnvironment.setAsContext();
    }

    @Override // org.apache.flink.streaming.api.functions.source.SourceFunction
    public void run(SourceFunction.SourceContext<Byte> sourceContext) throws Exception {
        PackagedProgram.callMainMethod(this.userMainClass, this.args);
    }

    public void close() throws Exception {
        this.driverStreamEnvironment.resetContextEnvironments();
    }

    @Override // org.apache.flink.streaming.api.functions.source.SourceFunction
    public void cancel() {
    }
}
