package org.apache.flink.runtime.healthmanager.plugins.utils;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.healthmanager.HealthMonitor;
import org.apache.flink.runtime.healthmanager.RestServerClient;
import org.apache.flink.runtime.healthmanager.metrics.MetricAggType;
import org.apache.flink.runtime.healthmanager.metrics.MetricProvider;
import org.apache.flink.runtime.healthmanager.metrics.TaskMetricSubscription;
import org.apache.flink.runtime.healthmanager.metrics.timeline.TimelineAggType;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/healthmanager/plugins/utils/TaskMetricsSubscriber.class */
public class TaskMetricsSubscriber {
    private static final Logger LOGGER = LoggerFactory.getLogger(TaskMetricsSubscriber.class);
    private Map<JobVertexID, TaskMetricSubscription> inputTpsSubs;
    private Map<JobVertexID, TaskMetricSubscription> maxTpsPerMinuteSubs;
    private Map<JobVertexID, TaskMetricSubscription> outputTpsSubs;
    private Map<JobVertexID, TaskMetricSubscription> timerCountSubs;
    private Map<JobVertexID, TaskMetricSubscription> taskLatencyCountRangeSubs;
    private Map<JobVertexID, TaskMetricSubscription> taskLatencySumRangeSubs;
    private Map<JobVertexID, TaskMetricSubscription> sourceLatencyCountRangeSubs;
    private Map<JobVertexID, TaskMetricSubscription> sourceLatencySumRangeSubs;
    private Map<JobVertexID, TaskMetricSubscription> sourceProcessLatencyCountRangeSubs;
    private Map<JobVertexID, TaskMetricSubscription> sourceProcessLatencySumRangeSubs;
    private Map<JobVertexID, TaskMetricSubscription> waitOutputCountRangeSubs;
    private Map<JobVertexID, TaskMetricSubscription> waitOutputSumRangeSubs;
    private Map<JobVertexID, TaskMetricSubscription> sourcePartitionCountSubs;
    private Map<JobVertexID, TaskMetricSubscription> sourcePartitionLatencyCountRangeSubs;
    private Map<JobVertexID, TaskMetricSubscription> sourcePartitionLatencySumRangeSubs;
    private Map<JobVertexID, TaskMetricSubscription> sourceDelayRateSubs;
    private HealthMonitor monitor;
    private long interval;

    public TaskMetricsSubscriber(HealthMonitor healthMonitor, long j) {
        this.monitor = healthMonitor;
        this.interval = j;
    }

