package org.apache.flink.streaming.api.graph;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.InputOutputFormatVertex;
import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobmanager.scheduler.ScheduleHint;
import org.apache.flink.runtime.operators.util.TaskConfig;
import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.graph.StreamNodeGroup;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
import org.apache.flink.streaming.runtime.partitioner.RescalePartitioner;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.streaming.runtime.tasks.ArbitraryInputStreamTask;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorV2.class */
public class StreamingJobGraphGeneratorV2 extends StreamingJobGraphGenerator implements StreamNodeGroup.AnalyzeResolver {
    private Map<Integer, StreamNodeGroup> nodeToStreamNodeGroup;
    private HashSet<StreamNodeGroup> allGroups;
    private List<StreamNodeGroup> groupsInTopologyOrder;
    private final Map<StreamEdge, IntermediateDataSet> edgeToIntermediateDataSet;
    private final Set<StreamEdge> unchainableEdge;

    /* JADX INFO: Access modifiers changed from: protected */
    public StreamingJobGraphGeneratorV2(StreamGraph streamGraph) {
        super(streamGraph);
        this.nodeToStreamNodeGroup = new HashMap();
        this.allGroups = new HashSet<>();
        this.groupsInTopologyOrder = new LinkedList();
        this.edgeToIntermediateDataSet = new IdentityHashMap();
        this.unchainableEdge = new HashSet();
    }

    @Override // org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator
    protected void createJobVertices(Map<Integer, byte[]> map, List<Map<Integer, byte[]>> list, Map<Integer, List<Tuple2<byte[], byte[]>>> map2) {
        checkTwoInputEdgeChainable();
        splitStreamNodeGroup();
        sortGroupByTopologyOrder();
        Iterator<StreamNodeGroup> it = this.groupsInTopologyOrder.iterator();
        while (it.hasNext()) {
            createJobVertex(it.next(), map, map2);
        }
    }

    private void checkTwoInputEdgeChainable() {
        for (StreamNode streamNode : this.streamGraph.getStreamNodes()) {
            if (streamNode.getFirstReadAllInput() != null) {
                Set<Integer> allPiplinedPredecessor = getAllPiplinedPredecessor(streamNode, streamNode.getFirstReadAllInput());
                Set<Integer> allPiplinedPredecessor2 = getAllPiplinedPredecessor(streamNode, streamNode.getFirstReadAllInput().equals(StreamEdge.InputOrder.FIRST) ? StreamEdge.InputOrder.SECOND : StreamEdge.InputOrder.FIRST);
                Iterator<Integer> it = allPiplinedPredecessor.iterator();
                while (true) {
                    if (!it.hasNext()) {
                        break;
                    }
                    if (allPiplinedPredecessor2.contains(it.next())) {
                        Iterator<StreamEdge> it2 = streamNode.getInEdges().iterator();
                        while (it2.hasNext()) {
                            this.unchainableEdge.add(it2.next());
                        }
                    }
                }
            }
        }
    }

    private Set<Integer> getAllPiplinedPredecessor(StreamNode streamNode, StreamEdge.InputOrder inputOrder) {
        HashSet hashSet = new HashSet();
        for (StreamEdge streamEdge : streamNode.getInEdges()) {
            if (!streamEdge.getResultPartitionType().isBlocking() && (inputOrder == null || streamEdge.getInputOrder().equals(inputOrder))) {
                if (!hashSet.contains(Integer.valueOf(streamEdge.getSourceId()))) {
                    hashSet.add(Integer.valueOf(streamEdge.getSourceId()));
                    hashSet.addAll(getAllPiplinedPredecessor(this.streamGraph.getStreamNode(Integer.valueOf(streamEdge.getSourceId())), null));
                }
            }
        }
        return hashSet;
    }

