/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.healthmanager;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.healthmanager.RestServerClient;
import org.apache.flink.runtime.healthmanager.metrics.HealthMonitorMetricGroup;
import org.apache.flink.runtime.healthmanager.metrics.MetricProvider;
import org.apache.flink.runtime.healthmanager.plugins.Action;
import org.apache.flink.runtime.healthmanager.plugins.ActionSelector;
import org.apache.flink.runtime.healthmanager.plugins.Detector;
import org.apache.flink.runtime.healthmanager.plugins.Resolver;
import org.apache.flink.runtime.healthmanager.plugins.Symptom;
import org.apache.flink.runtime.healthmanager.plugins.actionselectors.RescaleResourcePriorActionSelector;
import org.apache.flink.runtime.healthmanager.plugins.detectors.DelayIncreasingDetector;
import org.apache.flink.runtime.healthmanager.plugins.detectors.DirectOOMDetector;
import org.apache.flink.runtime.healthmanager.plugins.detectors.FailoverDetector;
import org.apache.flink.runtime.healthmanager.plugins.detectors.FrequentFullGCDetector;
import org.apache.flink.runtime.healthmanager.plugins.detectors.HeapOOMDetector;
import org.apache.flink.runtime.healthmanager.plugins.detectors.HighCpuDetector;
import org.apache.flink.runtime.healthmanager.plugins.detectors.HighDelayDetector;
import org.apache.flink.runtime.healthmanager.plugins.detectors.HighNativeMemoryDetector;
import org.apache.flink.runtime.healthmanager.plugins.detectors.JobStableDetector;
import org.apache.flink.runtime.healthmanager.plugins.detectors.JobStuckDetector;
import org.apache.flink.runtime.healthmanager.plugins.detectors.KilledDueToMemoryExceedDetector;
import org.apache.flink.runtime.healthmanager.plugins.detectors.LargeTimerCountDetector;
import org.apache.flink.runtime.healthmanager.plugins.detectors.LongTimeFullGCDetector;
import org.apache.flink.runtime.healthmanager.plugins.detectors.LowCpuDetector;
import org.apache.flink.runtime.healthmanager.plugins.detectors.LowMemoryDetector;
import org.apache.flink.runtime.healthmanager.plugins.detectors.OverParallelizedDetector;
import org.apache.flink.runtime.healthmanager.plugins.resolvers.CpuAdjuster;
import org.apache.flink.runtime.healthmanager.plugins.resolvers.DirectMemoryAdjuster;
import org.apache.flink.runtime.healthmanager.plugins.resolvers.HeapMemoryAdjuster;
import org.apache.flink.runtime.healthmanager.plugins.resolvers.NativeMemoryAdjuster;
import org.apache.flink.runtime.healthmanager.plugins.resolvers.ParallelismScaler;
import org.apache.flink.runtime.healthmanager.plugins.utils.HealthMonitorOptions;
import org.apache.flink.runtime.healthmanager.plugins.utils.JobTopologyAnalyzer;
import org.apache.flink.runtime.healthmanager.plugins.utils.MetricUtils;
import org.apache.flink.runtime.healthmanager.plugins.utils.TaskMetricsSubscriber;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HealthMonitor {
    private static final Logger LOGGER = LoggerFactory.getLogger(HealthMonitor.class);
    public static final ConfigOption<Boolean> HEALTH_MONITOR_ENABLED = ConfigOptions.key((String)"healthmonitor.enabled").defaultValue((Object)true);
    public static final ConfigOption<Long> HEALTH_MONITOR_STANDALONE_CHECK_INTERVAL = ConfigOptions.key((String)"healthmonitor.standalone.check.interval.ms").defaultValue((Object)10000L);
    public static final ConfigOption<Long> HEALTH_CHECK_INTERNAL = ConfigOptions.key((String)"healthmonitor.health.check.interval.ms").defaultValue((Object)10000L);
    public static final ConfigOption<String> ACTION_SELECTOR_CLASS = ConfigOptions.key((String)"healthmonitor.action.selector.class").defaultValue((Object)RescaleResourcePriorActionSelector.class.getCanonicalName());
    public static final ConfigOption<String> DETECTOR_CLASSES = ConfigOptions.key((String)"healthmonitor.detector.classes").noDefaultValue();
    public static final ConfigOption<String> RESOLVER_CLASSES = ConfigOptions.key((String)"healthmonitor.resolver.classes").noDefaultValue();
    public static final ConfigOption<Boolean> HEALTH_MONITOR_STANDALONE_MODE_ENABLED = ConfigOptions.key((String)"healthmonitor.standalone.mode.enabled").defaultValue((Object)false);
    public static final ConfigOption<Boolean> DRY_RUN_ENABLED = ConfigOptions.key((String)"healthmonitor.dry-run.enabled").defaultValue((Object)false);
    private JobID jobID;
    private Configuration config;
    private RestServerClient.JobConfig jobConfig;
    private MetricProvider metricProvider;
    private RestServerClient restServerClient;
    private HealthMonitorMetricGroup metricGroup;
    private ScheduledExecutorService executorService;
    private ScheduledFuture timedTaskHandler;
    private List<Detector> detectors;
    private List<Resolver> resolvers;
    private ActionSelector actionSelector;
    private volatile long jobStartExecutionTime = Long.MAX_VALUE;
    private volatile long successActionCount = 0L;
    private volatile long failedActionCount = 0L;
    private volatile boolean isEnabled;
    private Map<Long, TaskMetricsSubscriber> taskMetricsSubscribes = new HashMap<Long, TaskMetricsSubscriber>();
    private JobTopologyAnalyzer jobTopologyAnalyzer = new JobTopologyAnalyzer();

    @VisibleForTesting
    public HealthMonitor(JobID jobID, MetricProvider metricProvider, RestServerClient visitor, ScheduledExecutorService executorService, Configuration config) {
        this(jobID, metricProvider, visitor, null, executorService, config);
    }

    public HealthMonitor(JobID jobID, MetricProvider metricProvider, RestServerClient visitor, HealthMonitorMetricGroup metricGroup, ScheduledExecutorService executorService, Configuration config) {
        this.jobID = jobID;
        this.executorService = executorService;
        this.metricProvider = metricProvider;
        this.restServerClient = visitor;
        this.config = config.clone();
        this.metricGroup = metricGroup;
    }

    public void start() throws Exception {
        long checkInterval;
        LOGGER.info("Starting to monitor job {}", (Object)this.jobID);
        this.jobTopologyAnalyzer.analyze(this.getJobConfig());
        for (String key : this.getJobConfig().getConfig().keySet()) {
            this.config.setString(key, this.getJobConfig().getConfig().getString(key, null));
        }
        boolean bl = this.isEnabled = this.config.getBoolean(HEALTH_MONITOR_ENABLED) || this.config.getBoolean(HEALTH_MONITOR_STANDALONE_MODE_ENABLED);
        if (this.isEnabled) {
            this.loadPlugins();
        }
        long l = checkInterval = this.config.getBoolean(HEALTH_MONITOR_STANDALONE_MODE_ENABLED) ? this.config.getLong(HEALTH_MONITOR_STANDALONE_CHECK_INTERVAL) : this.config.getLong(HEALTH_CHECK_INTERNAL);
        if (checkInterval > 0L) {
            this.timedTaskHandler = this.executorService.scheduleAtFixedRate(new HealthChecker(), 0L, checkInterval, TimeUnit.MILLISECONDS);
        }
        if (this.metricGroup != null) {
            MetricGroup actionMetrics = this.metricGroup.addGroup("action");
            actionMetrics.gauge("success", (Gauge)new Gauge<Long>(){

                public Long getValue() {
                    return HealthMonitor.this.successActionCount;
                }
            });
            actionMetrics.gauge("failure", (Gauge)new Gauge<Long>(){

                public Long getValue() {
                    return HealthMonitor.this.failedActionCount;
                }
            });
        }
    }

    @VisibleForTesting
    public void loadPlugins() throws ClassNotFoundException, IllegalAccessException, InstantiationException {
        this.closePlugins();
        this.loadDetectors();
        this.loadResolvers();
        this.loadActionSelector();
    }

    public void stop() {
        if (this.timedTaskHandler != null) {
            this.timedTaskHandler.cancel(true);
        }
        this.closePlugins();
    }

    @VisibleForTesting
    public void closePlugins() {
        if (this.actionSelector != null) {
            this.actionSelector.close();
            this.actionSelector = null;
        }
        if (this.detectors != null) {
            for (Detector detector : this.detectors) {
                detector.close();
            }
            this.detectors.clear();
            this.detectors = null;
        }
        if (this.resolvers != null) {
            for (Resolver resolver : this.resolvers) {
                resolver.close();
            }
            this.resolvers.clear();
            this.resolvers = null;
        }
        for (TaskMetricsSubscriber taskMetricsSubscriber : this.taskMetricsSubscribes.values()) {
            taskMetricsSubscriber.close();
        }
        this.taskMetricsSubscribes.clear();
    }

    public TaskMetricsSubscriber subscribeTaskMetrics(long interval) {
        if (this.taskMetricsSubscribes.containsKey(interval)) {
            return this.taskMetricsSubscribes.get(interval);
        }
        TaskMetricsSubscriber subscriber = new TaskMetricsSubscriber(this, interval);
        subscriber.open();
        this.taskMetricsSubscribes.put(interval, subscriber);
        return subscriber;
    }

    private void loadActionSelector() throws ClassNotFoundException, IllegalAccessException, InstantiationException {
        this.actionSelector = (ActionSelector)Class.forName(this.config.getString(ACTION_SELECTOR_CLASS)).newInstance();
        LOGGER.info("Load action selector:" + this.actionSelector);
        this.actionSelector.open(this);
    }

    private void loadDetectors() throws ClassNotFoundException, IllegalAccessException, InstantiationException {
        HashSet<String> detectorClazzs = new HashSet<String>();
        if (this.config.getString(DETECTOR_CLASSES) != null) {
            detectorClazzs.addAll(Arrays.asList(this.config.getString(DETECTOR_CLASSES).split(",")));
        } else {
            if (this.config.getBoolean(HealthMonitorOptions.ENABLE_PARALLELISM_RESCALE)) {
                detectorClazzs.add(HighDelayDetector.class.getCanonicalName());
                detectorClazzs.add(DelayIncreasingDetector.class.getCanonicalName());
                detectorClazzs.add(LargeTimerCountDetector.class.getCanonicalName());
                detectorClazzs.add(OverParallelizedDetector.class.getCanonicalName());
                detectorClazzs.add(JobStableDetector.class.getCanonicalName());
                detectorClazzs.add(JobStuckDetector.class.getCanonicalName());
                detectorClazzs.add(FailoverDetector.class.getCanonicalName());
                detectorClazzs.add(FrequentFullGCDetector.class.getCanonicalName());
                detectorClazzs.add(LongTimeFullGCDetector.class.getCanonicalName());
            }
            if (this.config.getBoolean(HealthMonitorOptions.ENABLE_RESOURCE_RESCALE)) {
                detectorClazzs.add(HighCpuDetector.class.getCanonicalName());
                detectorClazzs.add(LowCpuDetector.class.getCanonicalName());
                detectorClazzs.add(HeapOOMDetector.class.getCanonicalName());
                detectorClazzs.add(FrequentFullGCDetector.class.getCanonicalName());
                detectorClazzs.add(LongTimeFullGCDetector.class.getCanonicalName());
                detectorClazzs.add(DirectOOMDetector.class.getCanonicalName());
                detectorClazzs.add(HighNativeMemoryDetector.class.getCanonicalName());
                detectorClazzs.add(KilledDueToMemoryExceedDetector.class.getCanonicalName());
                detectorClazzs.add(LowMemoryDetector.class.getCanonicalName());
                detectorClazzs.add(JobStableDetector.class.getCanonicalName());
            }
        }
        LOGGER.info("Load detectors:" + StringUtils.join(detectorClazzs, (String)","));
        this.detectors = new ArrayList<Detector>(detectorClazzs.size());
        for (String clazz : detectorClazzs) {
            Detector detector = (Detector)Class.forName(clazz.trim()).newInstance();
            this.detectors.add(detector);
            detector.open(this);
        }
    }

    private void loadResolvers() throws ClassNotFoundException, IllegalAccessException, InstantiationException {
        HashSet<String> resolverClazzs = new HashSet<String>();
        if (this.config.getString(RESOLVER_CLASSES) != null) {
            resolverClazzs.addAll(Arrays.asList(this.config.getString(RESOLVER_CLASSES).split(",")));
        } else {
            if (this.config.getBoolean(HealthMonitorOptions.ENABLE_PARALLELISM_RESCALE)) {
                resolverClazzs.add(ParallelismScaler.class.getCanonicalName());
            }
            if (this.config.getBoolean(HealthMonitorOptions.ENABLE_RESOURCE_RESCALE)) {
                resolverClazzs.add(CpuAdjuster.class.getCanonicalName());
                resolverClazzs.add(HeapMemoryAdjuster.class.getCanonicalName());
                resolverClazzs.add(DirectMemoryAdjuster.class.getCanonicalName());
                resolverClazzs.add(NativeMemoryAdjuster.class.getCanonicalName());
            }
        }
        LOGGER.info("Load resolvers:" + StringUtils.join(resolverClazzs, (String)","));
        this.resolvers = new ArrayList<Resolver>(resolverClazzs.size());
        for (String clazz : resolverClazzs) {
            Resolver resolver = (Resolver)Class.forName(clazz.trim()).newInstance();
            this.resolvers.add(resolver);
            resolver.open(this);
        }
    }

    public JobID getJobID() {
        return this.jobID;
    }

    public MetricProvider getMetricProvider() {
        return this.metricProvider;
    }

    public RestServerClient getRestServerClient() {
        return this.restServerClient;
    }

    public Configuration getConfig() {
        return this.config;
    }

    public ScheduledExecutorService getExecutorService() {
        return this.executorService;
    }

    public RestServerClient.JobConfig getJobConfig() {
        if (this.jobConfig == null) {
            this.jobConfig = this.restServerClient.getJobConfig(this.jobID);
        }
        return this.jobConfig;
    }

    public JobTopologyAnalyzer getJobTopologyAnalyzer() {
        return this.jobTopologyAnalyzer;
    }

    public long getJobStartExecutionTime() {
        if (this.jobStartExecutionTime == Long.MAX_VALUE) {
            this.jobStartExecutionTime = MetricUtils.getStartExecuteTime(this);
        }
        return this.jobStartExecutionTime;
    }

    public class HealthChecker
    implements Runnable {
        @Override
        public void run() {
            try {
                this.check();
                HealthMonitor.this.jobConfig = null;
            }
            catch (Throwable e) {
                LOGGER.warn("Fail to check job status", e);
            }
        }

        public void check() {
            block21: {
                LOGGER.debug("Start to check job {}.", (Object)HealthMonitor.this.jobID);
                Configuration newConfig = HealthMonitor.this.getJobConfig().getConfig();
                if (HealthMonitor.this.isEnabled && !newConfig.getBoolean(HEALTH_MONITOR_ENABLED) && !HealthMonitor.this.config.getBoolean(HEALTH_MONITOR_STANDALONE_MODE_ENABLED)) {
                    LOGGER.info("Disabling health monitor");
                    HealthMonitor.this.closePlugins();
                    HealthMonitor.this.isEnabled = false;
                    return;
                }
                if (!HealthMonitor.this.isEnabled && newConfig.getBoolean(HEALTH_MONITOR_ENABLED)) {
                    try {
                        LOGGER.info("Enabling health monitor");
                        for (Object key : newConfig.keySet()) {
                            HealthMonitor.this.config.setString((String)key, newConfig.getString((String)key, null));
                        }
                        HealthMonitor.this.loadPlugins();
                        HealthMonitor.this.isEnabled = true;
                    }
                    catch (ClassNotFoundException | IllegalAccessException | InstantiationException e) {
                        LOGGER.error("Fail to load plugins", (Throwable)e);
                        return;
                    }
                }
                if (!HealthMonitor.this.isEnabled) {
                    LOGGER.debug("Health monitor disabled.");
                    return;
                }
                LinkedList<Symptom> symptoms = new LinkedList<Symptom>();
                HealthMonitor.this.jobStartExecutionTime = Long.MAX_VALUE;
                for (Object detector : HealthMonitor.this.detectors) {
                    Symptom symptom = null;
                    try {
                        symptom = detector.detect();
                    }
                    catch (Throwable e) {
                        LOGGER.warn("Exception caught in detector " + detector, e);
                    }
                    if (symptom == null) continue;
                    symptoms.add(symptom);
                }
                LOGGER.debug("Detected symptoms: {}.", symptoms);
                LinkedList<Action> actions = new LinkedList<Action>();
                for (Resolver resolver : HealthMonitor.this.resolvers) {
                    try {
                        Action action = resolver.resolve(symptoms);
                        if (action == null) continue;
                        actions.add(action);
                    }
                    catch (Throwable e) {
                        LOGGER.warn("Exception caught in resolver " + resolver, e);
                    }
                }
                LOGGER.debug("Generated actions: {}.", actions);
                if (actions.size() == 0) {
                    return;
                }
                Action action = null;
                try {
                    action = HealthMonitor.this.actionSelector.accept(actions);
                }
                catch (Throwable e) {
                    LOGGER.warn("Exception caught in action selector", e);
                }
                if (action != null) {
                    try {
                        LOGGER.info("Executing action {}, because of symptom: {}", (Object)action, symptoms);
                        if (HealthMonitor.this.config.getBoolean(DRY_RUN_ENABLED)) {
                            return;
                        }
                        action.execute(HealthMonitor.this.restServerClient);
                        if (!action.validate(HealthMonitor.this.metricProvider, HealthMonitor.this.restServerClient)) {
                            LOGGER.info("Action {} validate failed, try to roll back", (Object)action);
                            Action actionToRollback = action.rollback();
                            LOGGER.info("Executing roll back action {}", (Object)actionToRollback);
                            actionToRollback.execute(HealthMonitor.this.restServerClient);
                            HealthMonitor.this.actionSelector.actionFailed(action);
                            HealthMonitor.this.failedActionCount++;
                            break block21;
                        }
                        LOGGER.info("Action {} validate succeed.", (Object)action);
                        HealthMonitor.this.actionSelector.actionSucceed(action);
                        HealthMonitor.this.successActionCount++;
                    }
                    catch (Throwable e) {
                        LOGGER.warn("Action " + action + " execution failed.", e);
                        HealthMonitor.this.actionSelector.actionFailed(action);
                        HealthMonitor.this.failedActionCount++;
                    }
                } else {
                    LOGGER.debug("No Action selected.");
                }
            }
        }
    }
}

