/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.healthmanager.plugins.detectors;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.healthmanager.HealthMonitor;
import org.apache.flink.runtime.healthmanager.RestServerClient;
import org.apache.flink.runtime.healthmanager.metrics.MetricAggType;
import org.apache.flink.runtime.healthmanager.metrics.MetricProvider;
import org.apache.flink.runtime.healthmanager.metrics.TaskMetricSubscription;
import org.apache.flink.runtime.healthmanager.metrics.timeline.TimelineAggType;
import org.apache.flink.runtime.healthmanager.plugins.Detector;
import org.apache.flink.runtime.healthmanager.plugins.Symptom;
import org.apache.flink.runtime.healthmanager.plugins.symptoms.JobVertexOverParallelized;
import org.apache.flink.runtime.healthmanager.plugins.utils.HealthMonitorOptions;
import org.apache.flink.runtime.healthmanager.plugins.utils.MetricUtils;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OverParallelizedDetector
implements Detector {
    private static final Logger LOGGER = LoggerFactory.getLogger(OverParallelizedDetector.class);
    private JobID jobID;
    private HealthMonitor monitor;
    private MetricProvider metricProvider;
    private double ratio;
    private long checkInterval;
    private int maxPartitionPerTask;
    private Map<JobVertexID, TaskMetricSubscription> sourcePartitionCountSubs;
    private Map<JobVertexID, TaskMetricSubscription> sourcePartitionLatencyCountRangeSubs;
    private Map<JobVertexID, TaskMetricSubscription> sourcePartitionLatencySumRangeSubs;
    private Map<JobVertexID, TaskMetricSubscription> sourceProcessLatencyCountRangeSubs;
    private Map<JobVertexID, TaskMetricSubscription> sourceProcessLatencySumRangeSubs;
    private Map<JobVertexID, TaskMetricSubscription> latencyCountRangeSubs;
    private Map<JobVertexID, TaskMetricSubscription> latencySumRangeSubs;
    private Map<JobVertexID, TaskMetricSubscription> inputTpsSubs;

    @Override
    public void open(HealthMonitor monitor) {
        this.jobID = monitor.getJobID();
        this.monitor = monitor;
        this.metricProvider = monitor.getMetricProvider();
        this.ratio = monitor.getConfig().getDouble(HealthMonitorOptions.PARALLELISM_MAX_RATIO);
        this.checkInterval = monitor.getConfig().getLong(HealthMonitorOptions.PARALLELISM_SCALE_INTERVAL);
        this.maxPartitionPerTask = monitor.getConfig().getInteger(HealthMonitorOptions.MAX_PARTITION_PER_TASK);
        this.sourcePartitionCountSubs = new HashMap<JobVertexID, TaskMetricSubscription>();
        this.sourcePartitionLatencyCountRangeSubs = new HashMap<JobVertexID, TaskMetricSubscription>();
        this.sourcePartitionLatencySumRangeSubs = new HashMap<JobVertexID, TaskMetricSubscription>();
        this.sourceProcessLatencyCountRangeSubs = new HashMap<JobVertexID, TaskMetricSubscription>();
        this.sourceProcessLatencySumRangeSubs = new HashMap<JobVertexID, TaskMetricSubscription>();
        this.latencyCountRangeSubs = new HashMap<JobVertexID, TaskMetricSubscription>();
        this.latencySumRangeSubs = new HashMap<JobVertexID, TaskMetricSubscription>();
        this.inputTpsSubs = new HashMap<JobVertexID, TaskMetricSubscription>();
        RestServerClient.JobConfig jobConfig = monitor.getJobConfig();
        for (JobVertexID vertexId : jobConfig.getVertexConfigs().keySet()) {
            TaskMetricSubscription latencyCountRangeSub = this.metricProvider.subscribeTaskMetric(this.jobID, vertexId, "taskLatency.count", MetricAggType.SUM, this.checkInterval, TimelineAggType.LATEST);
            this.latencyCountRangeSubs.put(vertexId, latencyCountRangeSub);
            TaskMetricSubscription latencySumRangeSub = this.metricProvider.subscribeTaskMetric(this.jobID, vertexId, "taskLatency.sum", MetricAggType.SUM, this.checkInterval, TimelineAggType.LATEST);
            this.latencySumRangeSubs.put(vertexId, latencySumRangeSub);
            TaskMetricSubscription inputTpsSub = this.metricProvider.subscribeTaskMetric(this.jobID, vertexId, "numRecordsReceived", MetricAggType.SUM, this.checkInterval, TimelineAggType.RATE);
            this.inputTpsSubs.put(vertexId, inputTpsSub);
            if (!jobConfig.getInputNodes().get((Object)vertexId).isEmpty()) continue;
            this.sourcePartitionCountSubs.put(vertexId, this.metricProvider.subscribeTaskMetric(this.jobID, vertexId, "partitionCount", MetricAggType.SUM, this.checkInterval, TimelineAggType.LATEST));
            this.sourcePartitionLatencyCountRangeSubs.put(vertexId, this.metricProvider.subscribeTaskMetric(this.jobID, vertexId, "partitionLatency.count", MetricAggType.SUM, this.checkInterval, TimelineAggType.RANGE));
            this.sourcePartitionLatencySumRangeSubs.put(vertexId, this.metricProvider.subscribeTaskMetric(this.jobID, vertexId, "partitionLatency.sum", MetricAggType.SUM, this.checkInterval, TimelineAggType.RANGE));
            this.sourceProcessLatencyCountRangeSubs.put(vertexId, this.metricProvider.subscribeTaskMetric(this.jobID, vertexId, "sourceProcessLatency.count", MetricAggType.SUM, this.checkInterval, TimelineAggType.LATEST));
            this.sourceProcessLatencySumRangeSubs.put(vertexId, this.metricProvider.subscribeTaskMetric(this.jobID, vertexId, "sourceProcessLatency.sum", MetricAggType.SUM, this.checkInterval, TimelineAggType.LATEST));
        }
    }

    @Override
    public void close() {
        if (this.metricProvider == null) {
            return;
        }
        if (this.latencyCountRangeSubs != null) {
            for (TaskMetricSubscription sub : this.latencyCountRangeSubs.values()) {
                this.metricProvider.unsubscribe(sub);
            }
        }
        if (this.latencySumRangeSubs != null) {
            for (TaskMetricSubscription sub : this.latencySumRangeSubs.values()) {
                this.metricProvider.unsubscribe(sub);
            }
        }
        if (this.inputTpsSubs != null) {
            for (TaskMetricSubscription sub : this.inputTpsSubs.values()) {
                this.metricProvider.unsubscribe(sub);
            }
        }
        if (this.sourcePartitionCountSubs != null) {
            for (TaskMetricSubscription sub : this.sourcePartitionCountSubs.values()) {
                this.metricProvider.unsubscribe(sub);
            }
        }
        if (this.sourcePartitionLatencyCountRangeSubs != null) {
            for (TaskMetricSubscription sub : this.sourcePartitionLatencyCountRangeSubs.values()) {
                this.metricProvider.unsubscribe(sub);
            }
        }
        if (this.sourcePartitionLatencySumRangeSubs != null) {
            for (TaskMetricSubscription sub : this.sourcePartitionLatencySumRangeSubs.values()) {
                this.metricProvider.unsubscribe(sub);
            }
        }
    }

    @Override
    public Symptom detect() throws Exception {
        LOGGER.debug("Start detecting.");
        RestServerClient.JobConfig jobConfig = this.monitor.getJobConfig();
        if (jobConfig == null) {
            return null;
        }
        ArrayList<JobVertexID> jobVertexIDs = new ArrayList<JobVertexID>();
        for (JobVertexID vertexId : this.latencyCountRangeSubs.keySet()) {
            double workload;
            double taskLatency;
            TaskMetricSubscription inputTpsSub;
            TaskMetricSubscription latencySumRangeSub;
            TaskMetricSubscription sourcePartitionCountSub = this.sourcePartitionCountSubs.get((Object)vertexId);
            TaskMetricSubscription sourcePartitionLatencyCountRangeSub = this.sourcePartitionLatencyCountRangeSubs.get((Object)vertexId);
            TaskMetricSubscription sourcePartitionLatencySumRangeSub = this.sourcePartitionLatencySumRangeSubs.get((Object)vertexId);
            TaskMetricSubscription sourceProcessLatencyCountRangeSub = this.sourceProcessLatencyCountRangeSubs.get((Object)vertexId);
            TaskMetricSubscription sourceProcessLatencySumRangeSub = this.sourceProcessLatencySumRangeSubs.get((Object)vertexId);
            TaskMetricSubscription latencyCountRangeSub = this.latencyCountRangeSubs.get((Object)vertexId);
            if (!MetricUtils.validateTaskMetric(this.monitor, this.checkInterval * 2L, latencyCountRangeSub, latencySumRangeSub = this.latencySumRangeSubs.get((Object)vertexId), inputTpsSub = this.inputTpsSubs.get((Object)vertexId))) {
                LOGGER.debug("Skip vertex {}, metrics missing.", (Object)vertexId);
                continue;
            }
            boolean isParallelReader = false;
            if (jobConfig.getInputNodes().get((Object)vertexId).isEmpty()) {
                isParallelReader = MetricUtils.validateTaskMetric(this.monitor, this.checkInterval * 2L, sourcePartitionCountSub, sourcePartitionLatencyCountRangeSub, sourcePartitionLatencySumRangeSub, sourceProcessLatencyCountRangeSub, sourceProcessLatencySumRangeSub) && (Double)sourcePartitionCountSub.getValue().f1 > 0.0;
                LOGGER.debug("Treat vertex {} as {} reader.", (Object)vertexId, (Object)(isParallelReader ? "parallel" : "non-parallel"));
                LOGGER.debug("source partition count " + sourcePartitionCountSub.getValue() + " source partition latency count range " + sourcePartitionLatencyCountRangeSub.getValue() + " source partition latency sum range " + sourcePartitionLatencySumRangeSub.getValue());
            }
            int parallelism = jobConfig.getVertexConfigs().get((Object)vertexId).getParallelism();
            double minParallelism = 1.0;
            if (isParallelReader) {
                double processLatencyCount = (Double)sourceProcessLatencyCountRangeSub.getValue().f1;
                double processLatencySum = (Double)sourceProcessLatencySumRangeSub.getValue().f1;
                taskLatency = processLatencyCount <= 0.0 ? 0.0 : processLatencySum / processLatencyCount / 1.0E9;
                double partitionCount = (Double)sourcePartitionCountSub.getValue().f1;
                double partitionLatencyCount = (Double)sourcePartitionLatencyCountRangeSub.getValue().f1;
                double partitionLatencySum = (Double)sourcePartitionLatencySumRangeSub.getValue().f1;
                double partitionLatency = partitionLatencyCount <= 0.0 ? 0.0 : partitionLatencySum / partitionLatencyCount / 1.0E9;
                workload = partitionLatency <= 0.0 ? 0.0 : partitionCount * taskLatency / partitionLatency;
                minParallelism = Math.ceil(partitionCount / (double)this.maxPartitionPerTask);
                LOGGER.debug("vertex {} partitionCount {} latency {} partitionLatency {} workload {}", new Object[]{vertexId, partitionCount, taskLatency, partitionLatency, workload});
            } else {
                double taskLatencyCount = (Double)latencyCountRangeSub.getValue().f1;
                double taskLatencySum = (Double)latencySumRangeSub.getValue().f1;
                taskLatency = taskLatencyCount <= 0.0 ? 0.0 : taskLatencySum / taskLatencyCount / 1.0E9;
                double inputTps = (Double)inputTpsSub.getValue().f1;
                workload = taskLatency * inputTps;
                LOGGER.debug("vertex {} input tps {} latency {} workload {}", new Object[]{vertexId, inputTps, taskLatency, workload});
            }
            if (!((double)parallelism > workload * this.ratio) || !((double)parallelism > minParallelism)) continue;
            jobVertexIDs.add(vertexId);
            LOGGER.info("Over parallelized detected for vertex {}, parallelism:{} workload:{}.", new Object[]{vertexId, parallelism, workload});
        }
        if (!jobVertexIDs.isEmpty()) {
            return new JobVertexOverParallelized(this.jobID, jobVertexIDs);
        }
        return null;
    }
}