    private void createJobVertex(StreamNodeGroup streamNodeGroup, Map<Integer, byte[]> map, Map<Integer, List<Tuple2<byte[], byte[]>>> map2) {
        LinkedList linkedList = new LinkedList();
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        for (StreamNode streamNode : streamNodeGroup.nodes) {
            int id = streamNode.getId();
            LinkedList linkedList2 = new LinkedList();
            streamNode.getOutEdges().stream().filter(streamEdge -> {
                return !streamNodeGroup.outEdges.contains(streamEdge);
            }).forEach(streamEdge2 -> {
                linkedList2.add(streamEdge2);
            });
            LinkedList linkedList3 = new LinkedList();
            streamNode.getOutEdges().stream().filter(streamEdge3 -> {
                return streamNodeGroup.outEdges.contains(streamEdge3);
            }).forEach(streamEdge4 -> {
                linkedList3.add(streamEdge4);
            });
            linkedList.add(new OperatorID(map.get(Integer.valueOf(id))));
            OperatorConfig operatorConfig = new OperatorConfig();
            operatorConfig.setPriority(streamNodeGroup.getNodePriority(id));
            initOperatorConfig(operatorConfig, new OperatorID(map.get(Integer.valueOf(id))), Integer.valueOf(id), linkedList2, linkedList3, -1);
            List<Tuple2<byte[], byte[]>> computeIfAbsent = map2.computeIfAbsent(Integer.valueOf(streamNodeGroup.id), num -> {
                return new ArrayList();
            });
            byte[] bArr = map.get(Integer.valueOf(id));
            computeIfAbsent.add(new Tuple2<>(bArr, bArr));
            if (streamNode.getInputFormat() != null) {
                hashMap.put(operatorConfig.getOperatorID(), streamNode.getInputFormat());
            }
            if (streamNode.getOutputFormat() != null) {
                hashMap2.put(operatorConfig.getOperatorID(), streamNode.getOutputFormat());
            }
            this.chainedConfigs.computeIfAbsent(Integer.valueOf(streamNodeGroup.id), num2 -> {
                return new LinkedHashMap();
            });
            this.chainedConfigs.get(Integer.valueOf(streamNodeGroup.id)).put(Integer.valueOf(id), operatorConfig);
        }
        JobVertex inputOutputFormatVertex = new InputOutputFormatVertex(streamNodeGroup.getName(), new JobVertexID(), linkedList);
        this.jobVertices.put(Integer.valueOf(streamNodeGroup.id), inputOutputFormatVertex);
        this.jobGraph.addVertex(inputOutputFormatVertex);
        inputOutputFormatVertex.setInvokableClass(ArbitraryInputStreamTask.class);
        ResourceSpec resourceSpec = streamNodeGroup.minResource;
        ResourceSpec resourceSpec2 = streamNodeGroup.maxResource;
        int i = 0;
        for (StreamEdge streamEdge5 : streamNodeGroup.outEdges) {
            if (streamEdge5.getResultPartitionType().isBlocking()) {
                i += streamEdge5.getShuffleMemory();
            }
        }
        if (i > 0) {
            ResourceSpec build = ResourceSpec.newBuilder().setManagedMemoryInMB(i).build();
            resourceSpec = resourceSpec.merge(build);
            resourceSpec2 = resourceSpec2.merge(build);
        }
        inputOutputFormatVertex.setResources(resourceSpec, resourceSpec2);
        int i2 = streamNodeGroup.parallelism;
        if (i2 > 0) {
            inputOutputFormatVertex.setParallelism(i2);
        }
        inputOutputFormatVertex.setMaxParallelism(streamNodeGroup.maxParallelism);
        new TaskConfig(inputOutputFormatVertex.getConfiguration()).setStubWrapper(new UserCodeObjectWrapper(new ImmutablePair(hashMap, hashMap2)));
        initJobVertexConfig(streamNodeGroup, new StreamTaskConfig(inputOutputFormatVertex.getConfiguration()));
        HashMap hashMap3 = new HashMap();
        HashMap hashMap4 = new HashMap();
        for (StreamEdge streamEdge6 : streamNodeGroup.outEdges) {
            StreamNodeGroup streamNodeGroup2 = this.nodeToStreamNodeGroup.get(Integer.valueOf(streamEdge6.getTargetId()));
            hashMap3.putIfAbsent(streamNodeGroup2, 0);
            hashMap3.put(streamNodeGroup2, Integer.valueOf(1 + ((Integer) hashMap3.get(streamNodeGroup2)).intValue()));
            hashMap4.putIfAbsent(streamNodeGroup2, Integer.MAX_VALUE);
            if (streamNodeGroup2.getInputOrder(streamEdge6) < ((Integer) hashMap4.get(streamNodeGroup2)).intValue()) {
                hashMap4.put(streamNodeGroup2, Integer.valueOf(streamNodeGroup2.getInputOrder(streamEdge6)));
            }
        }
        for (StreamEdge streamEdge7 : streamNodeGroup.outEdges) {
            ResultPartitionType resultPartitionType = streamEdge7.getResultPartitionType();
            StreamNodeGroup streamNodeGroup3 = this.nodeToStreamNodeGroup.get(Integer.valueOf(streamEdge7.getTargetId()));
            if (((Integer) hashMap3.get(streamNodeGroup3)).intValue() > 1 && streamNodeGroup3.inputOrders != null && streamNodeGroup3.getInputOrder(streamEdge7) != ((Integer) hashMap4.get(streamNodeGroup3)).intValue()) {
                resultPartitionType = ResultPartitionType.BLOCKING;
            }
            IntermediateDataSet createAndAddResultDataSet = inputOutputFormatVertex.createAndAddResultDataSet(new IntermediateDataSetID(streamEdge7.getEdgeID()), resultPartitionType, streamNodeGroup.isStopAndGoEdge.get(streamEdge7).booleanValue());
            this.edgeToIntermediateDataSet.put(streamEdge7, createAndAddResultDataSet);
            Iterator<Integer> it = streamNodeGroup.inputOutputDependency.getOrDefault(streamEdge7, new HashSet()).iterator();
            while (it.hasNext()) {
                inputOutputFormatVertex.addOutputInputDependency(createAndAddResultDataSet, it.next().intValue());
            }
        }
        if (streamNodeGroup.inputOrders != null) {
            inputOutputFormatVertex.setScheduleHint(ScheduleHint.newBuilder().setInputOrders(streamNodeGroup.inputOrders).build());
        }
        Iterator<StreamEdge> it2 = streamNodeGroup.inEdges.iterator();
        while (it2.hasNext()) {
            connect(it2.next());
        }
    }