    public void open() {
        this.inputTpsSubs = new HashMap();
        this.maxTpsPerMinuteSubs = new HashMap();
        this.outputTpsSubs = new HashMap();
        this.timerCountSubs = new HashMap();
        this.taskLatencyCountRangeSubs = new HashMap();
        this.taskLatencySumRangeSubs = new HashMap();
        this.sourceLatencyCountRangeSubs = new HashMap();
        this.sourceLatencySumRangeSubs = new HashMap();
        this.sourceProcessLatencyCountRangeSubs = new HashMap();
        this.sourceProcessLatencySumRangeSubs = new HashMap();
        this.waitOutputCountRangeSubs = new HashMap();
        this.waitOutputSumRangeSubs = new HashMap();
        this.sourcePartitionCountSubs = new HashMap();
        this.sourcePartitionLatencyCountRangeSubs = new HashMap();
        this.sourcePartitionLatencySumRangeSubs = new HashMap();
        this.sourceDelayRateSubs = new HashMap();
        RestServerClient.JobConfig jobConfig = this.monitor.getJobConfig();
        JobID jobID = this.monitor.getJobID();
        MetricProvider metricProvider = this.monitor.getMetricProvider();
        for (JobVertexID jobVertexID : jobConfig.getVertexConfigs().keySet()) {
            this.inputTpsSubs.put(jobVertexID, metricProvider.subscribeTaskMetric(jobID, jobVertexID, "numRecordsReceived", MetricAggType.SUM, this.interval, TimelineAggType.RATE));
            this.maxTpsPerMinuteSubs.put(jobVertexID, metricProvider.subscribeTaskMetric(jobID, jobVertexID, "numRecordsReceived", MetricAggType.SUM, (this.interval / 60000) * 60000, TimelineAggType.MAX, 60000L, TimelineAggType.RATE));
            this.outputTpsSubs.put(jobVertexID, metricProvider.subscribeTaskMetric(jobID, jobVertexID, "numRecordsSent", MetricAggType.SUM, this.interval, TimelineAggType.RATE));
            this.timerCountSubs.put(jobVertexID, metricProvider.subscribeTaskMetric(jobID, jobVertexID, MetricNames.TASK_TIMER_COUNT, MetricAggType.SUM, 1L, TimelineAggType.LATEST));
            if (this.monitor.getJobTopologyAnalyzer().isSource(jobVertexID)) {
                this.sourceLatencyCountRangeSubs.put(jobVertexID, metricProvider.subscribeTaskMetric(jobID, jobVertexID, MetricNames.SOURCE_LATENCY_COUNT, MetricAggType.SUM, this.interval, TimelineAggType.LATEST));
                this.sourceLatencySumRangeSubs.put(jobVertexID, metricProvider.subscribeTaskMetric(jobID, jobVertexID, MetricNames.SOURCE_LATENCY_SUM, MetricAggType.SUM, this.interval, TimelineAggType.LATEST));
                this.sourcePartitionCountSubs.put(jobVertexID, metricProvider.subscribeTaskMetric(jobID, jobVertexID, "partitionCount", MetricAggType.SUM, this.interval, TimelineAggType.LATEST));
                this.sourcePartitionLatencyCountRangeSubs.put(jobVertexID, metricProvider.subscribeTaskMetric(jobID, jobVertexID, MetricNames.SOURCE_PARTITION_LATENCY_COUNT, MetricAggType.SUM, this.interval, TimelineAggType.RANGE));
                this.sourcePartitionLatencySumRangeSubs.put(jobVertexID, metricProvider.subscribeTaskMetric(jobID, jobVertexID, MetricNames.SOURCE_PARTITION_LATENCY_SUM, MetricAggType.SUM, this.interval, TimelineAggType.RANGE));
                this.sourceProcessLatencyCountRangeSubs.put(jobVertexID, metricProvider.subscribeTaskMetric(jobID, jobVertexID, MetricNames.SOURCE_PROCESS_LATENCY_COUNT, MetricAggType.SUM, this.interval, TimelineAggType.LATEST));
                this.sourceProcessLatencySumRangeSubs.put(jobVertexID, metricProvider.subscribeTaskMetric(jobID, jobVertexID, MetricNames.SOURCE_PROCESS_LATENCY_SUM, MetricAggType.SUM, this.interval, TimelineAggType.LATEST));
                this.sourceDelayRateSubs.put(jobVertexID, metricProvider.subscribeTaskMetric(jobID, jobVertexID, "fetched_delay", MetricAggType.AVG, this.interval, TimelineAggType.RATE));
            }
            this.taskLatencyCountRangeSubs.put(jobVertexID, metricProvider.subscribeTaskMetric(jobID, jobVertexID, MetricNames.TASK_LATENCY_COUNT, MetricAggType.SUM, this.interval, TimelineAggType.LATEST));
            this.taskLatencySumRangeSubs.put(jobVertexID, metricProvider.subscribeTaskMetric(jobID, jobVertexID, MetricNames.TASK_LATENCY_SUM, MetricAggType.SUM, this.interval, TimelineAggType.LATEST));
            this.waitOutputCountRangeSubs.put(jobVertexID, metricProvider.subscribeTaskMetric(jobID, jobVertexID, MetricNames.WAIT_OUTPUT_COUNT, MetricAggType.SUM, this.interval, TimelineAggType.LATEST));
            this.waitOutputSumRangeSubs.put(jobVertexID, metricProvider.subscribeTaskMetric(jobID, jobVertexID, MetricNames.WAIT_OUTPUT_SUM, MetricAggType.SUM, this.interval, TimelineAggType.LATEST));
        }
    }

