/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.graph;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.ExecutionMode;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.runtime.io.network.DataExchangeMode;
import org.apache.flink.runtime.jobgraph.ControlType;
import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.runtime.operators.DamBehavior;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction;
import org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction;
import org.apache.flink.streaming.api.functions.source.InputFormatSourceFunctionV2;
import org.apache.flink.streaming.api.graph.DataPartitionerType;
import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.graph.StreamNode;
import org.apache.flink.streaming.api.graph.StreamSchedulingMode;
import org.apache.flink.streaming.api.transformations.CoFeedbackTransformation;
import org.apache.flink.streaming.api.transformations.FeedbackTransformation;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.flink.streaming.api.transformations.SelectTransformation;
import org.apache.flink.streaming.api.transformations.SideOutputTransformation;
import org.apache.flink.streaming.api.transformations.SinkTransformation;
import org.apache.flink.streaming.api.transformations.SourceTransformation;
import org.apache.flink.streaming.api.transformations.SourceV2Transformation;
import org.apache.flink.streaming.api.transformations.SplitTransformation;
import org.apache.flink.streaming.api.transformations.StreamTransformation;
import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
import org.apache.flink.streaming.api.transformations.UnionTransformation;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.InstantiationUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class StreamGraphGenerator {
    private static final Logger LOG = LoggerFactory.getLogger(StreamGraphGenerator.class);
    public static final int DEFAULT_LOWER_BOUND_MAX_PARALLELISM = 128;
    public static final int UPPER_BOUND_MAX_PARALLELISM = 32768;
    private final StreamGraph streamGraph;
    private final Context context;
    protected static Integer iterationIdCounter = 0;
    private Map<StreamTransformation<?>, Collection<Integer>> alreadyTransformed;

    public static int getNewIterationNodeId() {
        Integer n = iterationIdCounter;
        Integer n2 = iterationIdCounter = Integer.valueOf(iterationIdCounter - 1);
        return iterationIdCounter;
    }

    private StreamGraphGenerator(Context context) {
        this.context = context;
        this.streamGraph = new StreamGraph(context.getExecutionConfig(), context.getCheckpointConfig(), context.getDefaultParallelism(), context.getBufferTimeout(), DataPartitionerType.valueOf(context.getDefaultPartitioner()));
        this.streamGraph.setJobName(context.getJobName());
        this.streamGraph.getCustomConfiguration().setString(ScheduleMode.class.getName(), context.getScheduleMode().toString());
        this.streamGraph.setTimeCharacteristic(context.getTimeCharacteristic());
        this.streamGraph.setCachedFiles(context.getCacheFiles());
        this.streamGraph.setChaining(context.isChainingEnabled());
        this.streamGraph.setMultiHeadChainMode(context.isMultiHeadChainMode());
        this.streamGraph.setChainEagerlyEnabled(context.isChainEagerlyEnabled());
        this.streamGraph.setStateBackend(context.getStateBackend());
        this.streamGraph.getCustomConfiguration().addAll(context.getConfiguration());
        this.alreadyTransformed = new HashMap();
        this.streamGraph.addCustomConfiguration(context.getCustomConfiguration());
    }

    public static StreamGraph generate(Context context, List<StreamTransformation<?>> transformations) {
        LOG.info("Slot sharing is " + (context.isSlotSharingEnabled() ? "enabled." : "disabled."));
        return new StreamGraphGenerator(context).generateInternal(transformations);
    }

    /*
     * WARNING - void declaration
     */
    private StreamGraph generateInternal(List<StreamTransformation<?>> transformations) {
        for (StreamTransformation<?> streamTransformation : transformations) {
            this.transform(streamTransformation);
        }
        boolean needToSetDefaultResources = false;
        if (this.context.getDefaultResources() == null || ResourceSpec.DEFAULT.equals((Object)this.context.getDefaultResources())) {
            for (StreamNode node : this.streamGraph.getStreamNodes()) {
                ResourceSpec resources = node.getMinResources();
                if (resources == null || ResourceSpec.DEFAULT.equals((Object)resources)) continue;
                needToSetDefaultResources = true;
                break;
            }
        } else {
            needToSetDefaultResources = true;
        }
        if (needToSetDefaultResources) {
            ResourceSpec resourceSpec = this.context.getDefaultResources();
            if (resourceSpec == null || ResourceSpec.DEFAULT.equals((Object)resourceSpec)) {
                ResourceSpec resourceSpec2 = this.context.getGlobalDefaultResources();
            }
            for (StreamNode node : this.streamGraph.getStreamNodes()) {
                void var3_9;
                ResourceSpec resources = node.getMinResources();
                if (resources != null && !ResourceSpec.DEFAULT.equals((Object)resources)) continue;
                node.setResources((ResourceSpec)var3_9, (ResourceSpec)var3_9);
            }
        }
        this.setSchedulingDependencies();
        return this.streamGraph;
    }

    private void setSchedulingDependencies() {
        for (StreamNode node : this.streamGraph.getStreamNodes()) {
            for (StreamEdge inEdge : node.getInEdges()) {
                StreamSchedulingMode schedulingMode;
                if (inEdge.getDamBehavior() != null && inEdge.getDamBehavior().isMaterializing()) {
                    schedulingMode = StreamSchedulingMode.SEQUENTIAL;
                } else {
                    switch (inEdge.getDataExchangeMode()) {
                        case PIPELINED: {
                            schedulingMode = StreamSchedulingMode.CONCURRENT;
                            break;
                        }
                        case BATCH: {
                            schedulingMode = StreamSchedulingMode.SEQUENTIAL;
                            break;
                        }
                        case AUTO: {
                            schedulingMode = StreamSchedulingMode.AUTO;
                            break;
                        }
                        default: {
                            throw new UnsupportedOperationException("Not Support " + inEdge.getDataExchangeMode() + " exchanged mode.");
                        }
                    }
                }
                inEdge.setSchedulingMode(schedulingMode);
            }
        }
        ExecutionMode executionMode = this.streamGraph.getExecutionConfig().getExecutionMode();
        for (StreamNode currentNode : this.streamGraph.getStreamNodes()) {
            HashSet<StreamNode> laterScheduledInputNodes = new HashSet<StreamNode>();
            for (Map.Entry<StreamEdge, StreamNode.ReadPriority> entry : currentNode.getReadPriorityHints().entrySet()) {
                StreamEdge inEdge = entry.getKey();
                DataExchangeMode dataExchangeMode = inEdge.getDataExchangeMode();
                DamBehavior damBehavior = inEdge.getDamBehavior();
                if (!StreamNode.ReadPriority.LOWER.equals((Object)entry.getValue()) || !DataExchangeMode.PIPELINED.equals((Object)dataExchangeMode) && (!DataExchangeMode.AUTO.equals((Object)dataExchangeMode) || !ExecutionMode.PIPELINED.equals((Object)executionMode))) continue;
                laterScheduledInputNodes.add(this.streamGraph.getStreamNode(inEdge.getSourceId()));
            }
            if (laterScheduledInputNodes == null) continue;
            for (Map.Entry<StreamEdge, StreamNode.ReadPriority> entry : currentNode.getReadPriorityHints().entrySet()) {
                if (StreamNode.ReadPriority.LOWER.equals((Object)entry.getValue())) continue;
                StreamNode firstScheduledInputNode = this.streamGraph.getStreamNode(entry.getKey().getSourceId());
                for (StreamNode laterScheduledInputNode : laterScheduledInputNodes) {
                    this.streamGraph.addControlEdge(firstScheduledInputNode.getId(), laterScheduledInputNode.getId(), ControlType.START_ON_FINISH);
                }
            }
        }
    }

    private Collection<Integer> transform(StreamTransformation<?> transform) {
        Collection<Integer> transformedIds;
        int globalMaxParallelismFromConfig;
        if (this.alreadyTransformed.containsKey(transform)) {
            return this.alreadyTransformed.get(transform);
        }
        LOG.debug("Transforming " + transform);
        if (transform.getMaxParallelism() <= 0 && (globalMaxParallelismFromConfig = this.context.getExecutionConfig().getMaxParallelism()) > 0) {
            transform.setMaxParallelism(globalMaxParallelismFromConfig);
        }
        transform.getOutputType();
        if (transform instanceof OneInputTransformation) {
            transformedIds = this.transformOneInputTransform((OneInputTransformation)transform);
        } else if (transform instanceof TwoInputTransformation) {
            transformedIds = this.transformTwoInputTransform((TwoInputTransformation)transform);
        } else if (transform instanceof SourceTransformation) {
            transformedIds = this.transformSource((SourceTransformation)transform);
        } else if (transform instanceof SourceV2Transformation) {
            transformedIds = this.transformSourceV2((SourceV2Transformation)transform);
        } else if (transform instanceof SinkTransformation) {
            transformedIds = this.transformSink((SinkTransformation)transform);
        } else if (transform instanceof UnionTransformation) {
            transformedIds = this.transformUnion((UnionTransformation)transform);
        } else if (transform instanceof SplitTransformation) {
            transformedIds = this.transformSplit((SplitTransformation)transform);
        } else if (transform instanceof SelectTransformation) {
            transformedIds = this.transformSelect((SelectTransformation)transform);
        } else if (transform instanceof FeedbackTransformation) {
            transformedIds = this.transformFeedback((FeedbackTransformation)transform);
        } else if (transform instanceof CoFeedbackTransformation) {
            transformedIds = this.transformCoFeedback((CoFeedbackTransformation)transform);
        } else if (transform instanceof PartitionTransformation) {
            transformedIds = this.transformPartition((PartitionTransformation)transform);
        } else if (transform instanceof SideOutputTransformation) {
            transformedIds = this.transformSideOutput((SideOutputTransformation)transform);
        } else {
            throw new IllegalStateException("Unknown transformation: " + transform);
        }
        if (!this.alreadyTransformed.containsKey(transform)) {
            this.alreadyTransformed.put(transform, transformedIds);
        }
        if (transform.getBufferTimeout() >= 0L) {
            this.streamGraph.setBufferTimeout(transform.getId(), transform.getBufferTimeout());
        }
        if (transform.getUid() != null) {
            this.streamGraph.setTransformationUID(transform.getId(), transform.getUid());
        }
        if (transform.getUserProvidedNodeHash() != null) {
            this.streamGraph.setTransformationUserHash(transform.getId(), transform.getUserProvidedNodeHash());
        }
        if (transform.getMinResources() != null && transform.getPreferredResources() != null) {
            this.streamGraph.setResources(transform.getId(), transform.getMinResources(), transform.getPreferredResources());
        }
        if (transform.getResourceConstraints() != null) {
            this.streamGraph.setResourceConstraints(transform.getId(), transform.getResourceConstraints());
        }
        if (transform.getCustomConfiguration().keySet().size() > 0) {
            this.streamGraph.addCustomConfiguration(transform.getId(), transform.getCustomConfiguration());
        }
        return transformedIds;
    }

    private <T> Collection<Integer> transformUnion(UnionTransformation<T> union) {
        List<StreamTransformation<T>> inputs = union.getInputs();
        ArrayList<Integer> resultIds = new ArrayList<Integer>();
        for (StreamTransformation<T> input : inputs) {
            resultIds.addAll(this.transform(input));
        }
        return resultIds;
    }

    private <T> Collection<Integer> transformPartition(PartitionTransformation<T> partition) {
        StreamTransformation<T> input = partition.getInput();
        ArrayList<Integer> resultIds = new ArrayList<Integer>();
        Collection<Integer> transformedIds = this.transform(input);
        for (Integer transformedId : transformedIds) {
            int virtualId = StreamTransformation.getNewNodeId();
            this.streamGraph.addVirtualPartitionNode(transformedId, virtualId, partition.getPartitioner(), partition.getDataExchangeMode());
            resultIds.add(virtualId);
        }
        return resultIds;
    }

    private <T> Collection<Integer> transformSplit(SplitTransformation<T> split) {
        StreamTransformation<T> input = split.getInput();
        Collection<Integer> resultIds = this.transform(input);
        this.validateSplitTransformation(input);
        if (this.alreadyTransformed.containsKey(split)) {
            return this.alreadyTransformed.get(split);
        }
        for (int inputId : resultIds) {
            this.streamGraph.addOutputSelector(inputId, split.getOutputSelector());
        }
        return resultIds;
    }

    private <T> Collection<Integer> transformSelect(SelectTransformation<T> select) {
        StreamTransformation<T> input = select.getInput();
        Collection<Integer> resultIds = this.transform(input);
        if (this.alreadyTransformed.containsKey(select)) {
            return this.alreadyTransformed.get(select);
        }
        ArrayList<Integer> virtualResultIds = new ArrayList<Integer>();
        for (int inputId : resultIds) {
            int virtualId = StreamTransformation.getNewNodeId();
            this.streamGraph.addVirtualSelectNode(inputId, virtualId, select.getSelectedNames());
            virtualResultIds.add(virtualId);
        }
        return virtualResultIds;
    }

    private <T> Collection<Integer> transformSideOutput(SideOutputTransformation<T> sideOutput) {
        StreamTransformation<?> input = sideOutput.getInput();
        Collection<Integer> resultIds = this.transform(input);
        if (this.alreadyTransformed.containsKey(sideOutput)) {
            return this.alreadyTransformed.get(sideOutput);
        }
        ArrayList<Integer> virtualResultIds = new ArrayList<Integer>();
        for (int inputId : resultIds) {
            int virtualId = StreamTransformation.getNewNodeId();
            this.streamGraph.addVirtualSideOutputNode(inputId, virtualId, sideOutput.getOutputTag(), sideOutput.getDamBehavior());
            virtualResultIds.add(virtualId);
        }
        return virtualResultIds;
    }

    private <T> Collection<Integer> transformFeedback(FeedbackTransformation<T> iterate) {
        if (iterate.getFeedbackEdges().size() <= 0) {
            throw new IllegalStateException("Iteration " + iterate + " does not have any feedback edges.");
        }
        StreamTransformation<T> input = iterate.getInput();
        ArrayList<Integer> resultIds = new ArrayList<Integer>();
        Collection<Integer> inputIds = this.transform(input);
        resultIds.addAll(inputIds);
        if (this.alreadyTransformed.containsKey(iterate)) {
            return this.alreadyTransformed.get(iterate);
        }
        Tuple2<StreamNode, StreamNode> itSourceAndSink = this.streamGraph.createIterationSourceAndSink(iterate.getId(), StreamGraphGenerator.getNewIterationNodeId(), StreamGraphGenerator.getNewIterationNodeId(), iterate.getWaitTime(), iterate.getParallelism(), iterate.getMaxParallelism(), iterate.getMinResources(), iterate.getPreferredResources());
        StreamNode itSource = (StreamNode)itSourceAndSink.f0;
        StreamNode itSink = (StreamNode)itSourceAndSink.f1;
        this.streamGraph.setSerializers(itSource.getId(), null, null, iterate.getOutputType().createSerializer(this.context.getExecutionConfig()));
        this.streamGraph.setSerializers(itSink.getId(), iterate.getOutputType().createSerializer(this.context.getExecutionConfig()), null, null);
        resultIds.add(itSource.getId());
        this.alreadyTransformed.put(iterate, resultIds);
        ArrayList<Integer> allFeedbackIds = new ArrayList<Integer>();
        for (StreamTransformation<T> feedbackEdge : iterate.getFeedbackEdges()) {
            Collection<Integer> feedbackIds = this.transform(feedbackEdge);
            allFeedbackIds.addAll(feedbackIds);
            for (Integer feedbackId : feedbackIds) {
                this.streamGraph.addEdge(feedbackId, itSink.getId(), 0);
            }
        }
        String slotSharingGroup = this.determineSlotSharingGroup(null, allFeedbackIds, true);
        itSink.setSlotSharingGroup(slotSharingGroup);
        itSource.setSlotSharingGroup(slotSharingGroup);
        return resultIds;
    }

    private <F> Collection<Integer> transformCoFeedback(CoFeedbackTransformation<F> coIterate) {
        Tuple2<StreamNode, StreamNode> itSourceAndSink = this.streamGraph.createIterationSourceAndSink(coIterate.getId(), StreamGraphGenerator.getNewIterationNodeId(), StreamGraphGenerator.getNewIterationNodeId(), coIterate.getWaitTime(), coIterate.getParallelism(), coIterate.getMaxParallelism(), coIterate.getMinResources(), coIterate.getPreferredResources());
        StreamNode itSource = (StreamNode)itSourceAndSink.f0;
        StreamNode itSink = (StreamNode)itSourceAndSink.f1;
        this.streamGraph.setSerializers(itSource.getId(), null, null, coIterate.getOutputType().createSerializer(this.context.getExecutionConfig()));
        this.streamGraph.setSerializers(itSink.getId(), coIterate.getOutputType().createSerializer(this.context.getExecutionConfig()), null, null);
        Set<Integer> resultIds = Collections.singleton(itSource.getId());
        this.alreadyTransformed.put(coIterate, resultIds);
        ArrayList<Integer> allFeedbackIds = new ArrayList<Integer>();
        for (StreamTransformation<F> feedbackEdge : coIterate.getFeedbackEdges()) {
            Collection<Integer> feedbackIds = this.transform(feedbackEdge);
            allFeedbackIds.addAll(feedbackIds);
            for (Integer feedbackId : feedbackIds) {
                this.streamGraph.addEdge(feedbackId, itSink.getId(), 0);
            }
        }
        String slotSharingGroup = this.determineSlotSharingGroup(null, allFeedbackIds, true);
        itSink.setSlotSharingGroup(slotSharingGroup);
        itSource.setSlotSharingGroup(slotSharingGroup);
        return Collections.singleton(itSource.getId());
    }

    private <T> Collection<Integer> transformSource(SourceTransformation<T> source) {
        String slotSharingGroup = this.determineSlotSharingGroup(source.getSlotSharingGroup(), new ArrayList<Integer>(), false);
        this.streamGraph.addSource(source.getId(), slotSharingGroup, source.getOperator(), null, source.getOutputType(), "Source: " + source.getName());
        if (source.getOperator().getUserFunction() instanceof InputFormatSourceFunction) {
            InputFormatSourceFunction fs = (InputFormatSourceFunction)source.getOperator().getUserFunction();
            this.streamGraph.setInputFormat(source.getId(), fs.getFormat());
        }
        this.streamGraph.setParallelism(source.getId(), source.getParallelism());
        this.streamGraph.setMaxParallelism(source.getId(), source.getMaxParallelism());
        return Collections.singleton(source.getId());
    }

    private <T> Collection<Integer> transformSourceV2(SourceV2Transformation<T> source) {
        String slotSharingGroup = this.determineSlotSharingGroup(source.getSlotSharingGroup(), new ArrayList<Integer>(), false);
        this.streamGraph.addSource(source.getId(), slotSharingGroup, source.getOperator(), null, source.getOutputType(), "Source: " + source.getName());
        if (source.getOperator().getUserFunction() instanceof InputFormatSourceFunctionV2) {
            InputFormatSourceFunctionV2 fs = (InputFormatSourceFunctionV2)source.getOperator().getUserFunction();
            this.streamGraph.setInputFormat(source.getId(), fs.getFormat());
        }
        this.streamGraph.setParallelism(source.getId(), source.getParallelism());
        this.streamGraph.setMaxParallelism(source.getId(), source.getMaxParallelism());
        return Collections.singleton(source.getId());
    }

    private <T> Collection<Integer> transformSink(SinkTransformation<T> sink) {
        Collection<Integer> inputIds = this.transform(sink.getInput());
        String slotSharingGroup = this.determineSlotSharingGroup(sink.getSlotSharingGroup(), inputIds, false);
        this.streamGraph.addSink(sink.getId(), slotSharingGroup, sink.getOperator(), sink.getInput().getOutputType(), null, "Sink: " + sink.getName());
        if (sink.getOperator().getUserFunction() instanceof OutputFormatSinkFunction) {
            OutputFormatSinkFunction fs = (OutputFormatSinkFunction)sink.getOperator().getUserFunction();
            this.streamGraph.setOutputFormat(sink.getId(), fs.getFormat());
        }
        this.streamGraph.setParallelism(sink.getId(), sink.getParallelism());
        this.streamGraph.setMaxParallelism(sink.getId(), sink.getMaxParallelism());
        for (Integer inputId : inputIds) {
            this.streamGraph.addEdge(inputId, sink.getId(), 0);
        }
        if (sink.getStateKeySelector() != null) {
            TypeSerializer keySerializer = sink.getStateKeyType().createSerializer(this.context.getExecutionConfig());
            this.streamGraph.setOneInputStateKey(sink.getId(), sink.getStateKeySelector(), keySerializer);
        }
        return Collections.emptyList();
    }

    private <IN, OUT> Collection<Integer> transformOneInputTransform(OneInputTransformation<IN, OUT> transform) {
        Collection<Integer> inputIds = this.transform(transform.getInput());
        if (this.alreadyTransformed.containsKey(transform)) {
            return this.alreadyTransformed.get(transform);
        }
        String slotSharingGroup = this.determineSlotSharingGroup(transform.getSlotSharingGroup(), inputIds, false);
        this.streamGraph.addOperator(transform.getId(), slotSharingGroup, transform.getOperator(), transform.getInputType(), transform.getOutputType(), transform.getName());
        if (transform.getStateKeySelector() != null) {
            TypeSerializer keySerializer = transform.getStateKeyType().createSerializer(this.context.getExecutionConfig());
            this.streamGraph.setOneInputStateKey(transform.getId(), transform.getStateKeySelector(), keySerializer);
        }
        this.streamGraph.setParallelism(transform.getId(), transform.getParallelism());
        this.streamGraph.setMaxParallelism(transform.getId(), transform.getMaxParallelism());
        this.streamGraph.setMainOutputDamBehavior(transform.getId(), transform.getDamBehavior());
        for (Integer inputId : inputIds) {
            this.streamGraph.addEdge(inputId, transform.getId(), 0);
        }
        return Collections.singleton(transform.getId());
    }

    private <IN1, IN2, OUT> Collection<Integer> transformTwoInputTransform(TwoInputTransformation<IN1, IN2, OUT> transform) {
        StreamEdge inEdge;
        Collection<Integer> inputIds1 = this.transform(transform.getInput1());
        Collection<Integer> inputIds2 = this.transform(transform.getInput2());
        if (this.alreadyTransformed.containsKey(transform)) {
            return this.alreadyTransformed.get(transform);
        }
        ArrayList<Integer> allInputIds = new ArrayList<Integer>();
        allInputIds.addAll(inputIds1);
        allInputIds.addAll(inputIds2);
        String slotSharingGroup = this.determineSlotSharingGroup(transform.getSlotSharingGroup(), allInputIds, false);
        this.streamGraph.addCoOperator(transform.getId(), slotSharingGroup, transform.getOperator(), transform.getInputType1(), transform.getInputType2(), transform.getOutputType(), transform.getName());
        if (transform.getStateKeySelector1() != null || transform.getStateKeySelector2() != null) {
            TypeSerializer keySerializer = transform.getStateKeyType().createSerializer(this.context.getExecutionConfig());
            this.streamGraph.setTwoInputStateKey(transform.getId(), transform.getStateKeySelector1(), transform.getStateKeySelector2(), keySerializer);
        }
        this.streamGraph.setParallelism(transform.getId(), transform.getParallelism());
        this.streamGraph.setMaxParallelism(transform.getId(), transform.getMaxParallelism());
        this.streamGraph.setMainOutputDamBehavior(transform.getId(), transform.getDamBehavior());
        StreamNode.ReadPriority readPriority1 = null;
        StreamNode.ReadPriority readPriority2 = null;
        TwoInputTransformation.ReadOrder readOrderHint = transform.getReadOrderHint();
        if (TwoInputTransformation.ReadOrder.INPUT1_FIRST.equals((Object)readOrderHint)) {
            readPriority1 = StreamNode.ReadPriority.HIGHER;
            readPriority2 = StreamNode.ReadPriority.LOWER;
        } else if (TwoInputTransformation.ReadOrder.INPUT2_FIRST.equals((Object)readOrderHint)) {
            readPriority1 = StreamNode.ReadPriority.LOWER;
            readPriority2 = StreamNode.ReadPriority.HIGHER;
        } else if (TwoInputTransformation.ReadOrder.SPECIAL_ORDER.equals((Object)readOrderHint)) {
            readPriority1 = StreamNode.ReadPriority.DYNAMIC;
            readPriority2 = StreamNode.ReadPriority.DYNAMIC;
        }
        Integer vertexID = transform.getId();
        for (Integer inputId : inputIds1) {
            inEdge = this.streamGraph.addEdge(inputId, transform.getId(), 1);
            if (readPriority1 == null) continue;
            this.streamGraph.setReadPriorityHint(vertexID, inEdge, readPriority1);
        }
        for (Integer inputId : inputIds2) {
            inEdge = this.streamGraph.addEdge(inputId, transform.getId(), 2);
            if (readPriority2 == null) continue;
            this.streamGraph.setReadPriorityHint(vertexID, inEdge, readPriority2);
        }
        return Collections.singleton(transform.getId());
    }

    private String determineSlotSharingGroup(String specifiedGroup, Collection<Integer> inputIds, boolean isSlotSharingForced) {
        if (!this.context.isSlotSharingEnabled() && !isSlotSharingForced) {
            return null;
        }
        if (specifiedGroup != null) {
            return specifiedGroup;
        }
        String inputGroup = null;
        for (int id : inputIds) {
            String inputGroupCandidate = this.streamGraph.getSlotSharingGroup(id);
            if (inputGroup == null) {
                inputGroup = inputGroupCandidate;
                continue;
            }
            if (inputGroup.equals(inputGroupCandidate)) continue;
            return "default";
        }
        return inputGroup == null ? "default" : inputGroup;
    }

    private <T> void validateSplitTransformation(StreamTransformation<T> input) {
        if (input instanceof SelectTransformation || input instanceof SplitTransformation) {
            throw new IllegalStateException("Consecutive multiple splits are not supported. Splits are deprecated. Please use side-outputs.");
        }
        if (input instanceof SideOutputTransformation) {
            throw new IllegalStateException("Split after side-outputs are not supported. Splits are deprecated. Please use side-outputs.");
        }
        if (input instanceof UnionTransformation) {
            for (StreamTransformation transformation : ((UnionTransformation)input).getInputs()) {
                this.validateSplitTransformation(transformation);
            }
        } else if (input instanceof PartitionTransformation) {
            this.validateSplitTransformation(((PartitionTransformation)input).getInput());
        } else {
            return;
        }
    }

    public static class Context {
        private ExecutionConfig executionConfig;
        private CheckpointConfig checkpointConfig;
        private Configuration customConfiguration;
        private TimeCharacteristic timeCharacteristic;
        private StateBackend stateBackend;
        private boolean chainingEnabled;
        private boolean isMultiHeadChainMode;
        private boolean isSlotSharingEnabled;
        private boolean chainEagerlyEnabled;
        private String jobName = "Flink Streaming Job";
        private List<Tuple2<String, DistributedCache.DistributedCacheEntry>> cacheFile = new ArrayList<Tuple2<String, DistributedCache.DistributedCacheEntry>>();
        private ScheduleMode scheduleMode;
        private long bufferTimeout;
        private Configuration configuration = new Configuration();
        private int defaultParallelism;
        private String defaultPartitioner;
        private ResourceSpec defaultResources;
        private ResourceSpec globalDefaultResources;

        public static Context buildStreamProperties(StreamExecutionEnvironment env) {
            Context context = new Context();
            context.setExecutionConfig(env.getConfig());
            context.setCheckpointConfig(env.getCheckpointConfig());
            context.setCustomConfiguration(env.getCustomConfiguration());
            context.setTimeCharacteristic(env.getStreamTimeCharacteristic());
            context.setStateBackend(env.getStateBackend());
            context.setChainingEnabled(env.isChainingEnabled());
            context.setCacheFiles(env.getCachedFiles());
            context.setBufferTimeout(env.getBufferTimeout());
            context.setDefaultParallelism(env.getParallelism());
            context.setMultiHeadChainMode(env.isMultiHeadChainMode());
            context.setSlotSharingEnabled(env.isSlotSharingEnabled());
            context.setScheduleMode(ScheduleMode.EAGER);
            Configuration globalConf = GlobalConfiguration.loadConfiguration();
            context.setDefaultPartitioner(globalConf.getString(CoreOptions.DEFAULT_PARTITIONER));
            if (globalConf.contains(CoreOptions.CHAIN_EAGERLY_ENABLED)) {
                context.setChainEagerlyEnabled(globalConf.getBoolean(CoreOptions.CHAIN_EAGERLY_ENABLED));
            } else {
                context.setChainEagerlyEnabled(false);
            }
            context.setDefaultResources(env.getDefaultResources());
            context.setGlobalDefaultResources(new ResourceSpec.Builder().setCpuCores(globalConf.getDouble(CoreOptions.DEFAULT_RESOURCE_CPU_CORES)).setHeapMemoryInMB(globalConf.getInteger(CoreOptions.DEFAULT_RESOURCE_HEAP_MEMORY)).build());
            return context;
        }

        public static Context buildBatchProperties(StreamExecutionEnvironment env) {
            Context context = new Context();
            try {
                ExecutionConfig executionConfig = (ExecutionConfig)InstantiationUtil.clone((Serializable)env.getConfig());
                executionConfig.enableObjectReuse();
                context.setExecutionConfig(executionConfig);
                executionConfig.setLatencyTrackingInterval(-1L);
                CheckpointConfig checkpointConfig = new CheckpointConfig();
                context.setCheckpointConfig(checkpointConfig);
                context.setCustomConfiguration(env.getCustomConfiguration());
                context.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
                context.setChainingEnabled(true);
                context.setCacheFiles(env.getCachedFiles());
                context.setBufferTimeout(-1L);
                context.setMultiHeadChainMode(true);
                context.setSlotSharingEnabled(env.isSlotSharingEnabled());
                context.setScheduleMode(ScheduleMode.LAZY_FROM_SOURCES);
                Configuration globalConf = GlobalConfiguration.loadConfiguration();
                context.setDefaultPartitioner(globalConf.getString(CoreOptions.DEFAULT_PARTITIONER));
                if (globalConf.contains(CoreOptions.CHAIN_EAGERLY_ENABLED)) {
                    context.setChainEagerlyEnabled(globalConf.getBoolean(CoreOptions.CHAIN_EAGERLY_ENABLED));
                } else {
                    context.setChainEagerlyEnabled(false);
                }
                context.setDefaultResources(env.getDefaultResources());
                context.setGlobalDefaultResources(new ResourceSpec.Builder().setCpuCores(globalConf.getDouble(CoreOptions.DEFAULT_RESOURCE_CPU_CORES)).setHeapMemoryInMB(globalConf.getInteger(CoreOptions.DEFAULT_RESOURCE_HEAP_MEMORY)).build());
                return context;
            }
            catch (IOException | ClassNotFoundException e) {
                throw new FlinkRuntimeException("This exception could not happen.", (Throwable)e);
            }
        }

        public void setExecutionConfig(ExecutionConfig executionConfig) {
            this.executionConfig = executionConfig;
        }

        public void setCheckpointConfig(CheckpointConfig checkpointConfig) {
            this.checkpointConfig = checkpointConfig;
        }

        public void setCustomConfiguration(Configuration customConfiguration) {
            this.customConfiguration = customConfiguration;
        }

        public void setTimeCharacteristic(TimeCharacteristic timeCharacteristic) {
            this.timeCharacteristic = timeCharacteristic;
        }

        public void setStateBackend(StateBackend stateBackend) {
            this.stateBackend = stateBackend;
        }

        public void setChainingEnabled(boolean chainingEnabled) {
            this.chainingEnabled = chainingEnabled;
        }

        public ExecutionConfig getExecutionConfig() {
            return this.executionConfig;
        }

        public CheckpointConfig getCheckpointConfig() {
            return this.checkpointConfig;
        }

        public Configuration getCustomConfiguration() {
            return this.customConfiguration;
        }

        public TimeCharacteristic getTimeCharacteristic() {
            return this.timeCharacteristic;
        }

        public StateBackend getStateBackend() {
            return this.stateBackend;
        }

        public boolean isChainingEnabled() {
            return this.chainingEnabled;
        }

        public String getJobName() {
            return this.jobName;
        }

        public void setJobName(String jobName) {
            this.jobName = jobName;
        }

        public void setCacheFiles(List<Tuple2<String, DistributedCache.DistributedCacheEntry>> cacheFile) {
            this.cacheFile = cacheFile;
        }

        public List<Tuple2<String, DistributedCache.DistributedCacheEntry>> getCacheFiles() {
            return this.cacheFile;
        }

        public ScheduleMode getScheduleMode() {
            return this.scheduleMode;
        }

        public void setScheduleMode(ScheduleMode scheduleMode) {
            this.scheduleMode = scheduleMode;
        }

        public long getBufferTimeout() {
            return this.bufferTimeout;
        }

        public void setBufferTimeout(long bufferTimeout) {
            this.bufferTimeout = bufferTimeout;
        }

        public Configuration getConfiguration() {
            return this.configuration;
        }

        public void setConfiguration(Configuration configuration) {
            this.configuration = configuration;
        }

        public int getDefaultParallelism() {
            return this.defaultParallelism;
        }

        public void setDefaultParallelism(int defaultParallelism) {
            this.defaultParallelism = defaultParallelism;
        }

        public String getDefaultPartitioner() {
            return this.defaultPartitioner;
        }

        public void setDefaultPartitioner(String defaultPartitioner) {
            this.defaultPartitioner = defaultPartitioner;
        }

        public boolean isMultiHeadChainMode() {
            return this.isMultiHeadChainMode;
        }

        public void setMultiHeadChainMode(boolean multiHeadChainMode) {
            this.isMultiHeadChainMode = multiHeadChainMode;
        }

        public boolean isSlotSharingEnabled() {
            return this.isSlotSharingEnabled;
        }

        public void setSlotSharingEnabled(boolean slotSharingEnabled) {
            this.isSlotSharingEnabled = slotSharingEnabled;
        }

        public boolean isChainEagerlyEnabled() {
            return this.chainEagerlyEnabled;
        }

        public void setChainEagerlyEnabled(boolean chainEagerlyEnabled) {
            this.chainEagerlyEnabled = chainEagerlyEnabled;
        }

        public ResourceSpec getDefaultResources() {
            return this.defaultResources;
        }

        public void setDefaultResources(ResourceSpec resources) {
            this.defaultResources = resources;
        }

        public ResourceSpec getGlobalDefaultResources() {
            return this.globalDefaultResources;
        }

        public void setGlobalDefaultResources(ResourceSpec resources) {
            this.globalDefaultResources = resources;
        }
    }
}

