package org.apache.flink.table.resource.batch;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
import org.apache.flink.streaming.api.transformations.StreamTransformation;
import org.apache.flink.table.api.TableConfigOptions;
import org.apache.flink.table.plan.nodes.exec.BatchExecNode;
import org.apache.flink.table.resource.batch.schedule.RunningUnitGraphManagerPlugin;
import org.apache.flink.table.util.NodeResourceUtil;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/table/resource/batch/RunningUnitKeeper.class */
public class RunningUnitKeeper {
    private final Configuration tableConf;
    private List<NodeRunningUnit> runningUnits = new ArrayList();
    private final Map<BatchExecNode<?>, Set<NodeRunningUnit>> nodeRunningUnitMap = new LinkedHashMap();
    private final Map<BatchExecNode<?>, Set<BatchExecNodeStage>> nodeStagesMap = new LinkedHashMap();

    public RunningUnitKeeper(Configuration configuration) {
        this.tableConf = configuration;
    }

    public void clear() {
        this.runningUnits.clear();
        this.nodeRunningUnitMap.clear();
        this.nodeStagesMap.clear();
    }

    public void setRunningUnits(List<NodeRunningUnit> list) {
        Preconditions.checkArgument(list != null, "runningUnits should not be null.");
        this.runningUnits.addAll(list);
        for (NodeRunningUnit nodeRunningUnit : list) {
            Iterator<BatchExecNode<?>> it = nodeRunningUnit.getNodeSet().iterator();
            while (it.hasNext()) {
                this.nodeRunningUnitMap.computeIfAbsent(it.next(), batchExecNode -> {
                    return new LinkedHashSet();
                }).add(nodeRunningUnit);
            }
        }
        buildNodeStagesMap();
    }

    public void setScheduleConfig(StreamGraphGenerator.Context context) {
        if (this.runningUnits.isEmpty() || !NodeResourceUtil.enableRunningUnitSchedule(this.tableConf) || this.tableConf.getBoolean(TableConfigOptions.SQL_EXEC_DATA_EXCHANGE_MODE_ALL_BATCH)) {
            return;
        }
        context.getConfiguration().setString(JobManagerOptions.GRAPH_MANAGER_PLUGIN, RunningUnitGraphManagerPlugin.class.getName());
        try {
            InstantiationUtil.writeObjectToConfig(this.runningUnits, context.getConfiguration(), RunningUnitGraphManagerPlugin.RUNNING_UNIT_CONF_KEY);
        } catch (IOException e) {
            throw new FlinkRuntimeException("Could not serialize runningUnits to streamGraph config.", e);
        }
    }

    public void addTransformation(BatchExecNode<?> batchExecNode, StreamTransformation<?> streamTransformation) {
        if (this.nodeStagesMap.containsKey(batchExecNode)) {
            Iterator<BatchExecNodeStage> it = this.nodeStagesMap.get(batchExecNode).iterator();
            while (it.hasNext()) {
                it.next().addTransformation(streamTransformation);
            }
        }
    }

    private void buildNodeStagesMap() {
        Iterator<NodeRunningUnit> it = this.runningUnits.iterator();
        while (it.hasNext()) {
            for (BatchExecNodeStage batchExecNodeStage : it.next().getAllNodeStages()) {
                this.nodeStagesMap.computeIfAbsent(batchExecNodeStage.getBatchExecNode(), batchExecNode -> {
                    return new LinkedHashSet();
                }).add(batchExecNodeStage);
            }
        }
    }

    public Map<BatchExecNode<?>, Set<NodeRunningUnit>> getRunningUnitMap() {
        return this.nodeRunningUnitMap;
    }

    public List<NodeRunningUnit> getRunningUnits() {
        return this.runningUnits;
    }
}
