package org.apache.flink.runtime.healthmanager.plugins.detectors;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.runtime.healthmanager.HealthMonitor;
import org.apache.flink.runtime.healthmanager.RestServerClient;
import org.apache.flink.runtime.healthmanager.metrics.MetricAggType;
import org.apache.flink.runtime.healthmanager.metrics.TaskMetricSubscription;
import org.apache.flink.runtime.healthmanager.metrics.timeline.TimelineAggType;
import org.apache.flink.runtime.healthmanager.plugins.Detector;
import org.apache.flink.runtime.healthmanager.plugins.symptoms.JobVertexRpsUnsatisfied;
import org.apache.flink.runtime.healthmanager.plugins.utils.HealthMonitorOptions;
import org.apache.flink.runtime.healthmanager.plugins.utils.MetricUtils;
import org.apache.flink.runtime.healthmanager.plugins.utils.TaskMetricsSubscriber;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/healthmanager/plugins/detectors/RpsUnsatisfiedDetector.class */
public class RpsUnsatisfiedDetector implements Detector {
    private static final Logger LOGGER = LoggerFactory.getLogger(RpsUnsatisfiedDetector.class);
    public static final ConfigOption<String> TARGET_TPS_IDS = ConfigOptions.key("healthmonitor.target-rps.ids").noDefaultValue();
    public static final ConfigOption<String> TARGET_TPS_NAMES = ConfigOptions.key("healthmonitor.target-rps.names").noDefaultValue();
    public static final ConfigOption<String> TARGET_TPS_VALUES = ConfigOptions.key("healthmonitor.target-rps.values").noDefaultValue();
    private TaskMetricsSubscriber subscriber;
    private HealthMonitor monitor;
    private boolean enabled;
    private long interval;
    private Map<JobVertexID, Double> sourceTargetRps;
    private Map<JobVertexID, List<TaskMetricSubscription>> allRpsSubscriptions;

    @Override // org.apache.flink.runtime.healthmanager.plugins.Detector
    public void open(HealthMonitor healthMonitor) {
        List list;
        this.interval = healthMonitor.getConfig().getLong(HealthMonitorOptions.PARALLELISM_SCALE_INTERVAL);
        this.subscriber = healthMonitor.subscribeTaskMetrics(this.interval);
        this.monitor = healthMonitor;
        if (healthMonitor.getConfig().contains(TARGET_TPS_VALUES)) {
            this.enabled = true;
            this.sourceTargetRps = new HashMap();
            List list2 = (List) Arrays.stream(healthMonitor.getConfig().getString(TARGET_TPS_VALUES).split(",")).map(str -> {
                return Double.valueOf(str.trim());
            }).collect(Collectors.toList());
            if (healthMonitor.getConfig().contains(TARGET_TPS_IDS)) {
                list = (List) Arrays.stream(healthMonitor.getConfig().getString(TARGET_TPS_IDS).split(",")).map(str2 -> {
                    return Integer.valueOf(str2.trim());
                }).map(num -> {
                    return healthMonitor.getJobConfig().getVertexConfigs().entrySet().stream().filter(entry -> {
                        return ((RestServerClient.VertexConfig) entry.getValue()).getOperatorIds().contains(num);
                    }).findFirst().orElseThrow(() -> {
                        return new IllegalArgumentException("Can not find operator with id:" + num);
                    }).getKey();
                }).collect(Collectors.toList());
            } else {
                if (!healthMonitor.getConfig().contains(TARGET_TPS_NAMES)) {
                    this.enabled = false;
                    throw new IllegalArgumentException("Config of target source names or ids missing");
                }
                list = (List) Arrays.stream(healthMonitor.getConfig().getString(TARGET_TPS_NAMES).split(",")).map(str3 -> {
                    return getVertexIdBySourceName(str3.trim());
                }).collect(Collectors.toList());
            }
            Preconditions.checkArgument(list2.size() == list.size());
            Iterator it = list2.iterator();
            list.stream().forEach(jobVertexID -> {
            });
        } else {
            this.enabled = false;
        }
        if (this.enabled) {
            Set<JobVertexID> allSubDagRoots = healthMonitor.getJobTopologyAnalyzer().getAllSubDagRoots();
            this.allRpsSubscriptions = new HashMap();
            for (JobVertexID jobVertexID2 : allSubDagRoots) {
                if (this.sourceTargetRps.containsKey(jobVertexID2)) {
                    this.allRpsSubscriptions.put(jobVertexID2, new LinkedList());
                    Iterator<JobVertexID> it2 = healthMonitor.getJobTopologyAnalyzer().getSubDagVertices(jobVertexID2).iterator();
                    while (it2.hasNext()) {
                        this.allRpsSubscriptions.get(jobVertexID2).add(healthMonitor.getMetricProvider().subscribeTaskMetric(healthMonitor.getJobID(), it2.next(), "parserTps", MetricAggType.SUM, this.interval, TimelineAggType.AVG));
                    }
                }
            }
        }
    }

