/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.healthmanager.plugins.utils;

import java.util.Map;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.healthmanager.RestServerClient;
import org.apache.flink.runtime.healthmanager.plugins.utils.HealthMonitorOptions;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MaxResourceLimitUtil {
    private static final Logger LOGGER = LoggerFactory.getLogger(MaxResourceLimitUtil.class);

    public static double getMaxCpu(Configuration config) {
        return config.getDouble(HealthMonitorOptions.RESOURCE_LIMIT_CPU_CORE);
    }

    public static int getMaxMem(Configuration config) {
        return config.getInteger(HealthMonitorOptions.RESOURCE_LIMIT_MEMORY_MB);
    }

    public static RestServerClient.JobConfig scaleDownJobConfigToMaxResourceLimit(RestServerClient.JobConfig jobConfig, Map<JobVertexID, Integer> minParallelisms, double maxCpuLimit, int maxMemoryLimit) {
        if (maxCpuLimit <= 0.0 || maxMemoryLimit <= 0) {
            LOGGER.warn("Max resource limit <cpu, memory>=<{}, {}> could not be satisfied.", (Object)maxCpuLimit, (Object)maxMemoryLimit);
            return null;
        }
        RestServerClient.JobConfig adjustedJobConfig = new RestServerClient.JobConfig(jobConfig);
        boolean downScaled = true;
        while (adjustedJobConfig.getJobTotalCpuCores() > maxCpuLimit || adjustedJobConfig.getJobTotalMemoryMb() > maxMemoryLimit) {
            if (!downScaled) {
                LOGGER.warn("Max resource limit <cpu, memory>=<{}, {}> could not be satisfied.", (Object)maxCpuLimit, (Object)maxMemoryLimit);
                return null;
            }
            downScaled = false;
            double ratio = Math.min(maxCpuLimit / adjustedJobConfig.getJobTotalCpuCores(), (double)maxMemoryLimit / (double)adjustedJobConfig.getJobTotalMemoryMb());
            LOGGER.debug("Scaling down by ratio {}.", (Object)ratio);
            for (JobVertexID vertexId : adjustedJobConfig.getVertexConfigs().keySet()) {
                RestServerClient.VertexConfig originVertexConfig = adjustedJobConfig.getVertexConfigs().get((Object)vertexId);
                int parallelism = (int)Math.floor((double)originVertexConfig.getParallelism() * ratio);
                int n = parallelism = parallelism < 1 ? 1 : parallelism;
                if (parallelism < minParallelisms.getOrDefault((Object)vertexId, 1)) {
                    parallelism = minParallelisms.get((Object)vertexId);
                }
                if (parallelism < originVertexConfig.getParallelism()) {
                    downScaled = true;
                }
                RestServerClient.VertexConfig adjustedVertexConfig = new RestServerClient.VertexConfig(originVertexConfig.getName(), parallelism, originVertexConfig.getMaxParallelism(), originVertexConfig.getResourceSpec(), originVertexConfig.getOperatorIds(), originVertexConfig.getColocationGroupId());
                adjustedJobConfig.getVertexConfigs().put(vertexId, adjustedVertexConfig);
            }
            LOGGER.debug("Resource after scaling down: <cpu, memory>=<{}, {}>.", (Object)adjustedJobConfig.getJobTotalCpuCores(), (Object)adjustedJobConfig.getJobTotalMemoryMb());
        }
        return adjustedJobConfig;
    }
}

