package org.apache.flink.runtime.healthmanager.plugins.detectors;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.healthmanager.HealthMonitor;
import org.apache.flink.runtime.healthmanager.RestServerClient;
import org.apache.flink.runtime.healthmanager.metrics.MetricProvider;
import org.apache.flink.runtime.healthmanager.plugins.Detector;
import org.apache.flink.runtime.healthmanager.plugins.Symptom;
import org.apache.flink.runtime.healthmanager.plugins.symptoms.JobVertexOverParallelized;
import org.apache.flink.runtime.healthmanager.plugins.utils.HealthMonitorOptions;
import org.apache.flink.runtime.healthmanager.plugins.utils.JobTopologyAnalyzer;
import org.apache.flink.runtime.healthmanager.plugins.utils.TaskMetrics;
import org.apache.flink.runtime.healthmanager.plugins.utils.TaskMetricsSubscriber;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/healthmanager/plugins/detectors/OverParallelizedDetector.class */
public class OverParallelizedDetector implements Detector {
    private static final Logger LOGGER = LoggerFactory.getLogger(OverParallelizedDetector.class);
    private JobID jobID;
    private HealthMonitor monitor;
    private MetricProvider metricProvider;
    private double ratio;
    private long checkInterval;
    private int maxPartitionPerTask;
    private double multiOutputRatio;
    private double minDiffParallelismRatio;
    private TaskMetricsSubscriber subscriber;

    @Override // org.apache.flink.runtime.healthmanager.plugins.Detector
    public void open(HealthMonitor healthMonitor) {
        this.jobID = healthMonitor.getJobID();
        this.monitor = healthMonitor;
        this.metricProvider = healthMonitor.getMetricProvider();
        this.ratio = healthMonitor.getConfig().getDouble(HealthMonitorOptions.PARALLELISM_MAX_RATIO);
        this.checkInterval = healthMonitor.getConfig().getLong(HealthMonitorOptions.PARALLELISM_SCALE_INTERVAL);
        this.maxPartitionPerTask = healthMonitor.getConfig().getInteger(HealthMonitorOptions.MAX_PARTITION_PER_TASK);
        this.multiOutputRatio = healthMonitor.getConfig().getDouble(HealthMonitorOptions.PARALLELISM_SCALE_MULTI_OUTPUT_RATIO);
        this.minDiffParallelismRatio = healthMonitor.getConfig().getDouble(HealthMonitorOptions.PARALLELISM_SCALE_MIN_DIFF_RATIO);
        this.subscriber = this.monitor.subscribeTaskMetrics(this.checkInterval);
    }

    @Override // org.apache.flink.runtime.healthmanager.plugins.Detector
    public void close() {
        if (this.metricProvider == null) {
        }
    }

    @Override // org.apache.flink.runtime.healthmanager.plugins.Detector
    public Symptom detect() throws Exception {
        Map<JobVertexID, TaskMetrics> taskMetrics;
        double taskLatencyPerRecord;
        LOGGER.debug("Start detecting.");
        RestServerClient.JobConfig jobConfig = this.monitor.getJobConfig();
        if (jobConfig == null || (taskMetrics = this.subscriber.getTaskMetrics()) == null) {
            return null;
        }
        ArrayList arrayList = new ArrayList();
        for (JobVertexID jobVertexID : taskMetrics.keySet()) {
            TaskMetrics taskMetrics2 = taskMetrics.get(jobVertexID);
            LOGGER.debug("vertex metrics: {}", taskMetrics2);
            int parallelism = jobConfig.getVertexConfigs().get(jobVertexID).getParallelism();
            double d = 1.0d;
            if (taskMetrics2.isParallelSource()) {
                taskLatencyPerRecord = taskMetrics2.getPartitionLatency() <= 0.0d ? 0.0d : (taskMetrics2.getPartitionCount() * taskMetrics2.getTaskLatencyPerRecord()) / taskMetrics2.getPartitionLatency();
                d = Math.ceil(taskMetrics2.getPartitionCount() / this.maxPartitionPerTask);
            } else {
                taskLatencyPerRecord = taskMetrics2.getTaskLatencyPerRecord() * taskMetrics2.getMaxInputTpsPerMinute();
            }
            if (parallelism > taskLatencyPerRecord * getRatio(jobVertexID) && parallelism / d > 1.0d + this.minDiffParallelismRatio) {
                arrayList.add(jobVertexID);
                LOGGER.info("Over parallelized detected for vertex {}, parallelism:{} workload:{}.", new Object[]{jobVertexID, Integer.valueOf(parallelism), Double.valueOf(taskLatencyPerRecord)});
            }
        }
        if (arrayList.isEmpty()) {
            return null;
        }
        return new JobVertexOverParallelized(this.jobID, arrayList);
    }

    private double getRatio(JobVertexID jobVertexID) {
        JobTopologyAnalyzer jobTopologyAnalyzer = this.monitor.getJobTopologyAnalyzer();
        Iterator<JobVertexID> it = jobTopologyAnalyzer.getInputs(jobVertexID).iterator();
        while (it.hasNext()) {
            if (jobTopologyAnalyzer.getOutputs(it.next()).size() > 1) {
                return this.ratio * this.multiOutputRatio;
            }
        }
        return this.ratio;
    }
}
