package org.apache.flink.table.resource.common;

import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import org.apache.calcite.rel.RelDistribution;
import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.plan.nodes.common.CommonScan;
import org.apache.flink.table.plan.nodes.exec.ExecNode;
import org.apache.flink.table.plan.nodes.exec.NodeResource;
import org.apache.flink.table.plan.nodes.physical.batch.BatchExecBoundedStreamScan;
import org.apache.flink.table.plan.nodes.physical.batch.BatchExecExchange;
import org.apache.flink.table.plan.nodes.physical.batch.BatchExecSink;
import org.apache.flink.table.plan.nodes.physical.batch.BatchExecTableSourceScan;
import org.apache.flink.table.plan.nodes.physical.batch.BatchExecUnion;
import org.apache.flink.table.plan.nodes.physical.stream.StreamExecDataStreamScan;
import org.apache.flink.table.plan.nodes.physical.stream.StreamExecTableSourceScan;
import org.apache.flink.table.plan.nodes.process.DAGProcessContext;
import org.apache.flink.table.plan.nodes.process.DAGProcessor;
import org.apache.flink.table.util.NodeResourceUtil;

/* loaded from: input_file:org/apache/flink/table/resource/common/NodePartialResProcessor.class */
public class NodePartialResProcessor implements DAGProcessor {
    private final Set<ExecNode> calculatedNodeSet = new HashSet();
    private TableEnvironment tEnv;

    @Override // org.apache.flink.table.plan.nodes.process.DAGProcessor
    public List<ExecNode<?, ?>> process(List<ExecNode<?, ?>> list, DAGProcessContext dAGProcessContext) {
        this.calculatedNodeSet.clear();
        this.tEnv = dAGProcessContext.getTableEnvironment();
        Iterator<ExecNode<?, ?>> it = list.iterator();
        while (it.hasNext()) {
            calculate(it.next());
        }
        return list;
    }

    private void calculate(ExecNode<?, ?> execNode) {
        if (this.calculatedNodeSet.add(execNode)) {
            calculateInputs(execNode);
            if ((execNode instanceof BatchExecBoundedStreamScan) || (execNode instanceof StreamExecDataStreamScan)) {
                calculateStreamScan((CommonScan) execNode);
                return;
            }
            if ((execNode instanceof BatchExecTableSourceScan) || (execNode instanceof StreamExecTableSourceScan)) {
                calculateTableSourceScan((CommonScan) execNode);
                return;
            }
            if (execNode instanceof BatchExecExchange) {
                calculateExchange((BatchExecExchange) execNode);
            } else {
                if ((execNode instanceof BatchExecSink) || (execNode instanceof BatchExecUnion)) {
                    return;
                }
                calculateDefaultNode(execNode);
            }
        }
    }

    private void calculateStreamScan(CommonScan commonScan) {
        ResourceSpec minResources = commonScan.getSourceTransformation(this.tEnv.execEnv()).getMinResources();
        if (minResources == null) {
            minResources = ResourceSpec.DEFAULT;
        }
        calculateCommonScan(commonScan, minResources);
    }

    private void calculateTableSourceScan(CommonScan commonScan) {
        ResourceSpec minResources = commonScan.getSourceTransformation(this.tEnv.execEnv()).getMinResources();
        if (minResources == ResourceSpec.DEFAULT || minResources == null) {
            minResources = NodeResourceUtil.getResourceSpec(this.tEnv.getConfig().getConf(), NodeResourceUtil.getSourceMem(this.tEnv.getConfig().getConf()), NodeResourceUtil.getSourceDirectMem(this.tEnv.getConfig().getConf()));
        }
        calculateCommonScan(commonScan, minResources);
    }

    private void calculateCommonScan(CommonScan commonScan, ResourceSpec resourceSpec) {
        ResourceSpec resourceSpec2 = ResourceSpec.DEFAULT;
        if (commonScan.needInternalConversion()) {
            resourceSpec2 = NodeResourceUtil.getDefaultResourceSpec(this.tEnv.getConfig().getConf());
        }
        commonScan.setResForSourceAndConversion(resourceSpec, resourceSpec2);
    }

    private void calculateDefaultNode(ExecNode execNode) {
        setDefaultRes(execNode.getResource());
    }

    private void calculateExchange(BatchExecExchange batchExecExchange) {
        if (batchExecExchange.getDistribution().getType() == RelDistribution.Type.RANGE_DISTRIBUTED) {
            setDefaultRes(batchExecExchange.getResource());
        }
    }

    private void setDefaultRes(NodeResource nodeResource) {
        double defaultCpu = NodeResourceUtil.getDefaultCpu(this.tEnv.getConfig().getConf());
        int defaultHeapMem = NodeResourceUtil.getDefaultHeapMem(this.tEnv.getConfig().getConf());
        int defaultDirectMem = NodeResourceUtil.getDefaultDirectMem(this.tEnv.getConfig().getConf());
        nodeResource.setCpu(defaultCpu);
        nodeResource.setHeapMem(defaultHeapMem);
        nodeResource.setDirectMem(defaultDirectMem);
    }

    private void calculateInputs(ExecNode<?, ?> execNode) {
        execNode.getInputNodes().forEach(this::calculate);
    }
}
