/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.healthmanager.metrics;

import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.healthmanager.RestServerClient;
import org.apache.flink.runtime.healthmanager.metrics.JobTMMetricSubscription;
import org.apache.flink.runtime.healthmanager.metrics.MetricAggType;
import org.apache.flink.runtime.healthmanager.metrics.MetricProvider;
import org.apache.flink.runtime.healthmanager.metrics.MetricSubscription;
import org.apache.flink.runtime.healthmanager.metrics.TaskManagerMetricSubscription;
import org.apache.flink.runtime.healthmanager.metrics.TaskMetricSubscription;
import org.apache.flink.runtime.healthmanager.metrics.timeline.TimelineAggType;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RestServerMetricProvider
implements MetricProvider {
    private static final Logger LOGGER = LoggerFactory.getLogger(RestServerMetricProvider.class);
    private static final ConfigOption<Long> METRIC_FETCH_INTERVAL_OPTION = ConfigOptions.key((String)"metric.provider.fetch.interval.ms").defaultValue((Object)10000L);
    private Configuration config;
    private RestServerClient restServerClient;
    private ScheduledExecutorService scheduledExecutorService;
    private ScheduledFuture fetchTaskHandler;
    private Map<JobID, Map<JobVertexID, Map<String, List<TaskMetricSubscription>>>> taskMetricSubscriptions = new HashMap<JobID, Map<JobVertexID, Map<String, List<TaskMetricSubscription>>>>();
    private Map<JobID, Map<String, List<JobTMMetricSubscription>>> jobTMMetricSubscriptions = new HashMap<JobID, Map<String, List<JobTMMetricSubscription>>>();
    private Map<String, Map<String, List<TaskManagerMetricSubscription>>> tmMetricSubscriptions = new HashMap<String, Map<String, List<TaskManagerMetricSubscription>>>();

    public RestServerMetricProvider(Configuration config, RestServerClient restServerClient, ScheduledExecutorService scheduledExecutorService) {
        this.config = config;
        this.restServerClient = restServerClient;
        this.scheduledExecutorService = scheduledExecutorService;
    }

    @Override
    public void open() {
        long fetchIntervalMS = this.config.getLong(METRIC_FETCH_INTERVAL_OPTION);
        this.fetchTaskHandler = this.scheduledExecutorService.scheduleAtFixedRate(new MetricFetcher(), fetchIntervalMS, fetchIntervalMS, TimeUnit.MILLISECONDS);
    }

    @Override
    public void close() {
        if (this.fetchTaskHandler != null) {
            this.fetchTaskHandler.cancel(true);
        }
    }

    @Override
    public synchronized JobTMMetricSubscription subscribeAllTMMetric(JobID jobID, String metricName, long timeInterval, TimelineAggType timeAggType) {
        JobTMMetricSubscription metricSubscription = new JobTMMetricSubscription(jobID, metricName, timeAggType, timeInterval);
        this.jobTMMetricSubscriptions.computeIfAbsent(jobID, k -> new HashMap()).computeIfAbsent(metricName, k -> new LinkedList()).add(metricSubscription);
        return metricSubscription;
    }

    @Override
    public synchronized TaskManagerMetricSubscription subscribeTaskManagerMetric(String tmId, String metricName, long timeInterval, TimelineAggType timeAggType) {
        TaskManagerMetricSubscription metricSubscription = new TaskManagerMetricSubscription(tmId, metricName, timeAggType, timeInterval);
        this.tmMetricSubscriptions.computeIfAbsent(tmId, k -> new HashMap()).computeIfAbsent(metricName, k -> new LinkedList()).add(metricSubscription);
        return metricSubscription;
    }

    @Override
    public synchronized TaskMetricSubscription subscribeTaskMetric(JobID jobId, JobVertexID vertexId, String metricName, MetricAggType subtaskAggType, long timeInterval, TimelineAggType timeAggType) {
        TaskMetricSubscription metricSubscription = new TaskMetricSubscription(jobId, vertexId, subtaskAggType, metricName, timeAggType, timeInterval);
        this.taskMetricSubscriptions.computeIfAbsent(jobId, k -> new HashMap()).computeIfAbsent(vertexId, k -> new HashMap()).computeIfAbsent(metricName, k -> new LinkedList()).add(metricSubscription);
        return metricSubscription;
    }

    @Override
    public synchronized void unsubscribe(MetricSubscription subscription) {
        Map<String, List<MetricSubscription>> subscriptionByMetricName;
        if (subscription instanceof TaskMetricSubscription) {
            TaskMetricSubscription taskMetricSubscription = (TaskMetricSubscription)subscription;
            Map<JobVertexID, Map<String, List<TaskMetricSubscription>>> subscriptionByJobVertex = this.taskMetricSubscriptions.get(taskMetricSubscription.getJobID());
            Map<String, List<TaskMetricSubscription>> subscriptionByMetricName2 = subscriptionByJobVertex.get((Object)taskMetricSubscription.getJobVertexID());
            List<TaskMetricSubscription> subscriptionsOfOneMetric = subscriptionByMetricName2.get(taskMetricSubscription.getMetricName());
            subscriptionsOfOneMetric.remove(taskMetricSubscription);
            if (subscriptionsOfOneMetric.isEmpty()) {
                subscriptionByMetricName2.remove(taskMetricSubscription.getMetricName());
                if (subscriptionByMetricName2.isEmpty()) {
                    subscriptionByJobVertex.remove((Object)taskMetricSubscription.getJobVertexID());
                    if (subscriptionByJobVertex.isEmpty()) {
                        this.taskMetricSubscriptions.remove(taskMetricSubscription.getJobID());
                    }
                }
            }
        }
        if (subscription instanceof TaskManagerMetricSubscription) {
            TaskManagerMetricSubscription taskManagerMetricSubscription = (TaskManagerMetricSubscription)subscription;
            subscriptionByMetricName = this.tmMetricSubscriptions.get(taskManagerMetricSubscription.getTmId());
            List<TaskManagerMetricSubscription> subscriptionsOfOneMetric = subscriptionByMetricName.get(taskManagerMetricSubscription.getMetricName());
            subscriptionsOfOneMetric.remove(taskManagerMetricSubscription);
            if (subscriptionsOfOneMetric.isEmpty()) {
                subscriptionByMetricName.remove(taskManagerMetricSubscription.getMetricName());
                if (subscriptionByMetricName.isEmpty()) {
                    this.tmMetricSubscriptions.remove(taskManagerMetricSubscription.getTmId());
                }
            }
        }
        if (subscription instanceof JobTMMetricSubscription) {
            JobTMMetricSubscription jobTMMetricSubscription = (JobTMMetricSubscription)subscription;
            subscriptionByMetricName = this.jobTMMetricSubscriptions.get(jobTMMetricSubscription.getJobID());
            List<MetricSubscription> subscriptionOfOneMetric = subscriptionByMetricName.get(jobTMMetricSubscription.getMetricName());
            subscriptionOfOneMetric.remove(jobTMMetricSubscription);
            if (subscriptionOfOneMetric.isEmpty()) {
                subscriptionByMetricName.remove(jobTMMetricSubscription.getMetricName());
                if (subscriptionByMetricName.isEmpty()) {
                    this.jobTMMetricSubscriptions.remove(jobTMMetricSubscription.getJobID());
                }
            }
        }
    }

    private class MetricFetcher
    implements Runnable {
        private MetricFetcher() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            RestServerMetricProvider restServerMetricProvider = RestServerMetricProvider.this;
            synchronized (restServerMetricProvider) {
                try {
                    Map<String, Map<String, Tuple2<Long, Double>>> values;
                    for (Map.Entry jobEntry : RestServerMetricProvider.this.taskMetricSubscriptions.entrySet()) {
                        for (Map.Entry vertexEntry : ((Map)jobEntry.getValue()).entrySet()) {
                            Map<String, Map<Integer, Tuple2<Long, Double>>> values2 = RestServerMetricProvider.this.restServerClient.getTaskMetrics((JobID)jobEntry.getKey(), (JobVertexID)((Object)vertexEntry.getKey()), ((Map)vertexEntry.getValue()).keySet());
                            for (Map.Entry metricEntry : ((Map)vertexEntry.getValue()).entrySet()) {
                                for (TaskMetricSubscription subscription : (List)metricEntry.getValue()) {
                                    if (!values2.containsKey(metricEntry.getKey())) continue;
                                    subscription.addValue(values2.get(metricEntry.getKey()));
                                }
                            }
                        }
                    }
                    for (Map.Entry jobEntry : RestServerMetricProvider.this.jobTMMetricSubscriptions.entrySet()) {
                        values = RestServerMetricProvider.this.restServerClient.getTaskManagerMetrics((JobID)jobEntry.getKey(), ((Map)jobEntry.getValue()).keySet());
                        for (Map.Entry metricEntry : ((Map)jobEntry.getValue()).entrySet()) {
                            for (MetricSubscription subscription : (List)metricEntry.getValue()) {
                                if (!values.containsKey(metricEntry.getKey())) continue;
                                ((JobTMMetricSubscription)subscription).addValue(values.get(metricEntry.getKey()));
                            }
                        }
                    }
                    for (Map.Entry tmEntry : RestServerMetricProvider.this.tmMetricSubscriptions.entrySet()) {
                        values = RestServerMetricProvider.this.restServerClient.getTaskManagerMetrics(Collections.singleton(tmEntry.getKey()), ((Map)tmEntry.getValue()).keySet());
                        for (Map.Entry metricEntry : ((Map)tmEntry.getValue()).entrySet()) {
                            for (MetricSubscription subscription : (List)metricEntry.getValue()) {
                                if (!values.containsKey(metricEntry.getKey())) continue;
                                ((TaskManagerMetricSubscription)subscription).addValue(values.get(metricEntry.getKey()).get(tmEntry.getKey()));
                            }
                        }
                    }
                }
                catch (Throwable e) {
                    LOGGER.warn("Fail to fetch metrics", e);
                }
            }
        }
    }
}