    private void connect(StreamEdge streamEdge) {
        boolean z = this.nodeToStreamNodeGroup.get(Integer.valueOf(streamEdge.getSourceId())).isStopAndGoEdge.get(streamEdge).booleanValue() || streamEdge.getInputOrder().equals(this.streamGraph.getStreamNode(Integer.valueOf(streamEdge.getSourceId())).getFirstReadAllInput());
        JobVertex jobVertex = this.jobVertices.get(Integer.valueOf(this.nodeToStreamNodeGroup.get(Integer.valueOf(streamEdge.getTargetId())).id));
        StreamTaskConfig streamTaskConfig = new StreamTaskConfig(jobVertex.getConfiguration());
        streamTaskConfig.setInputsNum(streamTaskConfig.getInputsNum() + 1);
        StreamPartitioner<?> partitioner = streamEdge.getPartitioner();
        IntermediateDataSet intermediateDataSet = this.edgeToIntermediateDataSet.get(streamEdge);
        (partitioner instanceof ForwardPartitioner ? jobVertex.connectDataSetAsInput(intermediateDataSet, DistributionPattern.POINTWISE, intermediateDataSet.getResultType(), z) : partitioner instanceof RescalePartitioner ? jobVertex.connectDataSetAsInput(intermediateDataSet, DistributionPattern.POINTWISE, intermediateDataSet.getResultType(), z) : jobVertex.connectDataSetAsInput(intermediateDataSet, DistributionPattern.ALL_TO_ALL, intermediateDataSet.getResultType(), z)).setShipStrategyName(partitioner.toString());
    }

