/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.healthmanager.plugins.resolvers;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.runtime.healthmanager.HealthMonitor;
import org.apache.flink.runtime.healthmanager.RestServerClient;
import org.apache.flink.runtime.healthmanager.plugins.Action;
import org.apache.flink.runtime.healthmanager.plugins.Resolver;
import org.apache.flink.runtime.healthmanager.plugins.Symptom;
import org.apache.flink.runtime.healthmanager.plugins.actions.AdjustJobDirectMemory;
import org.apache.flink.runtime.healthmanager.plugins.symptoms.JobStable;
import org.apache.flink.runtime.healthmanager.plugins.symptoms.JobVertexDirectOOM;
import org.apache.flink.runtime.healthmanager.plugins.symptoms.JobVertexLowMemory;
import org.apache.flink.runtime.healthmanager.plugins.utils.HealthMonitorOptions;
import org.apache.flink.runtime.healthmanager.plugins.utils.MaxResourceLimitUtil;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatistics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DirectMemoryAdjuster
implements Resolver {
    private static final Logger LOGGER = LoggerFactory.getLogger(DirectMemoryAdjuster.class);
    private JobID jobID;
    private HealthMonitor monitor;
    private double scaleUpRatio;
    private double scaleDownRatio;
    private long timeout;
    private long opportunisticActionDelay;
    private long stableTime;
    private long checkpointIntervalThreshold;
    private double maxCpuLimit;
    private int maxMemoryLimit;
    private long opportunisticActionDelayStart;
    private JobStable jobStable;
    private JobVertexDirectOOM jobVertexDirectOOM;
    private JobVertexLowMemory jobVertexLowMemory;

    @Override
    public void open(HealthMonitor monitor) {
        this.monitor = monitor;
        this.jobID = monitor.getJobID();
        this.scaleUpRatio = monitor.getConfig().getDouble(HealthMonitorOptions.RESOURCE_MEMORY_SCALE_UP_RATIO);
        this.scaleDownRatio = monitor.getConfig().getDouble(HealthMonitorOptions.RESOURCE_MEMORY_SCALE_DOWN_RATIO);
        this.timeout = monitor.getConfig().getLong(HealthMonitorOptions.RESOURCE_SCALE_TIME_OUT);
        this.opportunisticActionDelay = monitor.getConfig().getLong(HealthMonitorOptions.RESOURCE_OPPORTUNISTIC_ACTION_DELAY);
        this.stableTime = monitor.getConfig().getLong(HealthMonitorOptions.RESOURCE_SCALE_STABLE_TIME);
        this.checkpointIntervalThreshold = monitor.getConfig().getLong(HealthMonitorOptions.PARALLELISM_SCALE_CHECKPOINT_THRESHOLD);
        this.maxCpuLimit = MaxResourceLimitUtil.getMaxCpu(monitor.getConfig());
        this.maxMemoryLimit = MaxResourceLimitUtil.getMaxMem(monitor.getConfig());
        this.opportunisticActionDelayStart = -1L;
    }

    @Override
    public void close() {
    }

    @Override
    public Action resolve(List<Symptom> symptomList) {
        LOGGER.debug("Start resolving.");
        if (this.opportunisticActionDelayStart < this.monitor.getJobStartExecutionTime()) {
            this.opportunisticActionDelayStart = -1L;
        }
        if (!this.diagnose(symptomList)) {
            return null;
        }
        HashMap<JobVertexID, Integer> targetDirect = new HashMap<JobVertexID, Integer>();
        RestServerClient.JobConfig jobConfig = this.monitor.getJobConfig();
        if (this.jobVertexLowMemory != null) {
            targetDirect.putAll(this.scaleDownVertexDirectMem(jobConfig));
        }
        if (this.jobVertexDirectOOM != null) {
            targetDirect.putAll(this.scaleUpVertexDirectMem(jobConfig));
        }
        if (targetDirect.isEmpty()) {
            return null;
        }
        AdjustJobDirectMemory adjustJobDirectMemory = new AdjustJobDirectMemory(this.jobID, this.timeout);
        for (Map.Entry entry : targetDirect.entrySet()) {
            JobVertexID vertexID = (JobVertexID)((Object)entry.getKey());
            int targetDirectMemory = (Integer)entry.getValue();
            RestServerClient.VertexConfig vertexConfig = jobConfig.getVertexConfigs().get((Object)vertexID);
            ResourceSpec currentResource = vertexConfig.getResourceSpec();
            ResourceSpec targetResource = new ResourceSpec.Builder(currentResource).setDirectMemoryInMB(targetDirectMemory).build();
            adjustJobDirectMemory.addVertex(vertexID, vertexConfig.getParallelism(), vertexConfig.getParallelism(), currentResource, targetResource);
        }
        if (this.maxCpuLimit != Double.MAX_VALUE || this.maxMemoryLimit != Integer.MAX_VALUE) {
            RestServerClient.JobConfig targetJobConfig = adjustJobDirectMemory.getAppliedJobConfig(jobConfig);
            double targetTotalCpu = targetJobConfig.getJobTotalCpuCores();
            int targetTotalMem = targetJobConfig.getJobTotalMemoryMb();
            if (targetTotalCpu > this.maxCpuLimit || targetTotalMem > this.maxMemoryLimit) {
                LOGGER.debug("Give up adjusting: total resource of target job config <cpu, mem>=<{}, {}> exceed max limit <cpu, mem>=<{}, {}>.", new Object[]{targetTotalCpu, targetTotalMem, this.maxCpuLimit, this.maxMemoryLimit});
                return null;
            }
        }
        adjustJobDirectMemory.exculdeMinorDiffVertices(this.monitor.getConfig());
        if (!adjustJobDirectMemory.isEmpty()) {
            long lastCheckpointTime = 0L;
            try {
                CheckpointStatistics completedCheckpointStats = this.monitor.getRestServerClient().getLatestCheckPointStates(this.monitor.getJobID());
                if (completedCheckpointStats != null) {
                    lastCheckpointTime = completedCheckpointStats.getLatestAckTimestamp();
                }
            }
            catch (Exception exception) {
                // empty catch block
            }
            long now = System.currentTimeMillis();
            if (this.jobVertexDirectOOM != null || this.jobVertexLowMemory != null) {
                adjustJobDirectMemory.setActionMode(Action.ActionMode.IMMEDIATE);
            } else if (this.opportunisticActionDelayStart > 0L && now - this.opportunisticActionDelayStart > this.opportunisticActionDelay && now - lastCheckpointTime < this.checkpointIntervalThreshold) {
                LOGGER.debug("Upgrade opportunistic action to immediate action.");
                adjustJobDirectMemory.setActionMode(Action.ActionMode.IMMEDIATE);
            } else {
                if (this.opportunisticActionDelayStart < 0L) {
                    this.opportunisticActionDelayStart = now;
                }
                adjustJobDirectMemory.setActionMode(Action.ActionMode.OPPORTUNISTIC);
            }
            LOGGER.info("AdjustJobDirectMemory action generated: {}.", (Object)adjustJobDirectMemory);
            return adjustJobDirectMemory;
        }
        return null;
    }

    @VisibleForTesting
    public boolean diagnose(List<Symptom> symptomList) {
        this.jobStable = null;
        this.jobVertexDirectOOM = null;
        this.jobVertexLowMemory = null;
        for (Symptom symptom : symptomList) {
            if (symptom instanceof JobStable) {
                this.jobStable = (JobStable)symptom;
                continue;
            }
            if (symptom instanceof JobVertexDirectOOM) {
                this.jobVertexDirectOOM = (JobVertexDirectOOM)symptom;
                continue;
            }
            if (!(symptom instanceof JobVertexLowMemory)) continue;
            this.jobVertexLowMemory = (JobVertexLowMemory)symptom;
        }
        if (this.jobVertexDirectOOM != null) {
            LOGGER.debug("Direct OOM detected, should rescale.");
            return true;
        }
        if (this.jobStable == null || this.jobStable.getStableTime() < this.stableTime) {
            LOGGER.debug("Job unstable, should not rescale.");
            return false;
        }
        if (this.jobVertexLowMemory == null) {
            LOGGER.debug("No need to rescale.");
            return false;
        }
        return true;
    }

    @VisibleForTesting
    public Map<JobVertexID, Integer> scaleUpVertexDirectMem(RestServerClient.JobConfig jobConfig) {
        if (this.jobVertexDirectOOM == null) {
            return Collections.emptyMap();
        }
        HashMap<JobVertexID, Integer> results = new HashMap<JobVertexID, Integer>();
        for (JobVertexID vertexID : this.jobVertexDirectOOM.getJobVertexIDs()) {
            RestServerClient.VertexConfig vertexConfig = jobConfig.getVertexConfigs().get((Object)vertexID);
            ResourceSpec currentResource = vertexConfig.getResourceSpec();
            int targetDirectMemory = currentResource.getDirectMemory() == 0 ? (int)Math.ceil(1.0 * this.scaleUpRatio) : (int)Math.ceil((double)currentResource.getDirectMemory() * this.scaleUpRatio);
            results.put(vertexID, targetDirectMemory);
            LOGGER.debug("Scale up, target direct memory for vertex {} is {}.", (Object)vertexID, (Object)targetDirectMemory);
        }
        return results;
    }

    @VisibleForTesting
    public Map<JobVertexID, Integer> scaleDownVertexDirectMem(RestServerClient.JobConfig jobConfig) {
        if (this.jobVertexLowMemory == null) {
            return Collections.emptyMap();
        }
        HashMap<JobVertexID, Integer> results = new HashMap<JobVertexID, Integer>();
        for (Map.Entry<JobVertexID, Double> entry : this.jobVertexLowMemory.getNonHeapUtilities().entrySet()) {
            JobVertexID vertexID = entry.getKey();
            double utility = entry.getValue();
            RestServerClient.VertexConfig vertexConfig = jobConfig.getVertexConfigs().get((Object)vertexID);
            int targetDirectMemory = vertexConfig.getResourceSpec().getDirectMemory();
            if (targetDirectMemory == 0) {
                targetDirectMemory = 1;
            }
            if (utility * this.scaleDownRatio < 1.0) {
                targetDirectMemory = (int)Math.ceil((double)targetDirectMemory * utility * this.scaleDownRatio);
            }
            results.put(vertexID, targetDirectMemory);
            LOGGER.debug("Scale down, target direct memory for vertex {} is {}.", (Object)vertexID, (Object)targetDirectMemory);
        }
        return results;
    }
}

