/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.healthmanager.plugins.detectors;

import java.util.HashMap;
import java.util.List;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.runtime.healthmanager.HealthMonitor;
import org.apache.flink.runtime.healthmanager.RestServerClient;
import org.apache.flink.runtime.healthmanager.metrics.JobTMMetricSubscription;
import org.apache.flink.runtime.healthmanager.metrics.MetricProvider;
import org.apache.flink.runtime.healthmanager.metrics.timeline.TimelineAggType;
import org.apache.flink.runtime.healthmanager.plugins.Detector;
import org.apache.flink.runtime.healthmanager.plugins.Symptom;
import org.apache.flink.runtime.healthmanager.plugins.symptoms.JobVertexHighNativeMemory;
import org.apache.flink.runtime.healthmanager.plugins.utils.HealthMonitorOptions;
import org.apache.flink.runtime.healthmanager.plugins.utils.MetricUtils;
import org.apache.flink.runtime.jobgraph.ExecutionVertexID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HighNativeMemoryDetector
implements Detector {
    private static final Logger LOGGER = LoggerFactory.getLogger(HighNativeMemoryDetector.class);
    public static final ConfigOption<Double> HIGH_NATIVE_MEM_THRESHOLD = ConfigOptions.key((String)"healthmonitor.high-native-mem-detector.threshold").defaultValue((Object)1.0);
    public static final ConfigOption<Double> HIGH_NATIVE_MEM_SEVERE_THRESHOLD = ConfigOptions.key((String)"healthmonitor.high-native-mem-detector.severe.threshold").defaultValue((Object)1.2);
    public static final ConfigOption<Double> HIGH_NATIVE_MEM_CRITICAL_THRESHOLD = ConfigOptions.key((String)"healthmonitor.high-native-mem-detector.critical.threshold").defaultValue((Object)Double.MAX_VALUE);
    private JobID jobID;
    private RestServerClient restServerClient;
    private MetricProvider metricProvider;
    private HealthMonitor monitor;
    private long overuseCheckInterval;
    private double threshold;
    private double severeThreshold;
    private double criticalThreshold;
    private JobTMMetricSubscription tmMemCapacitySubscription;
    private JobTMMetricSubscription tmMemUsageTotalSubscription;
    private JobTMMetricSubscription tmMemUsageHeapSubscription;
    private JobTMMetricSubscription tmMemUsageNonHeapSubscription;

    @Override
    public void open(HealthMonitor monitor) {
        this.monitor = monitor;
        this.jobID = monitor.getJobID();
        this.restServerClient = monitor.getRestServerClient();
        this.metricProvider = monitor.getMetricProvider();
        this.overuseCheckInterval = monitor.getConfig().getLong(HealthMonitorOptions.RESOURCE_SCALE_INTERVAL);
        this.threshold = monitor.getConfig().getDouble(HIGH_NATIVE_MEM_THRESHOLD);
        this.severeThreshold = monitor.getConfig().getDouble(HIGH_NATIVE_MEM_SEVERE_THRESHOLD);
        this.criticalThreshold = monitor.getConfig().getDouble(HIGH_NATIVE_MEM_CRITICAL_THRESHOLD);
        this.tmMemCapacitySubscription = this.metricProvider.subscribeAllTMMetric(this.jobID, "Status.ProcessTree.Memory.Allocated", this.overuseCheckInterval, TimelineAggType.MAX);
        this.tmMemUsageTotalSubscription = this.metricProvider.subscribeAllTMMetric(this.jobID, "Status.ProcessTree.Memory.RSS", this.overuseCheckInterval, TimelineAggType.MAX);
        this.tmMemUsageHeapSubscription = this.metricProvider.subscribeAllTMMetric(this.jobID, "Status.JVM.Memory.Heap.Committed", this.overuseCheckInterval, TimelineAggType.MAX);
        this.tmMemUsageNonHeapSubscription = this.metricProvider.subscribeAllTMMetric(this.jobID, "Status.JVM.Memory.NonHeap.Committed", this.overuseCheckInterval, TimelineAggType.MAX);
    }

    @Override
    public void close() {
        if (this.metricProvider != null && this.tmMemCapacitySubscription != null) {
            this.metricProvider.unsubscribe(this.tmMemCapacitySubscription);
        }
        if (this.metricProvider != null && this.tmMemUsageHeapSubscription != null) {
            this.metricProvider.unsubscribe(this.tmMemUsageTotalSubscription);
        }
        if (this.metricProvider != null && this.tmMemUsageHeapSubscription != null) {
            this.metricProvider.unsubscribe(this.tmMemUsageHeapSubscription);
        }
        if (this.metricProvider != null && this.tmMemUsageNonHeapSubscription != null) {
            this.metricProvider.unsubscribe(this.tmMemUsageNonHeapSubscription);
        }
    }

    @Override
    public Symptom detect() throws Exception {
        LOGGER.debug("Start detecting.");
        Object tmCapacities = this.tmMemCapacitySubscription.getValue();
        Object tmTotalUsages = this.tmMemUsageTotalSubscription.getValue();
        Object tmHeapUsages = this.tmMemUsageHeapSubscription.getValue();
        Object tmNonHeapUsages = this.tmMemUsageNonHeapSubscription.getValue();
        RestServerClient.JobConfig jobConfig = this.monitor.getJobConfig();
        if (tmCapacities == null || tmCapacities.isEmpty() || tmTotalUsages == null || tmTotalUsages.isEmpty() || tmHeapUsages == null || tmHeapUsages.isEmpty() || tmNonHeapUsages == null || tmNonHeapUsages.isEmpty()) {
            return null;
        }
        boolean severe = false;
        boolean critical = false;
        HashMap<JobVertexID, Double> vertexMaxUtility = new HashMap<JobVertexID, Double>();
        for (String tmId : tmCapacities.keySet()) {
            List<ExecutionVertexID> jobExecutionVertexIds;
            if (!MetricUtils.validateTmMetric(this.monitor, this.overuseCheckInterval * 2L, (Tuple2)tmCapacities.get(tmId), (Tuple2)tmTotalUsages.get(tmId), (Tuple2)tmHeapUsages.get(tmId), (Tuple2)tmNonHeapUsages.get(tmId))) {
                LOGGER.debug("Skip tm {}, metrics missing.", (Object)tmId);
                continue;
            }
            double capacity = (Double)((Tuple2)tmCapacities.get((Object)tmId)).f1;
            double totalUsage = (Double)((Tuple2)tmTotalUsages.get((Object)tmId)).f1;
            double heapUsage = (Double)((Tuple2)tmHeapUsages.get((Object)tmId)).f1;
            double nonHeapUsage = (Double)((Tuple2)tmNonHeapUsages.get((Object)tmId)).f1;
            LOGGER.debug("TM {}, capacity {}, usage total {}, heap {}, non-heap {}.", new Object[]{tmId, capacity, totalUsage, heapUsage, nonHeapUsage});
            if (totalUsage <= capacity * this.threshold) continue;
            if (totalUsage > capacity * this.severeThreshold) {
                severe = true;
            }
            if (totalUsage > capacity * this.criticalThreshold) {
                critical = true;
            }
            if ((jobExecutionVertexIds = this.restServerClient.getTaskManagerTasks(tmId)) == null) continue;
            double nativeUsage = (totalUsage - heapUsage - nonHeapUsage) / 1024.0 / 1024.0;
            if (nativeUsage < 0.0) {
                LOGGER.debug("Skip tm {}, abnormal native usage {}.", (Object)tmId, (Object)nativeUsage);
                continue;
            }
            double nativeCapacity = 0.0;
            for (ExecutionVertexID executionVertexID : jobExecutionVertexIds) {
                JobVertexID jobVertexID = executionVertexID.getJobVertexID();
                nativeCapacity += (double)jobConfig.getVertexConfigs().get((Object)jobVertexID).getResourceSpec().getNativeMemory();
            }
            if (nativeCapacity == 0.0) {
                nativeCapacity = 1.0;
            }
            double utility = nativeUsage / nativeCapacity;
            for (ExecutionVertexID executionVertexID : jobExecutionVertexIds) {
                JobVertexID jobVertexID = executionVertexID.getJobVertexID();
                if (vertexMaxUtility.containsKey((Object)jobVertexID) && !((Double)vertexMaxUtility.get((Object)jobVertexID) < utility)) continue;
                vertexMaxUtility.put(jobVertexID, utility);
            }
        }
        if (vertexMaxUtility != null && !vertexMaxUtility.isEmpty()) {
            LOGGER.info("Native memory high detected for vertices with max utility {}.", vertexMaxUtility);
            return new JobVertexHighNativeMemory(this.jobID, vertexMaxUtility, severe, critical);
        }
        return null;
    }
}

