/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.resource.batch;

import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
import org.apache.flink.streaming.api.transformations.StreamTransformation;
import org.apache.flink.table.api.TableConfigOptions;
import org.apache.flink.table.plan.nodes.exec.BatchExecNode;
import org.apache.flink.table.resource.batch.BatchExecNodeStage;
import org.apache.flink.table.resource.batch.NodeRunningUnit;
import org.apache.flink.table.resource.batch.schedule.RunningUnitGraphManagerPlugin;
import org.apache.flink.table.util.NodeResourceUtil;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;

public class RunningUnitKeeper {
    private final Configuration tableConf;
    private List<NodeRunningUnit> runningUnits = new ArrayList<NodeRunningUnit>();
    private final Map<BatchExecNode<?>, Set<NodeRunningUnit>> nodeRunningUnitMap = new LinkedHashMap();
    private final Map<BatchExecNode<?>, Set<BatchExecNodeStage>> nodeStagesMap = new LinkedHashMap();

    public RunningUnitKeeper(Configuration tableConf) {
        this.tableConf = tableConf;
    }

    public void clear() {
        this.runningUnits.clear();
        this.nodeRunningUnitMap.clear();
        this.nodeStagesMap.clear();
    }

    public void setRunningUnits(List<NodeRunningUnit> runningUnits) {
        Preconditions.checkArgument(runningUnits != null, "runningUnits should not be null.");
        this.runningUnits.addAll(runningUnits);
        for (NodeRunningUnit runningUnit : runningUnits) {
            for (BatchExecNode<?> node : runningUnit.getNodeSet()) {
                this.nodeRunningUnitMap.computeIfAbsent(node, k -> new LinkedHashSet()).add(runningUnit);
            }
        }
        this.buildNodeStagesMap();
    }

    public void setScheduleConfig(StreamGraphGenerator.Context context) {
        if (this.runningUnits.isEmpty()) {
            return;
        }
        if (NodeResourceUtil.enableRunningUnitSchedule(this.tableConf) && !this.tableConf.getBoolean(TableConfigOptions.SQL_EXEC_DATA_EXCHANGE_MODE_ALL_BATCH)) {
            context.getConfiguration().setString(JobManagerOptions.GRAPH_MANAGER_PLUGIN, RunningUnitGraphManagerPlugin.class.getName());
            try {
                InstantiationUtil.writeObjectToConfig(this.runningUnits, context.getConfiguration(), "runningUnit.key");
            }
            catch (IOException e2) {
                throw new FlinkRuntimeException("Could not serialize runningUnits to streamGraph config.", e2);
            }
        }
    }

    public void addTransformation(BatchExecNode<?> node, StreamTransformation<?> transformation) {
        if (!this.nodeStagesMap.containsKey(node)) {
            return;
        }
        for (BatchExecNodeStage nodeStage : this.nodeStagesMap.get(node)) {
            nodeStage.addTransformation(transformation);
        }
    }

    private void buildNodeStagesMap() {
        for (NodeRunningUnit unit : this.runningUnits) {
            for (BatchExecNodeStage stage : unit.getAllNodeStages()) {
                this.nodeStagesMap.computeIfAbsent(stage.getBatchExecNode(), k -> new LinkedHashSet()).add(stage);
            }
        }
    }

    public Map<BatchExecNode<?>, Set<NodeRunningUnit>> getRunningUnitMap() {
        return this.nodeRunningUnitMap;
    }

    public List<NodeRunningUnit> getRunningUnits() {
        return this.runningUnits;
    }
}

