/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.healthmanager.plugins.resolvers;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.runtime.healthmanager.HealthMonitor;
import org.apache.flink.runtime.healthmanager.RestServerClient;
import org.apache.flink.runtime.healthmanager.plugins.Action;
import org.apache.flink.runtime.healthmanager.plugins.Resolver;
import org.apache.flink.runtime.healthmanager.plugins.Symptom;
import org.apache.flink.runtime.healthmanager.plugins.actions.AdjustJobNativeMemory;
import org.apache.flink.runtime.healthmanager.plugins.symptoms.JobStable;
import org.apache.flink.runtime.healthmanager.plugins.symptoms.JobVertexHighNativeMemory;
import org.apache.flink.runtime.healthmanager.plugins.symptoms.JobVertexLowMemory;
import org.apache.flink.runtime.healthmanager.plugins.symptoms.JobVertexTmKilledDueToMemoryExceed;
import org.apache.flink.runtime.healthmanager.plugins.utils.HealthMonitorOptions;
import org.apache.flink.runtime.healthmanager.plugins.utils.MaxResourceLimitUtil;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatistics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NativeMemoryAdjuster
implements Resolver {
    private static final Logger LOGGER = LoggerFactory.getLogger(NativeMemoryAdjuster.class);
    private JobID jobID;
    private HealthMonitor monitor;
    private double scaleUpRatio;
    private double scaleDownRatio;
    private long timeout;
    private long opportunisticActionDelay;
    private long stableTime;
    private long checkpointIntervalThreshold;
    private double maxCpuLimit;
    private int maxMemoryLimit;
    private Map<JobVertexID, Double> vertexToScaleUpMaxUtilities;
    private long opportunisticActionDelayStart;
    private JobStable jobStable;
    private JobVertexHighNativeMemory jobVertexHighNativeMemory;
    private JobVertexTmKilledDueToMemoryExceed jobVertexTmKilledDueToMemoryExceed;
    private JobVertexLowMemory jobVertexLowMemory;

    @Override
    public void open(HealthMonitor monitor) {
        this.monitor = monitor;
        this.jobID = monitor.getJobID();
        this.scaleUpRatio = monitor.getConfig().getDouble(HealthMonitorOptions.RESOURCE_MEMORY_SCALE_UP_RATIO);
        this.scaleDownRatio = monitor.getConfig().getDouble(HealthMonitorOptions.RESOURCE_MEMORY_SCALE_DOWN_RATIO);
        this.timeout = monitor.getConfig().getLong(HealthMonitorOptions.RESOURCE_SCALE_TIME_OUT);
        this.opportunisticActionDelay = monitor.getConfig().getLong(HealthMonitorOptions.RESOURCE_OPPORTUNISTIC_ACTION_DELAY);
        this.stableTime = monitor.getConfig().getLong(HealthMonitorOptions.RESOURCE_SCALE_STABLE_TIME);
        this.checkpointIntervalThreshold = monitor.getConfig().getLong(HealthMonitorOptions.PARALLELISM_SCALE_CHECKPOINT_THRESHOLD);
        this.maxCpuLimit = MaxResourceLimitUtil.getMaxCpu(monitor.getConfig());
        this.maxMemoryLimit = MaxResourceLimitUtil.getMaxMem(monitor.getConfig());
        this.vertexToScaleUpMaxUtilities = new HashMap<JobVertexID, Double>();
        this.opportunisticActionDelayStart = -1L;
    }

    @Override
    public void close() {
    }

    @Override
    public Action resolve(List<Symptom> symptomList) {
        LOGGER.debug("Start resolving.");
        if (this.opportunisticActionDelayStart < this.monitor.getJobStartExecutionTime()) {
            this.opportunisticActionDelayStart = -1L;
            this.vertexToScaleUpMaxUtilities.clear();
        }
        if (!this.diagnose(symptomList)) {
            return null;
        }
        HashMap<JobVertexID, Integer> targetNative = new HashMap<JobVertexID, Integer>();
        RestServerClient.JobConfig jobConfig = this.monitor.getJobConfig();
        if (this.jobVertexLowMemory != null) {
            targetNative.putAll(this.scaleDownVertexNativeMemory(jobConfig));
        }
        if (this.jobVertexHighNativeMemory != null || this.jobVertexTmKilledDueToMemoryExceed != null || this.vertexToScaleUpMaxUtilities != null) {
            targetNative.putAll(this.scaleUpVertexNativeMemory(jobConfig));
        }
        if (targetNative.isEmpty()) {
            return null;
        }
        AdjustJobNativeMemory adjustJobNativeMemory = new AdjustJobNativeMemory(this.jobID, this.timeout);
        for (Map.Entry entry : targetNative.entrySet()) {
            JobVertexID vertexID = (JobVertexID)((Object)entry.getKey());
            int targetNativeMemory = (Integer)entry.getValue();
            RestServerClient.VertexConfig vertexConfig = jobConfig.getVertexConfigs().get((Object)vertexID);
            ResourceSpec currentResource = vertexConfig.getResourceSpec();
            ResourceSpec targetResource = new ResourceSpec.Builder(currentResource).setNativeMemoryInMB(targetNativeMemory).build();
            adjustJobNativeMemory.addVertex(vertexID, vertexConfig.getParallelism(), vertexConfig.getParallelism(), currentResource, targetResource);
        }
        if (this.maxCpuLimit != Double.MAX_VALUE || this.maxMemoryLimit != Integer.MAX_VALUE) {
            RestServerClient.JobConfig targetJobConfig = adjustJobNativeMemory.getAppliedJobConfig(jobConfig);
            double targetTotalCpu = targetJobConfig.getJobTotalCpuCores();
            int targetTotalMem = targetJobConfig.getJobTotalMemoryMb();
            if (targetTotalCpu > this.maxCpuLimit || targetTotalMem > this.maxMemoryLimit) {
                LOGGER.debug("Give up adjusting: total resource of target job config <cpu, mem>=<{}, {}> exceed max limit <cpu, mem>=<{}, {}>.", new Object[]{targetTotalCpu, targetTotalMem, this.maxCpuLimit, this.maxMemoryLimit});
                return null;
            }
        }
        adjustJobNativeMemory.exculdeMinorDiffVertices(this.monitor.getConfig());
        if (!adjustJobNativeMemory.isEmpty()) {
            long lastCheckpointTime = 0L;
            try {
                CheckpointStatistics completedCheckpointStats = this.monitor.getRestServerClient().getLatestCheckPointStates(this.monitor.getJobID());
                if (completedCheckpointStats != null) {
                    lastCheckpointTime = completedCheckpointStats.getLatestAckTimestamp();
                }
            }
            catch (Exception exception) {
                // empty catch block
            }
            long now = System.currentTimeMillis();
            if (this.jobVertexHighNativeMemory != null && this.jobVertexHighNativeMemory.isSevere() || this.jobVertexTmKilledDueToMemoryExceed != null || this.jobVertexLowMemory != null) {
                adjustJobNativeMemory.setActionMode(Action.ActionMode.IMMEDIATE);
            } else if (this.opportunisticActionDelayStart > 0L && now - this.opportunisticActionDelayStart > this.opportunisticActionDelay && now - lastCheckpointTime < this.checkpointIntervalThreshold) {
                LOGGER.debug("Upgrade opportunistic action to immediate action.");
                adjustJobNativeMemory.setActionMode(Action.ActionMode.IMMEDIATE);
            } else {
                if (this.opportunisticActionDelayStart < 0L) {
                    this.opportunisticActionDelayStart = now;
                }
                adjustJobNativeMemory.setActionMode(Action.ActionMode.OPPORTUNISTIC);
            }
            LOGGER.info("AdjustJobNativeMemory action generated: {}.", (Object)adjustJobNativeMemory);
            return adjustJobNativeMemory;
        }
        return null;
    }

    @VisibleForTesting
    public boolean diagnose(List<Symptom> symptomList) {
        this.jobStable = null;
        this.jobVertexHighNativeMemory = null;
        this.jobVertexLowMemory = null;
        this.jobVertexTmKilledDueToMemoryExceed = null;
        for (Symptom symptom : symptomList) {
            if (symptom instanceof JobStable) {
                this.jobStable = (JobStable)symptom;
                continue;
            }
            if (symptom instanceof JobVertexHighNativeMemory) {
                this.jobVertexHighNativeMemory = (JobVertexHighNativeMemory)symptom;
                continue;
            }
            if (symptom instanceof JobVertexTmKilledDueToMemoryExceed) {
                this.jobVertexTmKilledDueToMemoryExceed = (JobVertexTmKilledDueToMemoryExceed)symptom;
                continue;
            }
            if (!(symptom instanceof JobVertexLowMemory)) continue;
            this.jobVertexLowMemory = (JobVertexLowMemory)symptom;
        }
        if (this.jobVertexTmKilledDueToMemoryExceed != null) {
            LOGGER.debug("Task manager killed due to memory exceed detected, should rescale.");
            return true;
        }
        if (this.jobVertexHighNativeMemory != null && this.jobVertexHighNativeMemory.isCritical()) {
            LOGGER.debug("Critical native memory high detected, should rescale.");
            return true;
        }
        if (this.jobStable == null || this.jobStable.getStableTime() < this.stableTime) {
            LOGGER.debug("Job unstable, should not rescale.");
            return false;
        }
        if (this.jobVertexHighNativeMemory == null && this.jobVertexLowMemory == null) {
            LOGGER.debug("No need to rescale.");
            return false;
        }
        return true;
    }

    @VisibleForTesting
    public Map<JobVertexID, Integer> scaleUpVertexNativeMemory(RestServerClient.JobConfig jobConfig) {
        if (this.jobVertexHighNativeMemory != null) {
            for (Map.Entry<JobVertexID, Double> entry : this.jobVertexHighNativeMemory.getUtilities().entrySet()) {
                if (this.vertexToScaleUpMaxUtilities.containsKey((Object)entry.getKey()) && !(this.vertexToScaleUpMaxUtilities.get((Object)entry.getKey()) < entry.getValue())) continue;
                this.vertexToScaleUpMaxUtilities.put(entry.getKey(), entry.getValue());
            }
        }
        if (this.jobVertexTmKilledDueToMemoryExceed != null) {
            for (Map.Entry<JobVertexID, Double> entry : this.jobVertexTmKilledDueToMemoryExceed.getUtilities().entrySet()) {
                if (this.vertexToScaleUpMaxUtilities.containsKey((Object)entry.getKey()) && !(this.vertexToScaleUpMaxUtilities.get((Object)entry.getKey()) < entry.getValue())) continue;
                this.vertexToScaleUpMaxUtilities.put(entry.getKey(), entry.getValue());
            }
        }
        HashMap<JobVertexID, Integer> results = new HashMap<JobVertexID, Integer>();
        for (JobVertexID jvId : this.vertexToScaleUpMaxUtilities.keySet()) {
            RestServerClient.VertexConfig vertexConfig = jobConfig.getVertexConfigs().get((Object)jvId);
            ResourceSpec currentResource = vertexConfig.getResourceSpec();
            int targetNativeMemory = currentResource.getNativeMemory() == 0 ? (int)Math.ceil(this.vertexToScaleUpMaxUtilities.get((Object)jvId) * this.scaleUpRatio) : (int)Math.ceil((double)currentResource.getNativeMemory() * Math.max(1.0, this.vertexToScaleUpMaxUtilities.get((Object)jvId)) * this.scaleUpRatio);
            results.put(jvId, targetNativeMemory);
            LOGGER.debug("Scale up, target native memory for vertex {} is {}.", (Object)jvId, (Object)targetNativeMemory);
        }
        return results;
    }

    @VisibleForTesting
    public Map<JobVertexID, Integer> scaleDownVertexNativeMemory(RestServerClient.JobConfig jobConfig) {
        if (this.jobVertexLowMemory == null) {
            return Collections.emptyMap();
        }
        HashMap<JobVertexID, Integer> results = new HashMap<JobVertexID, Integer>();
        for (Map.Entry<JobVertexID, Double> entry : this.jobVertexLowMemory.getNativeUtilities().entrySet()) {
            JobVertexID vertexID = entry.getKey();
            double utility = entry.getValue();
            RestServerClient.VertexConfig vertexConfig = jobConfig.getVertexConfigs().get((Object)vertexID);
            int targetNativeMemory = vertexConfig.getResourceSpec().getNativeMemory();
            if (targetNativeMemory == 0) {
                targetNativeMemory = 1;
            }
            targetNativeMemory = (int)Math.ceil((double)targetNativeMemory * utility * this.scaleDownRatio);
            results.put(vertexID, targetNativeMemory);
            LOGGER.debug("Scale down, target native memory for vertex {} is {}.", (Object)vertexID, (Object)targetNativeMemory);
        }
        return results;
    }
}

