package org.apache.flink.runtime.healthmanager.plugins.detectors;

import java.util.LinkedList;
import java.util.Map;
import org.apache.flink.runtime.healthmanager.HealthMonitor;
import org.apache.flink.runtime.healthmanager.RestServerClient;
import org.apache.flink.runtime.healthmanager.plugins.Detector;
import org.apache.flink.runtime.healthmanager.plugins.Symptom;
import org.apache.flink.runtime.healthmanager.plugins.symptoms.JobVertexHighStateSize;
import org.apache.flink.runtime.healthmanager.plugins.utils.HealthMonitorOptions;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.rest.messages.checkpoints.TaskCheckpointStatistics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/healthmanager/plugins/detectors/HighStateSizeDetector.class */
public class HighStateSizeDetector implements Detector {
    private static final Logger LOGGER = LoggerFactory.getLogger(HighStateSizeDetector.class);
    private HealthMonitor healthMonitor;
    private long threshold;

    @Override // org.apache.flink.runtime.healthmanager.plugins.Detector
    public void open(HealthMonitor healthMonitor) {
        this.healthMonitor = healthMonitor;
        this.threshold = healthMonitor.getConfig().getLong(HealthMonitorOptions.PARALLELISM_SCALE_STATE_SIZE_THRESHOLD);
    }

    @Override // org.apache.flink.runtime.healthmanager.plugins.Detector
    public void close() {
    }

    @Override // org.apache.flink.runtime.healthmanager.plugins.Detector
    public Symptom detect() throws Exception {
        LinkedList linkedList = new LinkedList();
        RestServerClient.JobConfig jobConfig = this.healthMonitor.getJobConfig();
        for (Map.Entry<JobVertexID, TaskCheckpointStatistics> entry : this.healthMonitor.getRestServerClient().getLatestCheckPointStates(this.healthMonitor.getJobID()).getCheckpointStatisticsPerTask().entrySet()) {
            if ((entry.getValue().getStateSize() * 1.0d) / jobConfig.getVertexConfigs().get(entry.getKey()).getParallelism() > this.threshold) {
                LOGGER.debug("vertex {} state size [{}] reach threshold.", entry.getKey(), Long.valueOf(entry.getValue().getStateSize()));
                linkedList.add(entry.getKey());
            }
        }
        if (linkedList.isEmpty()) {
            return null;
        }
        return new JobVertexHighStateSize(this.healthMonitor.getJobID(), linkedList);
    }
}
