/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.healthmanager;

import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.healthmanager.HealthMonitor;
import org.apache.flink.runtime.healthmanager.RestServerClient;
import org.apache.flink.runtime.healthmanager.RestServerClientImpl;
import org.apache.flink.runtime.healthmanager.metrics.HealthManagerMetricGroup;
import org.apache.flink.runtime.healthmanager.metrics.MetricProvider;
import org.apache.flink.runtime.healthmanager.metrics.RestServerMetricProvider;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HealthManager {
    private static final Logger LOGGER = LoggerFactory.getLogger(HealthManager.class);
    private static final ConfigOption<Long> JOB_CHECK_INTERNAL = ConfigOptions.key((String)"healthmanager.job.check.interval.ms").defaultValue((Object)10000L);
    private Configuration config;
    private ScheduledExecutorService executorService;
    private MetricProvider metricProvider;
    private RestServerClient restServerClient;
    private Map<JobID, HealthMonitor> jobMonitors = new HashMap<JobID, HealthMonitor>();
    private ScheduledFuture timedTaskHandler;
    private HealthManagerMetricGroup metricGroup;

    public HealthManager(String restServerAddress, MetricRegistry metricRegistry, Configuration config) throws Exception {
        this.config = config;
        this.executorService = new ScheduledThreadPoolExecutor(4, new ExecutorThreadFactory("health-manager"));
        this.metricGroup = new HealthManagerMetricGroup(metricRegistry);
        LOGGER.info("Starting Health manager with rest server:" + restServerAddress);
        this.restServerClient = new RestServerClientImpl(restServerAddress, config, this.executorService);
        this.metricProvider = new RestServerMetricProvider(config, this.restServerClient, this.executorService);
    }

    public void start() {
        LOGGER.info("Starting health manager.");
        long interval = this.config.getLong(JOB_CHECK_INTERNAL);
        if (interval > 0L) {
            this.timedTaskHandler = this.executorService.scheduleAtFixedRate(new RunningJobListChecker(), 0L, interval, TimeUnit.MILLISECONDS);
        }
        this.metricProvider.open();
    }

    public void stop() {
        LOGGER.info("Stopping health manager.");
        if (this.timedTaskHandler != null) {
            this.timedTaskHandler.cancel(true);
        }
        this.metricProvider.close();
        for (HealthMonitor monitor : this.jobMonitors.values()) {
            monitor.stop();
        }
        this.jobMonitors.clear();
        this.executorService.shutdown();
        if (this.metricGroup != null) {
            this.metricGroup.close();
        }
    }

    private class RunningJobListChecker
    implements Runnable {
        private RunningJobListChecker() {
        }

        @Override
        public void run() {
            HashMap runningIds = new HashMap();
            try {
                HealthManager.this.restServerClient.listJob().stream().filter(status -> !status.getJobState().isGloballyTerminalState()).forEach(status -> runningIds.put(status.getJobId(), status.getJobName()));
            }
            catch (Throwable e) {
                LOGGER.warn("Wait rest server to be ready", e);
                return;
            }
            try {
                for (JobID id : runningIds.keySet()) {
                    if (HealthManager.this.jobMonitors.containsKey(id)) continue;
                    LOGGER.info("New job submitted, id:" + id);
                    HealthMonitor newMonitor = new HealthMonitor(id, HealthManager.this.metricProvider, HealthManager.this.restServerClient, HealthManager.this.metricGroup.addJob(id, (String)runningIds.get(id)), HealthManager.this.executorService, HealthManager.this.config);
                    try {
                        newMonitor.start();
                    }
                    catch (Exception e) {
                        LOGGER.info("Fail to start monitor for job:" + id, (Throwable)e);
                        continue;
                    }
                    HealthManager.this.jobMonitors.put(id, newMonitor);
                }
                LinkedList<JobID> finishedJob = new LinkedList<JobID>();
                for (JobID id : HealthManager.this.jobMonitors.keySet()) {
                    if (runningIds.containsKey(id)) continue;
                    LOGGER.info("New job finished or failed, id:" + id);
                    finishedJob.add(id);
                    HealthManager.this.metricGroup.removeJob(id);
                }
                for (JobID id : finishedJob) {
                    ((HealthMonitor)HealthManager.this.jobMonitors.remove(id)).stop();
                }
            }
            catch (Throwable e) {
                LOGGER.warn("Exception caught in job checker", e);
            }
        }
    }
}