    private void initJobVertexConfig(StreamNodeGroup streamNodeGroup, StreamTaskConfig streamTaskConfig) {
        streamTaskConfig.setOutEdgesInOrder(streamNodeGroup.outEdges);
        streamTaskConfig.setInPhysicalEdges(streamNodeGroup.inEdges);
        int i = 0;
        Iterator<StreamEdge> it = streamNodeGroup.outEdges.iterator();
        while (it.hasNext()) {
            if (it.next().getResultPartitionType().isBlocking()) {
                new TaskConfig(streamTaskConfig.getConfiguration()).setShuffleMemory(i, r0.getShuffleMemory());
            }
            i++;
        }
        HashSet hashSet = new HashSet();
        streamNodeGroup.inEdges.stream().forEach(streamEdge -> {
            hashSet.add(String.valueOf(this.streamGraph.getStreamNode(Integer.valueOf(streamEdge.getTargetId())).getId()));
        });
        for (StreamNode streamNode : streamNodeGroup.nodes) {
            if (streamNode.getInEdges().size() == 0) {
                hashSet.add(String.valueOf(streamNode.getId()));
            }
        }
        streamTaskConfig.setHeadNodeID(StringUtils.join(hashSet.toArray(), ","));
        streamTaskConfig.setChainedTaskConfigs(this.chainedConfigs.get(Integer.valueOf(streamNodeGroup.id)));
        streamTaskConfig.setBufferTimeout(this.streamGraph.getProperties().getBufferTimeout());
        streamTaskConfig.setTimeCharacteristic(this.streamGraph.getProperties().getTimeCharacteristic());
        streamTaskConfig.setStateBackend(this.streamGraph.getProperties().getStateBackend());
        streamTaskConfig.setCheckpointConfig(this.streamGraph.getProperties().getCheckpointConfig().getTaskCheckpointConfiguration());
        this.vertexConfigs.put(Integer.valueOf(streamNodeGroup.id), streamTaskConfig);
    }

    private void sortGroupByTopologyOrder() {
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        Iterator<StreamNodeGroup> it = this.allGroups.iterator();
        while (it.hasNext()) {
            StreamNodeGroup next = it.next();
            hashMap.put(next, new HashSet());
            ((Set) hashMap.get(next)).addAll(next.inEdges);
            Iterator<StreamEdge> it2 = next.inEdges.iterator();
            while (it2.hasNext()) {
                hashMap2.put(it2.next(), next);
            }
        }
        while (hashMap.size() > 0) {
            StreamNodeGroup streamNodeGroup = null;
            Iterator it3 = hashMap.entrySet().iterator();
            while (true) {
                if (!it3.hasNext()) {
                    break;
                }
                Map.Entry entry = (Map.Entry) it3.next();
                if (((Set) entry.getValue()).size() == 0) {
                    streamNodeGroup = (StreamNodeGroup) entry.getKey();
                    break;
                }
            }
            this.groupsInTopologyOrder.add(streamNodeGroup);
            for (StreamEdge streamEdge : streamNodeGroup.outEdges) {
                ((Set) hashMap.get(hashMap2.get(streamEdge))).remove(streamEdge);
            }
            hashMap.remove(streamNodeGroup);
        }
    }

    private void splitStreamNodeGroup() {
        Iterator<Integer> it = this.streamGraph.getSourceIDs().iterator();
        while (it.hasNext()) {
            createNodeGroup(this.streamGraph.getStreamNode(it.next()));
        }
        LinkedList linkedList = new LinkedList();
        linkedList.addAll(this.allGroups);
        Iterator it2 = linkedList.iterator();
        while (it2.hasNext()) {
            ((StreamNodeGroup) it2.next()).analyzeAndCheckConflict(this, this);
        }
    }

    private StreamNodeGroup initGroup(StreamNode streamNode) {
        StreamNodeGroup streamNodeGroup = new StreamNodeGroup(this.streamGraph);
        streamNodeGroup.addNode(streamNode);
        this.nodeToStreamNodeGroup.put(Integer.valueOf(streamNode.getId()), streamNodeGroup);
        this.allGroups.add(streamNodeGroup);
        return streamNodeGroup;
    }

