package org.apache.flink.runtime.healthmanager.metrics;

import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.healthmanager.RestServerClient;
import org.apache.flink.runtime.healthmanager.metrics.timeline.TimelineAggType;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/healthmanager/metrics/RestServerMetricProvider.class */
public class RestServerMetricProvider implements MetricProvider {
    private static final Logger LOGGER = LoggerFactory.getLogger(RestServerMetricProvider.class);
    private static final ConfigOption<Long> METRIC_FETCH_INTERVAL_OPTION = ConfigOptions.key("metric.provider.fetch.interval.ms").defaultValue(10000L);
    private Configuration config;
    private RestServerClient restServerClient;
    private ScheduledExecutorService scheduledExecutorService;
    private ScheduledFuture fetchTaskHandler;
    private Map<JobID, Map<JobVertexID, Map<String, List<TaskMetricSubscription>>>> taskMetricSubscriptions = new HashMap();
    private Map<JobID, Map<String, List<JobTMMetricSubscription>>> jobTMMetricSubscriptions = new HashMap();
    private Map<String, Map<String, List<TaskManagerMetricSubscription>>> tmMetricSubscriptions = new HashMap();

    /* loaded from: input_file:org/apache/flink/runtime/healthmanager/metrics/RestServerMetricProvider$MetricFetcher.class */
    private class MetricFetcher implements Runnable {
        private MetricFetcher() {
        }

        @Override // java.lang.Runnable
        public void run() {
            synchronized (RestServerMetricProvider.this) {
                try {
                    for (Map.Entry entry : RestServerMetricProvider.this.taskMetricSubscriptions.entrySet()) {
                        for (Map.Entry entry2 : ((Map) entry.getValue()).entrySet()) {
                            Map<String, Map<Integer, Tuple2<Long, Double>>> taskMetrics = RestServerMetricProvider.this.restServerClient.getTaskMetrics((JobID) entry.getKey(), (JobVertexID) entry2.getKey(), ((Map) entry2.getValue()).keySet());
                            for (Map.Entry entry3 : ((Map) entry2.getValue()).entrySet()) {
                                for (TaskMetricSubscription taskMetricSubscription : (List) entry3.getValue()) {
                                    if (taskMetrics.containsKey(entry3.getKey())) {
                                        taskMetricSubscription.addValue(taskMetrics.get(entry3.getKey()));
                                    }
                                }
                            }
                        }
                    }
                    for (Map.Entry entry4 : RestServerMetricProvider.this.jobTMMetricSubscriptions.entrySet()) {
                        Map<String, Map<String, Tuple2<Long, Double>>> taskManagerMetrics = RestServerMetricProvider.this.restServerClient.getTaskManagerMetrics((JobID) entry4.getKey(), ((Map) entry4.getValue()).keySet());
                        for (Map.Entry entry5 : ((Map) entry4.getValue()).entrySet()) {
                            for (JobTMMetricSubscription jobTMMetricSubscription : (List) entry5.getValue()) {
                                if (taskManagerMetrics.containsKey(entry5.getKey())) {
                                    jobTMMetricSubscription.addValue(taskManagerMetrics.get(entry5.getKey()));
                                }
                            }
                        }
                    }
                    for (Map.Entry entry6 : RestServerMetricProvider.this.tmMetricSubscriptions.entrySet()) {
                        Map<String, Map<String, Tuple2<Long, Double>>> taskManagerMetrics2 = RestServerMetricProvider.this.restServerClient.getTaskManagerMetrics(Collections.singleton(entry6.getKey()), ((Map) entry6.getValue()).keySet());
                        for (Map.Entry entry7 : ((Map) entry6.getValue()).entrySet()) {
                            for (TaskManagerMetricSubscription taskManagerMetricSubscription : (List) entry7.getValue()) {
                                if (taskManagerMetrics2.containsKey(entry7.getKey())) {
                                    taskManagerMetricSubscription.addValue(taskManagerMetrics2.get(entry7.getKey()).get(entry6.getKey()));
                                }
                            }
                        }
                    }
                } catch (Throwable th) {
                    RestServerMetricProvider.LOGGER.warn("Fail to fetch metrics", th);
                }
            }
        }
    }

    public RestServerMetricProvider(Configuration configuration, RestServerClient restServerClient, ScheduledExecutorService scheduledExecutorService) {
        this.config = configuration;
        this.restServerClient = restServerClient;
        this.scheduledExecutorService = scheduledExecutorService;
    }

    @Override // org.apache.flink.runtime.healthmanager.metrics.MetricProvider
    public void open() {
        long j = this.config.getLong(METRIC_FETCH_INTERVAL_OPTION);
        this.fetchTaskHandler = this.scheduledExecutorService.scheduleAtFixedRate(new MetricFetcher(), j, j, TimeUnit.MILLISECONDS);
    }

