package org.apache.flink.runtime.healthmanager.plugins.actions;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.healthmanager.RestServerClient;
import org.apache.flink.runtime.healthmanager.metrics.MetricProvider;
import org.apache.flink.runtime.healthmanager.plugins.Action;
import org.apache.flink.runtime.healthmanager.plugins.utils.HealthMonitorOptions;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/healthmanager/plugins/actions/AdjustJobConfig.class */
public class AdjustJobConfig implements Action {
    private static final Logger LOGGER = LoggerFactory.getLogger(AdjustJobConfig.class);
    protected JobID jobID;
    protected Map<JobVertexID, Integer> currentParallelism;
    protected Map<JobVertexID, Integer> targetParallelism;
    protected Map<JobVertexID, ResourceSpec> currentResource;
    protected Map<JobVertexID, ResourceSpec> targetResource;
    protected long timeoutMs;
    protected Action.ActionMode actionMode;
    private Exception rescaleException;

    public AdjustJobConfig(JobID jobID, long j) {
        this(jobID, j, new HashMap(), new HashMap(), new HashMap(), new HashMap(), Action.ActionMode.IMMEDIATE);
    }

    public AdjustJobConfig(JobID jobID, long j, Map<JobVertexID, Integer> map, Map<JobVertexID, Integer> map2, Map<JobVertexID, ResourceSpec> map3, Map<JobVertexID, ResourceSpec> map4) {
        this(jobID, j, map, map2, map3, map4, Action.ActionMode.IMMEDIATE);
    }

    public AdjustJobConfig(JobID jobID, long j, Map<JobVertexID, Integer> map, Map<JobVertexID, Integer> map2, Map<JobVertexID, ResourceSpec> map3, Map<JobVertexID, ResourceSpec> map4, Action.ActionMode actionMode) {
        this.rescaleException = null;
        this.jobID = jobID;
        this.timeoutMs = j;
        this.currentParallelism = map;
        this.currentResource = map3;
        this.targetParallelism = map2;
        this.targetResource = map4;
        this.actionMode = actionMode;
    }

    public void addVertex(JobVertexID jobVertexID, int i, int i2, ResourceSpec resourceSpec, ResourceSpec resourceSpec2) {
        this.currentParallelism.put(jobVertexID, Integer.valueOf(i));
        this.targetParallelism.put(jobVertexID, Integer.valueOf(i2));
        this.currentResource.put(jobVertexID, resourceSpec);
        this.targetResource.put(jobVertexID, resourceSpec2);
    }

    public boolean isEmpty() {
        return this.currentParallelism.isEmpty();
    }

    public void exculdeMinorDiffVertices(Configuration configuration) {
        double d = configuration.getDouble(HealthMonitorOptions.PARALLELISM_SCALE_MIN_DIFF_RATIO);
        double d2 = configuration.getDouble(HealthMonitorOptions.RESOURCE_SCALE_MIN_DIFF_RATIO);
        double d3 = configuration.getDouble(HealthMonitorOptions.RESOURCE_SCALE_MIN_DIFF_CPU);
        int integer = configuration.getInteger(HealthMonitorOptions.RESOURCE_SCALE_MIN_DIFF_NATIVE_MEM);
        HashSet hashSet = new HashSet();
        for (JobVertexID jobVertexID : this.currentParallelism.keySet()) {
            int intValue = this.currentParallelism.get(jobVertexID).intValue();
            int intValue2 = this.targetParallelism.get(jobVertexID).intValue();
            ResourceSpec resourceSpec = this.currentResource.get(jobVertexID);
            ResourceSpec resourceSpec2 = this.targetResource.get(jobVertexID);
            if (intValue2 <= intValue && intValue - intValue2 <= d * intValue && (Math.abs(resourceSpec.getCpuCores() - resourceSpec2.getCpuCores()) <= d2 * resourceSpec.getCpuCores() || Math.abs(resourceSpec.getCpuCores() - resourceSpec2.getCpuCores()) <= d3)) {
                if (Math.abs(resourceSpec.getHeapMemory() - resourceSpec2.getHeapMemory()) <= d2 * resourceSpec.getHeapMemory() && Math.abs(resourceSpec.getDirectMemory() - resourceSpec2.getDirectMemory()) <= d2 * resourceSpec.getDirectMemory() && (Math.abs(resourceSpec.getNativeMemory() - resourceSpec2.getNativeMemory()) <= d2 * resourceSpec.getNativeMemory() || Math.abs(resourceSpec.getNativeMemory() - resourceSpec2.getNativeMemory()) <= integer)) {
                    hashSet.add(jobVertexID);
                }
            }
        }
        Iterator it = hashSet.iterator();
        while (it.hasNext()) {
            JobVertexID jobVertexID2 = (JobVertexID) it.next();
            LOGGER.debug("Removing vertex with minor difference, vertex id: {}", jobVertexID2);
            this.currentParallelism.remove(jobVertexID2);
            this.targetParallelism.remove(jobVertexID2);
            this.currentResource.remove(jobVertexID2);
            this.targetResource.remove(jobVertexID2);
        }
    }

