/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.healthmanager.plugins.detectors;

import java.util.HashMap;
import java.util.Map;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.healthmanager.HealthMonitor;
import org.apache.flink.runtime.healthmanager.RestServerClient;
import org.apache.flink.runtime.healthmanager.metrics.MetricAggType;
import org.apache.flink.runtime.healthmanager.metrics.MetricProvider;
import org.apache.flink.runtime.healthmanager.metrics.TaskMetricSubscription;
import org.apache.flink.runtime.healthmanager.metrics.timeline.TimelineAggType;
import org.apache.flink.runtime.healthmanager.plugins.Detector;
import org.apache.flink.runtime.healthmanager.plugins.Symptom;
import org.apache.flink.runtime.healthmanager.plugins.symptoms.JobStable;
import org.apache.flink.runtime.healthmanager.plugins.utils.MetricUtils;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JobStableDetector
implements Detector {
    private static final Logger LOGGER = LoggerFactory.getLogger(JobStableDetector.class);
    public static final ConfigOption<Long> INTERVAL = ConfigOptions.key((String)"healthmonitor.job-stable-detector.interval.ms").defaultValue((Object)30000L);
    private HealthMonitor monitor;
    private MetricProvider metricProvider;
    private long interval;
    private long lastStableTime = 0L;
    private Map<JobVertexID, TaskMetricSubscription> initTimeSubs;

    @Override
    public void open(HealthMonitor monitor) {
        this.monitor = monitor;
        this.metricProvider = monitor.getMetricProvider();
        this.initTimeSubs = new HashMap<JobVertexID, TaskMetricSubscription>();
        RestServerClient.JobConfig jobConfig = monitor.getJobConfig();
        this.interval = monitor.getConfig().getLong(INTERVAL);
        for (JobVertexID vertexId : jobConfig.getVertexConfigs().keySet()) {
            this.initTimeSubs.put(vertexId, this.metricProvider.subscribeTaskMetric(monitor.getJobID(), vertexId, "taskInitTimeMs", MetricAggType.MIN, this.interval, TimelineAggType.LATEST));
        }
    }

    @Override
    public void close() {
        if (this.metricProvider != null && this.initTimeSubs != null) {
            for (TaskMetricSubscription sub : this.initTimeSubs.values()) {
                if (sub == null) continue;
                this.metricProvider.unsubscribe(sub);
            }
        }
    }

    @Override
    public Symptom detect() throws Exception {
        RestServerClient.JobStatus status = this.monitor.getRestServerClient().getJobStatus(this.monitor.getJobID());
        long allTaskRunningTime = Long.MIN_VALUE;
        for (Tuple2<Long, ExecutionState> state : status.getTaskStatus().values()) {
            if (!((ExecutionState)((Object)state.f1)).equals((Object)ExecutionState.RUNNING)) {
                LOGGER.debug("Some task not running yet!");
                return JobStable.UNSTABLE;
            }
            if (allTaskRunningTime >= (Long)state.f0) continue;
            allTaskRunningTime = (Long)state.f0;
        }
        if (this.lastStableTime < allTaskRunningTime) {
            long allTaskInitializedTime = Long.MIN_VALUE;
            for (JobVertexID vertexID : this.initTimeSubs.keySet()) {
                TaskMetricSubscription sub = this.initTimeSubs.get((Object)vertexID);
                if (!MetricUtils.validateTaskMetric(this.monitor, this.interval * 2L, sub) || (Long)sub.getValue().f0 < allTaskRunningTime || (Double)sub.getValue().f1 < 0.0) {
                    LOGGER.debug("Some task {} has not initialized yet! metric value {}, all running time {}", new Object[]{vertexID, sub.getValue(), allTaskRunningTime});
                    return JobStable.UNSTABLE;
                }
                if (allTaskInitializedTime >= (Long)sub.getValue().f0) continue;
                allTaskInitializedTime = (Long)sub.getValue().f0;
            }
            this.lastStableTime = allTaskInitializedTime;
        }
        long stableTime = System.currentTimeMillis() - this.lastStableTime;
        LOGGER.debug("JobStable detected, stable time {} ms.", (Object)stableTime);
        return new JobStable(stableTime);
    }
}

