/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.healthmanager.plugins.detectors;

import java.util.ArrayList;
import java.util.Map;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.healthmanager.HealthMonitor;
import org.apache.flink.runtime.healthmanager.RestServerClient;
import org.apache.flink.runtime.healthmanager.metrics.MetricProvider;
import org.apache.flink.runtime.healthmanager.plugins.Detector;
import org.apache.flink.runtime.healthmanager.plugins.Symptom;
import org.apache.flink.runtime.healthmanager.plugins.symptoms.JobVertexOverParallelized;
import org.apache.flink.runtime.healthmanager.plugins.utils.HealthMonitorOptions;
import org.apache.flink.runtime.healthmanager.plugins.utils.JobTopologyAnalyzer;
import org.apache.flink.runtime.healthmanager.plugins.utils.TaskMetrics;
import org.apache.flink.runtime.healthmanager.plugins.utils.TaskMetricsSubscriber;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OverParallelizedDetector
implements Detector {
    private static final Logger LOGGER = LoggerFactory.getLogger(OverParallelizedDetector.class);
    private JobID jobID;
    private HealthMonitor monitor;
    private MetricProvider metricProvider;
    private double ratio;
    private long checkInterval;
    private int maxPartitionPerTask;
    private double multiOutputRatio;
    private double minDiffParallelismRatio;
    private TaskMetricsSubscriber subscriber;

    @Override
    public void open(HealthMonitor monitor) {
        this.jobID = monitor.getJobID();
        this.monitor = monitor;
        this.metricProvider = monitor.getMetricProvider();
        this.ratio = monitor.getConfig().getDouble(HealthMonitorOptions.PARALLELISM_MAX_RATIO);
        this.checkInterval = monitor.getConfig().getLong(HealthMonitorOptions.PARALLELISM_SCALE_INTERVAL);
        this.maxPartitionPerTask = monitor.getConfig().getInteger(HealthMonitorOptions.MAX_PARTITION_PER_TASK);
        this.multiOutputRatio = monitor.getConfig().getDouble(HealthMonitorOptions.PARALLELISM_SCALE_MULTI_OUTPUT_RATIO);
        this.minDiffParallelismRatio = monitor.getConfig().getDouble(HealthMonitorOptions.PARALLELISM_SCALE_MIN_DIFF_RATIO);
        this.subscriber = this.monitor.subscribeTaskMetrics(this.checkInterval);
    }

    @Override
    public void close() {
        if (this.metricProvider == null) {
            return;
        }
    }

    @Override
    public Symptom detect() throws Exception {
        LOGGER.debug("Start detecting.");
        RestServerClient.JobConfig jobConfig = this.monitor.getJobConfig();
        if (jobConfig == null) {
            return null;
        }
        Map<JobVertexID, TaskMetrics> allTaskMetrics = this.subscriber.getTaskMetrics();
        if (allTaskMetrics == null) {
            return null;
        }
        ArrayList<JobVertexID> jobVertexIDs = new ArrayList<JobVertexID>();
        for (JobVertexID vertexId : allTaskMetrics.keySet()) {
            double workload;
            TaskMetrics metrics = allTaskMetrics.get((Object)vertexId);
            LOGGER.debug("vertex metrics: {}", (Object)metrics);
            int parallelism = jobConfig.getVertexConfigs().get((Object)vertexId).getParallelism();
            double minParallelism = 1.0;
            if (metrics.isParallelSource()) {
                workload = metrics.getPartitionLatency() <= 0.0 ? 0.0 : metrics.getPartitionCount() * metrics.getTaskLatencyPerRecord() / metrics.getPartitionLatency();
                minParallelism = Math.ceil(metrics.getPartitionCount() / (double)this.maxPartitionPerTask);
            } else {
                workload = metrics.getTaskLatencyPerRecord() * metrics.getMaxInputTpsPerMinute();
            }
            if (!((double)parallelism > workload * this.getRatio(vertexId)) || !((double)parallelism / minParallelism > 1.0 + this.minDiffParallelismRatio)) continue;
            jobVertexIDs.add(vertexId);
            LOGGER.info("Over parallelized detected for vertex {}, parallelism:{} workload:{}.", new Object[]{vertexId, parallelism, workload});
        }
        if (!jobVertexIDs.isEmpty()) {
            return new JobVertexOverParallelized(this.jobID, jobVertexIDs);
        }
        return null;
    }

    private double getRatio(JobVertexID vertexID) {
        JobTopologyAnalyzer jobTopologyAnalyzer = this.monitor.getJobTopologyAnalyzer();
        for (JobVertexID input : jobTopologyAnalyzer.getInputs(vertexID)) {
            if (jobTopologyAnalyzer.getOutputs(input).size() <= 1) continue;
            return this.ratio * this.multiOutputRatio;
        }
        return this.ratio;
    }
}

