/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.healthmanager.plugins.utils;

import java.util.HashMap;
import java.util.Map;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.healthmanager.HealthMonitor;
import org.apache.flink.runtime.healthmanager.RestServerClient;
import org.apache.flink.runtime.healthmanager.metrics.MetricAggType;
import org.apache.flink.runtime.healthmanager.metrics.MetricProvider;
import org.apache.flink.runtime.healthmanager.metrics.TaskMetricSubscription;
import org.apache.flink.runtime.healthmanager.metrics.timeline.TimelineAggType;
import org.apache.flink.runtime.healthmanager.plugins.utils.MetricUtils;
import org.apache.flink.runtime.healthmanager.plugins.utils.TaskMetrics;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TaskMetricsSubscriber {
    private static final Logger LOGGER = LoggerFactory.getLogger(TaskMetricsSubscriber.class);
    private Map<JobVertexID, TaskMetricSubscription> inputTpsSubs;
    private Map<JobVertexID, TaskMetricSubscription> maxTpsPerMinuteSubs;
    private Map<JobVertexID, TaskMetricSubscription> outputTpsSubs;
    private Map<JobVertexID, TaskMetricSubscription> timerCountSubs;
    private Map<JobVertexID, TaskMetricSubscription> taskLatencyCountRangeSubs;
    private Map<JobVertexID, TaskMetricSubscription> taskLatencySumRangeSubs;
    private Map<JobVertexID, TaskMetricSubscription> sourceLatencyCountRangeSubs;
    private Map<JobVertexID, TaskMetricSubscription> sourceLatencySumRangeSubs;
    private Map<JobVertexID, TaskMetricSubscription> sourceProcessLatencyCountRangeSubs;
    private Map<JobVertexID, TaskMetricSubscription> sourceProcessLatencySumRangeSubs;
    private Map<JobVertexID, TaskMetricSubscription> waitOutputCountRangeSubs;
    private Map<JobVertexID, TaskMetricSubscription> waitOutputSumRangeSubs;
    private Map<JobVertexID, TaskMetricSubscription> sourcePartitionCountSubs;
    private Map<JobVertexID, TaskMetricSubscription> sourcePartitionLatencyCountRangeSubs;
    private Map<JobVertexID, TaskMetricSubscription> sourcePartitionLatencySumRangeSubs;
    private Map<JobVertexID, TaskMetricSubscription> sourceDelayRateSubs;
    private HealthMonitor monitor;
    private long interval;

    public TaskMetricsSubscriber(HealthMonitor monitor, long interval) {
        this.monitor = monitor;
        this.interval = interval;
    }

    public void open() {
        this.inputTpsSubs = new HashMap<JobVertexID, TaskMetricSubscription>();
        this.maxTpsPerMinuteSubs = new HashMap<JobVertexID, TaskMetricSubscription>();
        this.outputTpsSubs = new HashMap<JobVertexID, TaskMetricSubscription>();
        this.timerCountSubs = new HashMap<JobVertexID, TaskMetricSubscription>();
        this.taskLatencyCountRangeSubs = new HashMap<JobVertexID, TaskMetricSubscription>();
        this.taskLatencySumRangeSubs = new HashMap<JobVertexID, TaskMetricSubscription>();
        this.sourceLatencyCountRangeSubs = new HashMap<JobVertexID, TaskMetricSubscription>();
        this.sourceLatencySumRangeSubs = new HashMap<JobVertexID, TaskMetricSubscription>();
        this.sourceProcessLatencyCountRangeSubs = new HashMap<JobVertexID, TaskMetricSubscription>();
        this.sourceProcessLatencySumRangeSubs = new HashMap<JobVertexID, TaskMetricSubscription>();
        this.waitOutputCountRangeSubs = new HashMap<JobVertexID, TaskMetricSubscription>();
        this.waitOutputSumRangeSubs = new HashMap<JobVertexID, TaskMetricSubscription>();
        this.sourcePartitionCountSubs = new HashMap<JobVertexID, TaskMetricSubscription>();
        this.sourcePartitionLatencyCountRangeSubs = new HashMap<JobVertexID, TaskMetricSubscription>();
        this.sourcePartitionLatencySumRangeSubs = new HashMap<JobVertexID, TaskMetricSubscription>();
        this.sourceDelayRateSubs = new HashMap<JobVertexID, TaskMetricSubscription>();
        RestServerClient.JobConfig jobConfig = this.monitor.getJobConfig();
        JobID jobID = this.monitor.getJobID();
        MetricProvider metricProvider = this.monitor.getMetricProvider();
        for (JobVertexID vertexId : jobConfig.getVertexConfigs().keySet()) {
            TaskMetricSubscription inputTpsSub = metricProvider.subscribeTaskMetric(jobID, vertexId, "numRecordsReceived", MetricAggType.SUM, this.interval, TimelineAggType.RATE);
            this.inputTpsSubs.put(vertexId, inputTpsSub);
            TaskMetricSubscription tpsPerMinuteSub = metricProvider.subscribeTaskMetric(jobID, vertexId, "numRecordsReceived", MetricAggType.SUM, this.interval / 60000L * 60000L, TimelineAggType.MAX, 60000L, TimelineAggType.RATE);
            this.maxTpsPerMinuteSubs.put(vertexId, tpsPerMinuteSub);
            TaskMetricSubscription outputTps = metricProvider.subscribeTaskMetric(jobID, vertexId, "numRecordsSent", MetricAggType.SUM, this.interval, TimelineAggType.RATE);
            this.outputTpsSubs.put(vertexId, outputTps);
            TaskMetricSubscription timerCount = metricProvider.subscribeTaskMetric(jobID, vertexId, "TimerCount.Total", MetricAggType.SUM, 1L, TimelineAggType.LATEST);
            this.timerCountSubs.put(vertexId, timerCount);
            if (this.monitor.getJobTopologyAnalyzer().isSource(vertexId)) {
                this.sourceLatencyCountRangeSubs.put(vertexId, metricProvider.subscribeTaskMetric(jobID, vertexId, "sourceLatency.count", MetricAggType.SUM, this.interval, TimelineAggType.LATEST));
                this.sourceLatencySumRangeSubs.put(vertexId, metricProvider.subscribeTaskMetric(jobID, vertexId, "sourceLatency.sum", MetricAggType.SUM, this.interval, TimelineAggType.LATEST));
                this.sourcePartitionCountSubs.put(vertexId, metricProvider.subscribeTaskMetric(jobID, vertexId, "partitionCount", MetricAggType.SUM, this.interval, TimelineAggType.LATEST));
                this.sourcePartitionLatencyCountRangeSubs.put(vertexId, metricProvider.subscribeTaskMetric(jobID, vertexId, "partitionLatency.count", MetricAggType.SUM, this.interval, TimelineAggType.RANGE));
                this.sourcePartitionLatencySumRangeSubs.put(vertexId, metricProvider.subscribeTaskMetric(jobID, vertexId, "partitionLatency.sum", MetricAggType.SUM, this.interval, TimelineAggType.RANGE));
                this.sourceProcessLatencyCountRangeSubs.put(vertexId, metricProvider.subscribeTaskMetric(jobID, vertexId, "sourceProcessLatency.count", MetricAggType.SUM, this.interval, TimelineAggType.LATEST));
                this.sourceProcessLatencySumRangeSubs.put(vertexId, metricProvider.subscribeTaskMetric(jobID, vertexId, "sourceProcessLatency.sum", MetricAggType.SUM, this.interval, TimelineAggType.LATEST));
                this.sourceDelayRateSubs.put(vertexId, metricProvider.subscribeTaskMetric(jobID, vertexId, "fetched_delay", MetricAggType.AVG, this.interval, TimelineAggType.RATE));
            }
            TaskMetricSubscription latencyCountRangeSub = metricProvider.subscribeTaskMetric(jobID, vertexId, "taskLatency.count", MetricAggType.SUM, this.interval, TimelineAggType.LATEST);
            this.taskLatencyCountRangeSubs.put(vertexId, latencyCountRangeSub);
            TaskMetricSubscription latencySumRangeSub = metricProvider.subscribeTaskMetric(jobID, vertexId, "taskLatency.sum", MetricAggType.SUM, this.interval, TimelineAggType.LATEST);
            this.taskLatencySumRangeSubs.put(vertexId, latencySumRangeSub);
            TaskMetricSubscription waitOutputCountRangeSub = metricProvider.subscribeTaskMetric(jobID, vertexId, "waitOutput.count", MetricAggType.SUM, this.interval, TimelineAggType.LATEST);
            this.waitOutputCountRangeSubs.put(vertexId, waitOutputCountRangeSub);
            TaskMetricSubscription waitOutputSumRangeSub = metricProvider.subscribeTaskMetric(jobID, vertexId, "waitOutput.sum", MetricAggType.SUM, this.interval, TimelineAggType.LATEST);
            this.waitOutputSumRangeSubs.put(vertexId, waitOutputSumRangeSub);
        }
    }

    public void close() {
        if (this.monitor == null) {
            return;
        }
        MetricProvider metricProvider = this.monitor.getMetricProvider();
        if (metricProvider == null) {
            return;
        }
        if (this.inputTpsSubs != null) {
            for (TaskMetricSubscription sub : this.inputTpsSubs.values()) {
                metricProvider.unsubscribe(sub);
            }
        }
        if (this.outputTpsSubs != null) {
            for (TaskMetricSubscription sub : this.outputTpsSubs.values()) {
                metricProvider.unsubscribe(sub);
            }
        }
        if (this.timerCountSubs != null) {
            for (TaskMetricSubscription sub : this.timerCountSubs.values()) {
                metricProvider.unsubscribe(sub);
            }
        }
        if (this.sourceLatencyCountRangeSubs != null) {
            for (TaskMetricSubscription sub : this.sourceLatencyCountRangeSubs.values()) {
                metricProvider.unsubscribe(sub);
            }
        }
        if (this.sourceLatencySumRangeSubs != null) {
            for (TaskMetricSubscription sub : this.sourceLatencySumRangeSubs.values()) {
                metricProvider.unsubscribe(sub);
            }
        }
        if (this.sourcePartitionCountSubs != null) {
            for (TaskMetricSubscription sub : this.sourcePartitionCountSubs.values()) {
                metricProvider.unsubscribe(sub);
            }
        }
        if (this.sourcePartitionLatencyCountRangeSubs != null) {
            for (TaskMetricSubscription sub : this.sourcePartitionLatencyCountRangeSubs.values()) {
                metricProvider.unsubscribe(sub);
            }
        }
        if (this.sourcePartitionLatencySumRangeSubs != null) {
            for (TaskMetricSubscription sub : this.sourcePartitionLatencySumRangeSubs.values()) {
                metricProvider.unsubscribe(sub);
            }
        }
        if (this.taskLatencyCountRangeSubs != null) {
            for (TaskMetricSubscription sub : this.taskLatencyCountRangeSubs.values()) {
                metricProvider.unsubscribe(sub);
            }
        }
        if (this.taskLatencySumRangeSubs != null) {
            for (TaskMetricSubscription sub : this.taskLatencySumRangeSubs.values()) {
                metricProvider.unsubscribe(sub);
            }
        }
        if (this.waitOutputCountRangeSubs != null) {
            for (TaskMetricSubscription sub : this.waitOutputCountRangeSubs.values()) {
                metricProvider.unsubscribe(sub);
            }
        }
        if (this.waitOutputSumRangeSubs != null) {
            for (TaskMetricSubscription sub : this.waitOutputSumRangeSubs.values()) {
                metricProvider.unsubscribe(sub);
            }
        }
    }

    public Map<JobVertexID, TaskMetrics> getTaskMetrics() {
        long now = System.currentTimeMillis();
        RestServerClient.JobConfig jobConfig = this.monitor.getJobConfig();
        if (jobConfig == null) {
            return null;
        }
        HashMap<JobVertexID, TaskMetrics> metrics = new HashMap<JobVertexID, TaskMetrics>();
        for (JobVertexID vertexId : jobConfig.getVertexConfigs().keySet()) {
            TaskMetricSubscription inputTpsSub = this.inputTpsSubs.get((Object)vertexId);
            TaskMetricSubscription outputTpsSub = this.outputTpsSubs.get((Object)vertexId);
            TaskMetricSubscription timerCountSub = this.timerCountSubs.get((Object)vertexId);
            TaskMetricSubscription sourceLatencyCountRangeSub = this.sourceLatencyCountRangeSubs.get((Object)vertexId);
            TaskMetricSubscription sourceLatencySumRangeSub = this.sourceLatencySumRangeSubs.get((Object)vertexId);
            TaskMetricSubscription sourceProcessLatencyCountRangeSub = this.sourceProcessLatencyCountRangeSubs.get((Object)vertexId);
            TaskMetricSubscription sourceProcessLatencySumRangeSub = this.sourceProcessLatencySumRangeSubs.get((Object)vertexId);
            TaskMetricSubscription taskLatencyCountRangeSub = this.taskLatencyCountRangeSubs.get((Object)vertexId);
            TaskMetricSubscription taskLatencySumRangeSub = this.taskLatencySumRangeSubs.get((Object)vertexId);
            TaskMetricSubscription waitOutputCountRangeSub = this.waitOutputCountRangeSubs.get((Object)vertexId);
            TaskMetricSubscription waitOutputSumRangeSub = this.waitOutputSumRangeSubs.get((Object)vertexId);
            TaskMetricSubscription sourcePartitionCountSub = this.sourcePartitionCountSubs.get((Object)vertexId);
            TaskMetricSubscription sourcePartitionLatencyCountRangeSub = this.sourcePartitionLatencyCountRangeSubs.get((Object)vertexId);
            TaskMetricSubscription sourcePartitionLatencySumRangeSub = this.sourcePartitionLatencySumRangeSubs.get((Object)vertexId);
            TaskMetricSubscription sourceDelayIncreasingRateSub = this.sourceDelayRateSubs.get((Object)vertexId);
            TaskMetricSubscription maxTpsPerMinuteSub = this.maxTpsPerMinuteSubs.get((Object)vertexId);
            if (!MetricUtils.validateTaskMetric(this.monitor, this.interval * 2L, inputTpsSub, taskLatencyCountRangeSub, taskLatencySumRangeSub, timerCountSub)) {
                LOGGER.debug("input metric missing " + (Object)((Object)vertexId));
                LOGGER.debug("input tps " + inputTpsSub.getValue() + ", task latency count range " + taskLatencyCountRangeSub.getValue() + ", task latency sum range " + taskLatencySumRangeSub.getValue() + ", timer count " + timerCountSub.getValue());
                return null;
            }
            if (!this.monitor.getJobTopologyAnalyzer().isSource(vertexId) && !MetricUtils.validateTaskMetric(this.monitor, this.interval * 2L, outputTpsSub, waitOutputCountRangeSub, waitOutputSumRangeSub)) {
                LOGGER.debug("output metric missing " + (Object)((Object)vertexId));
                LOGGER.debug("output tps " + outputTpsSub.getValue() + "wait output count range " + waitOutputCountRangeSub.getValue() + "wait output sum range " + waitOutputSumRangeSub.getValue());
                return null;
            }
            if (this.monitor.getJobTopologyAnalyzer().isSource(vertexId) && !MetricUtils.validateTaskMetric(this.monitor, this.interval * 2L, sourceLatencyCountRangeSub, sourceLatencySumRangeSub)) {
                LOGGER.debug("input metric missing for source " + (Object)((Object)vertexId));
                LOGGER.debug("source latency count range " + sourceLatencyCountRangeSub.getValue() + "source latency sum range " + sourceLatencySumRangeSub.getValue());
                return null;
            }
            boolean isParallelReader = false;
            if (this.monitor.getJobTopologyAnalyzer().isSource(vertexId)) {
                isParallelReader = MetricUtils.validateTaskMetric(this.monitor, this.interval * 2L, sourcePartitionCountSub, sourcePartitionLatencyCountRangeSub, sourcePartitionLatencySumRangeSub) && (Double)sourcePartitionCountSub.getValue().f1 > 0.0;
                LOGGER.debug("Treat vertex {} as {} reader.", (Object)vertexId, (Object)(isParallelReader ? "parallel" : "non-parallel"));
                LOGGER.debug("source partition count " + sourcePartitionCountSub.getValue() + " source partition latency count range " + sourcePartitionLatencyCountRangeSub.getValue() + " source partition latency sum range " + sourcePartitionLatencySumRangeSub.getValue());
            }
            double inputTps = (Double)inputTpsSub.getValue().f1;
            double outputTps = (Double)outputTpsSub.getValue().f1;
            double maxInputTpsPerMinute = inputTps;
            if (maxTpsPerMinuteSub != null && maxTpsPerMinuteSub.getValue() != null) {
                maxInputTpsPerMinute = (Double)maxTpsPerMinuteSub.getValue().f1;
            }
            double timerCount = (Double)timerCountSub.getValue().f1;
            double taskLatencyCount = (Double)taskLatencyCountRangeSub.getValue().f1;
            double taskLatencySum = (Double)taskLatencySumRangeSub.getValue().f1;
            double taskLatency = taskLatencyCount <= 0.0 ? 0.0 : taskLatencySum / taskLatencyCount / 1.0E9;
            double sourceLatency = 0.0;
            if (this.monitor.getJobTopologyAnalyzer().isSource(vertexId)) {
                double sourceLatencyCount = (Double)sourceLatencyCountRangeSub.getValue().f1;
                double sourceLatencySum = (Double)sourceLatencySumRangeSub.getValue().f1;
                sourceLatency = sourceLatencyCount <= 0.0 ? 0.0 : sourceLatencySum / sourceLatencyCount / 1.0E9;
            }
            double waitOutput = 0.0;
            if (!this.monitor.getJobTopologyAnalyzer().isSink(vertexId)) {
                double waitOutputCount = (Double)waitOutputCountRangeSub.getValue().f1;
                double waitOutputSum = (Double)waitOutputSumRangeSub.getValue().f1;
                waitOutput = waitOutputCount <= 0.0 ? 0.0 : waitOutputSum / waitOutputCount / 1.0E9;
            }
            double waitOutputPerInputRecord = inputTps <= 0.0 ? 0.0 : waitOutput * outputTps / inputTps;
            double partitionLatency = 0.0;
            double partitionCount = 0.0;
            if (isParallelReader) {
                double processLatencyCount = (Double)sourceProcessLatencyCountRangeSub.getValue().f1;
                double processLatencySum = (Double)sourceProcessLatencySumRangeSub.getValue().f1;
                taskLatency = processLatencyCount <= 0.0 ? 0.0 : processLatencySum / processLatencyCount / 1.0E9;
                double partitionLatencyCount = (Double)sourcePartitionLatencyCountRangeSub.getValue().f1;
                double partitionLatencySum = (Double)sourcePartitionLatencySumRangeSub.getValue().f1;
                partitionLatency = partitionLatencyCount <= 0.0 ? 0.0 : partitionLatencySum / partitionLatencyCount / 1.0E9;
                partitionCount = (Double)sourcePartitionCountSub.getValue().f1;
            }
            double workload = (taskLatency - waitOutputPerInputRecord) * maxInputTpsPerMinute;
            double delayIncreasingRate = 0.0;
            if (sourceDelayIncreasingRateSub != null && sourceDelayIncreasingRateSub.getValue() != null) {
                delayIncreasingRate = (Double)sourceDelayIncreasingRateSub.getValue().f1 / 1000.0;
            }
            TaskMetrics taskMetrics = new TaskMetrics(vertexId, isParallelReader, inputTps, maxInputTpsPerMinute, outputTps, timerCount, taskLatency, sourceLatency, waitOutputPerInputRecord, workload, delayIncreasingRate, partitionLatency, partitionCount);
            metrics.put(vertexId, taskMetrics);
        }
        return metrics;
    }
}