    @Override // org.apache.flink.runtime.healthmanager.plugins.Detector
    public void close() {
        if (this.allRpsSubscriptions != null) {
            Iterator<List<TaskMetricSubscription>> it = this.allRpsSubscriptions.values().iterator();
            while (it.hasNext()) {
                Iterator<TaskMetricSubscription> it2 = it.next().iterator();
                while (it2.hasNext()) {
                    this.monitor.getMetricProvider().unsubscribe(it2.next());
                }
            }
        }
    }

    @Override // org.apache.flink.runtime.healthmanager.plugins.Detector
    public JobVertexRpsUnsatisfied detect() throws Exception {
        LOGGER.debug("Start detecting.");
        if (!this.enabled) {
            LOGGER.debug("Skip because detector disabled.");
            return null;
        }
        Map<JobVertexID, Double> currentRPS = getCurrentRPS();
        if (currentRPS == null) {
            LOGGER.debug("Skip because source rps missing.");
            return null;
        }
        if (this.subscriber.getTaskMetrics() == null) {
            LOGGER.debug("Skip because task metrics missing.");
            return null;
        }
        LOGGER.debug("current rps:" + currentRPS);
        LOGGER.debug("target rps:" + this.sourceTargetRps);
        JobVertexRpsUnsatisfied jobVertexRpsUnsatisfied = new JobVertexRpsUnsatisfied();
        for (JobVertexID jobVertexID : currentRPS.keySet()) {
            double doubleValue = this.sourceTargetRps.get(jobVertexID).doubleValue();
            double doubleValue2 = currentRPS.get(jobVertexID).doubleValue();
            if (doubleValue > doubleValue2) {
                jobVertexRpsUnsatisfied.addVertex(jobVertexID, doubleValue, doubleValue2);
            }
        }
        if (jobVertexRpsUnsatisfied.getCurrentRps().isEmpty()) {
            return null;
        }
        return jobVertexRpsUnsatisfied;
    }

    public Map<JobVertexID, Double> getCurrentRPS() {
        if (this.allRpsSubscriptions == null) {
            return null;
        }
        HashMap hashMap = new HashMap();
        for (Map.Entry<JobVertexID, List<TaskMetricSubscription>> entry : this.allRpsSubscriptions.entrySet()) {
            Tuple2<Long, Double> tuple2 = null;
            Iterator<TaskMetricSubscription> it = entry.getValue().iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                TaskMetricSubscription next = it.next();
                if (MetricUtils.validateTaskMetric(this.monitor, this.interval * 2, next) && ((Double) next.getValue().f1).doubleValue() > 0.0d) {
                    tuple2 = next.getValue();
                    break;
                }
            }
            if (tuple2 == null) {
                LOGGER.debug("{} rps metrics missing", entry.getKey());
                return null;
            }
            hashMap.put(entry.getKey(), tuple2.f1);
        }
        return hashMap;
    }

    public JobVertexID getVertexIdBySourceName(String str) {
        String format = String.format("-%s-Stream ", str);
        String format2 = String.format("-%s-Stream", str);
        return (JobVertexID) this.monitor.getJobConfig().getVertexConfigs().entrySet().stream().filter(entry -> {
            return this.monitor.getJobTopologyAnalyzer().isSource((JobVertexID) entry.getKey());
        }).filter(entry2 -> {
            return ((RestServerClient.VertexConfig) entry2.getValue()).getName().contains(format) || ((RestServerClient.VertexConfig) entry2.getValue()).getName().contains(format2);
        }).findFirst().map(entry3 -> {
            return (JobVertexID) entry3.getKey();
        }).orElseThrow(() -> {
            return new IllegalArgumentException(String.format("Cannot find corresponding vertex of source '%s'. Please enter a valid source name", str));
        });
    }

    public static boolean isEnabled(HealthMonitor healthMonitor) {
        return healthMonitor.getConfig().contains(TARGET_TPS_VALUES) && (healthMonitor.getConfig().contains(TARGET_TPS_IDS) || healthMonitor.getConfig().contains(TARGET_TPS_NAMES));
    }
}
