package org.apache.flink.runtime.healthmanager.plugins.resolvers;

import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ResourceManagerOptions;
import org.apache.flink.runtime.healthmanager.HealthMonitor;
import org.apache.flink.runtime.healthmanager.RestServerClient;
import org.apache.flink.runtime.healthmanager.plugins.Action;
import org.apache.flink.runtime.healthmanager.plugins.Resolver;
import org.apache.flink.runtime.healthmanager.plugins.Symptom;
import org.apache.flink.runtime.healthmanager.plugins.actions.RescaleJobParallelism;
import org.apache.flink.runtime.healthmanager.plugins.symptoms.JobExceedMaxResourceLimit;
import org.apache.flink.runtime.healthmanager.plugins.utils.HealthMonitorOptions;
import org.apache.flink.runtime.healthmanager.plugins.utils.MaxResourceLimitUtil;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/healthmanager/plugins/resolvers/ExceedMaxResourceLimitResolver.class */
public class ExceedMaxResourceLimitResolver implements Resolver {
    private static final Logger LOGGER = LoggerFactory.getLogger(ExceedMaxResourceLimitResolver.class);
    public static final ConfigOption<Double> RATIO_OPTION = ConfigOptions.key("exceed-max-total-resource.rescale.ratio").defaultValue(Double.valueOf(0.9d));
    private JobID jobID;
    private HealthMonitor monitor;
    private double ratio;
    private long timeout;
    private double maxCpuLimit;
    private int maxMemoryLimit;

    @Override // org.apache.flink.runtime.healthmanager.plugins.Resolver
    public void open(HealthMonitor healthMonitor) {
        this.monitor = healthMonitor;
        this.jobID = healthMonitor.getJobID();
        this.ratio = healthMonitor.getConfig().getDouble(RATIO_OPTION);
        this.timeout = healthMonitor.getConfig().getLong(HealthMonitorOptions.PARALLELISM_SCALE_TIME_OUT);
        this.maxCpuLimit = healthMonitor.getConfig().getDouble(ResourceManagerOptions.MAX_TOTAL_RESOURCE_LIMIT_CPU_CORE);
        this.maxMemoryLimit = healthMonitor.getConfig().getInteger(ResourceManagerOptions.MAX_TOTAL_RESOURCE_LIMIT_MEMORY_MB);
    }

    @Override // org.apache.flink.runtime.healthmanager.plugins.Resolver
    public void close() {
    }

    @Override // org.apache.flink.runtime.healthmanager.plugins.Resolver
    public Action resolve(List<Symptom> list) {
        RestServerClient.JobConfig scaleDownJobConfigToMaxResourceLimit;
        LOGGER.debug("Start resolving.");
        JobExceedMaxResourceLimit jobExceedMaxResourceLimit = null;
        Iterator<Symptom> it = list.iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            Symptom next = it.next();
            if (next instanceof JobExceedMaxResourceLimit) {
                jobExceedMaxResourceLimit = (JobExceedMaxResourceLimit) next;
                LOGGER.debug("Job exceed max resource limit detected.");
                break;
            }
        }
        if (jobExceedMaxResourceLimit == null) {
            return null;
        }
        RestServerClient.JobConfig jobConfig = this.monitor.getJobConfig();
        double jobTotalCpuCores = jobConfig.getJobTotalCpuCores();
        int jobTotalMemoryMb = jobConfig.getJobTotalMemoryMb();
        if (jobTotalCpuCores > this.maxCpuLimit || jobTotalMemoryMb > this.maxMemoryLimit) {
            LOGGER.debug("Current job total resource exceed max limit. Down scale job to max resource limit <cpu, mem>=<{}, {}>.", Double.valueOf(this.maxCpuLimit), Integer.valueOf(this.maxMemoryLimit));
            scaleDownJobConfigToMaxResourceLimit = MaxResourceLimitUtil.scaleDownJobConfigToMaxResourceLimit(jobConfig, new HashMap(), this.maxCpuLimit, this.maxMemoryLimit);
        } else {
            LOGGER.debug("Current job total resource does not exceed max limit. Down scale job by ratio {}. Limit <cpu, mem>=<{}, {}>.", new Object[]{Double.valueOf(this.ratio), Double.valueOf(jobTotalCpuCores * this.ratio), Integer.valueOf((int) (jobTotalMemoryMb * this.ratio))});
            scaleDownJobConfigToMaxResourceLimit = MaxResourceLimitUtil.scaleDownJobConfigToMaxResourceLimit(jobConfig, new HashMap(), jobTotalCpuCores * this.ratio, (int) (jobTotalMemoryMb * this.ratio));
        }
        RescaleJobParallelism rescaleJobParallelism = new RescaleJobParallelism(this.jobID, this.timeout);
        for (JobVertexID jobVertexID : scaleDownJobConfigToMaxResourceLimit.getVertexConfigs().keySet()) {
            RestServerClient.VertexConfig vertexConfig = jobConfig.getVertexConfigs().get(jobVertexID);
            RestServerClient.VertexConfig vertexConfig2 = scaleDownJobConfigToMaxResourceLimit.getVertexConfigs().get(jobVertexID);
            if (vertexConfig.getParallelism() != vertexConfig2.getParallelism()) {
                rescaleJobParallelism.addVertex(jobVertexID, vertexConfig.getParallelism(), vertexConfig2.getParallelism(), vertexConfig.getResourceSpec(), vertexConfig2.getResourceSpec());
            }
        }
        if (rescaleJobParallelism.isEmpty()) {
            return null;
        }
        LOGGER.info("RescaleJobParallelism action generated: {}.", rescaleJobParallelism);
        return rescaleJobParallelism;
    }
}
