package org.apache.flink.table.plan.resource;

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.transformations.StreamTransformation;
import org.apache.flink.table.api.BatchTableEnvironment;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.plan.nodes.physical.batch.BatchExecRel;
import org.apache.flink.table.plan.nodes.physical.batch.RowBatchExecRel;
import org.apache.flink.table.plan.resource.autoconf.RelManagedCalculatorOnStatistics;
import org.apache.flink.table.plan.resource.autoconf.RelParallelismAdjuster;
import org.apache.flink.table.plan.resource.autoconf.RelReservedManagedMemAdjuster;
import org.apache.flink.table.plan.resource.schedule.RunningUnitSchedulerEventHandler;
import org.apache.flink.table.util.BatchExecResourceUtil;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.InstantiationUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/table/plan/resource/RunningUnitKeeper.class */
public class RunningUnitKeeper {
    private static final Logger LOG = LoggerFactory.getLogger(RunningUnitKeeper.class);
    private final TableConfig tableConfig;
    private final BatchTableEnvironment tableEnv;
    private List<RelRunningUnit> runningUnits;
    private final Map<RowBatchExecRel, Set<RelRunningUnit>> relRunningUnitMap = new LinkedHashMap();
    private final List<ShuffleStage> shuffleStages = new LinkedList();
    private final Map<RowBatchExecRel, Set<BatchExecRelStage>> relStagesMap = new LinkedHashMap();
    private final Map<RowBatchExecRel, ShuffleStage> relShuffleStageMap = new LinkedHashMap();
    private boolean useRunningUnit = true;

    public RunningUnitKeeper(BatchTableEnvironment batchTableEnvironment) {
        this.tableConfig = batchTableEnvironment.getConfig();
        this.tableEnv = batchTableEnvironment;
    }

    public ShuffleStage getRelShuffleStage(BatchExecRel<?> batchExecRel) {
        return this.relShuffleStageMap.get(batchExecRel);
    }

    public void buildRUs(RowBatchExecRel rowBatchExecRel) {
        if (this.tableConfig.getSubsectionOptimization() || this.tableConfig.enableBatchExternalShuffle() || this.tableConfig.enableRangePartition()) {
            this.useRunningUnit = false;
            return;
        }
        GenerateRunningUnitVisitor generateRunningUnitVisitor = new GenerateRunningUnitVisitor(rowBatchExecRel);
        rowBatchExecRel.accept(generateRunningUnitVisitor);
        this.runningUnits = generateRunningUnitVisitor.getRunningUnits();
        for (RelRunningUnit relRunningUnit : this.runningUnits) {
            Iterator<ShuffleStageInRunningUnit> it = relRunningUnit.getShuffleStagesInRunningUnit().iterator();
            while (it.hasNext()) {
                Iterator<RowBatchExecRel> it2 = it.next().getRelSet().iterator();
                while (it2.hasNext()) {
                    this.relRunningUnitMap.computeIfAbsent(it2.next(), rowBatchExecRel2 -> {
                        return new LinkedHashSet();
                    }).add(relRunningUnit);
                }
            }
        }
        buildShuffleStages();
        buildRelStagesMap();
    }

    public void setScheduleConfig(StreamExecutionEnvironment streamExecutionEnvironment, StreamGraph streamGraph) {
        if (this.useRunningUnit && BatchExecResourceUtil.enableRunningUnitSchedule(this.tableConfig)) {
            streamExecutionEnvironment.setConfiguration(JobManagerOptions.SCHEDULER_EVENT_HANDLER, RunningUnitSchedulerEventHandler.class.getName());
            try {
                InstantiationUtil.writeObjectToConfig(this.runningUnits, streamGraph.getProperties().getConfiguration(), RunningUnitSchedulerEventHandler.RUNNING_UNIT_CONF_KEY);
            } catch (IOException e) {
                throw new FlinkRuntimeException("Could not serialize runningUnits to streamGraph config.", e);
            }
        }
    }

