/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.healthmanager.plugins.detectors;

import java.util.LinkedList;
import java.util.Map;
import org.apache.flink.runtime.healthmanager.HealthMonitor;
import org.apache.flink.runtime.healthmanager.RestServerClient;
import org.apache.flink.runtime.healthmanager.plugins.Detector;
import org.apache.flink.runtime.healthmanager.plugins.Symptom;
import org.apache.flink.runtime.healthmanager.plugins.symptoms.JobVertexHighStateSize;
import org.apache.flink.runtime.healthmanager.plugins.utils.HealthMonitorOptions;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.rest.messages.checkpoints.TaskCheckpointStatistics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HighStateSizeDetector
implements Detector {
    private static final Logger LOGGER = LoggerFactory.getLogger(HighStateSizeDetector.class);
    private HealthMonitor healthMonitor;
    private long threshold;

    @Override
    public void open(HealthMonitor monitor) {
        this.healthMonitor = monitor;
        this.threshold = monitor.getConfig().getLong(HealthMonitorOptions.PARALLELISM_SCALE_STATE_SIZE_THRESHOLD);
    }

    @Override
    public void close() {
    }

    @Override
    public Symptom detect() throws Exception {
        LinkedList<JobVertexID> highStateSizeVertices = new LinkedList<JobVertexID>();
        RestServerClient.JobConfig jobConfig = this.healthMonitor.getJobConfig();
        Map<JobVertexID, TaskCheckpointStatistics> checkpointInfo = this.healthMonitor.getRestServerClient().getLatestCheckPointStates(this.healthMonitor.getJobID()).getCheckpointStatisticsPerTask();
        for (Map.Entry<JobVertexID, TaskCheckpointStatistics> entry : checkpointInfo.entrySet()) {
            if (!((double)entry.getValue().getStateSize() * 1.0 / (double)jobConfig.getVertexConfigs().get((Object)entry.getKey()).getParallelism() > (double)this.threshold)) continue;
            LOGGER.debug("vertex {} state size [{}] reach threshold.", (Object)entry.getKey(), (Object)entry.getValue().getStateSize());
            highStateSizeVertices.add(entry.getKey());
        }
        if (highStateSizeVertices.isEmpty()) {
            return null;
        }
        return new JobVertexHighStateSize(this.healthMonitor.getJobID(), highStateSizeVertices);
    }
}

