/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.resource.batch;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.io.network.DataExchangeMode;
import org.apache.flink.table.api.BatchTableEnvironment;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.plan.nodes.exec.BatchExecNode;
import org.apache.flink.table.plan.nodes.exec.ExecNode;
import org.apache.flink.table.plan.nodes.exec.batch.BatchExecNodeVisitorImpl;
import org.apache.flink.table.plan.nodes.physical.batch.BatchExecBoundedStreamScan;
import org.apache.flink.table.plan.nodes.physical.batch.BatchExecCalc;
import org.apache.flink.table.plan.nodes.physical.batch.BatchExecCorrelate;
import org.apache.flink.table.plan.nodes.physical.batch.BatchExecExchange;
import org.apache.flink.table.plan.nodes.physical.batch.BatchExecExpand;
import org.apache.flink.table.plan.nodes.physical.batch.BatchExecHashAggregate;
import org.apache.flink.table.plan.nodes.physical.batch.BatchExecHashJoinBase;
import org.apache.flink.table.plan.nodes.physical.batch.BatchExecHashWindowAggregate;
import org.apache.flink.table.plan.nodes.physical.batch.BatchExecJoinBase;
import org.apache.flink.table.plan.nodes.physical.batch.BatchExecLimit;
import org.apache.flink.table.plan.nodes.physical.batch.BatchExecLocalHashAggregate;
import org.apache.flink.table.plan.nodes.physical.batch.BatchExecLocalHashWindowAggregate;
import org.apache.flink.table.plan.nodes.physical.batch.BatchExecLocalSortAggregate;
import org.apache.flink.table.plan.nodes.physical.batch.BatchExecLocalSortWindowAggregate;
import org.apache.flink.table.plan.nodes.physical.batch.BatchExecNestedLoopJoinBase;
import org.apache.flink.table.plan.nodes.physical.batch.BatchExecOverAggregate;
import org.apache.flink.table.plan.nodes.physical.batch.BatchExecRank;
import org.apache.flink.table.plan.nodes.physical.batch.BatchExecSink;
import org.apache.flink.table.plan.nodes.physical.batch.BatchExecSort;
import org.apache.flink.table.plan.nodes.physical.batch.BatchExecSortAggregate;
import org.apache.flink.table.plan.nodes.physical.batch.BatchExecSortLimit;
import org.apache.flink.table.plan.nodes.physical.batch.BatchExecSortMergeJoinBase;
import org.apache.flink.table.plan.nodes.physical.batch.BatchExecSortWindowAggregate;
import org.apache.flink.table.plan.nodes.physical.batch.BatchExecTableSourceScan;
import org.apache.flink.table.plan.nodes.physical.batch.BatchExecTemporalTableJoin;
import org.apache.flink.table.plan.nodes.physical.batch.BatchExecUnion;
import org.apache.flink.table.plan.nodes.physical.batch.BatchExecValues;
import org.apache.flink.table.resource.batch.BatchExecNodeStage;
import org.apache.flink.table.resource.batch.NodeRunningUnit;

