/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.python.api;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Arrays;
import java.util.UUID;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.common.operators.Keys;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.PrintingOutputFormat;
import org.apache.flink.api.java.io.TupleCsvInputFormat;
import org.apache.flink.api.java.operators.CoGroupRawOperator;
import org.apache.flink.api.java.operators.CrossOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.DistinctOperator;
import org.apache.flink.api.java.operators.GroupReduceOperator;
import org.apache.flink.api.java.operators.JoinOperator;
import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.api.java.operators.MapPartitionOperator;
import org.apache.flink.api.java.operators.Operator;
import org.apache.flink.api.java.operators.PartitionOperator;
import org.apache.flink.api.java.operators.SortedGrouping;
import org.apache.flink.api.java.operators.UdfOperator;
import org.apache.flink.api.java.operators.UnionOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.python.api.PythonOperationInfo;
import org.apache.flink.python.api.PythonOptions;
import org.apache.flink.python.api.functions.PythonCoGroup;
import org.apache.flink.python.api.functions.PythonMapPartition;
import org.apache.flink.python.api.functions.util.IdentityGroupReduce;
import org.apache.flink.python.api.functions.util.KeyDiscarder;
import org.apache.flink.python.api.functions.util.NestedKeyDiscarder;
import org.apache.flink.python.api.functions.util.SerializerMap;
import org.apache.flink.python.api.functions.util.StringDeserializerMap;
import org.apache.flink.python.api.functions.util.StringTupleDeserializerMap;
import org.apache.flink.python.api.streaming.plan.PythonPlanStreamer;
import org.apache.flink.python.api.util.SetCache;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PythonPlanBinder {
    static final Logger LOG = LoggerFactory.getLogger(PythonPlanBinder.class);
    public static final String FLINK_PYTHON_DC_ID = "flink";
    public static final String FLINK_PYTHON_PLAN_NAME = File.separator + "plan.py";
    public static final String PLANBINDER_CONFIG_BCVAR_COUNT = "PLANBINDER_BCVAR_COUNT";
    public static final String PLANBINDER_CONFIG_BCVAR_NAME_PREFIX = "PLANBINDER_BCVAR_";
    public static final String PLAN_ARGUMENTS_KEY = "python.plan.arguments";
    private final Configuration operatorConfig;
    private final String tmpPlanFilesDir;
    private Path tmpDistributedDir;
    private final SetCache sets = new SetCache();
    private int currentEnvironmentID = 0;
    private PythonPlanStreamer streamer;

    public static void main(String[] args) throws Exception {
        Configuration globalConfig = GlobalConfiguration.loadConfiguration();
        PythonPlanBinder binder = new PythonPlanBinder(globalConfig);
        try {
            binder.runPlan(args);
        }
        catch (Exception e2) {
            System.out.println("Failed to run plan: " + e2.getMessage());
            LOG.error("Failed to run plan.", (Throwable)e2);
        }
    }

    public PythonPlanBinder(Configuration globalConfig) {
        String configuredPlanTmpPath = globalConfig.getString(PythonOptions.PLAN_TMP_DIR);
        this.tmpPlanFilesDir = configuredPlanTmpPath != null ? configuredPlanTmpPath : System.getProperty("java.io.tmpdir") + File.separator + "flink_plan_" + UUID.randomUUID();
        this.tmpDistributedDir = new Path(globalConfig.getString(PythonOptions.DC_TMP_DIR));
        this.operatorConfig = new Configuration();
        this.operatorConfig.setString(PythonOptions.PYTHON_BINARY_PATH, globalConfig.getString(PythonOptions.PYTHON_BINARY_PATH));
        String configuredTmpDataDir = globalConfig.getString(PythonOptions.DATA_TMP_DIR);
        if (configuredTmpDataDir != null) {
            this.operatorConfig.setString(PythonOptions.DATA_TMP_DIR, configuredTmpDataDir);
        }
        this.operatorConfig.setLong(PythonOptions.MMAP_FILE_SIZE, globalConfig.getLong(PythonOptions.MMAP_FILE_SIZE));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void runPlan(String[] args) throws Exception {
        if (args.length < 1) {
            throw new IllegalArgumentException("Missing script file argument. Usage: ./bin/pyflink.[sh/bat] <pathToScript>[ <pathToPackage1>[ <pathToPackageX]][ - <parameter1>[ <parameterX>]]");
        }
        int split = 0;
        for (int x = 0; x < args.length; ++x) {
            if (!args[x].equals("-")) continue;
            split = x;
            break;
        }
        try {
            String planFile = args[0];
            String[] filesToCopy = Arrays.copyOfRange(args, 1, split == 0 ? args.length : split);
            String[] planArgumentsArray = Arrays.copyOfRange(args, split == 0 ? args.length : split + 1, args.length);
            StringBuilder planArgumentsBuilder = new StringBuilder();
            for (String arg : planArgumentsArray) {
                planArgumentsBuilder.append(" ").append(arg);
            }
            String planArguments = planArgumentsBuilder.toString();
            this.operatorConfig.setString(PLAN_ARGUMENTS_KEY, planArguments);
            Path planPath = new Path(planFile);
            if (!FileSystem.getUnguardedFileSystem(planPath.toUri()).exists(planPath)) {
                throw new FileNotFoundException("Plan file " + planFile + " does not exist.");
            }
            for (String file : filesToCopy) {
                Path filePath = new Path(file);
                if (FileSystem.getUnguardedFileSystem(filePath.toUri()).exists(filePath)) continue;
                throw new FileNotFoundException("Additional file " + file + " does not exist.");
            }
            Path targetDir = new Path(this.tmpPlanFilesDir);
            PythonPlanBinder.deleteIfExists(targetDir);
            targetDir.getFileSystem().mkdirs(targetDir);
            PythonPlanBinder.unzipPythonLibrary(new Path(this.tmpPlanFilesDir));
            Path tmpPlanFilesPath = new Path(this.tmpPlanFilesDir);
            PythonPlanBinder.copyFile(planPath, tmpPlanFilesPath, FLINK_PYTHON_PLAN_NAME);
            for (String file : filesToCopy) {
                Path source = new Path(file);
                PythonPlanBinder.copyFile(source, tmpPlanFilesPath, source.getName());
            }
            this.streamer = new PythonPlanStreamer(this.operatorConfig);
            this.streamer.open(this.tmpPlanFilesDir, planArguments);
            while (this.streamer.preparePlanMode()) {
                ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
                this.receivePlan(env);
                env.registerCachedFile(tmpPlanFilesPath.toUri().toString(), FLINK_PYTHON_DC_ID, true);
                JobExecutionResult jer = env.execute();
                long runtime = jer.getNetRuntime();
                this.streamer.sendRecord(runtime);
                this.streamer.finishPlanMode();
                this.sets.reset();
            }
        }
        finally {
            try {
                FileSystem local = FileSystem.getLocalFileSystem();
                local.delete(new Path(this.tmpPlanFilesDir), true);
            }
            catch (IOException ioe) {
                LOG.error("PythonAPI file cleanup failed. {}", (Object)ioe.getMessage());
            }
            finally {
                if (this.streamer != null) {
                    this.streamer.close();
                }
            }
        }
    }

    private static void unzipPythonLibrary(Path targetDir) throws IOException {
        FileSystem targetFs = targetDir.getFileSystem();
        ClassLoader classLoader = PythonPlanBinder.class.getClassLoader();
        try (ZipInputStream zis = new ZipInputStream(classLoader.getResourceAsStream("python-source.zip"));){
            ZipEntry entry = zis.getNextEntry();
            while (entry != null) {
                String fileName = entry.getName();
                Path newFile = new Path(targetDir, fileName);
                if (entry.isDirectory()) {
                    targetFs.mkdirs(newFile);
                } else {
                    try {
                        LOG.debug("Unzipping to {}.", (Object)newFile);
                        FSDataOutputStream fsDataOutputStream = targetFs.create(newFile, FileSystem.WriteMode.NO_OVERWRITE);
                        IOUtils.copyBytes(zis, fsDataOutputStream, false);
                    }
                    catch (Exception e2) {
                        zis.closeEntry();
                        throw new IOException("Failed to unzip flink python library.", e2);
                    }
                }
                zis.closeEntry();
                entry = zis.getNextEntry();
            }
            zis.closeEntry();
        }
    }

    private static void deleteIfExists(Path path) throws IOException {
        FileSystem fs = path.getFileSystem();
        if (fs.exists(path)) {
            fs.delete(path, true);
        }
    }

    private static void copyFile(Path source, Path targetDirectory, String name) throws IOException {
        Path targetFilePath = new Path(targetDirectory, name);
        PythonPlanBinder.deleteIfExists(targetFilePath);
        FileUtils.copy(source, targetFilePath, true);
    }

    private void receivePlan(ExecutionEnvironment env) throws IOException {
        this.receiveParameters(env);
        this.receiveOperations(env);
    }

    private void receiveParameters(ExecutionEnvironment env) throws IOException {
        block6: for (int x = 0; x < 4; ++x) {
            Tuple value = (Tuple)this.streamer.getRecord(true);
            switch (Parameters.valueOf(((String)value.getField(0)).toUpperCase())) {
                case DOP: {
                    Integer dop = (Integer)value.getField(1);
                    env.setParallelism(dop.intValue());
                    continue block6;
                }
                case MODE: {
                    if (!((Boolean)value.getField(1)).booleanValue()) continue block6;
                    LOG.info("Local execution specified, using default for {}.", PythonOptions.DC_TMP_DIR);
                    this.tmpDistributedDir = new Path(PythonOptions.DC_TMP_DIR.defaultValue());
                    continue block6;
                }
                case RETRY: {
                    int retry = (Integer)value.getField(1);
                    env.setRestartStrategy(RestartStrategies.fixedDelayRestart(retry, 10000L));
                    continue block6;
                }
                case ID: {
                    this.currentEnvironmentID = (Integer)value.getField(1);
                }
            }
        }
        if (env.getParallelism() < 0) {
            env.setParallelism(1);
        }
    }

    private void receiveOperations(ExecutionEnvironment env) throws IOException {
        Integer operationCount = (Integer)this.streamer.getRecord(true);
        block30: for (int x = 0; x < operationCount; ++x) {
            PythonOperationInfo info = new PythonOperationInfo(this.streamer, this.currentEnvironmentID);
            Operation op = Operation.valueOf(info.identifier.toUpperCase());
            switch (op) {
                case SOURCE_CSV: {
                    this.createCsvSource(env, info);
                    continue block30;
                }
                case SOURCE_TEXT: {
                    this.createTextSource(env, info);
                    continue block30;
                }
                case SOURCE_VALUE: {
                    this.createValueSource(env, info);
                    continue block30;
                }
                case SOURCE_SEQ: {
                    this.createSequenceSource(env, info);
                    continue block30;
                }
                case SINK_CSV: {
                    this.createCsvSink(info);
                    continue block30;
                }
                case SINK_TEXT: {
                    this.createTextSink(info);
                    continue block30;
                }
                case SINK_PRINT: {
                    this.createPrintSink(info);
                    continue block30;
                }
                case BROADCAST: {
                    this.createBroadcastVariable(info);
                    continue block30;
                }
                case DISTINCT: {
                    this.createDistinctOperation(info);
                    continue block30;
                }
                case FIRST: {
                    this.createFirstOperation(info);
                    continue block30;
                }
                case PARTITION_HASH: {
                    this.createHashPartitionOperation(info);
                    continue block30;
                }
                case REBALANCE: {
                    this.createRebalanceOperation(info);
                    continue block30;
                }
                case GROUPBY: {
                    this.createGroupOperation(info);
                    continue block30;
                }
                case SORT: {
                    this.createSortOperation(info);
                    continue block30;
                }
                case UNION: {
                    this.createUnionOperation(info);
                    continue block30;
                }
                case COGROUP: {
                    this.createCoGroupOperation(info, info.types);
                    continue block30;
                }
                case CROSS: {
                    this.createCrossOperation(PythonOperationInfo.DatasizeHint.NONE, info, info.types);
                    continue block30;
                }
                case CROSS_H: {
                    this.createCrossOperation(PythonOperationInfo.DatasizeHint.HUGE, info, info.types);
                    continue block30;
                }
                case CROSS_T: {
                    this.createCrossOperation(PythonOperationInfo.DatasizeHint.TINY, info, info.types);
                    continue block30;
                }
                case FILTER: {
                    this.createFilterOperation(info, info.types);
                    continue block30;
                }
                case FLATMAP: {
                    this.createFlatMapOperation(info, info.types);
                    continue block30;
                }
                case GROUPREDUCE: {
                    this.createGroupReduceOperation(info);
                    continue block30;
                }
                case JOIN: {
                    this.createJoinOperation(PythonOperationInfo.DatasizeHint.NONE, info, info.types);
                    continue block30;
                }
                case JOIN_H: {
                    this.createJoinOperation(PythonOperationInfo.DatasizeHint.HUGE, info, info.types);
                    continue block30;
                }
                case JOIN_T: {
                    this.createJoinOperation(PythonOperationInfo.DatasizeHint.TINY, info, info.types);
                    continue block30;
                }
                case MAP: {
                    this.createMapOperation(info, info.types);
                    continue block30;
                }
                case MAPPARTITION: {
                    this.createMapPartitionOperation(info, info.types);
                    continue block30;
                }
                case REDUCE: {
                    this.createReduceOperation(info);
                }
            }
        }
    }

    private <T extends Tuple> void createCsvSource(ExecutionEnvironment env, PythonOperationInfo info) {
        if (!(info.types instanceof TupleTypeInfo)) {
            throw new RuntimeException("The output type of a csv source has to be a tuple. The derived type is " + info);
        }
        Path path = new Path(info.path);
        String lineD = info.lineDelimiter;
        String fieldD = info.fieldDelimiter;
        TupleTypeInfo types = (TupleTypeInfo)info.types;
        this.sets.add(info.setID, ((MapOperator)((DataSource)((DataSource)env.createInput((InputFormat)new TupleCsvInputFormat(path, lineD, fieldD, (TupleTypeInfoBase)types), (TypeInformation)types).setParallelism(info.parallelism)).name("CsvSource")).map(new SerializerMap()).setParallelism(info.parallelism)).name("CsvSourcePostStep"));
    }

    private void createTextSource(ExecutionEnvironment env, PythonOperationInfo info) {
        this.sets.add(info.setID, ((MapOperator)((DataSource)((DataSource)env.readTextFile(info.path).setParallelism(info.parallelism)).name("TextSource")).map(new SerializerMap()).setParallelism(info.parallelism)).name("TextSourcePostStep"));
    }

    private void createValueSource(ExecutionEnvironment env, PythonOperationInfo info) {
        this.sets.add(info.setID, ((MapOperator)((DataSource)((DataSource)env.fromElements(info.values).setParallelism(info.parallelism)).name("ValueSource")).map(new SerializerMap()).setParallelism(info.parallelism)).name("ValueSourcePostStep"));
    }

    private void createSequenceSource(ExecutionEnvironment env, PythonOperationInfo info) {
        this.sets.add(info.setID, ((MapOperator)((DataSource)((DataSource)env.generateSequence(info.frm, info.to).setParallelism(info.parallelism)).name("SequenceSource")).map(new SerializerMap()).setParallelism(info.parallelism)).name("SequenceSourcePostStep"));
    }

    private void createCsvSink(PythonOperationInfo info) {
        DataSet parent = this.sets.getDataSet(info.parentID);
        ((MapOperator)((MapOperator)parent.map((MapFunction)new StringTupleDeserializerMap()).setParallelism(info.parallelism)).name("CsvSinkPreStep")).writeAsCsv(info.path, info.lineDelimiter, info.fieldDelimiter, info.writeMode).setParallelism(info.parallelism).name("CsvSink");
    }

    private void createTextSink(PythonOperationInfo info) {
        DataSet parent = this.sets.getDataSet(info.parentID);
        ((MapOperator)parent.map((MapFunction)new StringDeserializerMap()).setParallelism(info.parallelism)).writeAsText(info.path, info.writeMode).setParallelism(info.parallelism).name("TextSink");
    }

    private void createPrintSink(PythonOperationInfo info) {
        DataSet parent = this.sets.getDataSet(info.parentID);
        ((MapOperator)((MapOperator)parent.map((MapFunction)new StringDeserializerMap()).setParallelism(info.parallelism)).name("PrintSinkPreStep")).output((OutputFormat)new PrintingOutputFormat(info.toError)).setParallelism(info.parallelism);
    }

    private void createBroadcastVariable(PythonOperationInfo info) {
        UdfOperator op1 = (UdfOperator)this.sets.getDataSet(info.parentID);
        DataSet op2 = this.sets.getDataSet(info.otherID);
        op1.withBroadcastSet(op2, info.name);
        Configuration c = op1.getParameters();
        if (c == null) {
            c = new Configuration();
        }
        int count = c.getInteger(PLANBINDER_CONFIG_BCVAR_COUNT, 0);
        c.setInteger(PLANBINDER_CONFIG_BCVAR_COUNT, count + 1);
        c.setString(PLANBINDER_CONFIG_BCVAR_NAME_PREFIX + count, info.name);
        op1.withParameters(c);
    }

    private <K extends Tuple> void createDistinctOperation(PythonOperationInfo info) {
        DataSet op = this.sets.getDataSet(info.parentID);
        Operator result = ((MapOperator)((DistinctOperator)((DistinctOperator)op.distinct(info.keys).setParallelism(info.parallelism)).name("Distinct")).map(new KeyDiscarder()).setParallelism(info.parallelism)).name("DistinctPostStep");
        this.sets.add(info.setID, result);
    }

    private <K extends Tuple> void createFirstOperation(PythonOperationInfo info) {
        if (this.sets.isDataSet(info.parentID)) {
            DataSet op = this.sets.getDataSet(info.parentID);
            this.sets.add(info.setID, ((GroupReduceOperator)op.first(info.count).setParallelism(info.parallelism)).name("First"));
        } else if (this.sets.isUnsortedGrouping(info.parentID)) {
            UnsortedGrouping op = this.sets.getUnsortedGrouping(info.parentID);
            this.sets.add(info.setID, ((MapOperator)((GroupReduceOperator)((GroupReduceOperator)op.first(info.count).setParallelism(info.parallelism)).name("First")).map(new KeyDiscarder()).setParallelism(info.parallelism)).name("FirstPostStep"));
        } else if (this.sets.isSortedGrouping(info.parentID)) {
            SortedGrouping op = this.sets.getSortedGrouping(info.parentID);
            this.sets.add(info.setID, ((MapOperator)((GroupReduceOperator)((GroupReduceOperator)op.first(info.count).setParallelism(info.parallelism)).name("First")).map(new KeyDiscarder()).setParallelism(info.parallelism)).name("FirstPostStep"));
        }
    }

    private void createGroupOperation(PythonOperationInfo info) {
        DataSet op1 = this.sets.getDataSet(info.parentID);
        this.sets.add(info.setID, op1.groupBy(info.keys));
    }

    private <K extends Tuple> void createHashPartitionOperation(PythonOperationInfo info) {
        DataSet op1 = this.sets.getDataSet(info.parentID);
        Operator result = ((MapOperator)((PartitionOperator)op1.partitionByHash(info.keys).setParallelism(info.parallelism)).map(new KeyDiscarder()).setParallelism(info.parallelism)).name("HashPartitionPostStep");
        this.sets.add(info.setID, result);
    }

    private void createRebalanceOperation(PythonOperationInfo info) {
        DataSet op = this.sets.getDataSet(info.parentID);
        this.sets.add(info.setID, ((PartitionOperator)op.rebalance().setParallelism(info.parallelism)).name("Rebalance"));
    }

    private void createSortOperation(PythonOperationInfo info) {
        if (this.sets.isDataSet(info.parentID)) {
            throw new IllegalArgumentException("sort() can not be applied on a DataSet.");
        }
        if (this.sets.isUnsortedGrouping(info.parentID)) {
            this.sets.add(info.setID, this.sets.getUnsortedGrouping(info.parentID).sortGroup(info.field, info.order));
        } else if (this.sets.isSortedGrouping(info.parentID)) {
            this.sets.add(info.setID, this.sets.getSortedGrouping(info.parentID).sortGroup(info.field, info.order));
        }
    }

    private <IN> void createUnionOperation(PythonOperationInfo info) {
        DataSet op1 = this.sets.getDataSet(info.parentID);
        DataSet op2 = this.sets.getDataSet(info.otherID);
        this.sets.add(info.setID, ((UnionOperator)op1.union(op2).setParallelism(info.parallelism)).name("Union"));
    }

    private <IN1, IN2, OUT> void createCoGroupOperation(PythonOperationInfo info, TypeInformation<OUT> type) {
        DataSet op1 = this.sets.getDataSet(info.parentID);
        DataSet op2 = this.sets.getDataSet(info.otherID);
        Keys.ExpressionKeys key1 = new Keys.ExpressionKeys(info.keys1, op1.getType());
        Keys.ExpressionKeys key2 = new Keys.ExpressionKeys(info.keys2, op2.getType());
        PythonCoGroup pcg = new PythonCoGroup(this.operatorConfig, info.envID, info.setID, type);
        this.sets.add(info.setID, new CoGroupRawOperator(op1, op2, key1, key2, pcg, type, info.name).setParallelism(info.parallelism));
    }

    private <IN1, IN2, OUT> void createCrossOperation(PythonOperationInfo.DatasizeHint mode, PythonOperationInfo info, TypeInformation<OUT> type) {
        CrossOperator.DefaultCross defaultResult;
        DataSet op1 = this.sets.getDataSet(info.parentID);
        DataSet op2 = this.sets.getDataSet(info.otherID);
        switch (mode) {
            case NONE: {
                defaultResult = op1.cross(op2);
                break;
            }
            case HUGE: {
                defaultResult = op1.crossWithHuge(op2);
                break;
            }
            case TINY: {
                defaultResult = op1.crossWithTiny(op2);
                break;
            }
            default: {
                throw new IllegalArgumentException("Invalid Cross mode specified: " + (Object)((Object)mode));
            }
        }
        defaultResult.setParallelism(info.parallelism);
        if (info.usesUDF) {
            this.sets.add(info.setID, ((MapPartitionOperator)defaultResult.mapPartition(new PythonMapPartition(this.operatorConfig, info.envID, info.setID, type)).setParallelism(info.parallelism)).name(info.name));
        } else {
            this.sets.add(info.setID, defaultResult.name("DefaultCross"));
        }
    }

    private <IN, OUT> void createFilterOperation(PythonOperationInfo info, TypeInformation<OUT> type) {
        DataSet op1 = this.sets.getDataSet(info.parentID);
        this.sets.add(info.setID, ((MapPartitionOperator)op1.mapPartition(new PythonMapPartition(this.operatorConfig, info.envID, info.setID, type)).setParallelism(info.parallelism)).name(info.name));
    }

    private <IN, OUT> void createFlatMapOperation(PythonOperationInfo info, TypeInformation<OUT> type) {
        DataSet op1 = this.sets.getDataSet(info.parentID);
        this.sets.add(info.setID, ((MapPartitionOperator)op1.mapPartition(new PythonMapPartition(this.operatorConfig, info.envID, info.setID, type)).setParallelism(info.parallelism)).name(info.name));
    }

    private void createGroupReduceOperation(PythonOperationInfo info) {
        if (this.sets.isDataSet(info.parentID)) {
            this.sets.add(info.setID, this.applyGroupReduceOperation(this.sets.getDataSet(info.parentID), info, info.types));
        } else if (this.sets.isUnsortedGrouping(info.parentID)) {
            this.sets.add(info.setID, this.applyGroupReduceOperation(this.sets.getUnsortedGrouping(info.parentID), info, info.types));
        } else if (this.sets.isSortedGrouping(info.parentID)) {
            this.sets.add(info.setID, this.applyGroupReduceOperation(this.sets.getSortedGrouping(info.parentID), info, info.types));
        }
    }

    private <IN, OUT> DataSet<OUT> applyGroupReduceOperation(DataSet<IN> op1, PythonOperationInfo info, TypeInformation<OUT> type) {
        return ((MapPartitionOperator)((GroupReduceOperator)((GroupReduceOperator)op1.reduceGroup(new IdentityGroupReduce()).setCombinable(false).name("PythonGroupReducePreStep")).setParallelism(info.parallelism)).mapPartition(new PythonMapPartition(this.operatorConfig, info.envID, info.setID, type)).setParallelism(info.parallelism)).name(info.name);
    }

    private <IN, OUT> DataSet<OUT> applyGroupReduceOperation(UnsortedGrouping<IN> op1, PythonOperationInfo info, TypeInformation<OUT> type) {
        return ((MapPartitionOperator)((GroupReduceOperator)((GroupReduceOperator)op1.reduceGroup(new IdentityGroupReduce()).setCombinable(false).setParallelism(info.parallelism)).name("PythonGroupReducePreStep")).mapPartition(new PythonMapPartition(this.operatorConfig, info.envID, info.setID, type)).setParallelism(info.parallelism)).name(info.name);
    }

    private <IN, OUT> DataSet<OUT> applyGroupReduceOperation(SortedGrouping<IN> op1, PythonOperationInfo info, TypeInformation<OUT> type) {
        return ((MapPartitionOperator)((GroupReduceOperator)((GroupReduceOperator)op1.reduceGroup(new IdentityGroupReduce()).setCombinable(false).setParallelism(info.parallelism)).name("PythonGroupReducePreStep")).mapPartition(new PythonMapPartition(this.operatorConfig, info.envID, info.setID, type)).setParallelism(info.parallelism)).name(info.name);
    }

    private <IN1, IN2, OUT> void createJoinOperation(PythonOperationInfo.DatasizeHint mode, PythonOperationInfo info, TypeInformation<OUT> type) {
        DataSet op1 = this.sets.getDataSet(info.parentID);
        DataSet op2 = this.sets.getDataSet(info.otherID);
        if (info.usesUDF) {
            this.sets.add(info.setID, ((MapPartitionOperator)this.createDefaultJoin(op1, op2, info.keys1, info.keys2, mode, info.parallelism).mapPartition(new PythonMapPartition(this.operatorConfig, info.envID, info.setID, type)).setParallelism(info.parallelism)).name(info.name));
        } else {
            this.sets.add(info.setID, this.createDefaultJoin(op1, op2, info.keys1, info.keys2, mode, info.parallelism));
        }
    }

    private <IN1, IN2> DataSet<Tuple2<byte[], byte[]>> createDefaultJoin(DataSet<IN1> op1, DataSet<IN2> op2, String[] firstKeys, String[] secondKeys, PythonOperationInfo.DatasizeHint mode, int parallelism) {
        switch (mode) {
            case NONE: {
                return ((MapOperator)((JoinOperator)op1.join(op2).where(firstKeys).equalTo(secondKeys).setParallelism(parallelism)).map(new NestedKeyDiscarder()).setParallelism(parallelism)).name("DefaultJoinPostStep");
            }
            case HUGE: {
                return ((MapOperator)((JoinOperator)op1.joinWithHuge(op2).where(firstKeys).equalTo(secondKeys).setParallelism(parallelism)).map(new NestedKeyDiscarder()).setParallelism(parallelism)).name("DefaultJoinPostStep");
            }
            case TINY: {
                return ((MapOperator)((JoinOperator)op1.joinWithTiny(op2).where(firstKeys).equalTo(secondKeys).setParallelism(parallelism)).map(new NestedKeyDiscarder()).setParallelism(parallelism)).name("DefaultJoinPostStep");
            }
        }
        throw new IllegalArgumentException("Invalid join mode specified.");
    }

    private <IN, OUT> void createMapOperation(PythonOperationInfo info, TypeInformation<OUT> type) {
        DataSet op1 = this.sets.getDataSet(info.parentID);
        this.sets.add(info.setID, ((MapPartitionOperator)op1.mapPartition(new PythonMapPartition(this.operatorConfig, info.envID, info.setID, type)).setParallelism(info.parallelism)).name(info.name));
    }

    private <IN, OUT> void createMapPartitionOperation(PythonOperationInfo info, TypeInformation<OUT> type) {
        DataSet op1 = this.sets.getDataSet(info.parentID);
        this.sets.add(info.setID, ((MapPartitionOperator)op1.mapPartition(new PythonMapPartition(this.operatorConfig, info.envID, info.setID, type)).setParallelism(info.parallelism)).name(info.name));
    }

    private void createReduceOperation(PythonOperationInfo info) {
        if (this.sets.isDataSet(info.parentID)) {
            this.sets.add(info.setID, this.applyReduceOperation(this.sets.getDataSet(info.parentID), info, info.types));
        } else if (this.sets.isUnsortedGrouping(info.parentID)) {
            this.sets.add(info.setID, this.applyReduceOperation(this.sets.getUnsortedGrouping(info.parentID), info, info.types));
        } else if (this.sets.isSortedGrouping(info.parentID)) {
            throw new IllegalArgumentException("Reduce cannot be applied on a SortedGrouping.");
        }
    }

    private <IN, OUT> DataSet<OUT> applyReduceOperation(DataSet<IN> op1, PythonOperationInfo info, TypeInformation<OUT> type) {
        return ((MapPartitionOperator)((GroupReduceOperator)((GroupReduceOperator)op1.reduceGroup(new IdentityGroupReduce()).setCombinable(false).setParallelism(info.parallelism)).name("PythonReducePreStep")).mapPartition(new PythonMapPartition(this.operatorConfig, info.envID, info.setID, type)).setParallelism(info.parallelism)).name(info.name);
    }

    private <IN, OUT> DataSet<OUT> applyReduceOperation(UnsortedGrouping<IN> op1, PythonOperationInfo info, TypeInformation<OUT> type) {
        return ((MapPartitionOperator)((GroupReduceOperator)((GroupReduceOperator)op1.reduceGroup(new IdentityGroupReduce()).setCombinable(false).setParallelism(info.parallelism)).name("PythonReducePreStep")).mapPartition(new PythonMapPartition(this.operatorConfig, info.envID, info.setID, type)).setParallelism(info.parallelism)).name(info.name);
    }

    protected static enum Operation {
        SOURCE_CSV,
        SOURCE_TEXT,
        SOURCE_VALUE,
        SOURCE_SEQ,
        SINK_CSV,
        SINK_TEXT,
        SINK_PRINT,
        SORT,
        UNION,
        FIRST,
        DISTINCT,
        GROUPBY,
        REBALANCE,
        PARTITION_HASH,
        BROADCAST,
        COGROUP,
        CROSS,
        CROSS_H,
        CROSS_T,
        FILTER,
        FLATMAP,
        GROUPREDUCE,
        JOIN,
        JOIN_H,
        JOIN_T,
        MAP,
        REDUCE,
        MAPPARTITION;

    }

    private static enum Parameters {
        DOP,
        MODE,
        RETRY,
        ID;

    }
}

