package org.apache.flink.table.resource.batch.parallelism;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.BatchTableEnvironment;
import org.apache.flink.table.api.TableConfigOptions;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.plan.nodes.exec.BatchExecNode;
import org.apache.flink.table.plan.nodes.exec.ExecNode;
import org.apache.flink.table.plan.nodes.physical.batch.BatchExecSink;
import org.apache.flink.table.plan.nodes.process.DAGProcessContext;
import org.apache.flink.table.plan.nodes.process.DAGProcessor;
import org.apache.flink.table.resource.batch.parallelism.autoconf.BatchParallelismAdjuster;
import org.apache.flink.table.util.NodeResourceUtil;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/table/resource/batch/parallelism/BatchParallelismProcessor.class */
public class BatchParallelismProcessor implements DAGProcessor {
    private TableEnvironment tEnv;

    @Override // org.apache.flink.table.plan.nodes.process.DAGProcessor
    public List<ExecNode<?, ?>> process(List<ExecNode<?, ?>> list, DAGProcessContext dAGProcessContext) {
        list.forEach(execNode -> {
            Preconditions.checkArgument(execNode instanceof BatchExecNode);
        });
        this.tEnv = dAGProcessContext.getTableEnvironment();
        List<ExecNode<?, ?>> filterSinkNodes = filterSinkNodes(list);
        Map<ExecNode<?, ?>, ShuffleStage> generate = ShuffleStageGenerator.generate(filterSinkNodes, BatchFinalParallelismSetter.calculate(this.tEnv, filterSinkNodes));
        calculateOnShuffleStages(generate, dAGProcessContext);
        for (ExecNode<?, ?> execNode2 : generate.keySet()) {
            execNode2.getResource().setParallelism(generate.get(execNode2).getParallelism());
        }
        return list;
    }

    protected void calculateOnShuffleStages(Map<ExecNode<?, ?>, ShuffleStage> map, DAGProcessContext dAGProcessContext) {
        Configuration conf = this.tEnv.getConfig().getConf();
        getShuffleStageParallelismCalculator(conf, NodeResourceUtil.getInferMode(conf)).calculate(map.values());
        Double valueOf = Double.valueOf(conf.getDouble(TableConfigOptions.SQL_RESOURCE_RUNNING_UNIT_CPU_TOTAL));
        if (valueOf.doubleValue() <= 0.0d || conf.getBoolean(TableConfigOptions.SQL_EXEC_SORT_RANGE_ENABLED)) {
            return;
        }
        BatchParallelismAdjuster.adjustParallelism(valueOf.doubleValue(), ((BatchTableEnvironment) dAGProcessContext.getTableEnvironment()).getRUKeeper().getRunningUnitMap(), map);
    }

    protected BatchShuffleStageParallelismCalculator getShuffleStageParallelismCalculator(Configuration configuration, NodeResourceUtil.InferMode inferMode) {
        int parallelism = this.tEnv.execEnv().getParallelism();
        return inferMode.equals(NodeResourceUtil.InferMode.ALL) ? new BatchParallelismCalculatorOnStatistics(configuration, parallelism) : new BatchParallelismCalculatorOnConfig(configuration, parallelism);
    }

    private List<ExecNode<?, ?>> filterSinkNodes(List<ExecNode<?, ?>> list) {
        ArrayList arrayList = new ArrayList();
        list.forEach(execNode -> {
            if (execNode instanceof BatchExecSink) {
                arrayList.add(execNode.getInputNodes().get(0));
            } else {
                arrayList.add(execNode);
            }
        });
        return arrayList;
    }

    protected TableEnvironment gettEnv() {
        return this.tEnv;
    }
}