    private void createNodeGroup(StreamNode streamNode) {
        if (this.nodeToStreamNodeGroup.get(Integer.valueOf(streamNode.getId())) == null) {
            initGroup(streamNode);
        }
        for (StreamEdge streamEdge : streamNode.getOutEdges()) {
            StreamNodeGroup streamNodeGroup = this.nodeToStreamNodeGroup.get(Integer.valueOf(streamNode.getId()));
            if (!isChainable(streamEdge)) {
                createNodeGroup(this.streamGraph.getStreamNode(Integer.valueOf(streamEdge.getTargetId())));
            } else if (this.nodeToStreamNodeGroup.containsKey(Integer.valueOf(streamEdge.getTargetId()))) {
                merge(streamNodeGroup, this.nodeToStreamNodeGroup.get(Integer.valueOf(streamEdge.getTargetId())));
            } else {
                streamNodeGroup.addNode(this.streamGraph.getStreamNode(Integer.valueOf(streamEdge.getTargetId())));
                this.nodeToStreamNodeGroup.put(Integer.valueOf(streamEdge.getTargetId()), streamNodeGroup);
                createNodeGroup(this.streamGraph.getStreamNode(Integer.valueOf(streamEdge.getTargetId())));
            }
        }
    }

    public boolean isChainable(StreamEdge streamEdge) {
        StreamNode streamNode = this.streamGraph.getStreamNode(Integer.valueOf(streamEdge.getSourceId()));
        StreamNodeGroup streamNodeGroup = this.nodeToStreamNodeGroup.get(Integer.valueOf(streamNode.getId()));
        StreamNode streamNode2 = this.streamGraph.getStreamNode(Integer.valueOf(streamEdge.getTargetId()));
        StreamOperator<?> operator = streamNode.getOperator();
        return streamNode.isSameSlotSharingGroup(streamNode2) && (operator.getChainingStrategy() == ChainingStrategy.ALWAYS || operator.getChainingStrategy() == ChainingStrategy.HEAD) && streamNode2.getOperator().getChainingStrategy() == ChainingStrategy.ALWAYS && (streamEdge.getPartitioner() instanceof ForwardPartitioner) && streamNode.getParallelism() == streamNode2.getParallelism() && this.streamGraph.getProperties().isChainingEnabled() && !this.unchainableEdge.contains(streamEdge) && !streamEdge.getResultPartitionType().isBlocking() && (streamNodeGroup == null || isResourceConstraintCompatible(streamNodeGroup, streamNode2));
    }

    private boolean isResourceConstraintCompatible(StreamNodeGroup streamNodeGroup, StreamNode streamNode) {
        return this.nodeToStreamNodeGroup.containsKey(Integer.valueOf(streamNode.getId())) ? streamNodeGroup.isCompatible(this.nodeToStreamNodeGroup.get(Integer.valueOf(streamNode.getId()))) : streamNodeGroup.isCompatible(streamNode);
    }

    private void merge(StreamNodeGroup streamNodeGroup, StreamNodeGroup streamNodeGroup2) {
        if (streamNodeGroup.equals(streamNodeGroup2)) {
            return;
        }
        for (StreamNode streamNode : streamNodeGroup2.nodes) {
            this.nodeToStreamNodeGroup.put(Integer.valueOf(streamNode.getId()), streamNodeGroup);
            streamNodeGroup.addNode(streamNode);
        }
        this.allGroups.remove(streamNodeGroup2);
    }

    private List<StreamNodeGroup> split(StreamNodeGroup streamNodeGroup, StreamNode streamNode) {
        Preconditions.checkArgument(streamNodeGroup.nodes.contains(streamNode));
        ArrayList arrayList = new ArrayList(2);
        StreamNodeGroup streamNodeGroup2 = new StreamNodeGroup(this.streamGraph);
        streamNodeGroup2.addNode(streamNode);
        this.nodeToStreamNodeGroup.put(Integer.valueOf(streamNode.getId()), streamNodeGroup2);
        LinkedList linkedList = new LinkedList();
        HashSet hashSet = new HashSet();
        hashSet.add(streamNode);
        linkedList.addAll(streamNode.getOutEdges());
        while (linkedList.size() > 0) {
            StreamNode streamNode2 = this.streamGraph.getStreamNode(Integer.valueOf(((StreamEdge) linkedList.poll()).getTargetId()));
            if (streamNodeGroup.nodes.contains(streamNode2) && !streamNodeGroup2.nodes.contains(streamNode2) && !hashSet.contains(streamNode2)) {
                hashSet.add(streamNode2);
                streamNodeGroup2.addNode(streamNode2);
                linkedList.addAll(streamNode2.getOutEdges());
                this.nodeToStreamNodeGroup.put(Integer.valueOf(streamNode2.getId()), streamNodeGroup2);
            }
        }
        arrayList.add(streamNodeGroup2);
        StreamNodeGroup streamNodeGroup3 = new StreamNodeGroup(this.streamGraph);
        for (StreamNode streamNode3 : streamNodeGroup.nodes) {
            if (!hashSet.contains(streamNode3)) {
                streamNodeGroup3.addNode(streamNode3);
                this.nodeToStreamNodeGroup.put(Integer.valueOf(streamNode3.getId()), streamNodeGroup3);
            }
        }
        arrayList.add(streamNodeGroup3);
        this.allGroups.remove(streamNodeGroup);
        this.allGroups.add(streamNodeGroup2);
        this.allGroups.add(streamNodeGroup3);
        return arrayList;
    }

