package org.apache.flink.runtime.healthmanager.plugins.detectors;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.healthmanager.HealthMonitor;
import org.apache.flink.runtime.healthmanager.RestServerClient;
import org.apache.flink.runtime.healthmanager.metrics.MetricAggType;
import org.apache.flink.runtime.healthmanager.metrics.MetricProvider;
import org.apache.flink.runtime.healthmanager.metrics.TaskMetricSubscription;
import org.apache.flink.runtime.healthmanager.metrics.timeline.TimelineAggType;
import org.apache.flink.runtime.healthmanager.plugins.Detector;
import org.apache.flink.runtime.healthmanager.plugins.Symptom;
import org.apache.flink.runtime.healthmanager.plugins.symptoms.JobVertexOverParallelized;
import org.apache.flink.runtime.healthmanager.plugins.utils.HealthMonitorOptions;
import org.apache.flink.runtime.healthmanager.plugins.utils.MetricNames;
import org.apache.flink.runtime.healthmanager.plugins.utils.MetricUtils;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/healthmanager/plugins/detectors/OverParallelizedDetector.class */
public class OverParallelizedDetector implements Detector {
    private static final Logger LOGGER = LoggerFactory.getLogger(OverParallelizedDetector.class);
    private JobID jobID;
    private HealthMonitor monitor;
    private MetricProvider metricProvider;
    private double ratio;
    private long checkInterval;
    private int maxPartitionPerTask;
    private Map<JobVertexID, TaskMetricSubscription> sourcePartitionCountSubs;
    private Map<JobVertexID, TaskMetricSubscription> sourcePartitionLatencyCountRangeSubs;
    private Map<JobVertexID, TaskMetricSubscription> sourcePartitionLatencySumRangeSubs;
    private Map<JobVertexID, TaskMetricSubscription> sourceProcessLatencyCountRangeSubs;
    private Map<JobVertexID, TaskMetricSubscription> sourceProcessLatencySumRangeSubs;
    private Map<JobVertexID, TaskMetricSubscription> latencyCountRangeSubs;
    private Map<JobVertexID, TaskMetricSubscription> latencySumRangeSubs;
    private Map<JobVertexID, TaskMetricSubscription> inputTpsSubs;

    @Override // org.apache.flink.runtime.healthmanager.plugins.Detector
    public void open(HealthMonitor healthMonitor) {
        this.jobID = healthMonitor.getJobID();
        this.monitor = healthMonitor;
        this.metricProvider = healthMonitor.getMetricProvider();
        this.ratio = healthMonitor.getConfig().getDouble(HealthMonitorOptions.PARALLELISM_MAX_RATIO);
        this.checkInterval = healthMonitor.getConfig().getLong(HealthMonitorOptions.PARALLELISM_SCALE_INTERVAL);
        this.maxPartitionPerTask = healthMonitor.getConfig().getInteger(HealthMonitorOptions.MAX_PARTITION_PER_TASK);
        this.sourcePartitionCountSubs = new HashMap();
        this.sourcePartitionLatencyCountRangeSubs = new HashMap();
        this.sourcePartitionLatencySumRangeSubs = new HashMap();
        this.sourceProcessLatencyCountRangeSubs = new HashMap();
        this.sourceProcessLatencySumRangeSubs = new HashMap();
        this.latencyCountRangeSubs = new HashMap();
        this.latencySumRangeSubs = new HashMap();
        this.inputTpsSubs = new HashMap();
        RestServerClient.JobConfig jobConfig = healthMonitor.getJobConfig();
        for (JobVertexID jobVertexID : jobConfig.getVertexConfigs().keySet()) {
            this.latencyCountRangeSubs.put(jobVertexID, this.metricProvider.subscribeTaskMetric(this.jobID, jobVertexID, MetricNames.TASK_LATENCY_COUNT, MetricAggType.SUM, this.checkInterval, TimelineAggType.LATEST));
            this.latencySumRangeSubs.put(jobVertexID, this.metricProvider.subscribeTaskMetric(this.jobID, jobVertexID, MetricNames.TASK_LATENCY_SUM, MetricAggType.SUM, this.checkInterval, TimelineAggType.LATEST));
            this.inputTpsSubs.put(jobVertexID, this.metricProvider.subscribeTaskMetric(this.jobID, jobVertexID, "numRecordsReceived", MetricAggType.SUM, this.checkInterval, TimelineAggType.RATE));
            if (jobConfig.getInputNodes().get(jobVertexID).isEmpty()) {
                this.sourcePartitionCountSubs.put(jobVertexID, this.metricProvider.subscribeTaskMetric(this.jobID, jobVertexID, "partitionCount", MetricAggType.SUM, this.checkInterval, TimelineAggType.LATEST));
                this.sourcePartitionLatencyCountRangeSubs.put(jobVertexID, this.metricProvider.subscribeTaskMetric(this.jobID, jobVertexID, MetricNames.SOURCE_PARTITION_LATENCY_COUNT, MetricAggType.SUM, this.checkInterval, TimelineAggType.RANGE));
                this.sourcePartitionLatencySumRangeSubs.put(jobVertexID, this.metricProvider.subscribeTaskMetric(this.jobID, jobVertexID, MetricNames.SOURCE_PARTITION_LATENCY_SUM, MetricAggType.SUM, this.checkInterval, TimelineAggType.RANGE));
                this.sourceProcessLatencyCountRangeSubs.put(jobVertexID, this.metricProvider.subscribeTaskMetric(this.jobID, jobVertexID, MetricNames.SOURCE_PROCESS_LATENCY_COUNT, MetricAggType.SUM, this.checkInterval, TimelineAggType.LATEST));
                this.sourceProcessLatencySumRangeSubs.put(jobVertexID, this.metricProvider.subscribeTaskMetric(this.jobID, jobVertexID, MetricNames.SOURCE_PROCESS_LATENCY_SUM, MetricAggType.SUM, this.checkInterval, TimelineAggType.LATEST));
            }
        }
    }

