package org.apache.flink.runtime.healthmanager.plugins.resolvers;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.runtime.healthmanager.HealthMonitor;
import org.apache.flink.runtime.healthmanager.RestServerClient;
import org.apache.flink.runtime.healthmanager.plugins.Action;
import org.apache.flink.runtime.healthmanager.plugins.Resolver;
import org.apache.flink.runtime.healthmanager.plugins.Symptom;
import org.apache.flink.runtime.healthmanager.plugins.actions.AdjustJobDirectMemory;
import org.apache.flink.runtime.healthmanager.plugins.symptoms.JobStable;
import org.apache.flink.runtime.healthmanager.plugins.symptoms.JobVertexDirectOOM;
import org.apache.flink.runtime.healthmanager.plugins.symptoms.JobVertexLowMemory;
import org.apache.flink.runtime.healthmanager.plugins.utils.HealthMonitorOptions;
import org.apache.flink.runtime.healthmanager.plugins.utils.MaxResourceLimitUtil;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatistics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/healthmanager/plugins/resolvers/DirectMemoryAdjuster.class */
public class DirectMemoryAdjuster implements Resolver {
    private static final Logger LOGGER = LoggerFactory.getLogger(DirectMemoryAdjuster.class);
    private JobID jobID;
    private HealthMonitor monitor;
    private double scaleUpRatio;
    private double scaleDownRatio;
    private long timeout;
    private long opportunisticActionDelay;
    private long stableTime;
    private long checkpointIntervalThreshold;
    private double maxCpuLimit;
    private int maxMemoryLimit;
    private long opportunisticActionDelayStart;
    private JobStable jobStable;
    private JobVertexDirectOOM jobVertexDirectOOM;
    private JobVertexLowMemory jobVertexLowMemory;

    @Override // org.apache.flink.runtime.healthmanager.plugins.Resolver
    public void open(HealthMonitor healthMonitor) {
        this.monitor = healthMonitor;
        this.jobID = healthMonitor.getJobID();
        this.scaleUpRatio = healthMonitor.getConfig().getDouble(HealthMonitorOptions.RESOURCE_MEMORY_SCALE_UP_RATIO);
        this.scaleDownRatio = healthMonitor.getConfig().getDouble(HealthMonitorOptions.RESOURCE_MEMORY_SCALE_DOWN_RATIO);
        this.timeout = healthMonitor.getConfig().getLong(HealthMonitorOptions.RESOURCE_SCALE_TIME_OUT);
        this.opportunisticActionDelay = healthMonitor.getConfig().getLong(HealthMonitorOptions.RESOURCE_OPPORTUNISTIC_ACTION_DELAY);
        this.stableTime = healthMonitor.getConfig().getLong(HealthMonitorOptions.RESOURCE_SCALE_STABLE_TIME);
        this.checkpointIntervalThreshold = healthMonitor.getConfig().getLong(HealthMonitorOptions.PARALLELISM_SCALE_CHECKPOINT_THRESHOLD);
        this.maxCpuLimit = MaxResourceLimitUtil.getMaxCpu(healthMonitor.getConfig());
        this.maxMemoryLimit = MaxResourceLimitUtil.getMaxMem(healthMonitor.getConfig());
        this.opportunisticActionDelayStart = -1L;
    }

    @Override // org.apache.flink.runtime.healthmanager.plugins.Resolver
    public void close() {
    }