    @Override // org.apache.flink.runtime.healthmanager.metrics.MetricProvider
    public void close() {
        if (this.fetchTaskHandler != null) {
            this.fetchTaskHandler.cancel(true);
        }
    }

    @Override // org.apache.flink.runtime.healthmanager.metrics.MetricProvider
    public synchronized JobTMMetricSubscription subscribeAllTMMetric(JobID jobID, String str, long j, TimelineAggType timelineAggType) {
        JobTMMetricSubscription jobTMMetricSubscription = new JobTMMetricSubscription(jobID, str, timelineAggType, j);
        this.jobTMMetricSubscriptions.computeIfAbsent(jobID, jobID2 -> {
            return new HashMap();
        }).computeIfAbsent(str, str2 -> {
            return new LinkedList();
        }).add(jobTMMetricSubscription);
        return jobTMMetricSubscription;
    }

    @Override // org.apache.flink.runtime.healthmanager.metrics.MetricProvider
    public synchronized TaskManagerMetricSubscription subscribeTaskManagerMetric(String str, String str2, long j, TimelineAggType timelineAggType) {
        TaskManagerMetricSubscription taskManagerMetricSubscription = new TaskManagerMetricSubscription(str, str2, timelineAggType, j);
        this.tmMetricSubscriptions.computeIfAbsent(str, str3 -> {
            return new HashMap();
        }).computeIfAbsent(str2, str4 -> {
            return new LinkedList();
        }).add(taskManagerMetricSubscription);
        return taskManagerMetricSubscription;
    }

    @Override // org.apache.flink.runtime.healthmanager.metrics.MetricProvider
    public synchronized TaskMetricSubscription subscribeTaskMetric(JobID jobID, JobVertexID jobVertexID, String str, MetricAggType metricAggType, long j, TimelineAggType timelineAggType) {
        TaskMetricSubscription taskMetricSubscription = new TaskMetricSubscription(jobID, jobVertexID, metricAggType, str, timelineAggType, j);
        this.taskMetricSubscriptions.computeIfAbsent(jobID, jobID2 -> {
            return new HashMap();
        }).computeIfAbsent(jobVertexID, jobVertexID2 -> {
            return new HashMap();
        }).computeIfAbsent(str, str2 -> {
            return new LinkedList();
        }).add(taskMetricSubscription);
        return taskMetricSubscription;
    }

    @Override // org.apache.flink.runtime.healthmanager.metrics.MetricProvider
    public synchronized void unsubscribe(MetricSubscription metricSubscription) {
        if (metricSubscription instanceof TaskMetricSubscription) {
            TaskMetricSubscription taskMetricSubscription = (TaskMetricSubscription) metricSubscription;
            Map<JobVertexID, Map<String, List<TaskMetricSubscription>>> map = this.taskMetricSubscriptions.get(taskMetricSubscription.getJobID());
            Map<String, List<TaskMetricSubscription>> map2 = map.get(taskMetricSubscription.getJobVertexID());
            List<TaskMetricSubscription> list = map2.get(taskMetricSubscription.getMetricName());
            list.remove(taskMetricSubscription);
            if (list.isEmpty()) {
                map2.remove(taskMetricSubscription.getMetricName());
                if (map2.isEmpty()) {
                    map.remove(taskMetricSubscription.getJobVertexID());
                    if (map.isEmpty()) {
                        this.taskMetricSubscriptions.remove(taskMetricSubscription.getJobID());
                    }
                }
            }
        }
        if (metricSubscription instanceof TaskManagerMetricSubscription) {
            TaskManagerMetricSubscription taskManagerMetricSubscription = (TaskManagerMetricSubscription) metricSubscription;
            Map<String, List<TaskManagerMetricSubscription>> map3 = this.tmMetricSubscriptions.get(taskManagerMetricSubscription.getTmId());
            List<TaskManagerMetricSubscription> list2 = map3.get(taskManagerMetricSubscription.getMetricName());
            list2.remove(taskManagerMetricSubscription);
            if (list2.isEmpty()) {
                map3.remove(taskManagerMetricSubscription.getMetricName());
                if (map3.isEmpty()) {
                    this.tmMetricSubscriptions.remove(taskManagerMetricSubscription.getTmId());
                }
            }
        }
        if (metricSubscription instanceof JobTMMetricSubscription) {
            JobTMMetricSubscription jobTMMetricSubscription = (JobTMMetricSubscription) metricSubscription;
            Map<String, List<JobTMMetricSubscription>> map4 = this.jobTMMetricSubscriptions.get(jobTMMetricSubscription.getJobID());
            List<JobTMMetricSubscription> list3 = map4.get(jobTMMetricSubscription.getMetricName());
            list3.remove(jobTMMetricSubscription);
            if (list3.isEmpty()) {
                map4.remove(jobTMMetricSubscription.getMetricName());
                if (map4.isEmpty()) {
                    this.jobTMMetricSubscriptions.remove(jobTMMetricSubscription.getJobID());
                }
            }
        }
    }
}