    public void calculateRelResource(RowBatchExecRel rowBatchExecRel) {
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        BatchExecResourceUtil.InferMode inferMode = BatchExecResourceUtil.getInferMode(this.tableConfig);
        if (!this.useRunningUnit || !inferMode.equals(BatchExecResourceUtil.InferMode.ALL)) {
            rowBatchExecRel.accept(new DefaultResultPartitionCalculator(this.tableConfig, this.tableEnv));
            rowBatchExecRel.accept(new RelCpuHeapMemCalculator(this.tableConfig, this.tableEnv, linkedHashMap));
            rowBatchExecRel.accept(new DefaultRelManagedCalculator(this.tableConfig, linkedHashMap));
            for (Map.Entry entry : linkedHashMap.entrySet()) {
                ((RowBatchExecRel) entry.getKey()).setResource((RelResource) entry.getValue());
                LOG.info(entry.getKey() + " resource: " + entry.getValue());
            }
            return;
        }
        RelMetadataQuery metadataQuery = rowBatchExecRel.getCluster().getMetadataQuery();
        rowBatchExecRel.accept(new ResultPartitionCalculatorOnStatistics(this.tableConfig, this.tableEnv, this, metadataQuery));
        rowBatchExecRel.accept(new RelCpuHeapMemCalculator(this.tableConfig, this.tableEnv, linkedHashMap));
        Tuple2<Double, Long> runningUnitResourceLimit = BatchExecResourceUtil.getRunningUnitResourceLimit(this.tableConfig);
        if (runningUnitResourceLimit != null) {
            new RelParallelismAdjuster(((Double) runningUnitResourceLimit.f0).doubleValue(), linkedHashMap).adjust(this.relShuffleStageMap);
        }
        rowBatchExecRel.accept(new RelManagedCalculatorOnStatistics(this.tableConfig, this, metadataQuery, linkedHashMap));
        if (runningUnitResourceLimit != null) {
            int operatorMinManagedMem = BatchExecResourceUtil.getOperatorMinManagedMem(this.tableConfig);
            HashMap hashMap = new HashMap();
            for (Map.Entry<RowBatchExecRel, ShuffleStage> entry2 : this.relShuffleStageMap.entrySet()) {
                hashMap.put(entry2.getKey(), Integer.valueOf(entry2.getValue().getResultParallelism()));
            }
            new RelReservedManagedMemAdjuster(((Long) runningUnitResourceLimit.f1).longValue(), linkedHashMap, hashMap, operatorMinManagedMem).adjust(this.relRunningUnitMap);
        }
        for (ShuffleStage shuffleStage : this.shuffleStages) {
            for (RowBatchExecRel rowBatchExecRel2 : shuffleStage.getBatchExecRelSet()) {
                rowBatchExecRel2.setResultPartitionCount(shuffleStage.getResultParallelism());
                rowBatchExecRel2.setResource((RelResource) linkedHashMap.get(rowBatchExecRel2));
                LOG.info(rowBatchExecRel2 + " resource: " + linkedHashMap.get(rowBatchExecRel2));
            }
        }
    }

    public void setRelID(RowBatchExecRel rowBatchExecRel, int i) {
        if (this.useRunningUnit && this.relStagesMap.containsKey(rowBatchExecRel)) {
            Iterator<BatchExecRelStage> it = this.relStagesMap.get(rowBatchExecRel).iterator();
            while (it.hasNext()) {
                it.next().setRelID(Integer.valueOf(i));
            }
        }
    }

    public void addTransformation(RowBatchExecRel rowBatchExecRel, StreamTransformation<?> streamTransformation) {
        if (this.useRunningUnit && this.relStagesMap.containsKey(rowBatchExecRel)) {
            Iterator<BatchExecRelStage> it = this.relStagesMap.get(rowBatchExecRel).iterator();
            while (it.hasNext()) {
                it.next().addTransformation(streamTransformation);
            }
        }
    }

    private void buildShuffleStages() {
        LinkedHashSet linkedHashSet = new LinkedHashSet();
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        Iterator<RelRunningUnit> it = this.runningUnits.iterator();
        while (it.hasNext()) {
            for (ShuffleStageInRunningUnit shuffleStageInRunningUnit : it.next().getShuffleStagesInRunningUnit()) {
                linkedHashSet.add(shuffleStageInRunningUnit);
                Iterator<RowBatchExecRel> it2 = shuffleStageInRunningUnit.getRelSet().iterator();
                while (it2.hasNext()) {
                    ((Set) linkedHashMap.computeIfAbsent(it2.next(), obj -> {
                        return new LinkedHashSet();
                    })).add(shuffleStageInRunningUnit);
                }
            }
        }
        while (!linkedHashSet.isEmpty()) {
            ShuffleStage shuffleStage = new ShuffleStage();
            LinkedList linkedList = new LinkedList(((ShuffleStageInRunningUnit) linkedHashSet.iterator().next()).getRelSet());
            LinkedHashSet linkedHashSet2 = new LinkedHashSet();
            while (!linkedList.isEmpty()) {
                RowBatchExecRel rowBatchExecRel = (RowBatchExecRel) linkedList.remove(0);
                if (!linkedHashSet2.contains(rowBatchExecRel)) {
                    linkedHashSet2.add(rowBatchExecRel);
                    this.relShuffleStageMap.put(rowBatchExecRel, shuffleStage);
                    Set<ShuffleStageInRunningUnit> set = (Set) linkedHashMap.get(rowBatchExecRel);
                    shuffleStage.addShuffleStageInRus(set);
                    for (ShuffleStageInRunningUnit shuffleStageInRunningUnit2 : set) {
                        linkedHashSet.remove(shuffleStageInRunningUnit2);
                        linkedList.addAll(shuffleStageInRunningUnit2.getRelSet());
                        shuffleStageInRunningUnit2.setShuffleStage(shuffleStage);
                    }
                }
            }
            this.shuffleStages.add(shuffleStage);
        }
    }

    private void buildRelStagesMap() {
        Iterator<RelRunningUnit> it = this.runningUnits.iterator();
        while (it.hasNext()) {
            Iterator<ShuffleStageInRunningUnit> it2 = it.next().getShuffleStagesInRunningUnit().iterator();
            while (it2.hasNext()) {
                for (BatchExecRelStage batchExecRelStage : it2.next().getRelStages()) {
                    this.relStagesMap.computeIfAbsent(batchExecRelStage.getBatchExecRel(), rowBatchExecRel -> {
                        return new LinkedHashSet();
                    }).add(batchExecRelStage);
                }
            }
        }
    }

    public List<RelRunningUnit> getRunningUnits() {
        return this.runningUnits;
    }
}
