/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.healthmanager.plugins.resolvers;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.runtime.healthmanager.HealthMonitor;
import org.apache.flink.runtime.healthmanager.RestServerClient;
import org.apache.flink.runtime.healthmanager.plugins.Action;
import org.apache.flink.runtime.healthmanager.plugins.Resolver;
import org.apache.flink.runtime.healthmanager.plugins.Symptom;
import org.apache.flink.runtime.healthmanager.plugins.actions.AdjustJobCpu;
import org.apache.flink.runtime.healthmanager.plugins.symptoms.JobStable;
import org.apache.flink.runtime.healthmanager.plugins.symptoms.JobVertexFrequentFullGC;
import org.apache.flink.runtime.healthmanager.plugins.symptoms.JobVertexHighCpu;
import org.apache.flink.runtime.healthmanager.plugins.symptoms.JobVertexLongTimeFullGC;
import org.apache.flink.runtime.healthmanager.plugins.symptoms.JobVertexLowCpu;
import org.apache.flink.runtime.healthmanager.plugins.utils.HealthMonitorOptions;
import org.apache.flink.runtime.healthmanager.plugins.utils.MaxResourceLimitUtil;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatistics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CpuAdjuster
implements Resolver {
    private static final Logger LOGGER = LoggerFactory.getLogger(CpuAdjuster.class);
    private JobID jobID;
    private HealthMonitor monitor;
    private double scaleUpRatio;
    private double scaleDownRatio;
    private long timeout;
    private long opportunisticActionDelay;
    private long stableTime;
    private double maxCpuLimit;
    private int maxMemoryLimit;
    private Map<JobVertexID, Double> vertexToScaleUpMaxUtility;
    private long opportunisticActionDelayStart;
    private long checkpointIntervalThreshold;
    private JobVertexHighCpu jobVertexHighCpu;
    private JobVertexLowCpu jobVertexLowCpu;
    private JobStable jobStable;
    private JobVertexFrequentFullGC jobVertexFrequentFullGC;
    private JobVertexLongTimeFullGC jobVertexLongTimeFullGC;

    @Override
    public void open(HealthMonitor monitor) {
        this.monitor = monitor;
        this.jobID = monitor.getJobID();
        this.scaleUpRatio = monitor.getConfig().getDouble(HealthMonitorOptions.RESOURCE_CPU_SCALE_UP_RATIO);
        this.scaleDownRatio = monitor.getConfig().getDouble(HealthMonitorOptions.RESOURCE_CPU_SCALE_DOWN_RATIO);
        this.timeout = monitor.getConfig().getLong(HealthMonitorOptions.RESOURCE_SCALE_TIME_OUT);
        this.opportunisticActionDelay = monitor.getConfig().getLong(HealthMonitorOptions.RESOURCE_OPPORTUNISTIC_ACTION_DELAY);
        this.stableTime = monitor.getConfig().getLong(HealthMonitorOptions.RESOURCE_SCALE_STABLE_TIME);
        this.checkpointIntervalThreshold = monitor.getConfig().getLong(HealthMonitorOptions.PARALLELISM_SCALE_CHECKPOINT_THRESHOLD);
        this.maxCpuLimit = MaxResourceLimitUtil.getMaxCpu(monitor.getConfig());
        this.maxMemoryLimit = MaxResourceLimitUtil.getMaxMem(monitor.getConfig());
        this.vertexToScaleUpMaxUtility = new HashMap<JobVertexID, Double>();
        this.opportunisticActionDelayStart = -1L;
    }

    @Override
    public void close() {
    }

    @Override
    public Action resolve(List<Symptom> symptomList) {
        LOGGER.debug("Start resolving.");
        if (this.opportunisticActionDelayStart < this.monitor.getJobStartExecutionTime()) {
            this.opportunisticActionDelayStart = -1L;
            this.vertexToScaleUpMaxUtility.clear();
        }
        if (!this.diagnose(symptomList)) {
            return null;
        }
        HashMap<JobVertexID, Double> targetCpu = new HashMap<JobVertexID, Double>();
        RestServerClient.JobConfig jobConfig = this.monitor.getJobConfig();
        if (this.jobVertexLowCpu != null) {
            targetCpu.putAll(this.scaleDownVertexCpu(jobConfig));
        }
        if (this.jobVertexHighCpu != null || !this.vertexToScaleUpMaxUtility.isEmpty()) {
            targetCpu.putAll(this.scaleUpVertexCpu(jobConfig));
        }
        if (targetCpu.isEmpty()) {
            return null;
        }
        AdjustJobCpu adjustJobCpu = new AdjustJobCpu(this.jobID, this.timeout);
        for (Map.Entry entry : targetCpu.entrySet()) {
            JobVertexID vertexID = (JobVertexID)((Object)entry.getKey());
            double target = (Double)entry.getValue();
            RestServerClient.VertexConfig vertexConfig = jobConfig.getVertexConfigs().get((Object)vertexID);
            ResourceSpec currentResource = vertexConfig.getResourceSpec();
            ResourceSpec targetResource = new ResourceSpec.Builder(currentResource).setCpuCores(target).build();
            adjustJobCpu.addVertex(vertexID, vertexConfig.getParallelism(), vertexConfig.getParallelism(), currentResource, targetResource);
        }
        if (this.maxCpuLimit != Double.MAX_VALUE || this.maxMemoryLimit != Integer.MAX_VALUE) {
            RestServerClient.JobConfig targetJobConfig = adjustJobCpu.getAppliedJobConfig(jobConfig);
            double targetTotalCpu = targetJobConfig.getJobTotalCpuCores();
            int targetTotalMem = targetJobConfig.getJobTotalMemoryMb();
            if (targetTotalCpu > this.maxCpuLimit || targetTotalMem > this.maxMemoryLimit) {
                LOGGER.debug("Give up adjusting: total resource of target job config <cpu, mem>=<{}, {}> exceed max limit <cpu, mem>=<{}, {}>.", new Object[]{targetTotalCpu, targetTotalMem, this.maxCpuLimit, this.maxMemoryLimit});
                return null;
            }
        }
        adjustJobCpu.exculdeMinorDiffVertices(this.monitor.getConfig());
        if (!adjustJobCpu.isEmpty()) {
            long lastCheckpointTime = 0L;
            try {
                CheckpointStatistics completedCheckpointStats = this.monitor.getRestServerClient().getLatestCheckPointStates(this.monitor.getJobID());
                if (completedCheckpointStats != null) {
                    lastCheckpointTime = completedCheckpointStats.getLatestAckTimestamp();
                }
            }
            catch (Exception exception) {
                // empty catch block
            }
            long now = System.currentTimeMillis();
            if (this.jobVertexLowCpu != null) {
                adjustJobCpu.setActionMode(Action.ActionMode.IMMEDIATE);
            } else if (this.opportunisticActionDelayStart > 0L && now - this.opportunisticActionDelayStart > this.opportunisticActionDelay && now - lastCheckpointTime < this.checkpointIntervalThreshold) {
                LOGGER.debug("Upgrade opportunistic action to immediate action.");
                adjustJobCpu.setActionMode(Action.ActionMode.IMMEDIATE);
            } else {
                if (this.opportunisticActionDelayStart < 0L) {
                    this.opportunisticActionDelayStart = now;
                }
                adjustJobCpu.setActionMode(Action.ActionMode.OPPORTUNISTIC);
            }
            LOGGER.info("AdjustJobCpu action generated: {}.", (Object)adjustJobCpu);
            return adjustJobCpu;
        }
        return null;
    }

    @VisibleForTesting
    public boolean diagnose(List<Symptom> symptomList) {
        this.jobVertexHighCpu = null;
        this.jobVertexLowCpu = null;
        this.jobStable = null;
        this.jobVertexFrequentFullGC = null;
        this.jobVertexLongTimeFullGC = null;
        for (Symptom symptom : symptomList) {
            if (symptom instanceof JobStable) {
                this.jobStable = (JobStable)symptom;
                continue;
            }
            if (symptom instanceof JobVertexHighCpu) {
                this.jobVertexHighCpu = (JobVertexHighCpu)symptom;
                LOGGER.debug("High cpu detected for vertices with max utilities {}.", this.jobVertexHighCpu.getUtilities());
                continue;
            }
            if (symptom instanceof JobVertexLowCpu) {
                this.jobVertexLowCpu = (JobVertexLowCpu)symptom;
                LOGGER.debug("Low cpu detected for vertices with max utilities {}.", this.jobVertexLowCpu.getUtilities());
            }
            if (symptom instanceof JobVertexFrequentFullGC) {
                this.jobVertexFrequentFullGC = (JobVertexFrequentFullGC)symptom;
                LOGGER.debug("Frequent full gc detected for vertices {}.", this.jobVertexFrequentFullGC.getJobVertexIDs());
            }
            if (!(symptom instanceof JobVertexLongTimeFullGC)) continue;
            this.jobVertexLongTimeFullGC = (JobVertexLongTimeFullGC)symptom;
            LOGGER.debug("Long time full gc detected for vertices {}.", this.jobVertexLongTimeFullGC.getJobVertexIDs());
        }
        if (this.jobStable == null || this.jobStable.getStableTime() < this.stableTime) {
            LOGGER.debug("Job unstable, should not rescale.");
            return false;
        }
        if (this.jobVertexFrequentFullGC != null && this.jobVertexFrequentFullGC.isSevere() || this.jobVertexLongTimeFullGC != null && this.jobVertexLongTimeFullGC.isSevere()) {
            LOGGER.debug("GC is severe, should not rescale.");
            return false;
        }
        if (this.jobVertexHighCpu == null && this.jobVertexLowCpu == null) {
            LOGGER.debug("No need to rescale.");
            return false;
        }
        return true;
    }

    @VisibleForTesting
    public Map<JobVertexID, Double> scaleUpVertexCpu(RestServerClient.JobConfig jobConfig) {
        if (this.jobVertexHighCpu != null) {
            for (Map.Entry<JobVertexID, Double> entry : this.jobVertexHighCpu.getUtilities().entrySet()) {
                if (this.vertexToScaleUpMaxUtility.containsKey((Object)entry.getKey()) && !(this.vertexToScaleUpMaxUtility.get((Object)entry.getKey()) < entry.getValue())) continue;
                this.vertexToScaleUpMaxUtility.put(entry.getKey(), entry.getValue());
            }
        }
        HashMap<JobVertexID, Double> results = new HashMap<JobVertexID, Double>();
        for (JobVertexID jvId : this.vertexToScaleUpMaxUtility.keySet()) {
            RestServerClient.VertexConfig vertexConfig = jobConfig.getVertexConfigs().get((Object)jvId);
            ResourceSpec currentResource = vertexConfig.getResourceSpec();
            double utility = this.vertexToScaleUpMaxUtility.get((Object)jvId);
            double targetCpu = currentResource.getCpuCores() * Math.max(1.0, utility) * this.scaleUpRatio;
            results.put(jvId, targetCpu);
            LOGGER.debug("Scale up, target cpu for vertex {} is {}.", (Object)jvId, (Object)targetCpu);
        }
        return results;
    }

    @VisibleForTesting
    public Map<JobVertexID, Double> scaleDownVertexCpu(RestServerClient.JobConfig jobConfig) {
        if (this.jobVertexLowCpu == null) {
            return Collections.emptyMap();
        }
        HashMap<JobVertexID, Double> results = new HashMap<JobVertexID, Double>();
        for (Map.Entry<JobVertexID, Double> entry : this.jobVertexLowCpu.getUtilities().entrySet()) {
            JobVertexID vertexID = entry.getKey();
            double utility = entry.getValue();
            RestServerClient.VertexConfig vertexConfig = jobConfig.getVertexConfigs().get((Object)vertexID);
            ResourceSpec currentResource = vertexConfig.getResourceSpec();
            double targetCpu = currentResource.getCpuCores();
            if (targetCpu == 0.0) {
                targetCpu = 1.0;
            }
            targetCpu = targetCpu * utility * this.scaleDownRatio;
            results.put(vertexID, targetCpu);
            LOGGER.debug("Scale down, target cpu for vertex {} is {}.", (Object)vertexID, (Object)targetCpu);
        }
        return results;
    }
}