    @Override // org.apache.flink.runtime.healthmanager.plugins.Action
    public void execute(RestServerClient restServerClient) throws Exception {
        HashMap hashMap = new HashMap();
        for (JobVertexID jobVertexID : this.currentParallelism.keySet()) {
            hashMap.put(jobVertexID, new Tuple2<>(this.targetParallelism.get(jobVertexID), this.targetResource.get(jobVertexID)));
        }
        if (hashMap.isEmpty()) {
            return;
        }
        restServerClient.rescale(this.jobID, hashMap).whenComplete((acknowledge, th) -> {
            if (th != null) {
                this.rescaleException = new Exception("Execute action failed.", th);
            }
        });
    }

    @Override // org.apache.flink.runtime.healthmanager.plugins.Action
    public boolean validate(MetricProvider metricProvider, RestServerClient restServerClient) throws Exception {
        long currentTimeMillis = System.currentTimeMillis();
        do {
            Thread.sleep(Math.min(this.timeoutMs / 10, 60000L));
            if (this.rescaleException != null) {
                LOGGER.error("Action {} execute failed because: {}", toString(), this.rescaleException);
                return false;
            }
            RestServerClient.JobStatus jobStatus = restServerClient.getJobStatus(this.jobID);
            int i = 0;
            Iterator<Tuple2<Long, ExecutionState>> it = jobStatus.getTaskStatus().values().iterator();
            while (it.hasNext() && ((ExecutionState) it.next().f1).equals(ExecutionState.RUNNING)) {
                i++;
            }
            if (i == jobStatus.getTaskStatus().size()) {
                return true;
            }
        } while (System.currentTimeMillis() - currentTimeMillis <= this.timeoutMs);
        return false;
    }

    @Override // org.apache.flink.runtime.healthmanager.plugins.Action
    public Action rollback() {
        return new AdjustJobConfig(this.jobID, this.timeoutMs, this.targetParallelism, this.currentParallelism, this.targetResource, this.currentResource);
    }

    public void setActionMode(Action.ActionMode actionMode) {
        this.actionMode = actionMode;
    }

    @Override // org.apache.flink.runtime.healthmanager.plugins.Action
    public Action.ActionMode getActionMode() {
        return this.actionMode;
    }

    public String toString() {
        return "AdjustJobConfig{actionMode: " + this.actionMode + ", adjustments: " + ((String) this.currentParallelism.keySet().stream().map(jobVertexID -> {
            return "{JobVertexID:" + jobVertexID + ", parallelism: " + this.currentParallelism.get(jobVertexID) + " -> " + this.targetParallelism.get(jobVertexID) + ", resource: " + this.currentResource.get(jobVertexID) + " -> " + this.targetResource.get(jobVertexID) + "}";
        }).collect(Collectors.joining(", "))) + "}";
    }

    public RestServerClient.JobConfig getAppliedJobConfig(RestServerClient.JobConfig jobConfig) {
        RestServerClient.JobConfig jobConfig2 = new RestServerClient.JobConfig(jobConfig);
        for (JobVertexID jobVertexID : this.targetResource.keySet()) {
            RestServerClient.VertexConfig vertexConfig = jobConfig.getVertexConfigs().get(jobVertexID);
            jobConfig2.getVertexConfigs().put(jobVertexID, new RestServerClient.VertexConfig(vertexConfig.getName(), this.targetParallelism.get(jobVertexID).intValue(), vertexConfig.getMaxParallelism(), this.targetResource.get(jobVertexID), vertexConfig.getOperatorIds(), vertexConfig.getColocationGroupId()));
        }
        return jobConfig2;
    }
}
