/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.resource.stream;

import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.calcite.rel.RelDistribution;
import org.apache.flink.streaming.api.transformations.StreamTransformation;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.plan.nodes.exec.ExecNode;
import org.apache.flink.table.plan.nodes.exec.StreamExecNode;
import org.apache.flink.table.plan.nodes.physical.stream.StreamExecDataStreamScan;
import org.apache.flink.table.plan.nodes.physical.stream.StreamExecExchange;
import org.apache.flink.table.plan.nodes.physical.stream.StreamExecTableSourceScan;
import org.apache.flink.table.plan.nodes.physical.stream.StreamExecValues;
import org.apache.flink.table.plan.nodes.process.DAGProcessContext;
import org.apache.flink.table.plan.nodes.process.DAGProcessor;
import org.apache.flink.table.util.NodeResourceUtil;
import org.apache.flink.util.Preconditions;

public class StreamParallelismProcessor
implements DAGProcessor {
    private final Set<ExecNode> calculatedNodeSet = new HashSet<ExecNode>();
    private TableEnvironment tEnv;

    @Override
    public List<ExecNode<?, ?>> process(List<ExecNode<?, ?>> sinkNodes, DAGProcessContext context) {
        this.calculatedNodeSet.clear();
        sinkNodes.forEach(s -> Preconditions.checkArgument(s instanceof StreamExecNode));
        this.tEnv = context.getTableEnvironment();
        for (ExecNode<?, ?> sinkNode : sinkNodes) {
            this.calculate(sinkNode);
        }
        return sinkNodes;
    }

    private void calculate(ExecNode<?, ?> execNode) {
        if (!this.calculatedNodeSet.add(execNode)) {
            return;
        }
        this.calculateInputs(execNode);
        if (execNode instanceof StreamExecDataStreamScan) {
            this.calculateStreamScan((StreamExecDataStreamScan)execNode);
        } else if (execNode instanceof StreamExecTableSourceScan) {
            this.calculateTableSourceScan((StreamExecTableSourceScan)execNode);
        } else if (execNode instanceof StreamExecExchange) {
            this.calculateExchange((StreamExecExchange)execNode);
        } else if (execNode instanceof StreamExecValues) {
            this.calculateValues((StreamExecValues)execNode);
        } else {
            this.calculateDefault(execNode);
        }
    }

    private void calculateStreamScan(StreamExecDataStreamScan streamScan) {
        StreamTransformation<Object> transformation = streamScan.getSourceTransformation(this.tEnv.execEnv());
        int parallelism = transformation.getParallelism();
        if (parallelism <= 0) {
            parallelism = this.tEnv.execEnv().getParallelism();
        }
        streamScan.getResource().setParallelism(parallelism);
    }

    private void calculateTableSourceScan(StreamExecTableSourceScan tableSourceScan) {
        StreamTransformation<?> transformation = tableSourceScan.getSourceTransformation(this.tEnv.execEnv());
        if (transformation.getMaxParallelism() > 0) {
            tableSourceScan.getResource().setParallelism(transformation.getMaxParallelism());
            return;
        }
        int configParallelism = NodeResourceUtil.getSourceParallelism(this.tEnv.getConfig().getConf(), this.tEnv.execEnv().getParallelism());
        tableSourceScan.getResource().setParallelism(configParallelism);
    }

    private void calculateExchange(StreamExecExchange execExchange) {
        if (execExchange.getDistribution().getType() == RelDistribution.Type.SINGLETON) {
            execExchange.getResource().setParallelism(1);
        } else {
            this.calculateDefault(execExchange);
        }
    }

    private void calculateValues(StreamExecValues values) {
        values.getResource().setParallelism(1);
    }

    private void calculateDefault(ExecNode<?, ?> execNode) {
        execNode.getResource().setParallelism(NodeResourceUtil.getOperatorDefaultParallelism(this.tEnv.getConfig().getConf(), this.tEnv.execEnv().getParallelism()));
    }

    private void calculateInputs(ExecNode<?, ?> node) {
        node.getInputNodes().forEach(this::calculate);
    }
}

