package org.apache.flink.runtime.healthmanager.plugins.detectors;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.healthmanager.HealthMonitor;
import org.apache.flink.runtime.healthmanager.RestServerClient;
import org.apache.flink.runtime.healthmanager.metrics.MetricAggType;
import org.apache.flink.runtime.healthmanager.metrics.MetricProvider;
import org.apache.flink.runtime.healthmanager.metrics.TaskMetricSubscription;
import org.apache.flink.runtime.healthmanager.metrics.timeline.TimelineAggType;
import org.apache.flink.runtime.healthmanager.plugins.Detector;
import org.apache.flink.runtime.healthmanager.plugins.Symptom;
import org.apache.flink.runtime.healthmanager.plugins.symptoms.JobStable;
import org.apache.flink.runtime.healthmanager.plugins.utils.MetricNames;
import org.apache.flink.runtime.healthmanager.plugins.utils.MetricUtils;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/healthmanager/plugins/detectors/JobStableDetector.class */
public class JobStableDetector implements Detector {
    private static final Logger LOGGER = LoggerFactory.getLogger(JobStableDetector.class);
    public static final ConfigOption<Long> INTERVAL = ConfigOptions.key("healthmonitor.job-stable-detector.interval.ms").defaultValue(30000L);
    private HealthMonitor monitor;
    private MetricProvider metricProvider;
    private long interval;
    private long lastStableTime = 0;
    private Map<JobVertexID, TaskMetricSubscription> initTimeSubs;

    @Override // org.apache.flink.runtime.healthmanager.plugins.Detector
    public void open(HealthMonitor healthMonitor) {
        this.monitor = healthMonitor;
        this.metricProvider = healthMonitor.getMetricProvider();
        this.initTimeSubs = new HashMap();
        RestServerClient.JobConfig jobConfig = healthMonitor.getJobConfig();
        this.interval = healthMonitor.getConfig().getLong(INTERVAL);
        for (JobVertexID jobVertexID : jobConfig.getVertexConfigs().keySet()) {
            this.initTimeSubs.put(jobVertexID, this.metricProvider.subscribeTaskMetric(healthMonitor.getJobID(), jobVertexID, MetricNames.TASK_INIT_TIME, MetricAggType.MIN, this.interval, TimelineAggType.LATEST));
        }
    }

    @Override // org.apache.flink.runtime.healthmanager.plugins.Detector
    public void close() {
        if (this.metricProvider == null || this.initTimeSubs == null) {
            return;
        }
        for (TaskMetricSubscription taskMetricSubscription : this.initTimeSubs.values()) {
            if (taskMetricSubscription != null) {
                this.metricProvider.unsubscribe(taskMetricSubscription);
            }
        }
    }

    @Override // org.apache.flink.runtime.healthmanager.plugins.Detector
    public Symptom detect() throws Exception {
        long j = Long.MIN_VALUE;
        for (Tuple2<Long, ExecutionState> tuple2 : this.monitor.getRestServerClient().getJobStatus(this.monitor.getJobID()).getTaskStatus().values()) {
            if (!((ExecutionState) tuple2.f1).equals(ExecutionState.RUNNING)) {
                LOGGER.debug("Some task not running yet!");
                return JobStable.UNSTABLE;
            }
            if (j < ((Long) tuple2.f0).longValue()) {
                j = ((Long) tuple2.f0).longValue();
            }
        }
        if (this.lastStableTime < j) {
            long j2 = Long.MIN_VALUE;
            Iterator<JobVertexID> it = this.initTimeSubs.keySet().iterator();
            while (it.hasNext()) {
                TaskMetricSubscription taskMetricSubscription = this.initTimeSubs.get(it.next());
                if (!MetricUtils.validateTaskMetric(this.monitor, this.interval * 2, taskMetricSubscription) || ((Long) taskMetricSubscription.getValue().f0).longValue() < j || ((Double) taskMetricSubscription.getValue().f1).doubleValue() < 0.0d) {
                    LOGGER.debug("Some task has not initialized yet!");
                    return JobStable.UNSTABLE;
                }
                if (j2 < ((Long) taskMetricSubscription.getValue().f0).longValue()) {
                    j2 = ((Long) taskMetricSubscription.getValue().f0).longValue();
                }
            }
            this.lastStableTime = j2;
        }
        long currentTimeMillis = System.currentTimeMillis() - this.lastStableTime;
        LOGGER.debug("JobStable detected, stable time {} ms.", Long.valueOf(currentTimeMillis));
        return new JobStable(currentTimeMillis);
    }
}
