/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.healthmanager.plugins.resolvers;

import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.runtime.healthmanager.HealthMonitor;
import org.apache.flink.runtime.healthmanager.RestServerClient;
import org.apache.flink.runtime.healthmanager.plugins.Action;
import org.apache.flink.runtime.healthmanager.plugins.Resolver;
import org.apache.flink.runtime.healthmanager.plugins.Symptom;
import org.apache.flink.runtime.healthmanager.plugins.actions.AdjustJobHeapMemory;
import org.apache.flink.runtime.healthmanager.plugins.symptoms.JobStable;
import org.apache.flink.runtime.healthmanager.plugins.symptoms.JobVertexFrequentFullGC;
import org.apache.flink.runtime.healthmanager.plugins.symptoms.JobVertexHeapOOM;
import org.apache.flink.runtime.healthmanager.plugins.symptoms.JobVertexLongTimeFullGC;
import org.apache.flink.runtime.healthmanager.plugins.symptoms.JobVertexLowMemory;
import org.apache.flink.runtime.healthmanager.plugins.utils.HealthMonitorOptions;
import org.apache.flink.runtime.healthmanager.plugins.utils.MaxResourceLimitUtil;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatistics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HeapMemoryAdjuster
implements Resolver {
    private static final Logger LOGGER = LoggerFactory.getLogger(HeapMemoryAdjuster.class);
    private JobID jobID;
    private HealthMonitor monitor;
    private double scaleUpRatio;
    private double scaleDownRatio;
    private long timeout;
    private long opportunisticActionDelay;
    private long stableTime;
    private long checkpointIntervalThreshold;
    private double maxCpuLimit;
    private int maxMemoryLimit;
    private Set<JobVertexID> vertexToScaleUp;
    private long opportunisticActionDelayStart;
    private JobStable jobStable;
    private JobVertexHeapOOM jobVertexHeapOOM;
    private JobVertexFrequentFullGC jobVertexFrequentFullGC;
    private JobVertexLongTimeFullGC jobVertexLongTimeFullGC;
    private JobVertexLowMemory jobVertexLowMemory;

    @Override
    public void open(HealthMonitor monitor) {
        this.monitor = monitor;
        this.jobID = monitor.getJobID();
        this.scaleUpRatio = monitor.getConfig().getDouble(HealthMonitorOptions.RESOURCE_MEMORY_SCALE_UP_RATIO);
        this.scaleDownRatio = monitor.getConfig().getDouble(HealthMonitorOptions.RESOURCE_MEMORY_SCALE_DOWN_RATIO);
        this.timeout = monitor.getConfig().getLong(HealthMonitorOptions.RESOURCE_SCALE_TIME_OUT);
        this.opportunisticActionDelay = monitor.getConfig().getLong(HealthMonitorOptions.RESOURCE_OPPORTUNISTIC_ACTION_DELAY);
        this.stableTime = monitor.getConfig().getLong(HealthMonitorOptions.RESOURCE_SCALE_STABLE_TIME);
        this.checkpointIntervalThreshold = monitor.getConfig().getLong(HealthMonitorOptions.PARALLELISM_SCALE_CHECKPOINT_THRESHOLD);
        this.maxCpuLimit = MaxResourceLimitUtil.getMaxCpu(monitor.getConfig());
        this.maxMemoryLimit = MaxResourceLimitUtil.getMaxMem(monitor.getConfig());
        this.vertexToScaleUp = new HashSet<JobVertexID>();
        this.opportunisticActionDelayStart = -1L;
    }

    @Override
    public void close() {
    }

    @Override
    public Action resolve(List<Symptom> symptomList) {
        LOGGER.debug("Start resolving.");
        if (this.opportunisticActionDelayStart < this.monitor.getJobStartExecutionTime()) {
            this.opportunisticActionDelayStart = -1L;
            this.vertexToScaleUp.clear();
        }
        if (!this.diagnose(symptomList)) {
            return null;
        }
        HashMap<JobVertexID, Integer> targetHeap = new HashMap<JobVertexID, Integer>();
        RestServerClient.JobConfig jobConfig = this.monitor.getJobConfig();
        if (this.jobVertexLowMemory != null) {
            targetHeap.putAll(this.scaleDownVertexHeapMem(jobConfig));
        }
        if (this.jobVertexHeapOOM != null || this.jobVertexFrequentFullGC != null || this.jobVertexLongTimeFullGC != null || !this.vertexToScaleUp.isEmpty()) {
            targetHeap.putAll(this.scaleUpVertexHeapMem(jobConfig));
        }
        if (targetHeap.isEmpty()) {
            return null;
        }
        AdjustJobHeapMemory adjustJobHeapMemory = new AdjustJobHeapMemory(this.jobID, this.timeout);
        for (Map.Entry entry : targetHeap.entrySet()) {
            JobVertexID vertexID = (JobVertexID)((Object)entry.getKey());
            int targetHeapMemory = (Integer)entry.getValue();
            RestServerClient.VertexConfig vertexConfig = jobConfig.getVertexConfigs().get((Object)vertexID);
            ResourceSpec currentResource = vertexConfig.getResourceSpec();
            ResourceSpec targetResource = new ResourceSpec.Builder(currentResource).setHeapMemoryInMB(targetHeapMemory).build();
            adjustJobHeapMemory.addVertex(vertexID, vertexConfig.getParallelism(), vertexConfig.getParallelism(), currentResource, targetResource);
        }
        if (this.maxCpuLimit != Double.MAX_VALUE || this.maxMemoryLimit != Integer.MAX_VALUE) {
            RestServerClient.JobConfig targetJobConfig = adjustJobHeapMemory.getAppliedJobConfig(jobConfig);
            double targetTotalCpu = targetJobConfig.getJobTotalCpuCores();
            int targetTotalMem = targetJobConfig.getJobTotalMemoryMb();
            if (targetTotalCpu > this.maxCpuLimit || targetTotalMem > this.maxMemoryLimit) {
                LOGGER.debug("Give up adjusting: total resource of target job config <cpu, mem>=<{}, {}> exceed max limit <cpu, mem>=<{}, {}>.", new Object[]{targetTotalCpu, targetTotalMem, this.maxCpuLimit, this.maxMemoryLimit});
                return null;
            }
        }
        adjustJobHeapMemory.exculdeMinorDiffVertices(this.monitor.getConfig());
        if (!adjustJobHeapMemory.isEmpty()) {
            long lastCheckpointTime = 0L;
            try {
                CheckpointStatistics completedCheckpointStats = this.monitor.getRestServerClient().getLatestCheckPointStates(this.monitor.getJobID());
                if (completedCheckpointStats != null) {
                    lastCheckpointTime = completedCheckpointStats.getLatestAckTimestamp();
                }
            }
            catch (Exception exception) {
                // empty catch block
            }
            long now = System.currentTimeMillis();
            if (this.jobVertexHeapOOM != null || this.jobVertexFrequentFullGC != null && this.jobVertexFrequentFullGC.isSevere() || this.jobVertexLongTimeFullGC != null && this.jobVertexLongTimeFullGC.isSevere() || this.jobVertexLowMemory != null) {
                adjustJobHeapMemory.setActionMode(Action.ActionMode.IMMEDIATE);
            } else if (this.opportunisticActionDelayStart > 0L && now - this.opportunisticActionDelayStart > this.opportunisticActionDelay && now - lastCheckpointTime < this.checkpointIntervalThreshold) {
                LOGGER.debug("Upgrade opportunistic action to immediate action.");
                adjustJobHeapMemory.setActionMode(Action.ActionMode.IMMEDIATE);
            } else {
                if (this.opportunisticActionDelayStart < 0L) {
                    this.opportunisticActionDelayStart = now;
                }
                adjustJobHeapMemory.setActionMode(Action.ActionMode.OPPORTUNISTIC);
            }
            LOGGER.info("AdjustJobHeapMemory action generated: {}.", (Object)adjustJobHeapMemory);
            return adjustJobHeapMemory;
        }
        return null;
    }

    @VisibleForTesting
    public boolean diagnose(List<Symptom> symptomList) {
        this.jobStable = null;
        this.jobVertexHeapOOM = null;
        this.jobVertexFrequentFullGC = null;
        this.jobVertexLongTimeFullGC = null;
        this.jobVertexLowMemory = null;
        for (Symptom symptom : symptomList) {
            if (symptom instanceof JobStable) {
                this.jobStable = (JobStable)symptom;
                continue;
            }
            if (symptom instanceof JobVertexHeapOOM) {
                this.jobVertexHeapOOM = (JobVertexHeapOOM)symptom;
                continue;
            }
            if (symptom instanceof JobVertexFrequentFullGC) {
                this.jobVertexFrequentFullGC = (JobVertexFrequentFullGC)symptom;
                continue;
            }
            if (symptom instanceof JobVertexLongTimeFullGC) {
                this.jobVertexLongTimeFullGC = (JobVertexLongTimeFullGC)symptom;
            }
            if (!(symptom instanceof JobVertexLowMemory)) continue;
            this.jobVertexLowMemory = (JobVertexLowMemory)symptom;
        }
        if (this.jobVertexHeapOOM != null) {
            LOGGER.debug("Heap OOM detected, should rescale.");
            return true;
        }
        if (this.jobVertexLongTimeFullGC != null && this.jobVertexLongTimeFullGC.isCritical()) {
            LOGGER.debug("Critical long time full GC detected, should rescale.");
            return true;
        }
        if (this.jobStable == null || this.jobStable.getStableTime() < this.stableTime) {
            LOGGER.debug("Job unstable, should not rescale.");
            return false;
        }
        if (this.jobVertexFrequentFullGC == null && this.jobVertexLongTimeFullGC == null && this.jobVertexLowMemory == null) {
            LOGGER.debug("No need to rescale.");
            return false;
        }
        return true;
    }

    @VisibleForTesting
    public Map<JobVertexID, Integer> scaleUpVertexHeapMem(RestServerClient.JobConfig jobConfig) {
        if (this.jobVertexHeapOOM != null) {
            this.vertexToScaleUp.addAll(this.jobVertexHeapOOM.getJobVertexIDs());
        }
        if (this.jobVertexFrequentFullGC != null) {
            this.vertexToScaleUp.addAll(this.jobVertexFrequentFullGC.getJobVertexIDs());
        }
        if (this.jobVertexLongTimeFullGC != null) {
            this.vertexToScaleUp.addAll(this.jobVertexLongTimeFullGC.getJobVertexIDs());
        }
        HashMap<JobVertexID, Integer> results = new HashMap<JobVertexID, Integer>();
        for (JobVertexID vertexID : this.vertexToScaleUp) {
            RestServerClient.VertexConfig vertexConfig = jobConfig.getVertexConfigs().get((Object)vertexID);
            ResourceSpec currentResource = vertexConfig.getResourceSpec();
            int targetHeapMemory = currentResource.getHeapMemory() == 0 ? (int)Math.ceil(1.0 * this.scaleUpRatio) : (int)Math.ceil((double)currentResource.getHeapMemory() * this.scaleUpRatio);
            results.put(vertexID, targetHeapMemory);
            LOGGER.debug("Scale up, target heap memory for vertex {} is {}.", (Object)vertexID, (Object)targetHeapMemory);
        }
        return results;
    }

    @VisibleForTesting
    public Map<JobVertexID, Integer> scaleDownVertexHeapMem(RestServerClient.JobConfig jobConfig) {
        if (this.jobVertexLowMemory == null) {
            return Collections.emptyMap();
        }
        HashMap<JobVertexID, Integer> results = new HashMap<JobVertexID, Integer>();
        for (Map.Entry<JobVertexID, Double> entry : this.jobVertexLowMemory.getHeapUtilities().entrySet()) {
            JobVertexID vertexID = entry.getKey();
            double utility = entry.getValue();
            RestServerClient.VertexConfig vertexConfig = jobConfig.getVertexConfigs().get((Object)vertexID);
            int targetHeapMemory = vertexConfig.getResourceSpec().getHeapMemory();
            if (targetHeapMemory == 0) {
                targetHeapMemory = 1;
            }
            if (utility * this.scaleDownRatio < 1.0) {
                targetHeapMemory = (int)Math.ceil((double)targetHeapMemory * utility * this.scaleDownRatio);
            }
            results.put(vertexID, targetHeapMemory);
            LOGGER.debug("Scale down, target heap memory for vertex {} is {}.", (Object)vertexID, (Object)targetHeapMemory);
        }
        return results;
    }
}