    public void close() {
        MetricProvider metricProvider;
        if (this.monitor == null || (metricProvider = this.monitor.getMetricProvider()) == null) {
            return;
        }
        if (this.inputTpsSubs != null) {
            Iterator<TaskMetricSubscription> it = this.inputTpsSubs.values().iterator();
            while (it.hasNext()) {
                metricProvider.unsubscribe(it.next());
            }
        }
        if (this.outputTpsSubs != null) {
            Iterator<TaskMetricSubscription> it2 = this.outputTpsSubs.values().iterator();
            while (it2.hasNext()) {
                metricProvider.unsubscribe(it2.next());
            }
        }
        if (this.timerCountSubs != null) {
            Iterator<TaskMetricSubscription> it3 = this.timerCountSubs.values().iterator();
            while (it3.hasNext()) {
                metricProvider.unsubscribe(it3.next());
            }
        }
        if (this.sourceLatencyCountRangeSubs != null) {
            Iterator<TaskMetricSubscription> it4 = this.sourceLatencyCountRangeSubs.values().iterator();
            while (it4.hasNext()) {
                metricProvider.unsubscribe(it4.next());
            }
        }
        if (this.sourceLatencySumRangeSubs != null) {
            Iterator<TaskMetricSubscription> it5 = this.sourceLatencySumRangeSubs.values().iterator();
            while (it5.hasNext()) {
                metricProvider.unsubscribe(it5.next());
            }
        }
        if (this.sourcePartitionCountSubs != null) {
            Iterator<TaskMetricSubscription> it6 = this.sourcePartitionCountSubs.values().iterator();
            while (it6.hasNext()) {
                metricProvider.unsubscribe(it6.next());
            }
        }
        if (this.sourcePartitionLatencyCountRangeSubs != null) {
            Iterator<TaskMetricSubscription> it7 = this.sourcePartitionLatencyCountRangeSubs.values().iterator();
            while (it7.hasNext()) {
                metricProvider.unsubscribe(it7.next());
            }
        }
        if (this.sourcePartitionLatencySumRangeSubs != null) {
            Iterator<TaskMetricSubscription> it8 = this.sourcePartitionLatencySumRangeSubs.values().iterator();
            while (it8.hasNext()) {
                metricProvider.unsubscribe(it8.next());
            }
        }
        if (this.taskLatencyCountRangeSubs != null) {
            Iterator<TaskMetricSubscription> it9 = this.taskLatencyCountRangeSubs.values().iterator();
            while (it9.hasNext()) {
                metricProvider.unsubscribe(it9.next());
            }
        }
        if (this.taskLatencySumRangeSubs != null) {
            Iterator<TaskMetricSubscription> it10 = this.taskLatencySumRangeSubs.values().iterator();
            while (it10.hasNext()) {
                metricProvider.unsubscribe(it10.next());
            }
        }
        if (this.waitOutputCountRangeSubs != null) {
            Iterator<TaskMetricSubscription> it11 = this.waitOutputCountRangeSubs.values().iterator();
            while (it11.hasNext()) {
                metricProvider.unsubscribe(it11.next());
            }
        }
        if (this.waitOutputSumRangeSubs != null) {
            Iterator<TaskMetricSubscription> it12 = this.waitOutputSumRangeSubs.values().iterator();
            while (it12.hasNext()) {
                metricProvider.unsubscribe(it12.next());
            }
        }
    }