    @Override // org.apache.flink.streaming.api.graph.StreamNodeGroup.AnalyzeResolver
    public void resolveNodePriorityLowerBoundConflicts(StreamNodeGroup streamNodeGroup, StreamNode streamNode, int i) {
        LOG.info("Detected priority lower bound conflict, current node: {}", streamNode);
        Iterator<StreamNodeGroup> it = split(streamNodeGroup, streamNode).iterator();
        while (it.hasNext()) {
            it.next().analyzeAndCheckConflict(this, this);
        }
    }

    @Override // org.apache.flink.streaming.api.graph.StreamNodeGroup.AnalyzeResolver
    public void resolveNodePriorityUpperBoundConflicts(StreamNodeGroup streamNodeGroup, StreamNode streamNode, int i) {
        LOG.info("Detected priority upper bound conflict, current node: {}", streamNode);
        for (StreamEdge streamEdge : streamNode.getOutEdges()) {
            if (!this.unchainableEdge.contains(streamEdge) && streamNodeGroup.nodes.contains(this.streamGraph.getStreamNode(Integer.valueOf(streamEdge.getTargetId())))) {
                this.unchainableEdge.add(streamEdge);
            }
        }
        streamNodeGroup.analyzeAndCheckConflict(this, this);
    }

    @Override // org.apache.flink.streaming.api.graph.StreamNodeGroup.AnalyzeResolver
    public void resolveInputOrderConflict(StreamNodeGroup streamNodeGroup, StreamNode streamNode, StreamNode streamNode2) {
        LOG.info("Detected input order conflict, current node: {}, dependency source: {}", streamNode, streamNode2);
        Iterator<StreamNodeGroup> it = split(streamNodeGroup, streamNode2).iterator();
        while (it.hasNext()) {
            it.next().analyzeAndCheckConflict(this, this);
        }
    }

    @Override // org.apache.flink.streaming.api.graph.StreamNodeGroup.AnalyzeResolver
    public void resolveUnchainableEdgeInGroupConflict(StreamNodeGroup streamNodeGroup, StreamEdge streamEdge) {
        LOG.info("Detected unchainable edge {}", streamEdge);
        Iterator<StreamNodeGroup> it = split(streamNodeGroup, this.streamGraph.getStreamNode(Integer.valueOf(streamEdge.getTargetId()))).iterator();
        while (it.hasNext()) {
            it.next().analyzeAndCheckConflict(this, this);
        }
    }

    @Override // org.apache.flink.streaming.api.graph.StreamNodeGroup.AnalyzeResolver
    public void resolveTopologyOrderConflicts(StreamNodeGroup streamNodeGroup, StreamNode streamNode, StreamNode streamNode2) {
        LOG.info("Detected topology order conflict, current node: {}, predecessor: {}.", streamNode, streamNode2);
        Iterator<StreamNodeGroup> it = split(streamNodeGroup, streamNode).iterator();
        while (it.hasNext()) {
            it.next().analyzeAndCheckConflict(this, this);
        }
    }

    @VisibleForTesting
    List<StreamNodeGroup> getGroupsInTopologyOrder() {
        return this.groupsInTopologyOrder;
    }
}