    @Override // org.apache.flink.runtime.healthmanager.plugins.Resolver
    public Action resolve(List<Symptom> list) {
        LOGGER.debug("Start resolving.");
        if (this.opportunisticActionDelayStart < this.monitor.getJobStartExecutionTime()) {
            this.opportunisticActionDelayStart = -1L;
        }
        if (!diagnose(list)) {
            return null;
        }
        HashMap hashMap = new HashMap();
        RestServerClient.JobConfig jobConfig = this.monitor.getJobConfig();
        if (this.jobVertexLowMemory != null) {
            hashMap.putAll(scaleDownVertexDirectMem(jobConfig));
        }
        if (this.jobVertexDirectOOM != null) {
            hashMap.putAll(scaleUpVertexDirectMem(jobConfig));
        }
        if (hashMap.isEmpty()) {
            return null;
        }
        AdjustJobDirectMemory adjustJobDirectMemory = new AdjustJobDirectMemory(this.jobID, this.timeout);
        for (Map.Entry entry : hashMap.entrySet()) {
            JobVertexID jobVertexID = (JobVertexID) entry.getKey();
            int intValue = ((Integer) entry.getValue()).intValue();
            RestServerClient.VertexConfig vertexConfig = jobConfig.getVertexConfigs().get(jobVertexID);
            ResourceSpec resourceSpec = vertexConfig.getResourceSpec();
            adjustJobDirectMemory.addVertex(jobVertexID, vertexConfig.getParallelism(), vertexConfig.getParallelism(), resourceSpec, new ResourceSpec.Builder(resourceSpec).setDirectMemoryInMB(intValue).build());
        }
        if (this.maxCpuLimit != Double.MAX_VALUE || this.maxMemoryLimit != Integer.MAX_VALUE) {
            RestServerClient.JobConfig appliedJobConfig = adjustJobDirectMemory.getAppliedJobConfig(jobConfig);
            double jobTotalCpuCores = appliedJobConfig.getJobTotalCpuCores();
            int jobTotalMemoryMb = appliedJobConfig.getJobTotalMemoryMb();
            if (jobTotalCpuCores > this.maxCpuLimit || jobTotalMemoryMb > this.maxMemoryLimit) {
                LOGGER.debug("Give up adjusting: total resource of target job config <cpu, mem>=<{}, {}> exceed max limit <cpu, mem>=<{}, {}>.", new Object[]{Double.valueOf(jobTotalCpuCores), Integer.valueOf(jobTotalMemoryMb), Double.valueOf(this.maxCpuLimit), Integer.valueOf(this.maxMemoryLimit)});
                return null;
            }
        }
        adjustJobDirectMemory.exculdeMinorDiffVertices(this.monitor.getConfig());
        if (adjustJobDirectMemory.isEmpty()) {
            return null;
        }
        long j = 0;
        try {
            CheckpointStatistics latestCheckPointStates = this.monitor.getRestServerClient().getLatestCheckPointStates(this.monitor.getJobID());
            if (latestCheckPointStates != null) {
                j = latestCheckPointStates.getLatestAckTimestamp();
            }
        } catch (Exception e) {
        }
        long currentTimeMillis = System.currentTimeMillis();
        if (this.jobVertexDirectOOM != null || this.jobVertexLowMemory != null) {
            adjustJobDirectMemory.setActionMode(Action.ActionMode.IMMEDIATE);
        } else if (this.opportunisticActionDelayStart <= 0 || currentTimeMillis - this.opportunisticActionDelayStart <= this.opportunisticActionDelay || currentTimeMillis - j >= this.checkpointIntervalThreshold) {
            if (this.opportunisticActionDelayStart < 0) {
                this.opportunisticActionDelayStart = currentTimeMillis;
            }
            adjustJobDirectMemory.setActionMode(Action.ActionMode.OPPORTUNISTIC);
        } else {
            LOGGER.debug("Upgrade opportunistic action to immediate action.");
            adjustJobDirectMemory.setActionMode(Action.ActionMode.IMMEDIATE);
        }
        LOGGER.info("AdjustJobDirectMemory action generated: {}.", adjustJobDirectMemory);
        return adjustJobDirectMemory;
    }

    @VisibleForTesting
    public boolean diagnose(List<Symptom> list) {
        this.jobStable = null;
        this.jobVertexDirectOOM = null;
        this.jobVertexLowMemory = null;
        for (Symptom symptom : list) {
            if (symptom instanceof JobStable) {
                this.jobStable = (JobStable) symptom;
            } else if (symptom instanceof JobVertexDirectOOM) {
                this.jobVertexDirectOOM = (JobVertexDirectOOM) symptom;
            } else if (symptom instanceof JobVertexLowMemory) {
                this.jobVertexLowMemory = (JobVertexLowMemory) symptom;
            }
        }
        if (this.jobVertexDirectOOM != null) {
            LOGGER.debug("Direct OOM detected, should rescale.");
            return true;
        }
        if (this.jobStable == null || this.jobStable.getStableTime() < this.stableTime) {
            LOGGER.debug("Job unstable, should not rescale.");
            return false;
        }
        if (this.jobVertexLowMemory != null) {
            return true;
        }
        LOGGER.debug("No need to rescale.");
        return false;
    }

    @VisibleForTesting
    public Map<JobVertexID, Integer> scaleUpVertexDirectMem(RestServerClient.JobConfig jobConfig) {
        if (this.jobVertexDirectOOM == null) {
            return Collections.emptyMap();
        }
        HashMap hashMap = new HashMap();
        for (JobVertexID jobVertexID : this.jobVertexDirectOOM.getJobVertexIDs()) {
            int ceil = (int) (jobConfig.getVertexConfigs().get(jobVertexID).getResourceSpec().getDirectMemory() == 0 ? Math.ceil(1.0d * this.scaleUpRatio) : Math.ceil(r0.getDirectMemory() * this.scaleUpRatio));
            hashMap.put(jobVertexID, Integer.valueOf(ceil));
            LOGGER.debug("Scale up, target direct memory for vertex {} is {}.", jobVertexID, Integer.valueOf(ceil));
        }
        return hashMap;
    }

    @VisibleForTesting
    public Map<JobVertexID, Integer> scaleDownVertexDirectMem(RestServerClient.JobConfig jobConfig) {
        if (this.jobVertexLowMemory == null) {
            return Collections.emptyMap();
        }
        HashMap hashMap = new HashMap();
        for (Map.Entry<JobVertexID, Double> entry : this.jobVertexLowMemory.getNonHeapUtilities().entrySet()) {
            JobVertexID key = entry.getKey();
            double doubleValue = entry.getValue().doubleValue();
            int directMemory = jobConfig.getVertexConfigs().get(key).getResourceSpec().getDirectMemory();
            if (directMemory == 0) {
                directMemory = 1;
            }
            if (doubleValue * this.scaleDownRatio < 1.0d) {
                directMemory = (int) Math.ceil(directMemory * doubleValue * this.scaleDownRatio);
            }
            hashMap.put(key, Integer.valueOf(directMemory));
            LOGGER.debug("Scale down, target direct memory for vertex {} is {}.", key, Integer.valueOf(directMemory));
        }
        return hashMap;
    }
}
