package org.apache.flink.table.resource.stream;

import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import org.apache.calcite.rel.RelDistribution;
import org.apache.flink.streaming.api.transformations.StreamTransformation;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.plan.nodes.exec.ExecNode;
import org.apache.flink.table.plan.nodes.exec.StreamExecNode;
import org.apache.flink.table.plan.nodes.physical.stream.StreamExecDataStreamScan;
import org.apache.flink.table.plan.nodes.physical.stream.StreamExecExchange;
import org.apache.flink.table.plan.nodes.physical.stream.StreamExecTableSourceScan;
import org.apache.flink.table.plan.nodes.physical.stream.StreamExecValues;
import org.apache.flink.table.plan.nodes.process.DAGProcessContext;
import org.apache.flink.table.plan.nodes.process.DAGProcessor;
import org.apache.flink.table.util.NodeResourceUtil;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/table/resource/stream/StreamParallelismProcessor.class */
public class StreamParallelismProcessor implements DAGProcessor {
    private final Set<ExecNode> calculatedNodeSet = new HashSet();
    private TableEnvironment tEnv;

    @Override // org.apache.flink.table.plan.nodes.process.DAGProcessor
    public List<ExecNode<?, ?>> process(List<ExecNode<?, ?>> list, DAGProcessContext dAGProcessContext) {
        this.calculatedNodeSet.clear();
        list.forEach(execNode -> {
            Preconditions.checkArgument(execNode instanceof StreamExecNode);
        });
        this.tEnv = dAGProcessContext.getTableEnvironment();
        Iterator<ExecNode<?, ?>> it = list.iterator();
        while (it.hasNext()) {
            calculate(it.next());
        }
        return list;
    }

    private void calculate(ExecNode<?, ?> execNode) {
        if (this.calculatedNodeSet.add(execNode)) {
            calculateInputs(execNode);
            if (execNode instanceof StreamExecDataStreamScan) {
                calculateStreamScan((StreamExecDataStreamScan) execNode);
                return;
            }
            if (execNode instanceof StreamExecTableSourceScan) {
                calculateTableSourceScan((StreamExecTableSourceScan) execNode);
                return;
            }
            if (execNode instanceof StreamExecExchange) {
                calculateExchange((StreamExecExchange) execNode);
            } else if (execNode instanceof StreamExecValues) {
                calculateValues((StreamExecValues) execNode);
            } else {
                calculateDefault(execNode);
            }
        }
    }

    private void calculateStreamScan(StreamExecDataStreamScan streamExecDataStreamScan) {
        int parallelism = streamExecDataStreamScan.getSourceTransformation(this.tEnv.execEnv()).getParallelism();
        if (parallelism <= 0) {
            parallelism = this.tEnv.execEnv().getParallelism();
        }
        streamExecDataStreamScan.getResource().setParallelism(parallelism);
    }

    private void calculateTableSourceScan(StreamExecTableSourceScan streamExecTableSourceScan) {
        StreamTransformation<?> sourceTransformation = streamExecTableSourceScan.getSourceTransformation(this.tEnv.execEnv());
        if (sourceTransformation.getMaxParallelism() > 0) {
            streamExecTableSourceScan.getResource().setParallelism(sourceTransformation.getMaxParallelism());
        } else {
            streamExecTableSourceScan.getResource().setParallelism(NodeResourceUtil.getSourceParallelism(this.tEnv.getConfig().getConf(), this.tEnv.execEnv().getParallelism()));
        }
    }

    private void calculateExchange(StreamExecExchange streamExecExchange) {
        if (streamExecExchange.getDistribution().getType() == RelDistribution.Type.SINGLETON) {
            streamExecExchange.getResource().setParallelism(1);
        } else {
            calculateDefault(streamExecExchange);
        }
    }

    private void calculateValues(StreamExecValues streamExecValues) {
        streamExecValues.getResource().setParallelism(1);
    }

    private void calculateDefault(ExecNode<?, ?> execNode) {
        execNode.getResource().setParallelism(NodeResourceUtil.getOperatorDefaultParallelism(this.tEnv.getConfig().getConf(), this.tEnv.execEnv().getParallelism()));
    }

    private void calculateInputs(ExecNode<?, ?> execNode) {
        execNode.getInputNodes().forEach(this::calculate);
    }
}
