/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.resource.batch.parallelism;

import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.calcite.rel.RelDistribution;
import org.apache.flink.streaming.api.transformations.StreamTransformation;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.plan.nodes.exec.ExecNode;
import org.apache.flink.table.plan.nodes.physical.batch.BatchExecBoundedStreamScan;
import org.apache.flink.table.plan.nodes.physical.batch.BatchExecExchange;
import org.apache.flink.table.plan.nodes.physical.batch.BatchExecTableSourceScan;
import org.apache.flink.table.plan.nodes.physical.batch.BatchExecValues;

public class BatchFinalParallelismSetter {
    private final TableEnvironment tEnv;
    private Set<ExecNode<?, ?>> calculatedNodeSet = new HashSet();
    private Map<ExecNode<?, ?>, Integer> finalParallelismNodeMap = new HashMap();

    private BatchFinalParallelismSetter(TableEnvironment tEnv) {
        this.tEnv = tEnv;
    }

    public static Map<ExecNode<?, ?>, Integer> calculate(TableEnvironment tEnv, List<ExecNode<?, ?>> sinkNodes) {
        BatchFinalParallelismSetter setter = new BatchFinalParallelismSetter(tEnv);
        sinkNodes.forEach(setter::calculate);
        return setter.finalParallelismNodeMap;
    }

    private void calculate(ExecNode<?, ?> batchExecNode) {
        if (!this.calculatedNodeSet.add(batchExecNode)) {
            return;
        }
        if (batchExecNode instanceof BatchExecTableSourceScan) {
            this.calculateTableSource((BatchExecTableSourceScan)batchExecNode);
        } else if (batchExecNode instanceof BatchExecBoundedStreamScan) {
            this.calculateBoundedStreamScan((BatchExecBoundedStreamScan)batchExecNode);
        } else if (batchExecNode instanceof BatchExecValues) {
            this.calculateValues((BatchExecValues)batchExecNode);
        } else {
            this.calculateSingleton(batchExecNode);
        }
    }

    private void calculateTableSource(BatchExecTableSourceScan tableSourceScan) {
        if (tableSourceScan.canLimitPushedDown()) {
            this.finalParallelismNodeMap.put(tableSourceScan, 1);
        } else {
            StreamTransformation<?> transformation = tableSourceScan.getSourceTransformation(this.tEnv.execEnv());
            if (transformation.getMaxParallelism() > 0) {
                this.finalParallelismNodeMap.put(tableSourceScan, transformation.getMaxParallelism());
            }
        }
    }

    private void calculateBoundedStreamScan(BatchExecBoundedStreamScan boundedStreamScan) {
        StreamTransformation<Object> transformation = boundedStreamScan.getSourceTransformation(this.tEnv.execEnv());
        int parallelism = transformation.getParallelism();
        if (parallelism <= 0) {
            parallelism = this.tEnv.execEnv().getParallelism();
        }
        this.finalParallelismNodeMap.put(boundedStreamScan, parallelism);
    }

    private void calculateSingleton(ExecNode<?, ?> execNode) {
        this.calculateInputs(execNode);
        for (ExecNode<?, ?> inputNode : execNode.getInputNodes()) {
            if (!(inputNode instanceof BatchExecExchange) || ((BatchExecExchange)inputNode).getDistribution().getType() != RelDistribution.Type.SINGLETON) continue;
            this.finalParallelismNodeMap.put(execNode, 1);
            return;
        }
    }

    private void calculateValues(BatchExecValues valuesBatchExec) {
        this.finalParallelismNodeMap.put(valuesBatchExec, 1);
    }

    private void calculateInputs(ExecNode<?, ?> node) {
        node.getInputNodes().forEach(this::calculate);
    }
}

