/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.resource.common;

import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.calcite.rel.RelDistribution;
import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.streaming.api.transformations.StreamTransformation;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.plan.nodes.common.CommonScan;
import org.apache.flink.table.plan.nodes.exec.ExecNode;
import org.apache.flink.table.plan.nodes.exec.NodeResource;
import org.apache.flink.table.plan.nodes.physical.batch.BatchExecBoundedStreamScan;
import org.apache.flink.table.plan.nodes.physical.batch.BatchExecExchange;
import org.apache.flink.table.plan.nodes.physical.batch.BatchExecSink;
import org.apache.flink.table.plan.nodes.physical.batch.BatchExecTableSourceScan;
import org.apache.flink.table.plan.nodes.physical.batch.BatchExecUnion;
import org.apache.flink.table.plan.nodes.physical.stream.StreamExecDataStreamScan;
import org.apache.flink.table.plan.nodes.physical.stream.StreamExecTableSourceScan;
import org.apache.flink.table.plan.nodes.process.DAGProcessContext;
import org.apache.flink.table.plan.nodes.process.DAGProcessor;
import org.apache.flink.table.util.NodeResourceUtil;

public class NodePartialResProcessor
implements DAGProcessor {
    private final Set<ExecNode> calculatedNodeSet = new HashSet<ExecNode>();
    private TableEnvironment tEnv;

    @Override
    public List<ExecNode<?, ?>> process(List<ExecNode<?, ?>> sinkNodes, DAGProcessContext context) {
        this.calculatedNodeSet.clear();
        this.tEnv = context.getTableEnvironment();
        for (ExecNode<?, ?> sinkNode : sinkNodes) {
            this.calculate(sinkNode);
        }
        return sinkNodes;
    }

    private void calculate(ExecNode<?, ?> execNode) {
        if (!this.calculatedNodeSet.add(execNode)) {
            return;
        }
        this.calculateInputs(execNode);
        if (execNode instanceof BatchExecBoundedStreamScan || execNode instanceof StreamExecDataStreamScan) {
            this.calculateStreamScan((CommonScan)((Object)execNode));
        } else if (execNode instanceof BatchExecTableSourceScan || execNode instanceof StreamExecTableSourceScan) {
            this.calculateTableSourceScan((CommonScan)((Object)execNode));
        } else if (execNode instanceof BatchExecExchange) {
            this.calculateExchange((BatchExecExchange)execNode);
        } else if (!(execNode instanceof BatchExecSink) && !(execNode instanceof BatchExecUnion)) {
            this.calculateDefaultNode(execNode);
        }
    }

    private void calculateStreamScan(CommonScan streamScan) {
        StreamTransformation<?> transformation = streamScan.getSourceTransformation(this.tEnv.execEnv());
        ResourceSpec sourceRes = transformation.getMinResources();
        if (sourceRes == null) {
            sourceRes = ResourceSpec.DEFAULT;
        }
        this.calculateCommonScan(streamScan, sourceRes);
    }

    private void calculateTableSourceScan(CommonScan tableSourceScan) {
        StreamTransformation<?> transformation = tableSourceScan.getSourceTransformation(this.tEnv.execEnv());
        ResourceSpec sourceRes = transformation.getMinResources();
        if (sourceRes == ResourceSpec.DEFAULT || sourceRes == null) {
            int heap = NodeResourceUtil.getSourceMem(this.tEnv.getConfig().getConf());
            int direct = NodeResourceUtil.getSourceDirectMem(this.tEnv.getConfig().getConf());
            sourceRes = NodeResourceUtil.getResourceSpec(this.tEnv.getConfig().getConf(), heap, direct);
        }
        this.calculateCommonScan(tableSourceScan, sourceRes);
    }

    private void calculateCommonScan(CommonScan commonScan, ResourceSpec sourceRes) {
        ResourceSpec conversionRes = ResourceSpec.DEFAULT;
        if (commonScan.needInternalConversion()) {
            conversionRes = NodeResourceUtil.getDefaultResourceSpec(this.tEnv.getConfig().getConf());
        }
        commonScan.setResForSourceAndConversion(sourceRes, conversionRes);
    }

    private void calculateDefaultNode(ExecNode node) {
        this.setDefaultRes(node.getResource());
    }

    private void calculateExchange(BatchExecExchange execExchange) {
        if (execExchange.getDistribution().getType() == RelDistribution.Type.RANGE_DISTRIBUTED) {
            this.setDefaultRes(execExchange.getResource());
        }
    }

    private void setDefaultRes(NodeResource resource) {
        double cpu = NodeResourceUtil.getDefaultCpu(this.tEnv.getConfig().getConf());
        int heap = NodeResourceUtil.getDefaultHeapMem(this.tEnv.getConfig().getConf());
        int direct = NodeResourceUtil.getDefaultDirectMem(this.tEnv.getConfig().getConf());
        resource.setCpu(cpu);
        resource.setHeapMem(heap);
        resource.setDirectMem(direct);
    }

    private void calculateInputs(ExecNode<?, ?> node) {
        node.getInputNodes().forEach(this::calculate);
    }
}