public class RunningUnitGenerator
extends BatchExecNodeVisitorImpl {
    private final Map<BatchExecNode<?>, List<NodeStageExchangeInfo>> outputInfoMap = new LinkedHashMap();
    private final List<NodeRunningUnit> runningUnits = new LinkedList<NodeRunningUnit>();
    private final Configuration tableConf;

    public RunningUnitGenerator(Configuration tableConf) {
        this.tableConf = tableConf;
    }

    public List<NodeRunningUnit> getRunningUnits() {
        return this.runningUnits;
    }

    protected void addIntoInputRunningUnit(List<NodeStageExchangeInfo> inputInfoList, BatchExecNodeStage nodeStage) {
        for (NodeStageExchangeInfo inputInfo : inputInfoList) {
            if (inputInfo.exchangeMode == DataExchangeMode.BATCH) {
                nodeStage.addDependStage(inputInfo.outStage, BatchExecNodeStage.DependType.DATA_TRIGGER);
            } else {
                for (NodeRunningUnit inputRunningUnit : inputInfo.outStage.getRunningUnitList()) {
                    inputRunningUnit.addNodeStage(nodeStage);
                    nodeStage.addRunningUnit(inputRunningUnit);
                }
            }
            if (!nodeStage.getRunningUnitList().isEmpty()) continue;
            this.newRunningUnitWithNodeStage(nodeStage);
        }
    }

    protected void newRunningUnitWithNodeStage(BatchExecNodeStage nodeStage) {
        NodeRunningUnit runningUnit = new NodeRunningUnit();
        this.runningUnits.add(runningUnit);
        runningUnit.addNodeStage(nodeStage);
        nodeStage.addRunningUnit(runningUnit);
    }

    @Override
    public void visit(BatchExecBoundedStreamScan boundedStreamScan) {
        this.visitSource(boundedStreamScan);
    }

    @Override
    public void visit(BatchExecTableSourceScan scanTableSource) {
        this.visitSource(scanTableSource);
    }

    @Override
    public void visit(BatchExecValues values) {
        this.visitSource(values);
    }

    protected void visitSource(BatchExecNode<?> sourceNode) {
        List<NodeStageExchangeInfo> outputInfoList = this.outputInfoMap.get(sourceNode);
        if (outputInfoList == null) {
            BatchExecNodeStage nodeStage = new BatchExecNodeStage(sourceNode, 0);
            this.newRunningUnitWithNodeStage(nodeStage);
            outputInfoList = Collections.singletonList(new NodeStageExchangeInfo(nodeStage));
            this.outputInfoMap.put(sourceNode, outputInfoList);
        }
    }

    protected List<NodeStageExchangeInfo> visitOneStageSingleNode(BatchExecNode<?> singleNode) {
        List<NodeStageExchangeInfo> outputInfoList = this.outputInfoMap.get(singleNode);
        if (outputInfoList == null) {
            BatchExecNode input = (BatchExecNode)singleNode.getInputNodes().get(0);
            input.accept(this);
            List<NodeStageExchangeInfo> inputInfoList = this.outputInfoMap.get(input);
            BatchExecNodeStage nodeStage = new BatchExecNodeStage(singleNode, 0);
            this.addIntoInputRunningUnit(inputInfoList, nodeStage);
            outputInfoList = Collections.singletonList(new NodeStageExchangeInfo(nodeStage));
            this.outputInfoMap.put(singleNode, outputInfoList);
        }
        return outputInfoList;
    }

    @Override
    public void visit(BatchExecCalc calc) {
        this.visitOneStageSingleNode(calc);
    }

    @Override
    public void visit(BatchExecCorrelate correlate) {
        this.visitOneStageSingleNode(correlate);
    }

    @Override
    public void visit(BatchExecExpand expand) {
        this.visitOneStageSingleNode(expand);
    }

    @Override
    public void visit(BatchExecLocalSortWindowAggregate localSortAggregate) {
        this.visitOneStageSingleNode(localSortAggregate);
    }

    @Override
    public void visit(BatchExecSortWindowAggregate sortAggregate) {
        this.visitOneStageSingleNode(sortAggregate);
    }

    @Override
    public void visit(BatchExecOverAggregate overWindowAgg) {
        this.visitOneStageSingleNode(overWindowAgg);
    }

    @Override
    public void visit(BatchExecLimit limit) {
        this.visitOneStageSingleNode(limit);
    }

    @Override
    public void visit(BatchExecLocalHashWindowAggregate localHashAggregate) {
        this.visitOneStageSingleNode(localHashAggregate);
    }

    @Override
    public void visit(BatchExecTemporalTableJoin joinTable) {
        this.visitOneStageSingleNode(joinTable);
    }

    @Override
    public void visit(BatchExecHashWindowAggregate hashAggregate) {
        this.visitTwoStageSingleNode(hashAggregate);
    }

    protected List<NodeStageExchangeInfo> visitTwoStageSingleNode(BatchExecNode<?> singleNode) {
        List<NodeStageExchangeInfo> outputInfoList = this.outputInfoMap.get(singleNode);
        if (outputInfoList == null) {
            BatchExecNode input = (BatchExecNode)singleNode.getInputNodes().get(0);
            input.accept(this);
            List<NodeStageExchangeInfo> inputInfoList = this.outputInfoMap.get(input);
            BatchExecNodeStage inStage = new BatchExecNodeStage(singleNode, 0);
            BatchExecNodeStage outStage = new BatchExecNodeStage(singleNode, 1);
            outStage.addDependStage(inStage, BatchExecNodeStage.DependType.DATA_TRIGGER);
            this.addIntoInputRunningUnit(inputInfoList, inStage);
            this.newRunningUnitWithNodeStage(outStage);
            outputInfoList = Collections.singletonList(new NodeStageExchangeInfo(outStage));
            this.outputInfoMap.put(singleNode, outputInfoList);
        }
        return outputInfoList;
    }

    @Override
    public void visit(BatchExecLocalHashAggregate localHashAggregate) {
        if (localHashAggregate.getGrouping().length == 0) {
            this.visitTwoStageSingleNode(localHashAggregate);
        } else {
            this.visitOneStageSingleNode(localHashAggregate);
        }
    }

    @Override
    public void visit(BatchExecSortAggregate sortAggregate) {
        if (sortAggregate.getGrouping().length == 0) {
            this.visitTwoStageSingleNode(sortAggregate);
        } else {
            this.visitOneStageSingleNode(sortAggregate);
        }
    }

    @Override
    public void visit(BatchExecLocalSortAggregate localSortAggregate) {
        if (localSortAggregate.getGrouping().length == 0) {
            this.visitTwoStageSingleNode(localSortAggregate);
        } else {
            this.visitOneStageSingleNode(localSortAggregate);
        }
    }

    @Override
    public void visit(BatchExecSort sort) {
        this.visitTwoStageSingleNode(sort);
    }

    @Override
    public void visit(BatchExecSortLimit sortLimit) {
        this.visitTwoStageSingleNode(sortLimit);
    }

    @Override
    public void visit(BatchExecRank rank) {
        this.visitOneStageSingleNode(rank);
    }

    @Override
    public void visit(BatchExecHashAggregate hashAggregate) {
        this.visitTwoStageSingleNode(hashAggregate);
    }

    protected List<NodeStageExchangeInfo> visitBuildProbeJoin(BatchExecJoinBase hashJoin, boolean leftIsBuild) {
        List<NodeStageExchangeInfo> outputInfoList = this.outputInfoMap.get(hashJoin);
        if (outputInfoList == null) {
            BatchExecNodeStage buildStage = new BatchExecNodeStage(hashJoin, 0);
            BatchExecNodeStage probeStage = new BatchExecNodeStage(hashJoin, 1);
            probeStage.addDependStage(buildStage, BatchExecNodeStage.DependType.PRIORITY);
            BatchExecNode buildInput = (BatchExecNode)(leftIsBuild ? hashJoin.getInputNodes().get(0) : hashJoin.getInputNodes().get(1));
            BatchExecNode probeInput = (BatchExecNode)(leftIsBuild ? hashJoin.getInputNodes().get(1) : hashJoin.getInputNodes().get(0));
            buildInput.accept(this);
            List<NodeStageExchangeInfo> buildInputInfoList = this.outputInfoMap.get(buildInput);
            probeInput.accept(this);
            List<NodeStageExchangeInfo> probeInputInfoList = this.outputInfoMap.get(probeInput);
            this.addIntoInputRunningUnit(buildInputInfoList, buildStage);
            this.addIntoInputRunningUnit(probeInputInfoList, probeStage);
            outputInfoList = Collections.singletonList(new NodeStageExchangeInfo(probeStage));
            this.outputInfoMap.put(hashJoin, outputInfoList);
        }
        return outputInfoList;
    }

    @Override
    public void visit(BatchExecExchange exchange) {
        List<NodeStageExchangeInfo> outputInfoList = this.outputInfoMap.get(exchange);
        if (outputInfoList == null) {
            BatchExecNode input = (BatchExecNode)((Object)exchange.getInput());
            input.accept(this);
            List<NodeStageExchangeInfo> inputInfoList = this.outputInfoMap.get(input);
            if (exchange.getDataExchangeModeForDeadlockBreakup(this.tableConf) == DataExchangeMode.BATCH) {
                outputInfoList = new ArrayList<NodeStageExchangeInfo>(inputInfoList.size());
                for (NodeStageExchangeInfo nodeStageExchangeInfo : inputInfoList) {
                    outputInfoList.add(new NodeStageExchangeInfo(nodeStageExchangeInfo.outStage, DataExchangeMode.BATCH));
                }
            } else {
                outputInfoList = inputInfoList;
            }
            this.outputInfoMap.put(exchange, outputInfoList);
        }
    }

    @Override
    public void visit(BatchExecHashJoinBase hashJoin) {
        List<NodeStageExchangeInfo> outputInfoList;
        if (hashJoin.hashJoinType().buildLeftSemiOrAnti() && (outputInfoList = this.outputInfoMap.get(hashJoin)) == null) {
            BatchExecNodeStage buildStage = new BatchExecNodeStage(hashJoin, 0);
            BatchExecNodeStage probeStage = new BatchExecNodeStage(hashJoin, 1);
            BatchExecNodeStage outStage = new BatchExecNodeStage(hashJoin, 2);
            probeStage.addDependStage(buildStage, BatchExecNodeStage.DependType.PRIORITY);
            outStage.addDependStage(probeStage, BatchExecNodeStage.DependType.DATA_TRIGGER);
            BatchExecNode buildInput = (BatchExecNode)hashJoin.getInputNodes().get(0);
            BatchExecNode probeInput = (BatchExecNode)hashJoin.getInputNodes().get(1);
            buildInput.accept(this);
            List<NodeStageExchangeInfo> buildInputInfoList = this.outputInfoMap.get(buildInput);
            probeInput.accept(this);
            List<NodeStageExchangeInfo> probeInputInfoList = this.outputInfoMap.get(probeInput);
            this.addIntoInputRunningUnit(buildInputInfoList, buildStage);
            this.addIntoInputRunningUnit(probeInputInfoList, probeStage);
            this.newRunningUnitWithNodeStage(outStage);
            outputInfoList = Collections.singletonList(new NodeStageExchangeInfo(outStage));
            this.outputInfoMap.put(hashJoin, outputInfoList);
        }
        this.visitBuildProbeJoin(hashJoin, hashJoin.leftIsBuild());
    }

    @Override
    public void visit(BatchExecSortMergeJoinBase sortMergeJoin) {
        List<NodeStageExchangeInfo> outputInfoList = this.outputInfoMap.get(sortMergeJoin);
        if (outputInfoList == null) {
            BatchExecNodeStage in0Stage = new BatchExecNodeStage(sortMergeJoin, 0);
            BatchExecNodeStage in1Stage = new BatchExecNodeStage(sortMergeJoin, 1);
            BatchExecNodeStage outStage = new BatchExecNodeStage(sortMergeJoin, 2);
            outStage.addDependStage(in0Stage, BatchExecNodeStage.DependType.DATA_TRIGGER);
            outStage.addDependStage(in1Stage, BatchExecNodeStage.DependType.DATA_TRIGGER);
            BatchExecNode leftInput = (BatchExecNode)sortMergeJoin.getInputNodes().get(0);
            leftInput.accept(this);
            List<NodeStageExchangeInfo> in0InfoList = this.outputInfoMap.get(leftInput);
            BatchExecNode rightInput = (BatchExecNode)sortMergeJoin.getInputNodes().get(1);
            rightInput.accept(this);
            List<NodeStageExchangeInfo> in1InfoList = this.outputInfoMap.get(rightInput);
            this.addIntoInputRunningUnit(in0InfoList, in0Stage);
            this.addIntoInputRunningUnit(in1InfoList, in1Stage);
            this.newRunningUnitWithNodeStage(outStage);
            outputInfoList = Collections.singletonList(new NodeStageExchangeInfo(outStage));
            this.outputInfoMap.put(sortMergeJoin, outputInfoList);
        }
    }

    @Override
    public void visit(BatchExecNestedLoopJoinBase nestedLoopJoin) {
        this.visitBuildProbeJoin(nestedLoopJoin, nestedLoopJoin.leftIsBuild());
    }

    @Override
    public void visit(BatchExecUnion union) {
        List<NodeStageExchangeInfo> outputInfoList = this.outputInfoMap.get(union);
        if (outputInfoList == null) {
            outputInfoList = new LinkedList<NodeStageExchangeInfo>();
            for (ExecNode<BatchTableEnvironment, ?> input : union.getInputNodes()) {
                ((BatchExecNode)input).accept(this);
                outputInfoList.addAll((Collection<NodeStageExchangeInfo>)this.outputInfoMap.get((BatchExecNode)input));
            }
            this.outputInfoMap.put(union, outputInfoList);
        }
    }

    @Override
    public void visit(BatchExecSink<?> sink) {
        throw new TableException("could not reach sink here.");
    }

    protected static class NodeStageExchangeInfo {
        private final BatchExecNodeStage outStage;
        private final DataExchangeMode exchangeMode;

        public NodeStageExchangeInfo(BatchExecNodeStage outStage) {
            this(outStage, null);
        }

        public NodeStageExchangeInfo(BatchExecNodeStage outStage, DataExchangeMode exchangeMode) {
            this.outStage = outStage;
            this.exchangeMode = exchangeMode;
        }
    }
}

