/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.graph;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.ExecutionMode;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.common.operators.ResourceConstraints;
import org.apache.flink.api.common.operators.ResourceConstraintsConfig;
import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
import org.apache.flink.runtime.client.ClientUtils;
import org.apache.flink.runtime.io.network.DataExchangeMode;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.ControlType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.FormatUtil;
import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobEdge;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.MultiInputOutputFormatVertex;
import org.apache.flink.runtime.jobgraph.OperatorDescriptor;
import org.apache.flink.runtime.jobgraph.OperatorEdgeDescriptor;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.SchedulingMode;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.operators.DamBehavior;
import org.apache.flink.runtime.operators.util.TaskConfig;
import org.apache.flink.runtime.resourcemanager.placementconstraint.InterSlotPlacementConstraint;
import org.apache.flink.runtime.resourcemanager.placementconstraint.PlacementConstraint;
import org.apache.flink.runtime.resourcemanager.placementconstraint.SlotTag;
import org.apache.flink.runtime.resourcemanager.placementconstraint.SlotTagScope;
import org.apache.flink.runtime.resourcemanager.placementconstraint.TaggedSlot;
import org.apache.flink.runtime.resourcemanager.placementconstraint.TaggedSlotContext;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.checkpoint.WithMasterCheckpointHook;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.graph.FunctionMasterCheckpointHookFactory;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.graph.StreamControlEdge;
import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.graph.StreamGraphHasher;
import org.apache.flink.streaming.api.graph.StreamGraphHasherV2;
import org.apache.flink.streaming.api.graph.StreamGraphUserHashHasher;
import org.apache.flink.streaming.api.graph.StreamNode;
import org.apache.flink.streaming.api.graph.util.CursorableLinkedList;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.runtime.partitioner.CustomPartitionerWrapper;
import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
import org.apache.flink.streaming.runtime.partitioner.RescalePartitioner;
import org.apache.flink.streaming.runtime.partitioner.ShufflePartitioner;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.streaming.runtime.tasks.ArbitraryInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamIterationHead;
import org.apache.flink.streaming.runtime.tasks.StreamIterationTail;
import org.apache.flink.streaming.runtime.tasks.StreamTaskConfig;
import org.apache.flink.streaming.runtime.tasks.StreamTaskConfigCache;
import org.apache.flink.util.AbstractID;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class StreamingJobGraphGenerator {
    private static final Logger LOG = LoggerFactory.getLogger(StreamingJobGraphGenerator.class);
    public static final String JOB_VERTEX_TO_STREAM_NODE_MAP = "jobVertexToStreamNodeMap";
    private final StreamGraph streamGraph;
    private final JobGraph jobGraph;
    private final Map<Integer, JobVertex> nodeToJobVertexMap;
    private final List<StreamEdge> transitiveOutEdges;
    private final Map<Integer, List<Integer>> chainedNodeIdsMap;
    private final StreamGraphHasher defaultStreamGraphHasher;
    private final List<StreamGraphHasher> legacyStreamGraphHashers;
    private boolean hasNonDeterministicShuffles = false;

    public static JobGraph createJobGraph(StreamGraph streamGraph) {
        return StreamingJobGraphGenerator.createJobGraph(streamGraph, null);
    }

    public static JobGraph createJobGraph(StreamGraph streamGraph, Configuration flinkConf) {
        return new StreamingJobGraphGenerator(streamGraph).createJobGraph(flinkConf);
    }

    private StreamingJobGraphGenerator(StreamGraph streamGraph) {
        this.streamGraph = streamGraph;
        this.defaultStreamGraphHasher = new StreamGraphHasherV2();
        this.legacyStreamGraphHashers = Collections.singletonList(new StreamGraphUserHashHasher());
        this.nodeToJobVertexMap = new HashMap<Integer, JobVertex>();
        this.transitiveOutEdges = new ArrayList<StreamEdge>();
        this.chainedNodeIdsMap = new HashMap<Integer, List<Integer>>();
        this.jobGraph = new JobGraph(streamGraph.getJobName());
    }

    private void setDefaultResourceConstraints(Configuration flinkConf) {
        ResourceConstraintsConfig resourceConstraintsConfig;
        ResourceConstraints constraints;
        if (flinkConf != null && (constraints = (resourceConstraintsConfig = new ResourceConstraintsConfig(flinkConf)).getDefaultResourceConstraints()) != null) {
            LOG.info("Set default resource constraints: " + constraints.toString());
            for (StreamNode streamNode : this.streamGraph.getStreamNodes()) {
                if (streamNode.getResourceConstraints() != null) continue;
                streamNode.setResourceConstraints(constraints.clone());
            }
        }
    }

    private JobGraph createJobGraph(Configuration flinkConf) {
        this.jobGraph.addCustomConfiguration(this.streamGraph.getCustomConfiguration());
        Map<Integer, byte[]> hashes = this.defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(this.streamGraph);
        ArrayList<Map<Integer, byte[]>> legacyHashes = new ArrayList<Map<Integer, byte[]>>(this.legacyStreamGraphHashers.size());
        for (StreamGraphHasher hasher : this.legacyStreamGraphHashers) {
            legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(this.streamGraph));
        }
        this.setDefaultResourceConstraints(flinkConf);
        this.setChaining(hashes, legacyHashes);
        this.connectEdges();
        this.connectControlEdges();
        this.setTags();
        this.setPlacementConstraints();
        this.setSlotSharing();
        this.configureCheckpointing();
        this.setSchedulerConfiguration();
        ClientUtils.addUserArtifactEntries(this.streamGraph.getCachedFiles(), (JobGraph)this.jobGraph);
        try {
            this.jobGraph.setExecutionConfig(this.streamGraph.getExecutionConfig());
        }
        catch (IOException e) {
            throw new IllegalConfigurationException("Could not serialize the ExecutionConfig.This indicates that non-serializable types (like custom serializers) were registered");
        }
        this.jobGraph.setHasNonDeterministicShuffles(this.hasNonDeterministicShuffles);
        return this.jobGraph;
    }

    private void setSchedulerConfiguration() {
        Configuration configuration = this.jobGraph.getSchedulingConfiguration();
        this.setVertexToStreamNodesMap(configuration);
        configuration.addAll(this.streamGraph.getCustomConfiguration());
    }

    private void setTags() {
        for (JobVertex vertex : this.jobGraph.getVertices()) {
            vertex.addTag(new SlotTag(vertex.getID().toString(), this.jobGraph.getJobID()));
        }
    }

    private void setPlacementConstraints() {
        if (this.streamGraph.getExecutionConfig().isForceTaskExclusivePlacement()) {
            for (JobVertex vertex : this.jobGraph.getVerticesSortedTopologicallyFromSources()) {
                SlotTag tag = new SlotTag(vertex.getID().toString(), this.jobGraph.getJobID());
                TaggedSlot slotWithGivenTag = new TaggedSlot(true, Collections.singletonList(tag), SlotTagScope.JOB);
                TaggedSlot slotWithoutGivenTag = new TaggedSlot(false, Collections.singletonList(tag), SlotTagScope.JOB);
                TaggedSlotContext contextWithoutSlotWithoutGivenTag = new TaggedSlotContext(false, slotWithoutGivenTag);
                InterSlotPlacementConstraint placementConstraintForTaskWithGivenTag = new InterSlotPlacementConstraint(slotWithGivenTag, contextWithoutSlotWithoutGivenTag);
                this.jobGraph.addPlacementConstraint((PlacementConstraint)placementConstraintForTaskWithGivenTag);
                TaggedSlotContext contextWithoutSlotWithGivenTag = new TaggedSlotContext(false, slotWithGivenTag);
                InterSlotPlacementConstraint placementConstraintForTaskWithoutGivenTag = new InterSlotPlacementConstraint(slotWithoutGivenTag, contextWithoutSlotWithGivenTag);
                this.jobGraph.addPlacementConstraint((PlacementConstraint)placementConstraintForTaskWithoutGivenTag);
            }
        }
    }

    private void setVertexToStreamNodesMap(Configuration configuration) {
        HashMap vertexToStreamNodeIds = new HashMap();
        for (Map.Entry<Integer, List<Integer>> entry : this.chainedNodeIdsMap.entrySet()) {
            JobVertex jobVertex = this.nodeToJobVertexMap.get(entry.getKey());
            vertexToStreamNodeIds.put(jobVertex.getID(), entry.getValue() == null ? Collections.emptyList() : entry.getValue());
        }
        try {
            InstantiationUtil.writeObjectToConfig(vertexToStreamNodeIds, (Configuration)configuration, (String)JOB_VERTEX_TO_STREAM_NODE_MAP);
        }
        catch (IOException e) {
            throw new FlinkRuntimeException("Could not serialize job vertex to stream node map", (Throwable)e);
        }
    }

    private void setChaining(Map<Integer, byte[]> hashes, List<Map<Integer, byte[]>> legacyHashes) {
        Map<Integer, ChainingStreamNode> chainedNodeMap;
        List<Integer> sortedSourceIDs = this.streamGraph.getSourceIDs().stream().sorted(Comparator.comparing(id -> {
            StreamOperator<?> operator = this.streamGraph.getStreamNode((Integer)id).getOperator();
            return operator == null || operator instanceof StreamSource ? 0 : 1;
        }).thenComparingInt(id -> id)).collect(Collectors.toList());
        List<ChainingStreamNode> sortedChainingNodes = StreamingJobGraphGenerator.sortTopologicalNodes(this.streamGraph, sortedSourceIDs);
        Map<Integer, ChainingStreamNode> chainingNodeMap = sortedChainingNodes.stream().collect(Collectors.toMap(ChainingStreamNode::getNodeId, o -> o));
        SequenceGenerator depthFirstSequenceGenerator = new SequenceGenerator();
        HashSet<Integer> visitedNodeSet = new HashSet<Integer>();
        for (Integer sourceNodeId : sortedSourceIDs) {
            StreamingJobGraphGenerator.setDepthFirstNumber(sourceNodeId, chainingNodeMap, this.streamGraph, depthFirstSequenceGenerator, visitedNodeSet);
        }
        if (this.streamGraph.isChainingEnabled()) {
            this.splitChain(sortedChainingNodes, chainingNodeMap);
            chainedNodeMap = chainingNodeMap;
        } else {
            chainedNodeMap = null;
        }
        CreatingChainIntermediateStorager intermediateStorager = new CreatingChainIntermediateStorager();
        for (ChainingStreamNode chainingNode : sortedChainingNodes) {
            Integer startNodeId = chainingNode.getNodeId();
            if (this.createChain(startNodeId, startNodeId, new SequenceGenerator(), chainedNodeMap, hashes, legacyHashes, intermediateStorager)) {
                for (Integer nodeId : intermediateStorager.chainedNodeIdsInOrder) {
                    this.nodeToJobVertexMap.put(nodeId, intermediateStorager.createdVertex);
                }
                this.transitiveOutEdges.addAll(intermediateStorager.chainOutEdgesInOrder);
                this.chainedNodeIdsMap.put(startNodeId, new ArrayList<Integer>(intermediateStorager.chainedNodeIdsInOrder));
                this.jobGraph.addVertex(intermediateStorager.createdVertex);
            }
            intermediateStorager.resetForNewChain();
        }
        this.transitiveOutEdges.sort(Comparator.comparingInt(o -> ((ChainingStreamNode)chainingNodeMap.get(o.getTargetId())).getDepthFirstNumber()).thenComparingInt(o -> this.streamGraph.getStreamNode(o.getTargetId()).getInEdges().indexOf(o)));
    }

    static List<ChainingStreamNode> sortTopologicalNodes(StreamGraph streamGraph, List<Integer> sortedSourceIDs) {
        int size;
        ArrayList<ChainingStreamNode> sortedChainingNodes = new ArrayList<ChainingStreamNode>();
        ArrayDeque<Integer> visitedNonZeroInputNodes = new ArrayDeque<Integer>();
        HashMap<Integer, Integer[]> remainingInputNumMap = new HashMap<Integer, Integer[]>();
        for (Integer sourceNodeId : sortedSourceIDs) {
            visitedNonZeroInputNodes.add(sourceNodeId);
            remainingInputNumMap.put(sourceNodeId, new Integer[]{0, -1});
        }
        int iterationNumber = -1;
        SequenceGenerator topologicalOrderSeq = new SequenceGenerator();
        HashSet<Integer> dedupSet = new HashSet<Integer>();
        while ((size = visitedNonZeroInputNodes.size()) > 0) {
            int remainingNumIndex1 = ++iterationNumber % 2;
            int remainingNumIndex2 = (iterationNumber + 1) % 2;
            dedupSet.clear();
            boolean hasZeroInputNode = false;
            for (int i = 0; i < size; ++i) {
                Integer currentNodeId = (Integer)visitedNonZeroInputNodes.pollFirst();
                Integer remainingInputNum = ((Integer[])remainingInputNumMap.get(currentNodeId))[remainingNumIndex1];
                if (remainingInputNum == 0) {
                    StreamNode currentNode = streamGraph.getStreamNode(currentNodeId);
                    ChainingStreamNode chainingNode = new ChainingStreamNode(currentNodeId, currentNode.getInEdges().size(), topologicalOrderSeq.get(), iterationNumber);
                    sortedChainingNodes.add(chainingNode);
                    for (StreamEdge outEdge : currentNode.getOutEdges()) {
                        Integer targetId = outEdge.getTargetId();
                        if (!dedupSet.contains(targetId)) {
                            visitedNonZeroInputNodes.addLast(targetId);
                            Integer[] remainingInputNums = (Integer[])remainingInputNumMap.get(targetId);
                            if (remainingInputNums == null) {
                                remainingInputNums = new Integer[]{-1, -1};
                                remainingInputNums[remainingNumIndex2] = streamGraph.getStreamNode(targetId).getInEdges().size() - 1;
                                remainingInputNumMap.put(targetId, remainingInputNums);
                            } else {
                                remainingInputNums[remainingNumIndex2] = remainingInputNums[remainingNumIndex1] - 1;
                            }
                            dedupSet.add(targetId);
                            continue;
                        }
                        Integer[] integerArray = (Integer[])remainingInputNumMap.get(targetId);
                        int n = remainingNumIndex2;
                        Integer n2 = integerArray[n];
                        Integer n3 = integerArray[n] = Integer.valueOf(integerArray[n] - 1);
                    }
                    remainingInputNumMap.remove(currentNodeId);
                    hasZeroInputNode = true;
                    continue;
                }
                if (remainingInputNum > 0) {
                    if (dedupSet.contains(currentNodeId)) continue;
                    visitedNonZeroInputNodes.addLast(currentNodeId);
                    Integer[] remainingInputNums = (Integer[])remainingInputNumMap.get(currentNodeId);
                    remainingInputNums[remainingNumIndex2] = remainingInputNums[remainingNumIndex1];
                    dedupSet.add(currentNodeId);
                    continue;
                }
                throw new RuntimeException("remainingInputNum for the node (id: " + currentNodeId + ") should be greater than 0");
            }
            if (hasZeroInputNode) continue;
            break;
        }
        if (sortedChainingNodes.size() < streamGraph.getStreamNodes().size()) {
            throw new RuntimeException("The stream graph is cyclic.");
        }
        return sortedChainingNodes;
    }

    static void setDepthFirstNumber(Integer nodeId, Map<Integer, ChainingStreamNode> chainingNodeMap, StreamGraph streamGraph, SequenceGenerator depthFirstSequenceGenerator, Set<Integer> visitedNodeSet) {
        if (visitedNodeSet.contains(nodeId)) {
            return;
        }
        visitedNodeSet.add(nodeId);
        StreamNode currentNode = streamGraph.getStreamNode(nodeId);
        for (StreamEdge outEdge : currentNode.getOutEdges()) {
            StreamingJobGraphGenerator.setDepthFirstNumber(outEdge.getTargetId(), chainingNodeMap, streamGraph, depthFirstSequenceGenerator, visitedNodeSet);
        }
        chainingNodeMap.get(nodeId).setDepthFirstNumber(depthFirstSequenceGenerator.get());
    }

    private void splitChain(List<ChainingStreamNode> sortedChainingNodes, Map<Integer, ChainingStreamNode> chainingNodeMap) {
        StreamingJobGraphGenerator.splitUpInitialChains(chainingNodeMap, sortedChainingNodes, this.streamGraph);
        if (this.streamGraph.isMultiHeadChainMode()) {
            StreamingJobGraphGenerator.breakOffChainForNoDeadlock(chainingNodeMap, sortedChainingNodes, this.streamGraph);
            StreamingJobGraphGenerator.breakOffChainForAcyclicJobGraph(chainingNodeMap, sortedChainingNodes, this.streamGraph);
        }
    }

    static void splitUpInitialChains(Map<Integer, ChainingStreamNode> chainingNodeMap, List<ChainingStreamNode> sortedChainingNodes, StreamGraph streamGraph) {
        for (ChainingStreamNode currentChainingNode : sortedChainingNodes) {
            StreamNode currentNode = streamGraph.getStreamNode(currentChainingNode.getNodeId());
            if (currentNode.getInEdges().size() == 0) {
                StreamOperator<?> operator = currentNode.getOperator();
                currentChainingNode.setAllowMultiHeadChaining(operator == null || operator instanceof StreamSource ? Boolean.FALSE : Boolean.TRUE);
            }
            for (StreamEdge edge : currentNode.getOutEdges()) {
                ChainingStreamNode downstreamChainingNode = chainingNodeMap.get(edge.getTargetId());
                downstreamChainingNode.chainTo(currentChainingNode, edge, streamGraph.getStreamNode(edge.getSourceId()), streamGraph.getStreamNode(edge.getTargetId()), streamGraph.isMultiHeadChainMode(), streamGraph.isChainEagerlyEnabled(), streamGraph.getExecutionConfig().getExecutionMode());
            }
        }
    }

    static void breakOffChainForNoDeadlock(Map<Integer, ChainingStreamNode> chainingNodeMap, List<ChainingStreamNode> sortedChainingNodes, StreamGraph streamGraph) {
        StreamingJobGraphGenerator.inferReadPriority(chainingNodeMap, sortedChainingNodes, streamGraph);
        for (ChainingStreamNode chainingNode : sortedChainingNodes) {
            Integer nodeId = chainingNode.getNodeId();
            StreamNode node = streamGraph.getStreamNode(nodeId);
            if (!chainingNode.isReadPriorityConflicting()) continue;
            for (StreamEdge inEdge : node.getInEdges()) {
                Integer sourceId = inEdge.getSourceId();
                StreamNode.ReadPriority readPriority = chainingNode.getReadPriority(sourceId);
                if (StreamNode.ReadPriority.HIGHER.equals((Object)readPriority) || !StreamingJobGraphGenerator.needToBreakOffChain(inEdge, chainingNodeMap, streamGraph)) continue;
                chainingNode.removeChainableToNode(sourceId);
            }
        }
    }

    static void inferReadPriority(Map<Integer, ChainingStreamNode> chainingNodeMap, List<ChainingStreamNode> sortedChainingNodes, StreamGraph streamGraph) {
        Integer nodeId;
        int i;
        ArrayList<StreamEdge> lackPriorEdges = new ArrayList<StreamEdge>();
        for (i = sortedChainingNodes.size() - 1; i >= 0; --i) {
            ChainingStreamNode chainingNode = sortedChainingNodes.get(i);
            nodeId = chainingNode.getNodeId();
            StreamNode node = streamGraph.getStreamNode(nodeId);
            for (StreamEdge inEdge : node.getInEdges()) {
                if (inEdge.getDataExchangeMode() == DataExchangeMode.BATCH) {
                    lackPriorEdges.add(inEdge);
                    continue;
                }
                Integer upstreamNodeId = inEdge.getSourceId();
                ChainingStreamNode upstreamChainingNode = chainingNodeMap.get(upstreamNodeId);
                StreamNode.ReadPriority readPriority = node.getReadPriorityHint(inEdge);
                if (readPriority == null) {
                    readPriority = chainingNode.getTransitivePriority();
                } else {
                    StreamNode.ReadPriority downPriority = chainingNode.getTransitivePriority();
                    if (downPriority != null && !readPriority.equals((Object)downPriority)) {
                        readPriority = StreamNode.ReadPriority.DYNAMIC;
                    }
                }
                if (readPriority != null) {
                    chainingNode.setReadPriority(upstreamNodeId, readPriority);
                    upstreamChainingNode.setDownPriority(nodeId, readPriority);
                    continue;
                }
                lackPriorEdges.add(inEdge);
            }
        }
        for (i = 0; i < lackPriorEdges.size(); ++i) {
            StreamEdge edge = (StreamEdge)lackPriorEdges.get(i);
            nodeId = edge.getSourceId();
            Integer downstreamNodeId = edge.getTargetId();
            ChainingStreamNode chainingNode = chainingNodeMap.get(nodeId);
            ChainingStreamNode downstreamChainingNode = chainingNodeMap.get(downstreamNodeId);
            StreamNode.ReadPriority readPriority = chainingNode.getTransitivePriority();
            if (readPriority == null) {
                readPriority = StreamNode.ReadPriority.HIGHER;
            }
            downstreamChainingNode.setReadPriority(nodeId, readPriority);
            chainingNode.setDownPriority(downstreamNodeId, readPriority);
        }
    }

    private static boolean needToBreakOffChain(StreamEdge origEdge, Map<Integer, ChainingStreamNode> chainingNodeMap, StreamGraph streamGraph) {
        int currentBreakNum;
        Integer origNodeId = origEdge.getTargetId();
        Integer origUpstreamId = origEdge.getSourceId();
        boolean isOrigEdgeBroken = !chainingNodeMap.get(origNodeId).isChainTo(origUpstreamId);
        boolean isInputBreakCondMet = false;
        boolean isPriorityConflictCondMet = false;
        boolean isDamCondMet = false;
        ArrayDeque<ImmutablePair> edgeQueue = new ArrayDeque<ImmutablePair>();
        edgeQueue.addLast(new ImmutablePair((Object)origEdge, (Object)isOrigEdgeBroken));
        int n = currentBreakNum = isOrigEdgeBroken ? 1 : 0;
        while (edgeQueue.size() > 0) {
            if (!isInputBreakCondMet && currentBreakNum == edgeQueue.size()) {
                isInputBreakCondMet = true;
            }
            Pair pair = (Pair)edgeQueue.pollFirst();
            StreamEdge currentEdge = (StreamEdge)pair.getLeft();
            Integer upstreamNodeId = currentEdge.getSourceId();
            boolean isDownBroken = (Boolean)pair.getRight();
            if (isDownBroken) {
                --currentBreakNum;
            }
            if (isInputBreakCondMet && !isDamCondMet) break;
            if (!isDamCondMet && !isDownBroken && DamBehavior.FULL_DAM.equals((Object)currentEdge.getDamBehavior())) {
                isDamCondMet = true;
            }
            if (isDamCondMet && chainingNodeMap.get(upstreamNodeId).isDownPriorityConflicting()) {
                isPriorityConflictCondMet = true;
                break;
            }
            StreamNode upstreamNode = streamGraph.getStreamNode(upstreamNodeId);
            for (StreamEdge nextInEdge : upstreamNode.getInEdges()) {
                if (!isDownBroken) {
                    boolean bl = isDownBroken = !chainingNodeMap.get(nextInEdge.getTargetId()).isChainTo(nextInEdge.getSourceId());
                }
                if (isDownBroken) {
                    ++currentBreakNum;
                }
                edgeQueue.addLast(new ImmutablePair((Object)nextInEdge, (Object)isDownBroken));
            }
        }
        return isDamCondMet && isPriorityConflictCondMet;
    }

    static void breakOffChainForAcyclicJobGraph(Map<Integer, ChainingStreamNode> chainingNodeMap, List<ChainingStreamNode> sortedChainingNodes, StreamGraph streamGraph) {
        if (chainingNodeMap.size() == 0) {
            return;
        }
        Map<Integer, CoarsenedNode> coarsenedNodeMap = chainingNodeMap.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, o -> {
            Integer nodeId = (Integer)o.getKey();
            return new CoarsenedNode(streamGraph.getStreamNode(nodeId), (ChainingStreamNode)chainingNodeMap.get(nodeId));
        }));
        CursorableLinkedList<CoarsenedNode> sortedCoarsenedNodes = new CursorableLinkedList<CoarsenedNode>();
        for (ChainingStreamNode chainingNode : sortedChainingNodes) {
            sortedCoarsenedNodes.add(coarsenedNodeMap.get(chainingNode.getNodeId()));
        }
        HashMap<Integer, CursorableLinkedList.Cursor<CoarsenedNode>> cursorMap = new HashMap<Integer, CursorableLinkedList.Cursor<CoarsenedNode>>();
        CursorableLinkedList.Cursor cursor = sortedCoarsenedNodes.cursor();
        CoarsenedNode coarsenedNode = (CoarsenedNode)cursor.next();
        while (true) {
            Integer coarsenedId = coarsenedNode.getId();
            cursorMap.put(coarsenedId, sortedCoarsenedNodes.cursor(cursor));
            CoarsenedNode nextCoarsenedNode = null;
            if (cursor.hasNext()) {
                nextCoarsenedNode = (CoarsenedNode)cursor.next();
            }
            Set<Integer> predCoarsenedNodes = coarsenedNode.getPredecessorsNodes().keySet();
            Iterator<Integer> iterator = predCoarsenedNodes.iterator();
            while (iterator.hasNext()) {
                CoarsenedNode predCoarsenedNode;
                CursorableLinkedList.Cursor<CoarsenedNode> newCursor;
                Integer predCoarsenedId = iterator.next();
                if (!coarsenedNode.getPredecessorsNodes().get(predCoarsenedId).booleanValue() || (newCursor = StreamingJobGraphGenerator.tryToMergeCoarsenedNode(sortedCoarsenedNodes, coarsenedNode, predCoarsenedNode = coarsenedNodeMap.get(predCoarsenedId), cursorMap)) == null) continue;
                predCoarsenedNode.merge(coarsenedNode, coarsenedNodeMap);
                cursorMap.put(predCoarsenedId, newCursor);
                coarsenedNodeMap.remove(coarsenedId);
                cursorMap.remove(coarsenedId);
                coarsenedNode = predCoarsenedNode;
                coarsenedId = predCoarsenedId;
            }
            if (nextCoarsenedNode == null) break;
            coarsenedNode = nextCoarsenedNode;
        }
        HashMap<Integer, Integer> streamNodeIdToCoarsenedIdMap = new HashMap<Integer, Integer>();
        cursor = sortedCoarsenedNodes.cursor();
        while (cursor.hasNext()) {
            coarsenedNode = (CoarsenedNode)cursor.next();
            Integer coarsenedId = coarsenedNode.getId();
            for (Integer streamNodeId : coarsenedNode.getOriginalNodes()) {
                streamNodeIdToCoarsenedIdMap.put(streamNodeId, coarsenedId);
            }
        }
        for (ChainingStreamNode chainingNode : sortedChainingNodes) {
            Integer nodeId = chainingNode.getNodeId();
            Integer coarsenedId = (Integer)streamNodeIdToCoarsenedIdMap.get(nodeId);
            StreamNode node = streamGraph.getStreamNode(nodeId);
            for (StreamEdge edge : node.getInEdges()) {
                Integer sourceId = edge.getSourceId();
                if (!chainingNode.isChainTo(sourceId) || ((Integer)streamNodeIdToCoarsenedIdMap.get(sourceId)).equals(coarsenedId)) continue;
                chainingNode.removeChainableToNode(sourceId);
            }
            chainingNode.setCoarsenedId(coarsenedId);
        }
    }

    private static CursorableLinkedList.Cursor<CoarsenedNode> tryToMergeCoarsenedNode(CursorableLinkedList<CoarsenedNode> sortedCoarsenedNodes, CoarsenedNode coarsenedNode, CoarsenedNode predecessorNode, Map<Integer, CursorableLinkedList.Cursor<CoarsenedNode>> cursorMap) {
        Integer coarsenedId = coarsenedNode.getId();
        Integer predecessorId = predecessorNode.getId();
        CursorableLinkedList.Cursor<CoarsenedNode> cursor1 = sortedCoarsenedNodes.cursor(cursorMap.get(predecessorId));
        CoarsenedNode currentNode = cursor1.getValue();
        Map<Integer, Boolean> predecessorNodes = coarsenedNode.getPredecessorsNodes();
        int pos1 = 0;
        int index = 0;
        while (true) {
            Integer currentNodeId;
            if (predecessorNodes.containsKey(currentNodeId = currentNode.getId())) {
                pos1 = index;
            }
            if (currentNodeId.equals(coarsenedId)) break;
            if (!cursor1.hasNext()) {
                throw new IllegalStateException("An internal error is occurred.");
            }
            ++index;
            currentNode = cursor1.next();
        }
        CursorableLinkedList.Cursor<CoarsenedNode> cursor2 = sortedCoarsenedNodes.cursor(cursorMap.get(predecessorId));
        int pos2 = 0;
        Set<Integer> successorNodes = predecessorNode.getSucessorNodes();
        while (cursor2.hasNext()) {
            ++pos2;
            currentNode = cursor2.next();
            Integer currentNodeId = currentNode.getId();
            if (successorNodes.contains(currentNodeId)) break;
            if (!currentNodeId.equals(coarsenedId)) continue;
            throw new IllegalStateException("An internal error is occurred");
        }
        if (pos2 > pos1) {
            CursorableLinkedList.Cursor<CoarsenedNode> resultCursor = cursorMap.get(predecessorId);
            if (pos2 > 1) {
                resultCursor.moveNodeTo(cursor2);
            }
            cursorMap.get(coarsenedId).remove();
            return resultCursor;
        }
        return null;
    }

    private boolean createChain(Integer startNodeId, Integer currentNodeId, SequenceGenerator chainIndexGenerator, @Nullable Map<Integer, ChainingStreamNode> chainedNodeMap, Map<Integer, byte[]> hashes, List<Map<Integer, byte[]>> legacyHashes, CreatingChainIntermediateStorager storager) {
        if (storager.allBuiltNodes.contains(currentNodeId)) {
            return false;
        }
        storager.allBuiltNodes.add(currentNodeId);
        StreamNode currentStreamNode = this.streamGraph.getStreamNode(currentNodeId);
        int chainIndex = chainIndexGenerator.get();
        byte[] primaryHashBytes = hashes.get(currentNodeId);
        boolean isHeadNode = chainedNodeMap == null || chainedNodeMap.get(currentNodeId).isChainHeadNode();
        ArrayList<StreamEdge> chainedOutputs = new ArrayList<StreamEdge>();
        ArrayList<StreamEdge> nonChainedOutputs = new ArrayList<StreamEdge>();
        for (StreamEdge outEdge : currentStreamNode.getOutEdges()) {
            ChainingStreamNode downstreamNode;
            Integer downstreamNodeId = outEdge.getTargetId();
            ChainingStreamNode chainingStreamNode = downstreamNode = chainedNodeMap == null ? null : chainedNodeMap.get(downstreamNodeId);
            if (chainedNodeMap == null || !downstreamNode.isChainTo(currentNodeId)) {
                nonChainedOutputs.add(outEdge);
                continue;
            }
            StreamNode targetNode = this.streamGraph.getStreamNode(outEdge.getTargetId());
            StreamNode startNode = this.streamGraph.getStreamNode(startNodeId);
            startNode.setResourceConstraints(this.mergeResourceConstraints(startNode.getResourceConstraints(), targetNode.getResourceConstraints()));
            chainedOutputs.add(outEdge);
            this.createChain(startNodeId, downstreamNodeId, chainIndexGenerator, chainedNodeMap, hashes, legacyHashes, storager);
        }
        storager.chainedNameMap.put(currentNodeId, StreamingJobGraphGenerator.makeChainedName(currentStreamNode.getOperatorName(), chainedOutputs, storager.chainedNameMap));
        if (chainedNodeMap != null) {
            for (StreamEdge inEdge : currentStreamNode.getInEdges()) {
                Integer upstreamNodeId = inEdge.getSourceId();
                ChainingStreamNode currentNode = chainedNodeMap.get(currentNodeId);
                if (!currentNode.isChainTo(upstreamNodeId)) continue;
                this.createChain(startNodeId, upstreamNodeId, chainIndexGenerator, chainedNodeMap, hashes, legacyHashes, storager);
            }
        }
        StreamConfig currentNodeConfig = new StreamConfig(new Configuration());
        OperatorID currentOperatorID = new OperatorID(primaryHashBytes);
        if (isHeadNode) {
            currentNodeConfig.setChainStart();
        }
        currentNodeConfig.setChainIndex(chainIndex);
        currentNodeConfig.setOperatorName(currentStreamNode.getOperatorName());
        currentNodeConfig.setOperatorID(currentOperatorID);
        if (chainedOutputs.isEmpty()) {
            currentNodeConfig.setChainEnd();
        }
        ArrayList<StreamEdge> nonChainedInputs = new ArrayList<StreamEdge>();
        for (StreamEdge inEdge : currentStreamNode.getInEdges()) {
            if (chainedNodeMap == null) {
                nonChainedInputs.add(inEdge);
                continue;
            }
            ChainingStreamNode currentNode = chainedNodeMap.get(currentNodeId);
            if (currentNode.isChainTo(inEdge.getSourceId())) continue;
            nonChainedInputs.add(inEdge);
        }
        StreamingJobGraphGenerator.setupNodeConfig(currentNodeId, nonChainedInputs, chainedOutputs, nonChainedOutputs, this.streamGraph, currentNodeConfig);
        storager.chainedConfigMap.put(currentNodeId, currentNodeConfig);
        storager.chainedNodeIdsInOrder.add(currentNodeId);
        if (isHeadNode) {
            storager.chainedHeadNodeIdsInOrder.add(currentNodeId);
        }
        storager.chainInEdgesInOrder.addAll(nonChainedInputs);
        storager.chainOutEdgesInOrder.addAll(nonChainedOutputs);
        if (currentStreamNode.getOutputFormat() != null) {
            storager.chainOutputFormatMap.put(currentOperatorID, currentStreamNode.getOutputFormat());
        }
        if (currentStreamNode.getInputFormat() != null) {
            storager.chainInputFormatMap.put(currentOperatorID, currentStreamNode.getInputFormat());
        }
        ResourceSpec currentNodeMinResources = currentStreamNode.getMinResources();
        storager.chainedMinResources = storager.chainedMinResources == null ? currentNodeMinResources : storager.chainedMinResources.merge(currentNodeMinResources);
        ResourceSpec currentNodePreferredResources = currentStreamNode.getPreferredResources();
        ResourceSpec resourceSpec = storager.chainedPreferredResources = storager.chainedPreferredResources == null ? currentNodePreferredResources : storager.chainedPreferredResources.merge(currentNodePreferredResources);
        if (currentNodeId.equals(startNodeId)) {
            if (chainedNodeMap != null) {
                storager.chainInEdgesInOrder.sort(Comparator.comparingInt(o -> ((ChainingStreamNode)chainedNodeMap.get(o.getTargetId())).getDepthFirstNumber()).thenComparingInt(o -> this.streamGraph.getStreamNode(o.getTargetId()).getInEdges().indexOf(o)));
                storager.chainOutEdgesInOrder.sort(Comparator.comparingInt(o -> ((ChainingStreamNode)chainedNodeMap.get(o.getTargetId())).getDepthFirstNumber()).thenComparingInt(o -> this.streamGraph.getStreamNode(o.getTargetId()).getInEdges().indexOf(o)));
                storager.chainedNodeIdsInOrder.sort(Comparator.comparingInt(o -> ((ChainingStreamNode)chainedNodeMap.get(o)).getDepthFirstNumber()));
                storager.chainedHeadNodeIdsInOrder.sort(Comparator.comparingInt(o -> ((ChainingStreamNode)chainedNodeMap.get(o)).getTopologicalOrder()));
            }
            storager.createdVertex = this.createJobVertex(startNodeId, hashes, legacyHashes, storager);
            this.setupVertexConfig(currentNodeConfig, storager, storager.createdVertex.getConfiguration());
        }
        return true;
    }

    private JobVertex createJobVertex(Integer startNodeId, Map<Integer, byte[]> hashes, List<Map<Integer, byte[]>> legacyHashes, CreatingChainIntermediateStorager storager) {
        int n;
        MultiInputOutputFormatVertex jobVertex;
        Object hash;
        byte[] primaryHashBytes = hashes.get(startNodeId);
        if (primaryHashBytes == null) {
            throw new IllegalStateException("Cannot find node hash (nodeId: " + startNodeId + ") . Did you generate them before calling this method?");
        }
        JobVertexID jobVertexId = new JobVertexID(primaryHashBytes);
        ArrayList<JobVertexID> legacyJobVertexIds = new ArrayList<JobVertexID>(legacyHashes.size());
        for (Map<Integer, byte[]> legacyHash : legacyHashes) {
            hash = legacyHash.get(startNodeId);
            if (null == hash) continue;
            legacyJobVertexIds.add(new JobVertexID(hash));
        }
        ArrayList<OperatorID> chainedOperatorVertexIds = new ArrayList<OperatorID>();
        ArrayList<OperatorID> userDefinedChainedOperatorVertexIds = new ArrayList<OperatorID>();
        hash = storager.chainedNodeIdsInOrder.iterator();
        while (hash.hasNext()) {
            Integer nodeId = (Integer)hash.next();
            byte[] hash2 = hashes.get(nodeId);
            for (Map<Integer, byte[]> map : legacyHashes) {
                chainedOperatorVertexIds.add(new OperatorID(hash2));
                byte[] legacyHash = map.get(nodeId);
                userDefinedChainedOperatorVertexIds.add(legacyHash != null ? new OperatorID(legacyHash) : null);
            }
        }
        int vertexNameLengthLimit = this.streamGraph.getExecutionConfig().getTaskNameLengthLimit();
        String jobVertexName = StreamingJobGraphGenerator.makeJobVertexName(storager.chainedHeadNodeIdsInOrder, storager.chainedNameMap);
        if (jobVertexName != null && jobVertexName.length() > vertexNameLengthLimit) {
            jobVertexName = jobVertexName.substring(0, vertexNameLengthLimit) + "...";
        }
        StreamNode startStreamNode = this.streamGraph.getStreamNode(startNodeId);
        if (storager.chainInputFormatMap.size() != 0 || storager.chainOutputFormatMap.size() != 0) {
            jobVertex = new MultiInputOutputFormatVertex(jobVertexName, jobVertexId, legacyJobVertexIds, chainedOperatorVertexIds, userDefinedChainedOperatorVertexIds);
            TaskConfig taskConfig = new TaskConfig(jobVertex.getConfiguration());
            FormatUtil.MultiFormatStub.setStubFormats((TaskConfig)taskConfig, storager.chainInputFormatMap.size() == 0 ? null : storager.chainInputFormatMap, storager.chainOutputFormatMap.size() == 0 ? null : storager.chainOutputFormatMap);
        } else {
            jobVertex = new JobVertex(jobVertexName, jobVertexId, legacyJobVertexIds, chainedOperatorVertexIds, userDefinedChainedOperatorVertexIds);
        }
        boolean hasNodeWithChainedMultiInputs = false;
        for (Integer nodeId : storager.chainedNodeIdsInOrder) {
            byte[] hash3 = hashes.get(nodeId);
            StreamNode node = this.streamGraph.getStreamNode(nodeId);
            OperatorID operatorID = new OperatorID(hash3);
            OperatorDescriptor operatorDescriptor = new OperatorDescriptor(node.getOperatorName(), operatorID, Integer.valueOf(node.getId()));
            int inEdgesNumInChain = 0;
            for (StreamEdge streamEdge : node.getInEdges()) {
                OperatorEdgeDescriptor edgeDescriptor = new OperatorEdgeDescriptor(new OperatorID(hashes.get(streamEdge.getSourceId())), operatorID, streamEdge.getTypeNumber(), streamEdge.getPartitioner() == null ? "null" : streamEdge.getPartitioner().toString());
                operatorDescriptor.addInput(edgeDescriptor);
                if (!storager.chainedConfigMap.containsKey(streamEdge.getSourceId())) continue;
                ++inEdgesNumInChain;
            }
            jobVertex.addOperatorDescriptor(operatorDescriptor);
            if (hasNodeWithChainedMultiInputs || inEdgesNumInChain <= true) continue;
            hasNodeWithChainedMultiInputs = true;
        }
        jobVertex.setResources(storager.chainedMinResources, storager.chainedPreferredResources);
        if (storager.chainedHeadNodeIdsInOrder.size() > 1 || hasNodeWithChainedMultiInputs) {
            jobVertex.setInvokableClass(ArbitraryInputStreamTask.class);
        } else {
            jobVertex.setInvokableClass(startStreamNode.getJobVertexClass());
        }
        jobVertex.setResourceConstraints(this.streamGraph.getStreamNode(startNodeId).getResourceConstraints());
        int n2 = startStreamNode.getParallelism();
        if (n2 > 0) {
            jobVertex.setParallelism(n2);
        } else {
            n = jobVertex.getParallelism();
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Parallelism set: {} for {}", (Object)n, (Object)startNodeId);
        }
        jobVertex.setMaxParallelism(startStreamNode.getMaxParallelism());
        return jobVertex;
    }

    private void setupVertexConfig(StreamConfig anyheadNodeConfig, CreatingChainIntermediateStorager storager, Configuration config) {
        StreamTaskConfigCache configCache = storager.vertexConfigCache;
        configCache.setTimeCharacteristic(anyheadNodeConfig.getTimeCharacteristic());
        configCache.setCheckpointingEnabled(anyheadNodeConfig.isCheckpointingEnabled());
        configCache.setCheckpointMode(anyheadNodeConfig.getCheckpointMode());
        configCache.setStateBackend(anyheadNodeConfig.getStateBackend(storager.classLoader));
        configCache.setChainedNodeConfigs(storager.chainedConfigMap);
        configCache.setChainedHeadNodeIds(storager.chainedHeadNodeIdsInOrder);
        configCache.setInStreamEdgesOfChain(storager.chainInEdgesInOrder);
        configCache.setOutStreamEdgesOfChain(storager.chainOutEdgesInOrder);
        configCache.serializeTo(new StreamTaskConfig(config));
    }

    private static void setupNodeConfig(Integer nodeId, List<StreamEdge> nonChainableInputs, List<StreamEdge> chainableOutputs, List<StreamEdge> nonChainableOutputs, StreamGraph streamGraph, StreamConfig config) {
        Configuration customConfiguration;
        StreamNode vertex = streamGraph.getStreamNode(nodeId);
        config.setVertexID(nodeId);
        config.setBufferTimeout(vertex.getBufferTimeout());
        config.setTypeSerializerIn1(vertex.getTypeSerializerIn1());
        config.setTypeSerializerIn2(vertex.getTypeSerializerIn2());
        config.setTypeSerializerOut(vertex.getTypeSerializerOut());
        for (StreamEdge edge : chainableOutputs) {
            if (edge.getOutputTag() == null) continue;
            config.setTypeSerializerSideOut(edge.getOutputTag(), edge.getOutputTag().getTypeInfo().createSerializer(streamGraph.getExecutionConfig()));
        }
        for (StreamEdge edge : nonChainableOutputs) {
            if (edge.getOutputTag() == null) continue;
            config.setTypeSerializerSideOut(edge.getOutputTag(), edge.getOutputTag().getTypeInfo().createSerializer(streamGraph.getExecutionConfig()));
        }
        config.setStreamOperator(vertex.getOperator());
        config.setOutputSelectors(vertex.getOutputSelectors());
        config.setNumberOfInputs(nonChainableInputs.size());
        config.setNumberOfOutputs(nonChainableOutputs.size());
        config.setNonChainedOutputs(nonChainableOutputs);
        config.setChainedOutputs(chainableOutputs);
        config.setTimeCharacteristic(streamGraph.getTimeCharacteristic());
        CheckpointConfig ceckpointCfg = streamGraph.getCheckpointConfig();
        config.setStateBackend(streamGraph.getStateBackend());
        config.setCheckpointingEnabled(ceckpointCfg.isCheckpointingEnabled());
        if (ceckpointCfg.isCheckpointingEnabled()) {
            config.setCheckpointMode(ceckpointCfg.getCheckpointingMode());
        } else {
            config.setCheckpointMode(CheckpointingMode.AT_LEAST_ONCE);
        }
        config.setStatePartitioner(0, vertex.getStatePartitioner1());
        config.setStatePartitioner(1, vertex.getStatePartitioner2());
        config.setStateKeySerializer(vertex.getStateKeySerializer());
        Class<? extends AbstractInvokable> vertexClass = vertex.getJobVertexClass();
        if (vertexClass.equals(StreamIterationHead.class) || vertexClass.equals(StreamIterationTail.class)) {
            config.setIterationId(streamGraph.getBrokerID(nodeId));
            config.setIterationWaitTime(streamGraph.getLoopTimeout(nodeId));
        }
        if ((customConfiguration = vertex.getCustomConfiguration()).keySet().size() > 0) {
            config.setCustomConfiguration(customConfiguration);
        }
    }

    private static String makeChainedName(String currentOperatorName, List<StreamEdge> chainedOutputs, Map<Integer, String> chainedNames) {
        if (chainedOutputs.size() > 1) {
            ArrayList<String> outputChainedNames = new ArrayList<String>();
            for (StreamEdge chainable : chainedOutputs) {
                outputChainedNames.add(chainedNames.get(chainable.getTargetId()));
            }
            return currentOperatorName + " -> (" + StringUtils.join(outputChainedNames, (String)", ") + ")";
        }
        if (chainedOutputs.size() == 1) {
            return currentOperatorName + " -> " + chainedNames.get(chainedOutputs.get(0).getTargetId());
        }
        return currentOperatorName;
    }

    private static String makeJobVertexName(List<Integer> sortedHeadNodeIds, Map<Integer, String> chainedNames) {
        StringBuilder nameBuffer = new StringBuilder();
        int chainedHeadNodeCount = sortedHeadNodeIds.size();
        if (chainedHeadNodeCount > 1) {
            nameBuffer.append("[");
        }
        for (int i = 0; i < chainedHeadNodeCount; ++i) {
            if (i > 0) {
                nameBuffer.append(", ");
            }
            nameBuffer.append(chainedNames.get(sortedHeadNodeIds.get(i)));
        }
        if (chainedHeadNodeCount > 1) {
            nameBuffer.append("]");
        }
        return nameBuffer.toString();
    }

    private static ResultPartitionType getEdgeResultPartitionType(DataExchangeMode dataExchangeMode, ExecutionMode executionMode) {
        switch (dataExchangeMode) {
            case AUTO: {
                switch (executionMode) {
                    case PIPELINED: {
                        return ResultPartitionType.PIPELINED;
                    }
                    case BATCH: {
                        return ResultPartitionType.BLOCKING;
                    }
                }
                throw new UnsupportedOperationException("Unknown execution mode " + executionMode + ".");
            }
            case PIPELINED: {
                return ResultPartitionType.PIPELINED;
            }
            case BATCH: {
                return ResultPartitionType.BLOCKING;
            }
            case PIPELINE_WITH_BATCH_FALLBACK: {
                throw new UnsupportedOperationException("Data exchange mode " + dataExchangeMode + " is not supported.");
            }
        }
        throw new UnsupportedOperationException("Unknown data exchange mode " + dataExchangeMode + ".");
    }

    private void connectEdges() {
        for (StreamEdge edge : this.transitiveOutEdges) {
            SchedulingMode schedulingMode;
            JobVertex upstreamVertex = this.nodeToJobVertexMap.get(edge.getSourceId());
            JobVertex downstreamVertex = this.nodeToJobVertexMap.get(edge.getTargetId());
            if (upstreamVertex.getID().equals((Object)downstreamVertex.getID())) {
                throw new RuntimeException("The job graph is cyclic.");
            }
            StreamPartitioner<?> partitioner = edge.getPartitioner();
            IntermediateDataSetID dataSetID = new IntermediateDataSetID((AbstractID)edge.getEdgeID());
            ExecutionMode executionMode = this.streamGraph.getExecutionConfig().getExecutionMode();
            JobEdge jobEdge = partitioner instanceof ForwardPartitioner || partitioner instanceof RescalePartitioner ? downstreamVertex.connectDataSetAsInput(upstreamVertex, dataSetID, DistributionPattern.POINTWISE, StreamingJobGraphGenerator.getEdgeResultPartitionType(edge.getDataExchangeMode(), executionMode)) : downstreamVertex.connectDataSetAsInput(upstreamVertex, dataSetID, DistributionPattern.ALL_TO_ALL, StreamingJobGraphGenerator.getEdgeResultPartitionType(edge.getDataExchangeMode(), executionMode));
            jobEdge.setShipStrategyName(partitioner.toString());
            if (!this.isPartitionerDeterministic(partitioner)) {
                LOG.info("Detected non-deterministic partitioner {} in edge {}.", partitioner, (Object)jobEdge);
                this.hasNonDeterministicShuffles = true;
            }
            switch (edge.getSchedulingMode()) {
                case CONCURRENT: {
                    schedulingMode = SchedulingMode.CONCURRENT;
                    break;
                }
                case SEQUENTIAL: {
                    schedulingMode = SchedulingMode.SEQUENTIAL;
                    break;
                }
                case AUTO: {
                    if (ExecutionMode.PIPELINED.equals((Object)executionMode)) {
                        schedulingMode = SchedulingMode.CONCURRENT;
                        break;
                    }
                    if (ExecutionMode.BATCH.equals((Object)executionMode)) {
                        schedulingMode = SchedulingMode.SEQUENTIAL;
                        break;
                    }
                    throw new UnsupportedOperationException("Not Support " + edge.getDataExchangeMode() + " exchanged mode.");
                }
                default: {
                    throw new UnsupportedOperationException("Not Support " + (Object)((Object)edge.getSchedulingMode()) + " scheduling type.");
                }
            }
            jobEdge.setSchedulingMode(schedulingMode);
            if (!LOG.isDebugEnabled()) continue;
            LOG.debug("CONNECTED: {} - {} -> {}", new Object[]{partitioner.getClass().getSimpleName(), edge.getSourceId(), edge.getTargetId()});
        }
    }

    private boolean isPartitionerDeterministic(StreamPartitioner partitioner) {
        return !(partitioner instanceof CustomPartitionerWrapper) && !(partitioner instanceof RebalancePartitioner) && !(partitioner instanceof RescalePartitioner) && !(partitioner instanceof ShufflePartitioner);
    }

    private void connectControlEdges() {
        HashMap<JobVertex, Set> dataEdges = new HashMap<JobVertex, Set>();
        for (JobVertex producer : this.jobGraph.getVertices()) {
            for (IntermediateDataSet dataSet : producer.getProducedDataSets()) {
                for (JobEdge jobEdge : dataSet.getConsumers()) {
                    Set consumers = dataEdges.computeIfAbsent(producer, k -> new HashSet());
                    consumers.add(jobEdge.getTarget());
                }
            }
        }
        HashMap<JobVertex, Map> controlEdges = new HashMap<JobVertex, Map>();
        for (StreamNode streamNode : this.streamGraph.getStreamNodes()) {
            for (StreamControlEdge streamControlEdge : streamNode.getInControlEdges()) {
                JobVertex sourceVertex = this.nodeToJobVertexMap.get(streamControlEdge.getSourceId());
                JobVertex targetVertex = this.nodeToJobVertexMap.get(streamControlEdge.getTargetId());
                if (sourceVertex.getID().equals((Object)targetVertex.getID()) || dataEdges.containsKey(sourceVertex) && ((Set)dataEdges.get(sourceVertex)).contains(targetVertex) || dataEdges.containsKey(targetVertex) && ((Set)dataEdges.get(targetVertex)).contains(sourceVertex)) continue;
                Map controlTypeMap = controlEdges.computeIfAbsent(sourceVertex, k -> new HashMap());
                Set controlTypes = controlTypeMap.computeIfAbsent(targetVertex, k -> new HashSet());
                controlTypes.add(streamControlEdge.getControlType());
            }
        }
        for (JobVertex sourceVertex : controlEdges.keySet()) {
            for (Map.Entry entry : ((Map)controlEdges.get(sourceVertex)).entrySet()) {
                JobVertex targetVertex = (JobVertex)entry.getKey();
                ControlType controlType = ((Set)entry.getValue()).contains(ControlType.CONCURRENT) ? ControlType.CONCURRENT : ControlType.START_ON_FINISH;
                targetVertex.connectControlEdge(sourceVertex, controlType);
            }
        }
    }

    private void setSlotSharing() {
        HashMap<String, SlotSharingGroup> slotSharingGroups = new HashMap<String, SlotSharingGroup>();
        for (Integer n : this.chainedNodeIdsMap.keySet()) {
            JobVertex vertex = this.nodeToJobVertexMap.get(n);
            String slotSharingGroup = this.streamGraph.getStreamNode(n).getSlotSharingGroup();
            if (slotSharingGroup == null) continue;
            SlotSharingGroup group = (SlotSharingGroup)slotSharingGroups.get(slotSharingGroup);
            if (group == null) {
                group = new SlotSharingGroup();
                slotSharingGroups.put(slotSharingGroup, group);
            }
            vertex.setSlotSharingGroup(group);
        }
        for (Tuple2 tuple2 : this.streamGraph.getIterationSourceSinkPairs()) {
            CoLocationGroup ccg = new CoLocationGroup();
            JobVertex source = this.nodeToJobVertexMap.get(((StreamNode)tuple2.f0).getId());
            JobVertex sink = this.nodeToJobVertexMap.get(((StreamNode)tuple2.f1).getId());
            ccg.addVertex(source);
            ccg.addVertex(sink);
            source.updateCoLocationGroup(ccg);
            sink.updateCoLocationGroup(ccg);
        }
        for (JobVertex jobVertex : this.jobGraph.getVerticesSortedTopologicallyFromSources()) {
            SlotSharingGroup group = jobVertex.getSlotSharingGroup();
            if (group == null) continue;
            jobVertex.getTags().clear();
            jobVertex.getTags().addAll(group.getTags());
        }
    }

    private void configureCheckpointing() {
        SerializedValue serializedStateBackend;
        SerializedValue serializedHooks;
        boolean isExactlyOnce;
        CheckpointRetentionPolicy retentionAfterTermination;
        CheckpointConfig cfg = this.streamGraph.getCheckpointConfig();
        long interval = cfg.getCheckpointInterval();
        if (interval > 0L) {
            ExecutionConfig executionConfig = this.streamGraph.getExecutionConfig();
            executionConfig.setFailTaskOnCheckpointError(cfg.isFailOnCheckpointingErrors());
        } else {
            interval = Long.MAX_VALUE;
        }
        ArrayList<JobVertexID> triggerVertices = new ArrayList<JobVertexID>();
        ArrayList<JobVertexID> ackVertices = new ArrayList<JobVertexID>(this.chainedNodeIdsMap.size());
        ArrayList<JobVertexID> commitVertices = new ArrayList<JobVertexID>(this.chainedNodeIdsMap.size());
        for (Integer startHeadNodeId : this.chainedNodeIdsMap.keySet()) {
            JobVertex vertex = this.nodeToJobVertexMap.get(startHeadNodeId);
            if (vertex.isInputVertex()) {
                triggerVertices.add(vertex.getID());
            }
            commitVertices.add(vertex.getID());
            ackVertices.add(vertex.getID());
        }
        if (cfg.isExternalizedCheckpointsEnabled()) {
            CheckpointConfig.ExternalizedCheckpointCleanup cleanup = cfg.getExternalizedCheckpointCleanup();
            if (cleanup == null) {
                throw new IllegalStateException("Externalized checkpoints enabled, but no cleanup mode configured.");
            }
            retentionAfterTermination = cleanup.deleteOnCancellation() ? CheckpointRetentionPolicy.RETAIN_ON_FAILURE : CheckpointRetentionPolicy.RETAIN_ON_CANCELLATION;
        } else {
            retentionAfterTermination = CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION;
        }
        CheckpointingMode mode = cfg.getCheckpointingMode();
        if (mode == CheckpointingMode.EXACTLY_ONCE) {
            isExactlyOnce = true;
        } else if (mode == CheckpointingMode.AT_LEAST_ONCE) {
            isExactlyOnce = false;
        } else {
            throw new IllegalStateException("Unexpected checkpointing mode. Did not expect there to be another checkpointing mode besides exactly-once or at-least-once.");
        }
        ArrayList<FunctionMasterCheckpointHookFactory> hooks = new ArrayList<FunctionMasterCheckpointHookFactory>();
        for (StreamNode node : this.streamGraph.getStreamNodes()) {
            Object f;
            StreamOperator<?> op = node.getOperator();
            if (!(op instanceof AbstractUdfStreamOperator) || !((f = ((AbstractUdfStreamOperator)op).getUserFunction()) instanceof WithMasterCheckpointHook)) continue;
            hooks.add(new FunctionMasterCheckpointHookFactory((WithMasterCheckpointHook)f));
        }
        if (hooks.isEmpty()) {
            serializedHooks = null;
        } else {
            try {
                MasterTriggerRestoreHook.Factory[] asArray = hooks.toArray(new MasterTriggerRestoreHook.Factory[hooks.size()]);
                serializedHooks = new SerializedValue((Object)asArray);
            }
            catch (IOException e) {
                throw new FlinkRuntimeException("Trigger/restore hook is not serializable", (Throwable)e);
            }
        }
        if (this.streamGraph.getStateBackend() == null) {
            serializedStateBackend = null;
        } else {
            try {
                serializedStateBackend = new SerializedValue((Object)this.streamGraph.getStateBackend());
            }
            catch (IOException e) {
                throw new FlinkRuntimeException("State backend is not serializable", (Throwable)e);
            }
        }
        JobCheckpointingSettings settings = new JobCheckpointingSettings(triggerVertices, ackVertices, commitVertices, new CheckpointCoordinatorConfiguration(interval, cfg.getCheckpointTimeout(), cfg.getMinPauseBetweenCheckpoints(), cfg.getMaxConcurrentCheckpoints(), retentionAfterTermination, isExactlyOnce), serializedStateBackend, serializedHooks);
        this.jobGraph.setSnapshotSettings(settings);
    }

    private ResourceConstraints mergeResourceConstraints(ResourceConstraints first, ResourceConstraints second) {
        if (first == null && second == null) {
            return null;
        }
        if (first != null) {
            return first.merge(second);
        }
        return second.merge(first);
    }

    private static boolean isResourceConstraintsCompatible(StreamNode startNode, StreamNode targetNode) {
        return startNode.getResourceConstraints() == null && targetNode.getResourceConstraints() == null || startNode.getResourceConstraints() != null && startNode.getResourceConstraints().isCompatibleWith(targetNode.getResourceConstraints()) || targetNode.getResourceConstraints() != null && targetNode.getResourceConstraints().isCompatibleWith(startNode.getResourceConstraints());
    }

    static class SequenceGenerator {
        private int sequence = 0;

        SequenceGenerator() {
        }

        public int get() {
            return this.sequence++;
        }

        public int last() {
            return this.sequence;
        }
    }

    static class CoarsenedNode {
        private final Integer id;
        private final Set<Integer> originalNodes = new HashSet<Integer>();
        private final Map<Integer, Boolean> predecessorsNodes = new HashMap<Integer, Boolean>();
        private final Set<Integer> sucessorNodes = new HashSet<Integer>();

        public CoarsenedNode(StreamNode streamNode, ChainingStreamNode chainingNode) {
            this.id = streamNode.getId();
            this.originalNodes.add(streamNode.getId());
            for (StreamEdge inEdge : streamNode.getInEdges()) {
                Integer sourceId = inEdge.getSourceId();
                this.predecessorsNodes.put(sourceId, chainingNode.isChainTo(sourceId));
            }
            for (StreamEdge outEdge : streamNode.getOutEdges()) {
                this.sucessorNodes.add(outEdge.getTargetId());
            }
        }

        public Integer getId() {
            return this.id;
        }

        public Set<Integer> getOriginalNodes() {
            return this.originalNodes;
        }

        public Map<Integer, Boolean> getPredecessorsNodes() {
            return this.predecessorsNodes;
        }

        public Set<Integer> getSucessorNodes() {
            return this.sucessorNodes;
        }

        public void merge(CoarsenedNode other, Map<Integer, CoarsenedNode> coarsenedNodeMap) {
            this.originalNodes.addAll(other.originalNodes);
            for (Map.Entry<Integer, Boolean> entry : other.predecessorsNodes.entrySet()) {
                Integer otherPredNodeId;
                this.predecessorsNodes.put(otherPredNodeId, other.predecessorsNodes.get(otherPredNodeId = entry.getKey()) != false && this.predecessorsNodes.getOrDefault(otherPredNodeId, Boolean.TRUE) != false);
                Set<Integer> updateNodes = coarsenedNodeMap.get((Object)otherPredNodeId).sucessorNodes;
                updateNodes.remove(other.id);
                updateNodes.add(this.id);
            }
            for (Integer otherSuccNodeId : other.sucessorNodes) {
                this.sucessorNodes.add(otherSuccNodeId);
                Map<Integer, Boolean> updateNodes = coarsenedNodeMap.get((Object)otherSuccNodeId).predecessorsNodes;
                updateNodes.put(this.id, updateNodes.get(other.id) != false && updateNodes.getOrDefault(this.id, Boolean.TRUE) != false);
                updateNodes.remove(other.id);
            }
            this.predecessorsNodes.remove(this.id);
            this.sucessorNodes.remove(this.id);
            this.predecessorsNodes.remove(other.id);
            this.sucessorNodes.remove(other.id);
        }
    }

    static class ChainingStreamNode {
        private final Integer nodeId;
        private final int inEdgeCnt;
        private final int topologicalOrder;
        private final int layerNumber;
        private int depthFirstNumber;
        private Map<StreamNode.ReadPriority, Set<Integer>> downPriorityMap;
        private Map<Integer, StreamNode.ReadPriority> readPriorityMap;
        private Map<StreamNode.ReadPriority, Integer> priorityInEdgeNumMap;
        private Integer coarsenedId;
        private Set<Integer> chainableToSet;
        private Boolean allowMultiHeadChaining;

        ChainingStreamNode(Integer nodeId, int inEdgeCnt, int topologicalOrder, int layerNumber) {
            this.nodeId = nodeId;
            this.inEdgeCnt = inEdgeCnt;
            this.topologicalOrder = topologicalOrder;
            this.layerNumber = layerNumber;
            this.downPriorityMap = new HashMap<StreamNode.ReadPriority, Set<Integer>>();
            this.readPriorityMap = new HashMap<Integer, StreamNode.ReadPriority>();
            this.priorityInEdgeNumMap = new HashMap<StreamNode.ReadPriority, Integer>();
        }

        int getNodeId() {
            return this.nodeId;
        }

        int getTopologicalOrder() {
            return this.topologicalOrder;
        }

        int getLayerNumber() {
            return this.layerNumber;
        }

        int getDepthFirstNumber() {
            return this.depthFirstNumber;
        }

        void setDepthFirstNumber(int depthFirstNumber) {
            this.depthFirstNumber = depthFirstNumber;
        }

        StreamNode.ReadPriority getTransitivePriority() {
            StreamNode.ReadPriority priority;
            if (this.isDownPriorityConflicting()) {
                priority = StreamNode.ReadPriority.DYNAMIC;
            } else if (this.downPriorityMap.size() == 1) {
                priority = this.downPriorityMap.keySet().iterator().next();
            } else if (this.downPriorityMap.size() == 0) {
                priority = null;
            } else {
                throw new IllegalStateException("This is an internal error.");
            }
            return priority;
        }

        boolean isDownPriorityConflicting() {
            return this.downPriorityMap.size() > 1 || this.downPriorityMap.getOrDefault((Object)StreamNode.ReadPriority.DYNAMIC, Collections.EMPTY_SET).size() > 1;
        }

        Set<Integer> getDownPriorityNodes(StreamNode.ReadPriority priority) {
            return this.downPriorityMap.get((Object)priority);
        }

        void setDownPriority(Integer downstreamNodeId, StreamNode.ReadPriority priority) {
            Preconditions.checkState((priority != null ? 1 : 0) != 0);
            this.downPriorityMap.computeIfAbsent(priority, k -> new HashSet()).add(downstreamNodeId);
        }

        StreamNode.ReadPriority getReadPriority(Integer upstreamNodeId) {
            return this.readPriorityMap.get(upstreamNodeId);
        }

        boolean isReadPriorityConflicting() {
            return this.priorityInEdgeNumMap.size() > 1 || this.priorityInEdgeNumMap.getOrDefault((Object)StreamNode.ReadPriority.DYNAMIC, 0) > 1;
        }

        void setReadPriority(Integer upstreamNodeId, StreamNode.ReadPriority priority) {
            Preconditions.checkState((priority != null ? 1 : 0) != 0);
            this.readPriorityMap.put(upstreamNodeId, priority);
            this.priorityInEdgeNumMap.put(priority, this.priorityInEdgeNumMap.getOrDefault((Object)priority, 0) + 1);
        }

        Integer getCoarsenedId() {
            return this.coarsenedId;
        }

        void setCoarsenedId(Integer coarsenedId) {
            this.coarsenedId = coarsenedId;
        }

        boolean isChainHeadNode() {
            return this.inEdgeCnt == 0 || this.inEdgeCnt > (this.chainableToSet == null ? 0 : this.chainableToSet.size());
        }

        boolean isChainTo(Integer upstreamNodeId) {
            return this.chainableToSet != null && this.chainableToSet.contains(upstreamNodeId);
        }

        void setAllowMultiHeadChaining(Boolean allowMultiHeadChaining) {
            Preconditions.checkState((this.allowMultiHeadChaining == null || this.allowMultiHeadChaining == allowMultiHeadChaining ? 1 : 0) != 0, (String)"The flag allowMultiHeadChaining can not be changed (nodeId: %s).", (Object[])new Object[]{this.nodeId});
            this.allowMultiHeadChaining = allowMultiHeadChaining;
        }

        void chainTo(ChainingStreamNode upstreamChainingNode, StreamEdge edge, StreamNode sourceNode, StreamNode targetNode, boolean isMultiHeadChainMode, boolean isEagerChainingEnabled, ExecutionMode executionMode) {
            boolean isChainable = upstreamChainingNode.allowMultiHeadChaining != false && isMultiHeadChainMode ? this.isChainableOnMultiHeadMode(edge, sourceNode, targetNode, isEagerChainingEnabled, executionMode) : this.isChainable(edge, sourceNode, targetNode, isEagerChainingEnabled, executionMode);
            if (isChainable) {
                this.addChainableToNode(upstreamChainingNode.nodeId);
                this.setAllowMultiHeadChaining(upstreamChainingNode.allowMultiHeadChaining);
            } else {
                this.setAllowMultiHeadChaining(Boolean.TRUE);
            }
        }

        private void addChainableToNode(Integer upstreamNodeId) {
            if (this.chainableToSet == null) {
                this.chainableToSet = new HashSet<Integer>();
            }
            this.chainableToSet.add(upstreamNodeId);
        }

        void removeChainableToNode(Integer upstreamNodeId) {
            if (this.chainableToSet != null) {
                this.chainableToSet.remove(upstreamNodeId);
            }
        }

        private boolean isChainable(StreamEdge edge, StreamNode upstreamNode, StreamNode downStreamNode, boolean chainEagerlyEnabled, ExecutionMode executionMode) {
            return downStreamNode.getInEdges().size() == 1 && this.isChainableOnMultiHeadMode(edge, upstreamNode, downStreamNode, chainEagerlyEnabled, executionMode);
        }

        private boolean isChainableOnMultiHeadMode(StreamEdge edge, StreamNode upstreamNode, StreamNode downStreamNode, boolean chainEagerlyEnabled, ExecutionMode executionMode) {
            StreamOperator<?> downstreamOperator = downStreamNode.getOperator();
            StreamOperator<?> upstreamOperator = upstreamNode.getOperator();
            return downstreamOperator != null && upstreamOperator != null && downStreamNode.isSameSlotSharingGroup(upstreamNode) && downstreamOperator.getChainingStrategy() == ChainingStrategy.ALWAYS && (upstreamOperator.getChainingStrategy() == ChainingStrategy.HEAD || upstreamOperator.getChainingStrategy() == ChainingStrategy.ALWAYS) && (edge.getPartitioner() instanceof ForwardPartitioner || downStreamNode.getParallelism() == 1 && chainEagerlyEnabled) && downStreamNode.getParallelism() == upstreamNode.getParallelism() && edge.getDataExchangeMode() != DataExchangeMode.BATCH && StreamingJobGraphGenerator.isResourceConstraintsCompatible(upstreamNode, downStreamNode);
        }
    }

    private static class CreatingChainIntermediateStorager {
        final Set<Integer> allBuiltNodes = new HashSet<Integer>();
        final ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
        final Map<Integer, StreamConfig> chainedConfigMap = new HashMap<Integer, StreamConfig>();
        final List<Integer> chainedHeadNodeIdsInOrder = new ArrayList<Integer>();
        final List<StreamEdge> chainInEdgesInOrder = new ArrayList<StreamEdge>();
        final List<StreamEdge> chainOutEdgesInOrder = new ArrayList<StreamEdge>();
        final Map<OperatorID, InputFormat> chainInputFormatMap = new HashMap<OperatorID, InputFormat>();
        final Map<OperatorID, OutputFormat> chainOutputFormatMap = new HashMap<OperatorID, OutputFormat>();
        final Map<Integer, String> chainedNameMap = new HashMap<Integer, String>();
        ResourceSpec chainedMinResources;
        ResourceSpec chainedPreferredResources;
        JobVertex createdVertex;
        final StreamTaskConfigCache vertexConfigCache = new StreamTaskConfigCache(this.classLoader);
        final List<Integer> chainedNodeIdsInOrder = new ArrayList<Integer>();

        CreatingChainIntermediateStorager() {
        }

        void resetForNewChain() {
            this.chainedConfigMap.clear();
            this.chainedHeadNodeIdsInOrder.clear();
            this.chainInEdgesInOrder.clear();
            this.chainOutEdgesInOrder.clear();
            this.chainInputFormatMap.clear();
            this.chainOutputFormatMap.clear();
            this.chainedNameMap.clear();
            this.chainedMinResources = null;
            this.chainedPreferredResources = null;
            this.createdVertex = null;
            this.vertexConfigCache.clear();
            this.chainedNodeIdsInOrder.clear();
        }
    }
}

