/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.resource.batch.parallelism;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.BatchTableEnvironment;
import org.apache.flink.table.api.TableConfigOptions;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.plan.nodes.exec.BatchExecNode;
import org.apache.flink.table.plan.nodes.exec.ExecNode;
import org.apache.flink.table.plan.nodes.physical.batch.BatchExecSink;
import org.apache.flink.table.plan.nodes.process.DAGProcessContext;
import org.apache.flink.table.plan.nodes.process.DAGProcessor;
import org.apache.flink.table.resource.batch.NodeRunningUnit;
import org.apache.flink.table.resource.batch.parallelism.BatchFinalParallelismSetter;
import org.apache.flink.table.resource.batch.parallelism.BatchParallelismCalculatorOnConfig;
import org.apache.flink.table.resource.batch.parallelism.BatchParallelismCalculatorOnStatistics;
import org.apache.flink.table.resource.batch.parallelism.BatchShuffleStageParallelismCalculator;
import org.apache.flink.table.resource.batch.parallelism.ShuffleStage;
import org.apache.flink.table.resource.batch.parallelism.ShuffleStageGenerator;
import org.apache.flink.table.resource.batch.parallelism.autoconf.BatchParallelismAdjuster;
import org.apache.flink.table.util.NodeResourceUtil;
import org.apache.flink.util.Preconditions;

public class BatchParallelismProcessor
implements DAGProcessor {
    private TableEnvironment tEnv;

    @Override
    public List<ExecNode<?, ?>> process(List<ExecNode<?, ?>> sinkNodes, DAGProcessContext context) {
        sinkNodes.forEach(s -> Preconditions.checkArgument(s instanceof BatchExecNode));
        this.tEnv = context.getTableEnvironment();
        List<ExecNode<?, ?>> rootNodes = this.filterSinkNodes(sinkNodes);
        Map<ExecNode<?, ?>, Integer> nodeToFinalParallelismMap = BatchFinalParallelismSetter.calculate(this.tEnv, rootNodes);
        Map<ExecNode<?, ?>, ShuffleStage> nodeShuffleStageMap = ShuffleStageGenerator.generate(rootNodes, nodeToFinalParallelismMap);
        this.calculateOnShuffleStages(nodeShuffleStageMap, context);
        for (ExecNode<?, ?> node : nodeShuffleStageMap.keySet()) {
            node.getResource().setParallelism(nodeShuffleStageMap.get(node).getParallelism());
        }
        return sinkNodes;
    }

    protected void calculateOnShuffleStages(Map<ExecNode<?, ?>, ShuffleStage> nodeShuffleStageMap, DAGProcessContext context) {
        Configuration tableConf = this.tEnv.getConfig().getConf();
        NodeResourceUtil.InferMode inferMode = NodeResourceUtil.getInferMode(tableConf);
        this.getShuffleStageParallelismCalculator(tableConf, inferMode).calculate(nodeShuffleStageMap.values());
        Double cpuLimit = tableConf.getDouble(TableConfigOptions.SQL_RESOURCE_RUNNING_UNIT_CPU_TOTAL);
        if (cpuLimit > 0.0 && !tableConf.getBoolean(TableConfigOptions.SQL_EXEC_SORT_RANGE_ENABLED)) {
            Map<BatchExecNode<?>, Set<NodeRunningUnit>> nodeRunningUnitMap = ((BatchTableEnvironment)context.getTableEnvironment()).getRUKeeper().getRunningUnitMap();
            BatchParallelismAdjuster.adjustParallelism(cpuLimit, nodeRunningUnitMap, nodeShuffleStageMap);
        }
    }

    protected BatchShuffleStageParallelismCalculator getShuffleStageParallelismCalculator(Configuration tableConf, NodeResourceUtil.InferMode inferMode) {
        int envParallelism = this.tEnv.execEnv().getParallelism();
        if (inferMode.equals((Object)NodeResourceUtil.InferMode.ALL)) {
            return new BatchParallelismCalculatorOnStatistics(tableConf, envParallelism);
        }
        return new BatchParallelismCalculatorOnConfig(tableConf, envParallelism);
    }

    private List<ExecNode<?, ?>> filterSinkNodes(List<ExecNode<?, ?>> sinkNodes) {
        ArrayList rootNodes = new ArrayList();
        sinkNodes.forEach(s -> {
            if (s instanceof BatchExecSink) {
                rootNodes.add(s.getInputNodes().get(0));
            } else {
                rootNodes.add((ExecNode<?, ?>)s);
            }
        });
        return rootNodes;
    }

    protected TableEnvironment gettEnv() {
        return this.tEnv;
    }
}