    @Override // org.apache.flink.runtime.healthmanager.plugins.Detector
    public void close() {
        if (this.metricProvider == null) {
            return;
        }
        if (this.latencyCountRangeSubs != null) {
            Iterator<TaskMetricSubscription> it = this.latencyCountRangeSubs.values().iterator();
            while (it.hasNext()) {
                this.metricProvider.unsubscribe(it.next());
            }
        }
        if (this.latencySumRangeSubs != null) {
            Iterator<TaskMetricSubscription> it2 = this.latencySumRangeSubs.values().iterator();
            while (it2.hasNext()) {
                this.metricProvider.unsubscribe(it2.next());
            }
        }
        if (this.inputTpsSubs != null) {
            Iterator<TaskMetricSubscription> it3 = this.inputTpsSubs.values().iterator();
            while (it3.hasNext()) {
                this.metricProvider.unsubscribe(it3.next());
            }
        }
        if (this.sourcePartitionCountSubs != null) {
            Iterator<TaskMetricSubscription> it4 = this.sourcePartitionCountSubs.values().iterator();
            while (it4.hasNext()) {
                this.metricProvider.unsubscribe(it4.next());
            }
        }
        if (this.sourcePartitionLatencyCountRangeSubs != null) {
            Iterator<TaskMetricSubscription> it5 = this.sourcePartitionLatencyCountRangeSubs.values().iterator();
            while (it5.hasNext()) {
                this.metricProvider.unsubscribe(it5.next());
            }
        }
        if (this.sourcePartitionLatencySumRangeSubs != null) {
            Iterator<TaskMetricSubscription> it6 = this.sourcePartitionLatencySumRangeSubs.values().iterator();
            while (it6.hasNext()) {
                this.metricProvider.unsubscribe(it6.next());
            }
        }
    }

