package org.apache.flink.runtime.healthmanager;

import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.healthmanager.metrics.HealthManagerMetricGroup;
import org.apache.flink.runtime.healthmanager.metrics.MetricProvider;
import org.apache.flink.runtime.healthmanager.metrics.RestServerMetricProvider;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/healthmanager/HealthManager.class */
public class HealthManager {
    private static final Logger LOGGER = LoggerFactory.getLogger(HealthManager.class);
    private static final ConfigOption<Long> JOB_CHECK_INTERNAL = ConfigOptions.key("healthmanager.job.check.interval.ms").defaultValue(10000L);
    private Configuration config;
    private MetricProvider metricProvider;
    private RestServerClient restServerClient;
    private ScheduledFuture timedTaskHandler;
    private HealthManagerMetricGroup metricGroup;
    private Map<JobID, HealthMonitor> jobMonitors = new HashMap();
    private ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(4, new ExecutorThreadFactory("health-manager"));

    /* loaded from: input_file:org/apache/flink/runtime/healthmanager/HealthManager$RunningJobListChecker.class */
    private class RunningJobListChecker implements Runnable {
        private RunningJobListChecker() {
        }

        @Override // java.lang.Runnable
        public void run() {
            HashMap hashMap = new HashMap();
            try {
                HealthManager.this.restServerClient.listJob().stream().filter(jobStatusMessage -> {
                    return !jobStatusMessage.getJobState().isGloballyTerminalState();
                }).forEach(jobStatusMessage2 -> {
                });
                try {
                    for (JobID jobID : hashMap.keySet()) {
                        if (!HealthManager.this.jobMonitors.containsKey(jobID)) {
                            HealthManager.LOGGER.info("New job submitted, id:" + jobID);
                            HealthMonitor healthMonitor = new HealthMonitor(jobID, HealthManager.this.metricProvider, HealthManager.this.restServerClient, HealthManager.this.metricGroup.addJob(jobID, (String) hashMap.get(jobID)), HealthManager.this.executorService, HealthManager.this.config);
                            try {
                                healthMonitor.start();
                                HealthManager.this.jobMonitors.put(jobID, healthMonitor);
                            } catch (Exception e) {
                                HealthManager.LOGGER.info("Fail to start monitor for job:" + jobID, e);
                            }
                        }
                    }
                    LinkedList linkedList = new LinkedList();
                    for (JobID jobID2 : HealthManager.this.jobMonitors.keySet()) {
                        if (!hashMap.containsKey(jobID2)) {
                            HealthManager.LOGGER.info("New job finished or failed, id:" + jobID2);
                            linkedList.add(jobID2);
                            HealthManager.this.metricGroup.removeJob(jobID2);
                        }
                    }
                    Iterator it = linkedList.iterator();
                    while (it.hasNext()) {
                        ((HealthMonitor) HealthManager.this.jobMonitors.remove((JobID) it.next())).stop();
                    }
                } catch (Throwable th) {
                    HealthManager.LOGGER.warn("Exception caught in job checker", th);
                }
            } catch (Throwable th2) {
                HealthManager.LOGGER.warn("Wait rest server to be ready", th2);
            }
        }
    }

    public HealthManager(String str, MetricRegistry metricRegistry, Configuration configuration) throws Exception {
        this.config = configuration;
        this.metricGroup = new HealthManagerMetricGroup(metricRegistry);
        LOGGER.info("Starting Health manager with rest server:" + str);
        this.restServerClient = new RestServerClientImpl(str, configuration, this.executorService);
        this.metricProvider = new RestServerMetricProvider(configuration, this.restServerClient, this.executorService);
    }

    public void start() {
        LOGGER.info("Starting health manager.");
        long j = this.config.getLong(JOB_CHECK_INTERNAL);
        if (j > 0) {
            this.timedTaskHandler = this.executorService.scheduleAtFixedRate(new RunningJobListChecker(), 0L, j, TimeUnit.MILLISECONDS);
        }
        this.metricProvider.open();
    }

    public void stop() {
        LOGGER.info("Stopping health manager.");
        if (this.timedTaskHandler != null) {
            this.timedTaskHandler.cancel(true);
        }
        this.metricProvider.close();
        Iterator<HealthMonitor> it = this.jobMonitors.values().iterator();
        while (it.hasNext()) {
            it.next().stop();
        }
        this.jobMonitors.clear();
        this.executorService.shutdown();
        if (this.metricGroup != null) {
            this.metricGroup.close();
        }
    }
}
