/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.healthmanager.plugins.resolvers;

import java.util.HashMap;
import java.util.List;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.runtime.healthmanager.HealthMonitor;
import org.apache.flink.runtime.healthmanager.RestServerClient;
import org.apache.flink.runtime.healthmanager.plugins.Action;
import org.apache.flink.runtime.healthmanager.plugins.Resolver;
import org.apache.flink.runtime.healthmanager.plugins.Symptom;
import org.apache.flink.runtime.healthmanager.plugins.actions.RescaleJobParallelism;
import org.apache.flink.runtime.healthmanager.plugins.symptoms.JobExceedMaxResourceLimit;
import org.apache.flink.runtime.healthmanager.plugins.utils.HealthMonitorOptions;
import org.apache.flink.runtime.healthmanager.plugins.utils.MaxResourceLimitUtil;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ExceedMaxResourceLimitResolver
implements Resolver {
    private static final Logger LOGGER = LoggerFactory.getLogger(ExceedMaxResourceLimitResolver.class);
    public static final ConfigOption<Double> RATIO_OPTION = ConfigOptions.key((String)"exceed-max-total-resource.rescale.ratio").defaultValue((Object)0.9);
    private JobID jobID;
    private HealthMonitor monitor;
    private double ratio;
    private long timeout;
    private double maxCpuLimit;
    private int maxMemoryLimit;

    @Override
    public void open(HealthMonitor monitor) {
        this.monitor = monitor;
        this.jobID = monitor.getJobID();
        this.ratio = monitor.getConfig().getDouble(RATIO_OPTION);
        this.timeout = monitor.getConfig().getLong(HealthMonitorOptions.PARALLELISM_SCALE_TIME_OUT);
        this.maxCpuLimit = monitor.getConfig().getDouble(HealthMonitorOptions.RESOURCE_LIMIT_CPU_CORE);
        this.maxMemoryLimit = monitor.getConfig().getInteger(HealthMonitorOptions.RESOURCE_LIMIT_MEMORY_MB);
    }

    @Override
    public void close() {
    }

    @Override
    public Action resolve(List<Symptom> symptomList) {
        RestServerClient.JobConfig targetJobConfig;
        LOGGER.debug("Start resolving.");
        JobExceedMaxResourceLimit jobExceedMaxResourceLimit = null;
        for (Symptom symptom : symptomList) {
            if (!(symptom instanceof JobExceedMaxResourceLimit)) continue;
            jobExceedMaxResourceLimit = (JobExceedMaxResourceLimit)symptom;
            LOGGER.debug("Job exceed max resource limit detected.");
            break;
        }
        if (jobExceedMaxResourceLimit == null) {
            return null;
        }
        RestServerClient.JobConfig currentJobConfig = this.monitor.getJobConfig();
        double totalCpu = currentJobConfig.getJobTotalCpuCores();
        int totalMem = currentJobConfig.getJobTotalMemoryMb();
        if (totalCpu > this.maxCpuLimit || totalMem > this.maxMemoryLimit) {
            LOGGER.debug("Current job total resource exceed max limit. Down scale job to max resource limit <cpu, mem>=<{}, {}>.", (Object)this.maxCpuLimit, (Object)this.maxMemoryLimit);
            targetJobConfig = MaxResourceLimitUtil.scaleDownJobConfigToMaxResourceLimit(currentJobConfig, new HashMap<JobVertexID, Integer>(), this.maxCpuLimit, this.maxMemoryLimit);
        } else {
            LOGGER.debug("Current job total resource does not exceed max limit. Down scale job by ratio {}. Limit <cpu, mem>=<{}, {}>.", new Object[]{this.ratio, totalCpu * this.ratio, (int)((double)totalMem * this.ratio)});
            targetJobConfig = MaxResourceLimitUtil.scaleDownJobConfigToMaxResourceLimit(currentJobConfig, new HashMap<JobVertexID, Integer>(), totalCpu * this.ratio, (int)((double)totalMem * this.ratio));
        }
        RescaleJobParallelism rescaleJobParallelism = new RescaleJobParallelism(this.jobID, this.timeout);
        for (JobVertexID vertexId : targetJobConfig.getVertexConfigs().keySet()) {
            RestServerClient.VertexConfig originVertexConfig = currentJobConfig.getVertexConfigs().get((Object)vertexId);
            RestServerClient.VertexConfig adjustedVertexConfig = targetJobConfig.getVertexConfigs().get((Object)vertexId);
            if (originVertexConfig.getParallelism() == adjustedVertexConfig.getParallelism()) continue;
            rescaleJobParallelism.addVertex(vertexId, originVertexConfig.getParallelism(), adjustedVertexConfig.getParallelism(), originVertexConfig.getResourceSpec(), adjustedVertexConfig.getResourceSpec());
        }
        if (!rescaleJobParallelism.isEmpty()) {
            LOGGER.info("RescaleJobParallelism action generated: {}.", (Object)rescaleJobParallelism);
            return rescaleJobParallelism;
        }
        return null;
    }
}