    @Override // org.apache.flink.runtime.healthmanager.plugins.Detector
    public Symptom detect() throws Exception {
        double d;
        LOGGER.debug("Start detecting.");
        RestServerClient.JobConfig jobConfig = this.monitor.getJobConfig();
        if (jobConfig == null) {
            return null;
        }
        ArrayList arrayList = new ArrayList();
        for (JobVertexID jobVertexID : this.latencyCountRangeSubs.keySet()) {
            TaskMetricSubscription taskMetricSubscription = this.sourcePartitionCountSubs.get(jobVertexID);
            TaskMetricSubscription taskMetricSubscription2 = this.sourcePartitionLatencyCountRangeSubs.get(jobVertexID);
            TaskMetricSubscription taskMetricSubscription3 = this.sourcePartitionLatencySumRangeSubs.get(jobVertexID);
            TaskMetricSubscription taskMetricSubscription4 = this.sourceProcessLatencyCountRangeSubs.get(jobVertexID);
            TaskMetricSubscription taskMetricSubscription5 = this.sourceProcessLatencySumRangeSubs.get(jobVertexID);
            TaskMetricSubscription taskMetricSubscription6 = this.latencyCountRangeSubs.get(jobVertexID);
            TaskMetricSubscription taskMetricSubscription7 = this.latencySumRangeSubs.get(jobVertexID);
            TaskMetricSubscription taskMetricSubscription8 = this.inputTpsSubs.get(jobVertexID);
            if (MetricUtils.validateTaskMetric(this.monitor, this.checkInterval * 2, taskMetricSubscription6, taskMetricSubscription7, taskMetricSubscription8)) {
                boolean z = false;
                if (jobConfig.getInputNodes().get(jobVertexID).isEmpty()) {
                    z = MetricUtils.validateTaskMetric(this.monitor, this.checkInterval * 2, taskMetricSubscription, taskMetricSubscription2, taskMetricSubscription3, taskMetricSubscription4, taskMetricSubscription5) && ((Double) taskMetricSubscription.getValue().f1).doubleValue() > 0.0d;
                    LOGGER.debug("Treat vertex {} as {} reader.", jobVertexID, z ? "parallel" : "non-parallel");
                    LOGGER.debug("source partition count " + taskMetricSubscription.getValue() + " source partition latency count range " + taskMetricSubscription2.getValue() + " source partition latency sum range " + taskMetricSubscription3.getValue());
                }
                int parallelism = jobConfig.getVertexConfigs().get(jobVertexID).getParallelism();
                double d2 = 1.0d;
                if (z) {
                    double doubleValue = ((Double) taskMetricSubscription4.getValue().f1).doubleValue();
                    double doubleValue2 = doubleValue <= 0.0d ? 0.0d : (((Double) taskMetricSubscription5.getValue().f1).doubleValue() / doubleValue) / 1.0E9d;
                    double doubleValue3 = ((Double) taskMetricSubscription.getValue().f1).doubleValue();
                    double doubleValue4 = ((Double) taskMetricSubscription2.getValue().f1).doubleValue();
                    double doubleValue5 = doubleValue4 <= 0.0d ? 0.0d : (((Double) taskMetricSubscription3.getValue().f1).doubleValue() / doubleValue4) / 1.0E9d;
                    d = doubleValue5 <= 0.0d ? 0.0d : (doubleValue3 * doubleValue2) / doubleValue5;
                    d2 = Math.ceil(doubleValue3 / this.maxPartitionPerTask);
                    LOGGER.debug("vertex {} partitionCount {} latency {} partitionLatency {} workload {}", new Object[]{jobVertexID, Double.valueOf(doubleValue3), Double.valueOf(doubleValue2), Double.valueOf(doubleValue5), Double.valueOf(d)});
                } else {
                    double doubleValue6 = ((Double) taskMetricSubscription6.getValue().f1).doubleValue();
                    double doubleValue7 = doubleValue6 <= 0.0d ? 0.0d : (((Double) taskMetricSubscription7.getValue().f1).doubleValue() / doubleValue6) / 1.0E9d;
                    double doubleValue8 = ((Double) taskMetricSubscription8.getValue().f1).doubleValue();
                    d = doubleValue7 * doubleValue8;
                    LOGGER.debug("vertex {} input tps {} latency {} workload {}", new Object[]{jobVertexID, Double.valueOf(doubleValue8), Double.valueOf(doubleValue7), Double.valueOf(d)});
                }
                if (parallelism > d * this.ratio && parallelism > d2) {
                    arrayList.add(jobVertexID);
                    LOGGER.info("Over parallelized detected for vertex {}, parallelism:{} workload:{}.", new Object[]{jobVertexID, Integer.valueOf(parallelism), Double.valueOf(d)});
                }
            } else {
                LOGGER.debug("Skip vertex {}, metrics missing.", jobVertexID);
            }
        }
        if (arrayList.isEmpty()) {
            return null;
        }
        return new JobVertexOverParallelized(this.jobID, arrayList);
    }
}