    public Map<JobVertexID, TaskMetrics> getTaskMetrics() {
        System.currentTimeMillis();
        RestServerClient.JobConfig jobConfig = this.monitor.getJobConfig();
        if (jobConfig == null) {
            return null;
        }
        HashMap hashMap = new HashMap();
        for (JobVertexID jobVertexID : jobConfig.getVertexConfigs().keySet()) {
            TaskMetricSubscription taskMetricSubscription = this.inputTpsSubs.get(jobVertexID);
            TaskMetricSubscription taskMetricSubscription2 = this.outputTpsSubs.get(jobVertexID);
            TaskMetricSubscription taskMetricSubscription3 = this.timerCountSubs.get(jobVertexID);
            TaskMetricSubscription taskMetricSubscription4 = this.sourceLatencyCountRangeSubs.get(jobVertexID);
            TaskMetricSubscription taskMetricSubscription5 = this.sourceLatencySumRangeSubs.get(jobVertexID);
            TaskMetricSubscription taskMetricSubscription6 = this.sourceProcessLatencyCountRangeSubs.get(jobVertexID);
            TaskMetricSubscription taskMetricSubscription7 = this.sourceProcessLatencySumRangeSubs.get(jobVertexID);
            TaskMetricSubscription taskMetricSubscription8 = this.taskLatencyCountRangeSubs.get(jobVertexID);
            TaskMetricSubscription taskMetricSubscription9 = this.taskLatencySumRangeSubs.get(jobVertexID);
            TaskMetricSubscription taskMetricSubscription10 = this.waitOutputCountRangeSubs.get(jobVertexID);
            TaskMetricSubscription taskMetricSubscription11 = this.waitOutputSumRangeSubs.get(jobVertexID);
            TaskMetricSubscription taskMetricSubscription12 = this.sourcePartitionCountSubs.get(jobVertexID);
            TaskMetricSubscription taskMetricSubscription13 = this.sourcePartitionLatencyCountRangeSubs.get(jobVertexID);
            TaskMetricSubscription taskMetricSubscription14 = this.sourcePartitionLatencySumRangeSubs.get(jobVertexID);
            TaskMetricSubscription taskMetricSubscription15 = this.sourceDelayRateSubs.get(jobVertexID);
            TaskMetricSubscription taskMetricSubscription16 = this.maxTpsPerMinuteSubs.get(jobVertexID);
            if (!MetricUtils.validateTaskMetric(this.monitor, this.interval * 2, taskMetricSubscription, taskMetricSubscription8, taskMetricSubscription9, taskMetricSubscription3)) {
                LOGGER.debug("input metric missing " + jobVertexID);
                LOGGER.debug("input tps " + taskMetricSubscription.getValue() + ", task latency count range " + taskMetricSubscription8.getValue() + ", task latency sum range " + taskMetricSubscription9.getValue() + ", timer count " + taskMetricSubscription3.getValue());
                return null;
            }
            if (!this.monitor.getJobTopologyAnalyzer().isSource(jobVertexID) && !MetricUtils.validateTaskMetric(this.monitor, this.interval * 2, taskMetricSubscription2, taskMetricSubscription10, taskMetricSubscription11)) {
                LOGGER.debug("output metric missing " + jobVertexID);
                LOGGER.debug("output tps " + taskMetricSubscription2.getValue() + "wait output count range " + taskMetricSubscription10.getValue() + "wait output sum range " + taskMetricSubscription11.getValue());
                return null;
            }
            if (this.monitor.getJobTopologyAnalyzer().isSource(jobVertexID) && !MetricUtils.validateTaskMetric(this.monitor, this.interval * 2, taskMetricSubscription4, taskMetricSubscription5)) {
                LOGGER.debug("input metric missing for source " + jobVertexID);
                LOGGER.debug("source latency count range " + taskMetricSubscription4.getValue() + "source latency sum range " + taskMetricSubscription5.getValue());
                return null;
            }
            boolean z = false;
            if (this.monitor.getJobTopologyAnalyzer().isSource(jobVertexID)) {
                z = MetricUtils.validateTaskMetric(this.monitor, this.interval * 2, taskMetricSubscription12, taskMetricSubscription13, taskMetricSubscription14) && ((Double) taskMetricSubscription12.getValue().f1).doubleValue() > 0.0d;
                LOGGER.debug("Treat vertex {} as {} reader.", jobVertexID, z ? "parallel" : "non-parallel");
                LOGGER.debug("source partition count " + taskMetricSubscription12.getValue() + " source partition latency count range " + taskMetricSubscription13.getValue() + " source partition latency sum range " + taskMetricSubscription14.getValue());
            }
            double doubleValue = ((Double) taskMetricSubscription.getValue().f1).doubleValue();
            double doubleValue2 = ((Double) taskMetricSubscription2.getValue().f1).doubleValue();
            double d = doubleValue;
            if (taskMetricSubscription16 != null && taskMetricSubscription16.getValue() != null) {
                d = ((Double) taskMetricSubscription16.getValue().f1).doubleValue();
            }
            double doubleValue3 = ((Double) taskMetricSubscription3.getValue().f1).doubleValue();
            double doubleValue4 = ((Double) taskMetricSubscription8.getValue().f1).doubleValue();
            double doubleValue5 = doubleValue4 <= 0.0d ? 0.0d : (((Double) taskMetricSubscription9.getValue().f1).doubleValue() / doubleValue4) / 1.0E9d;
            double d2 = 0.0d;
            if (this.monitor.getJobTopologyAnalyzer().isSource(jobVertexID)) {
                double doubleValue6 = ((Double) taskMetricSubscription4.getValue().f1).doubleValue();
                d2 = doubleValue6 <= 0.0d ? 0.0d : (((Double) taskMetricSubscription5.getValue().f1).doubleValue() / doubleValue6) / 1.0E9d;
            }
            double d3 = 0.0d;
            if (!this.monitor.getJobTopologyAnalyzer().isSink(jobVertexID)) {
                double doubleValue7 = ((Double) taskMetricSubscription10.getValue().f1).doubleValue();
                d3 = doubleValue7 <= 0.0d ? 0.0d : (((Double) taskMetricSubscription11.getValue().f1).doubleValue() / doubleValue7) / 1.0E9d;
            }
            double d4 = doubleValue <= 0.0d ? 0.0d : (d3 * doubleValue2) / doubleValue;
            double d5 = 0.0d;
            double d6 = 0.0d;
            if (z) {
                double doubleValue8 = ((Double) taskMetricSubscription6.getValue().f1).doubleValue();
                doubleValue5 = doubleValue8 <= 0.0d ? 0.0d : (((Double) taskMetricSubscription7.getValue().f1).doubleValue() / doubleValue8) / 1.0E9d;
                double doubleValue9 = ((Double) taskMetricSubscription13.getValue().f1).doubleValue();
                d5 = doubleValue9 <= 0.0d ? 0.0d : (((Double) taskMetricSubscription14.getValue().f1).doubleValue() / doubleValue9) / 1.0E9d;
                d6 = ((Double) taskMetricSubscription12.getValue().f1).doubleValue();
            }
            double d7 = (doubleValue5 - d4) * d;
            double d8 = 0.0d;
            if (taskMetricSubscription15 != null && taskMetricSubscription15.getValue() != null) {
                d8 = ((Double) taskMetricSubscription15.getValue().f1).doubleValue() / 1000.0d;
            }
            hashMap.put(jobVertexID, new TaskMetrics(jobVertexID, z, doubleValue, d, doubleValue2, doubleValue3, doubleValue5, d2, d4, d7, d8, d5, d6));
        }
        return hashMap;
    }
}
