/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.healthmanager.plugins.actions;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.healthmanager.RestServerClient;
import org.apache.flink.runtime.healthmanager.metrics.MetricProvider;
import org.apache.flink.runtime.healthmanager.plugins.Action;
import org.apache.flink.runtime.healthmanager.plugins.utils.HealthMonitorOptions;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.messages.Acknowledge;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AdjustJobConfig
implements Action {
    private static final Logger LOGGER = LoggerFactory.getLogger(AdjustJobConfig.class);
    protected JobID jobID;
    protected Map<JobVertexID, Integer> currentParallelism;
    protected Map<JobVertexID, Integer> targetParallelism;
    protected Map<JobVertexID, ResourceSpec> currentResource;
    protected Map<JobVertexID, ResourceSpec> targetResource;
    protected long timeoutMs;
    protected Action.ActionMode actionMode;
    private Exception rescaleException = null;

    public AdjustJobConfig(JobID jobID, long timeoutMs) {
        this(jobID, timeoutMs, new HashMap<JobVertexID, Integer>(), new HashMap<JobVertexID, Integer>(), new HashMap<JobVertexID, ResourceSpec>(), new HashMap<JobVertexID, ResourceSpec>(), Action.ActionMode.IMMEDIATE);
    }

    public AdjustJobConfig(JobID jobID, long timeoutMs, Map<JobVertexID, Integer> currentParallelism, Map<JobVertexID, Integer> targetParallelism, Map<JobVertexID, ResourceSpec> currentResource, Map<JobVertexID, ResourceSpec> targetResource) {
        this(jobID, timeoutMs, currentParallelism, targetParallelism, currentResource, targetResource, Action.ActionMode.IMMEDIATE);
    }

    public AdjustJobConfig(JobID jobID, long timeoutMs, Map<JobVertexID, Integer> currentParallelism, Map<JobVertexID, Integer> targetParallelism, Map<JobVertexID, ResourceSpec> currentResource, Map<JobVertexID, ResourceSpec> targetResource, Action.ActionMode actionMode) {
        this.jobID = jobID;
        this.timeoutMs = timeoutMs;
        this.currentParallelism = currentParallelism;
        this.currentResource = currentResource;
        this.targetParallelism = targetParallelism;
        this.targetResource = targetResource;
        this.actionMode = actionMode;
    }

    public void addVertex(JobVertexID jobVertexId, int currentParallelism, int targetParallelism, ResourceSpec currentResource, ResourceSpec targetResource) {
        this.currentParallelism.put(jobVertexId, currentParallelism);
        this.targetParallelism.put(jobVertexId, targetParallelism);
        this.currentResource.put(jobVertexId, currentResource);
        this.targetResource.put(jobVertexId, targetResource);
    }

    public boolean isEmpty() {
        return this.currentParallelism.isEmpty();
    }

    public void exculdeMinorDiffVertices(Configuration conf) {
        double minDiffParallelismRatio = conf.getDouble(HealthMonitorOptions.PARALLELISM_SCALE_MIN_DIFF_RATIO);
        double minDiffResourceRatio = conf.getDouble(HealthMonitorOptions.RESOURCE_SCALE_MIN_DIFF_RATIO);
        double minDiffCpuCore = conf.getDouble(HealthMonitorOptions.RESOURCE_SCALE_MIN_DIFF_CPU);
        int minDiffNativeMemMB = conf.getInteger(HealthMonitorOptions.RESOURCE_SCALE_MIN_DIFF_NATIVE_MEM);
        HashSet<JobVertexID> vertexToRemove = new HashSet<JobVertexID>();
        for (JobVertexID vertexID : this.currentParallelism.keySet()) {
            int curPara = this.currentParallelism.get((Object)vertexID);
            int tarPara = this.targetParallelism.get((Object)vertexID);
            ResourceSpec curRes = this.currentResource.get((Object)vertexID);
            ResourceSpec tarRes = this.targetResource.get((Object)vertexID);
            if (tarPara > curPara || (double)(curPara - tarPara) > minDiffParallelismRatio * (double)curPara || Math.abs(curRes.getCpuCores() - tarRes.getCpuCores()) > minDiffResourceRatio * curRes.getCpuCores() && Math.abs(curRes.getCpuCores() - tarRes.getCpuCores()) > minDiffCpuCore || (double)Math.abs(curRes.getHeapMemory() - tarRes.getHeapMemory()) > minDiffResourceRatio * (double)curRes.getHeapMemory() || (double)Math.abs(curRes.getDirectMemory() - tarRes.getDirectMemory()) > minDiffResourceRatio * (double)curRes.getDirectMemory() || (double)Math.abs(curRes.getNativeMemory() - tarRes.getNativeMemory()) > minDiffResourceRatio * (double)curRes.getNativeMemory() && Math.abs(curRes.getNativeMemory() - tarRes.getNativeMemory()) > minDiffNativeMemMB) continue;
            vertexToRemove.add(vertexID);
        }
        for (JobVertexID vertexID : vertexToRemove) {
            LOGGER.debug("Removing vertex with minor difference, vertex id: {}", (Object)vertexID);
            this.currentParallelism.remove((Object)vertexID);
            this.targetParallelism.remove((Object)vertexID);
            this.currentResource.remove((Object)vertexID);
            this.targetResource.remove((Object)vertexID);
        }
    }

    @Override
    public void execute(RestServerClient restServerClient) throws Exception {
        HashMap<JobVertexID, Tuple2<Integer, ResourceSpec>> vertexParallelismResource = new HashMap<JobVertexID, Tuple2<Integer, ResourceSpec>>();
        for (JobVertexID jvId : this.currentParallelism.keySet()) {
            vertexParallelismResource.put(jvId, (Tuple2<Integer, ResourceSpec>)new Tuple2((Object)this.targetParallelism.get((Object)jvId), (Object)this.targetResource.get((Object)jvId)));
        }
        if (!vertexParallelismResource.isEmpty()) {
            CompletableFuture<Acknowledge> rescaleFuture = restServerClient.rescale(this.jobID, vertexParallelismResource);
            rescaleFuture.whenComplete((ignored, throwable) -> {
                if (throwable != null) {
                    this.rescaleException = new Exception("Execute action failed.", (Throwable)throwable);
                }
            });
        }
    }

    @Override
    public boolean validate(MetricProvider provider, RestServerClient restServerClient) throws Exception {
        block3: {
            long start2 = System.currentTimeMillis();
            do {
                Thread.sleep(Math.min(this.timeoutMs / 10L, 60000L));
                if (this.rescaleException != null) {
                    LOGGER.error("Action {} execute failed because: {}", (Object)this.toString(), (Object)this.rescaleException);
                    return false;
                }
                RestServerClient.JobStatus jobStatus = restServerClient.getJobStatus(this.jobID);
                int i = 0;
                for (Tuple2<Long, ExecutionState> time2state : jobStatus.getTaskStatus().values()) {
                    if (!((ExecutionState)((Object)time2state.f1)).equals((Object)ExecutionState.RUNNING)) break;
                    ++i;
                }
                if (i == jobStatus.getTaskStatus().size()) break block3;
            } while (System.currentTimeMillis() - start2 <= this.timeoutMs);
            return false;
        }
        return true;
    }

    @Override
    public Action rollback() {
        return new AdjustJobConfig(this.jobID, this.timeoutMs, this.targetParallelism, this.currentParallelism, this.targetResource, this.currentResource);
    }

    public void setActionMode(Action.ActionMode actionMode) {
        this.actionMode = actionMode;
    }

    @Override
    public Action.ActionMode getActionMode() {
        return this.actionMode;
    }

    public String toString() {
        String adjustments = this.currentParallelism.keySet().stream().map(vertexId -> "{JobVertexID:" + (Object)vertexId + ", parallelism: " + this.currentParallelism.get(vertexId) + " -> " + this.targetParallelism.get(vertexId) + ", resource: " + this.currentResource.get(vertexId) + " -> " + this.targetResource.get(vertexId) + "}").collect(Collectors.joining(", "));
        return "AdjustJobConfig{actionMode: " + (Object)((Object)this.actionMode) + ", adjustments: " + adjustments + "}";
    }

    public RestServerClient.JobConfig getAppliedJobConfig(RestServerClient.JobConfig originJobConfig) {
        RestServerClient.JobConfig appliedJobConfig = new RestServerClient.JobConfig(originJobConfig);
        for (JobVertexID vertexId : this.targetResource.keySet()) {
            RestServerClient.VertexConfig originVertexConfig = originJobConfig.getVertexConfigs().get((Object)vertexId);
            RestServerClient.VertexConfig appliedVertexConfig = new RestServerClient.VertexConfig(originVertexConfig.getName(), this.targetParallelism.get((Object)vertexId), originVertexConfig.getMaxParallelism(), this.targetResource.get((Object)vertexId), originVertexConfig.getOperatorIds(), originVertexConfig.getColocationGroupId());
            appliedJobConfig.getVertexConfigs().put(vertexId, appliedVertexConfig);
        }
        return appliedJobConfig;
    }
}

