/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.healthmanager.plugins.detectors;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.runtime.healthmanager.HealthMonitor;
import org.apache.flink.runtime.healthmanager.RestServerClient;
import org.apache.flink.runtime.healthmanager.metrics.MetricAggType;
import org.apache.flink.runtime.healthmanager.metrics.TaskMetricSubscription;
import org.apache.flink.runtime.healthmanager.metrics.timeline.TimelineAggType;
import org.apache.flink.runtime.healthmanager.plugins.Detector;
import org.apache.flink.runtime.healthmanager.plugins.symptoms.JobVertexRpsUnsatisfied;
import org.apache.flink.runtime.healthmanager.plugins.utils.HealthMonitorOptions;
import org.apache.flink.runtime.healthmanager.plugins.utils.MetricUtils;
import org.apache.flink.runtime.healthmanager.plugins.utils.TaskMetrics;
import org.apache.flink.runtime.healthmanager.plugins.utils.TaskMetricsSubscriber;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RpsUnsatisfiedDetector
implements Detector {
    private static final Logger LOGGER = LoggerFactory.getLogger(RpsUnsatisfiedDetector.class);
    public static final ConfigOption<String> TARGET_TPS_IDS = ConfigOptions.key((String)"healthmonitor.target-rps.ids").noDefaultValue();
    public static final ConfigOption<String> TARGET_TPS_NAMES = ConfigOptions.key((String)"healthmonitor.target-rps.names").noDefaultValue();
    public static final ConfigOption<String> TARGET_TPS_VALUES = ConfigOptions.key((String)"healthmonitor.target-rps.values").noDefaultValue();
    private TaskMetricsSubscriber subscriber;
    private HealthMonitor monitor;
    private boolean enabled;
    private long interval;
    private Map<JobVertexID, Double> sourceTargetRps;
    private Map<JobVertexID, List<TaskMetricSubscription>> allRpsSubscriptions;

    @Override
    public void open(HealthMonitor monitor) {
        this.interval = monitor.getConfig().getLong(HealthMonitorOptions.PARALLELISM_SCALE_INTERVAL);
        this.subscriber = monitor.subscribeTaskMetrics(this.interval);
        this.monitor = monitor;
        if (monitor.getConfig().contains(TARGET_TPS_VALUES)) {
            List ids;
            this.enabled = true;
            this.sourceTargetRps = new HashMap<JobVertexID, Double>();
            List values = Arrays.stream(monitor.getConfig().getString(TARGET_TPS_VALUES).split(",")).map(e -> Double.valueOf(e.trim())).collect(Collectors.toList());
            if (monitor.getConfig().contains(TARGET_TPS_IDS)) {
                ids = Arrays.stream(monitor.getConfig().getString(TARGET_TPS_IDS).split(",")).map(e -> Integer.valueOf(e.trim())).map(e -> (JobVertexID)((Object)((Object)monitor.getJobConfig().getVertexConfigs().entrySet().stream().filter(config -> ((RestServerClient.VertexConfig)config.getValue()).getOperatorIds().contains(e)).findFirst().orElseThrow(() -> new IllegalArgumentException("Can not find operator with id:" + e)).getKey()))).collect(Collectors.toList());
            } else if (monitor.getConfig().contains(TARGET_TPS_NAMES)) {
                ids = Arrays.stream(monitor.getConfig().getString(TARGET_TPS_NAMES).split(",")).map(e -> this.getVertexIdBySourceName(e.trim())).collect(Collectors.toList());
            } else {
                this.enabled = false;
                throw new IllegalArgumentException("Config of target source names or ids missing");
            }
            Preconditions.checkArgument((values.size() == ids.size() ? 1 : 0) != 0);
            Iterator valueIterator = values.iterator();
            ids.stream().forEach(id -> this.sourceTargetRps.put((JobVertexID)((Object)id), (Double)valueIterator.next()));
        } else {
            this.enabled = false;
        }
        if (this.enabled) {
            Set<JobVertexID> subDagRoots = monitor.getJobTopologyAnalyzer().getAllSubDagRoots();
            this.allRpsSubscriptions = new HashMap<JobVertexID, List<TaskMetricSubscription>>();
            for (JobVertexID rootId : subDagRoots) {
                if (!this.sourceTargetRps.containsKey((Object)rootId)) continue;
                this.allRpsSubscriptions.put(rootId, new LinkedList());
                for (JobVertexID vertexID : monitor.getJobTopologyAnalyzer().getSubDagVertices(rootId)) {
                    TaskMetricSubscription subscription = monitor.getMetricProvider().subscribeTaskMetric(monitor.getJobID(), vertexID, "parserTps", MetricAggType.SUM, this.interval, TimelineAggType.AVG);
                    this.allRpsSubscriptions.get((Object)rootId).add(subscription);
                }
            }
        }
    }

    @Override
    public void close() {
        if (this.allRpsSubscriptions != null) {
            for (List<TaskMetricSubscription> subscriptionList : this.allRpsSubscriptions.values()) {
                for (TaskMetricSubscription subscription : subscriptionList) {
                    this.monitor.getMetricProvider().unsubscribe(subscription);
                }
            }
        }
    }

    @Override
    public JobVertexRpsUnsatisfied detect() throws Exception {
        LOGGER.debug("Start detecting.");
        if (!this.enabled) {
            LOGGER.debug("Skip because detector disabled.");
            return null;
        }
        Map<JobVertexID, Double> sourceCurrentRps = this.getCurrentRPS();
        if (sourceCurrentRps == null) {
            LOGGER.debug("Skip because source rps missing.");
            return null;
        }
        Map<JobVertexID, TaskMetrics> allTaskMetrics = this.subscriber.getTaskMetrics();
        if (allTaskMetrics == null) {
            LOGGER.debug("Skip because task metrics missing.");
            return null;
        }
        LOGGER.debug("current rps:" + sourceCurrentRps);
        LOGGER.debug("target rps:" + this.sourceTargetRps);
        JobVertexRpsUnsatisfied jobVertexRpsUnsatisfied = new JobVertexRpsUnsatisfied();
        for (JobVertexID vertexID : sourceCurrentRps.keySet()) {
            double currentRps;
            double targetRps = this.sourceTargetRps.get((Object)vertexID);
            if (!(targetRps > (currentRps = sourceCurrentRps.get((Object)vertexID).doubleValue()))) continue;
            jobVertexRpsUnsatisfied.addVertex(vertexID, targetRps, currentRps);
        }
        if (jobVertexRpsUnsatisfied.getCurrentRps().isEmpty()) {
            return null;
        }
        return jobVertexRpsUnsatisfied;
    }

    public Map<JobVertexID, Double> getCurrentRPS() {
        if (this.allRpsSubscriptions == null) {
            return null;
        }
        HashMap<JobVertexID, Double> allRps = new HashMap<JobVertexID, Double>();
        for (Map.Entry<JobVertexID, List<TaskMetricSubscription>> entry : this.allRpsSubscriptions.entrySet()) {
            Tuple2<Long, Double> rps = null;
            for (TaskMetricSubscription subscription : entry.getValue()) {
                if (!MetricUtils.validateTaskMetric(this.monitor, this.interval * 2L, subscription) || !((Double)subscription.getValue().f1 > 0.0)) continue;
                rps = subscription.getValue();
                break;
            }
            if (rps != null) {
                allRps.put(entry.getKey(), (Double)rps.f1);
                continue;
            }
            LOGGER.debug("{} rps metrics missing", (Object)entry.getKey());
            return null;
        }
        return allRps;
    }

    public JobVertexID getVertexIdBySourceName(String tableName) {
        String filter = String.format("-%s-Stream ", tableName);
        String filter2 = String.format("-%s-Stream", tableName);
        return this.monitor.getJobConfig().getVertexConfigs().entrySet().stream().filter(e -> this.monitor.getJobTopologyAnalyzer().isSource((JobVertexID)((Object)((Object)e.getKey())))).filter(e -> ((RestServerClient.VertexConfig)e.getValue()).getName().contains(filter) || ((RestServerClient.VertexConfig)e.getValue()).getName().contains(filter2)).findFirst().map(e -> (JobVertexID)((Object)((Object)e.getKey()))).orElseThrow(() -> new IllegalArgumentException(String.format("Cannot find corresponding vertex of source '%s'. Please enter a valid source name", tableName)));
    }

    public static boolean isEnabled(HealthMonitor healthMonitor) {
        return healthMonitor.getConfig().contains(TARGET_TPS_VALUES) && (healthMonitor.getConfig().contains(TARGET_TPS_IDS) || healthMonitor.getConfig().contains(TARGET_TPS_NAMES));
    }
}

