/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.healthmanager.plugins.resolvers;

import java.math.BigInteger;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.healthmanager.HealthMonitor;
import org.apache.flink.runtime.healthmanager.RestServerClient;
import org.apache.flink.runtime.healthmanager.metrics.MetricAggType;
import org.apache.flink.runtime.healthmanager.metrics.MetricProvider;
import org.apache.flink.runtime.healthmanager.metrics.TaskMetricSubscription;
import org.apache.flink.runtime.healthmanager.metrics.timeline.TimelineAggType;
import org.apache.flink.runtime.healthmanager.plugins.Action;
import org.apache.flink.runtime.healthmanager.plugins.Resolver;
import org.apache.flink.runtime.healthmanager.plugins.Symptom;
import org.apache.flink.runtime.healthmanager.plugins.actions.RescaleJobParallelism;
import org.apache.flink.runtime.healthmanager.plugins.detectors.LargeTimerCountDetector;
import org.apache.flink.runtime.healthmanager.plugins.symptoms.JobStable;
import org.apache.flink.runtime.healthmanager.plugins.symptoms.JobStuck;
import org.apache.flink.runtime.healthmanager.plugins.symptoms.JobVertexBackPressure;
import org.apache.flink.runtime.healthmanager.plugins.symptoms.JobVertexDelayIncreasing;
import org.apache.flink.runtime.healthmanager.plugins.symptoms.JobVertexFailover;
import org.apache.flink.runtime.healthmanager.plugins.symptoms.JobVertexFrequentFullGC;
import org.apache.flink.runtime.healthmanager.plugins.symptoms.JobVertexHighDelay;
import org.apache.flink.runtime.healthmanager.plugins.symptoms.JobVertexLargeTimerCount;
import org.apache.flink.runtime.healthmanager.plugins.symptoms.JobVertexLongTimeFullGC;
import org.apache.flink.runtime.healthmanager.plugins.symptoms.JobVertexOverParallelized;
import org.apache.flink.runtime.healthmanager.plugins.utils.HealthMonitorOptions;
import org.apache.flink.runtime.healthmanager.plugins.utils.JobTopologyAnalyzer;
import org.apache.flink.runtime.healthmanager.plugins.utils.MaxResourceLimitUtil;
import org.apache.flink.runtime.healthmanager.plugins.utils.TaskMetrics;
import org.apache.flink.runtime.healthmanager.plugins.utils.TaskMetricsSubscriber;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatistics;
import org.apache.flink.runtime.rest.messages.checkpoints.TaskCheckpointStatistics;
import org.apache.flink.util.AbstractID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ParallelismScaler
implements Resolver {
    private static final Logger LOGGER = LoggerFactory.getLogger(ParallelismScaler.class);
    private JobID jobID;
    private HealthMonitor monitor;
    private MetricProvider metricProvider;
    private double scaleTpsRatio;
    private long timeout;
    private long checkInterval;
    private int maxPartitionPerTask;
    private long stableTime;
    private long stateSizeThreshold;
    private boolean checkRescale;
    private boolean checkAllInScaleDown;
    private double multiOutputRatio;
    private double timerCountThreshold;
    private double maxCpuLimit;
    private int maxMemoryLimit;
    private long checkpointIntervalThreshold;
    private double minDiffParallelismRatio;
    private JobVertexHighDelay highDelaySymptom;
    private JobVertexDelayIncreasing delayIncreasingSymptom;
    private JobVertexBackPressure backPressureSymptom;
    private JobVertexOverParallelized overParallelizedSymptom;
    private JobStable jobStableSymptom;
    private JobVertexFrequentFullGC frequentFullGCSymptom;
    private JobVertexLongTimeFullGC longTimeFullGCSymptom;
    private JobVertexLargeTimerCount largeTimerCountSymptom;
    private JobVertexFailover failoverSymptom;
    private JobStuck jobStuckSymptom;
    private boolean needScaleUpForDelay;
    private boolean needScaleUpForBackpressure;
    private boolean needScaleUpForMassiveTimer;
    private boolean needScaleDown;
    private TaskMetricsSubscriber taskMetricsSubscriber;
    private JobTopologyAnalyzer jobTopologyAnalyzer;
    private Map<JobVertexID, TaskMetricSubscription> sourcePartitionCountSubs;
    private Map<JobVertexID, TaskMetricSubscription> timerCountSubs;

    @Override
    public void open(HealthMonitor monitor) {
        this.monitor = monitor;
        this.jobID = monitor.getJobID();
        this.metricProvider = monitor.getMetricProvider();
        this.scaleTpsRatio = monitor.getConfig().getDouble(HealthMonitorOptions.PARALLELISM_MIN_RATIO);
        this.timeout = monitor.getConfig().getLong(HealthMonitorOptions.PARALLELISM_SCALE_TIME_OUT);
        this.checkInterval = monitor.getConfig().getLong(HealthMonitorOptions.PARALLELISM_SCALE_INTERVAL);
        this.maxPartitionPerTask = monitor.getConfig().getInteger(HealthMonitorOptions.MAX_PARTITION_PER_TASK);
        this.stableTime = monitor.getConfig().getLong(HealthMonitorOptions.PARALLELISM_SCALE_STABLE_TIME);
        this.stateSizeThreshold = monitor.getConfig().getLong(HealthMonitorOptions.PARALLELISM_SCALE_STATE_SIZE_THRESHOLD);
        this.checkpointIntervalThreshold = monitor.getConfig().getLong(HealthMonitorOptions.PARALLELISM_SCALE_CHECKPOINT_THRESHOLD);
        this.checkRescale = monitor.getConfig().getBoolean(HealthMonitorOptions.PARALLELSIM_SCALE_CHECK_RESCALE);
        this.checkAllInScaleDown = monitor.getConfig().getBoolean(HealthMonitorOptions.PARALLELISM_SCALE_CHECK_ALL_IN_SCALE_DOWN);
        this.checkRescale = monitor.getConfig().getBoolean(HealthMonitorOptions.PARALLELSIM_SCALE_CHECK_RESCALE);
        this.multiOutputRatio = monitor.getConfig().getDouble(HealthMonitorOptions.PARALLELISM_SCALE_MULTI_OUTPUT_RATIO);
        this.timerCountThreshold = (double)monitor.getConfig().getLong(LargeTimerCountDetector.LARGE_TIMER_COUNT_THRESHOLD) / monitor.getConfig().getDouble(HealthMonitorOptions.TIMER_SCALE_RATIO);
        this.maxCpuLimit = MaxResourceLimitUtil.getMaxCpu(monitor.getConfig());
        this.maxMemoryLimit = MaxResourceLimitUtil.getMaxMem(monitor.getConfig());
        this.minDiffParallelismRatio = monitor.getConfig().getDouble(HealthMonitorOptions.PARALLELISM_SCALE_MIN_DIFF_RATIO);
        this.taskMetricsSubscriber = monitor.subscribeTaskMetrics(this.checkInterval);
        RestServerClient.JobConfig jobConfig = monitor.getJobConfig();
        this.jobTopologyAnalyzer = monitor.getJobTopologyAnalyzer();
        this.sourcePartitionCountSubs = new HashMap<JobVertexID, TaskMetricSubscription>();
        this.timerCountSubs = new HashMap<JobVertexID, TaskMetricSubscription>();
        for (JobVertexID vertexId : jobConfig.getVertexConfigs().keySet()) {
            TaskMetricSubscription timerCount = this.metricProvider.subscribeTaskMetric(this.jobID, vertexId, "TimerCount.Total", MetricAggType.SUM, 1L, TimelineAggType.LATEST);
            this.timerCountSubs.put(vertexId, timerCount);
            if (!this.jobTopologyAnalyzer.isSource(vertexId)) continue;
            this.sourcePartitionCountSubs.put(vertexId, this.metricProvider.subscribeTaskMetric(this.jobID, vertexId, "partitionCount", MetricAggType.SUM, this.checkInterval, TimelineAggType.LATEST));
        }
    }

    @Override
    public void close() {
        if (this.metricProvider == null) {
            return;
        }
        if (this.sourcePartitionCountSubs != null) {
            for (TaskMetricSubscription sub : this.sourcePartitionCountSubs.values()) {
                this.metricProvider.unsubscribe(sub);
            }
        }
        if (this.timerCountSubs != null) {
            for (TaskMetricSubscription sub : this.timerCountSubs.values()) {
                this.metricProvider.unsubscribe(sub);
            }
        }
    }

    @VisibleForTesting
    public void setMonitor(HealthMonitor monitor) {
        this.monitor = monitor;
    }

    @Override
    public Action resolve(List<Symptom> symptomList) {
        Map<Object, Object> targetParallelisms;
        LOGGER.debug("Start resolving.");
        this.parseSymptoms(symptomList);
        if (!this.diagnose()) {
            return null;
        }
        Map<JobVertexID, TaskMetrics> taskMetrics = this.taskMetricsSubscriber.getTaskMetrics();
        if (taskMetrics == null && !this.needScaleUpForMassiveTimer) {
            LOGGER.debug("Can not rescale, metrics are not completed.");
            return null;
        }
        if (taskMetrics != null) {
            for (TaskMetrics metric : taskMetrics.values()) {
                LOGGER.debug("Task Metrics: {}", (Object)metric);
            }
        }
        Map<JobVertexID, TaskCheckpointStatistics> taskCheckpointInfo = null;
        CheckpointStatistics completedCheckpointStats = null;
        long lastCheckpointTime = 0L;
        try {
            completedCheckpointStats = this.monitor.getRestServerClient().getLatestCheckPointStates(this.monitor.getJobID());
            if (completedCheckpointStats != null) {
                taskCheckpointInfo = completedCheckpointStats.getCheckpointStatisticsPerTask();
                lastCheckpointTime = completedCheckpointStats.getLatestAckTimestamp();
            }
        }
        catch (Exception exception) {
            // empty catch block
        }
        Map<JobVertexID, Integer> minParallelisms = this.getVertexMinParallelisms(this.monitor.getJobConfig(), taskCheckpointInfo, taskMetrics);
        LOGGER.debug("Min parallelism for vertices: {}", minParallelisms);
        if (taskMetrics != null) {
            Map<JobVertexID, Double> subDagScaleUpRatio = this.getSubDagScaleUpRatio(taskMetrics);
            Set<JobVertexID> vertexToDownScale = new HashSet<JobVertexID>();
            if (System.currentTimeMillis() - lastCheckpointTime < this.checkpointIntervalThreshold || !subDagScaleUpRatio.isEmpty()) {
                vertexToDownScale = this.getVertexToScaleDown(this.monitor.getJobConfig(), minParallelisms);
            }
            targetParallelisms = this.getVertexTargetParallelisms(subDagScaleUpRatio, vertexToDownScale, taskMetrics);
        } else {
            HashSet<JobVertexID> massiveTimerVertices = new HashSet<JobVertexID>();
            massiveTimerVertices.addAll(this.largeTimerCountSymptom.getJobVertexIDs());
            targetParallelisms = new HashMap();
            RestServerClient.JobConfig originJobConfig = this.monitor.getJobConfig();
            for (JobVertexID vertexID : massiveTimerVertices) {
                Tuple2<Long, Double> timerCount;
                if (!this.timerCountSubs.containsKey((Object)vertexID) || (timerCount = this.timerCountSubs.get((Object)vertexID).getValue()) == null) continue;
                LOGGER.debug("Current timer count {} for vertex {}.", timerCount.f1, (Object)vertexID);
                int targetParallelism = (int)Math.ceil((Double)timerCount.f1 / (double)this.monitor.getConfig().getLong(LargeTimerCountDetector.LARGE_TIMER_COUNT_THRESHOLD) * this.monitor.getConfig().getDouble(HealthMonitorOptions.TIMER_SCALE_RATIO));
                targetParallelism = targetParallelism > 1 ? targetParallelism : 1;
                RestServerClient.VertexConfig originVertexConfig = originJobConfig.getVertexConfigs().get((Object)vertexID);
                if (targetParallelism <= originVertexConfig.getParallelism()) continue;
                targetParallelisms.put((Object)vertexID, targetParallelism);
            }
        }
        LOGGER.debug("Target parallelism for vertices before applying constraints: {}.", targetParallelisms);
        if (targetParallelisms.isEmpty()) {
            return null;
        }
        this.updateTargetParallelismsSubjectToConstraints(targetParallelisms, minParallelisms, this.monitor.getJobConfig());
        LOGGER.debug("Target parallelism for vertices after applying constraints: {}.", targetParallelisms);
        RescaleJobParallelism rescaleJobParallelism = this.generateRescaleParallelismAction(targetParallelisms, minParallelisms, this.monitor.getJobConfig());
        if (rescaleJobParallelism != null && !rescaleJobParallelism.isEmpty()) {
            LOGGER.info("RescaleJobParallelism action generated: {}.", (Object)rescaleJobParallelism);
            return rescaleJobParallelism;
        }
        return null;
    }

    @VisibleForTesting
    public Set<JobVertexID> getVertexToScaleDown(RestServerClient.JobConfig jobConfig, Map<JobVertexID, Integer> minParallelisms) {
        HashSet<JobVertexID> vertexToDownScale = new HashSet<JobVertexID>();
        vertexToDownScale.clear();
        if (this.needScaleDown) {
            HashSet<JobVertexID> verticesToDownScale = new HashSet<JobVertexID>(this.overParallelizedSymptom.getJobVertexIDs());
            for (JobVertexID vertexId : verticesToDownScale) {
                if (minParallelisms != null && minParallelisms.get((Object)vertexId) != null && minParallelisms.get((Object)vertexId) >= jobConfig.getVertexConfigs().get((Object)vertexId).getParallelism()) continue;
                vertexToDownScale.add(vertexId);
            }
            if (this.checkAllInScaleDown && !vertexToDownScale.isEmpty()) {
                vertexToDownScale.addAll(jobConfig.getVertexConfigs().keySet());
            }
        }
        LOGGER.debug("Roots of sub-dags need to scale down: {}.", vertexToDownScale);
        return vertexToDownScale;
    }

    @VisibleForTesting
    public void parseSymptoms(List<Symptom> symptomList) {
        this.jobStableSymptom = null;
        this.frequentFullGCSymptom = null;
        this.longTimeFullGCSymptom = null;
        this.largeTimerCountSymptom = null;
        this.failoverSymptom = null;
        this.jobStuckSymptom = null;
        this.highDelaySymptom = null;
        this.delayIncreasingSymptom = null;
        this.backPressureSymptom = null;
        this.overParallelizedSymptom = null;
        for (Symptom symptom : symptomList) {
            if (symptom instanceof JobStable) {
                this.jobStableSymptom = (JobStable)symptom;
            }
            if (symptom instanceof JobVertexFrequentFullGC) {
                this.frequentFullGCSymptom = (JobVertexFrequentFullGC)symptom;
                LOGGER.debug("Frequent full gc detected for vertices {}.", this.frequentFullGCSymptom.getJobVertexIDs());
                continue;
            }
            if (symptom instanceof JobVertexLongTimeFullGC) {
                this.longTimeFullGCSymptom = (JobVertexLongTimeFullGC)symptom;
                LOGGER.debug("Long time full gc detected for vertices {}.", this.longTimeFullGCSymptom.getJobVertexIDs());
                continue;
            }
            if (symptom instanceof JobVertexLargeTimerCount) {
                this.largeTimerCountSymptom = (JobVertexLargeTimerCount)symptom;
                LOGGER.debug("Large timer count detected for vertices {}.", this.largeTimerCountSymptom.getJobVertexIDs());
                continue;
            }
            if (symptom instanceof JobVertexFailover) {
                this.failoverSymptom = (JobVertexFailover)symptom;
                LOGGER.debug("Failover detected for vertices {}.", this.failoverSymptom.getJobVertexIDs());
                continue;
            }
            if (symptom instanceof JobStuck) {
                this.jobStuckSymptom = (JobStuck)symptom;
                LOGGER.debug("Stuck detected for vertices {}.", this.jobStuckSymptom.getJobVertexIDs());
                continue;
            }
            if (symptom instanceof JobVertexHighDelay) {
                this.highDelaySymptom = (JobVertexHighDelay)symptom;
                LOGGER.debug("High delay detected for vertices {}.", this.highDelaySymptom.getJobVertexIDs());
                continue;
            }
            if (symptom instanceof JobVertexDelayIncreasing) {
                this.delayIncreasingSymptom = (JobVertexDelayIncreasing)symptom;
                LOGGER.debug("Delay increasing detected for vertices {}.", this.delayIncreasingSymptom.getJobVertexIDs());
                continue;
            }
            if (symptom instanceof JobVertexBackPressure) {
                this.backPressureSymptom = (JobVertexBackPressure)symptom;
                LOGGER.debug("Back pressure detected for vertices {}.", this.backPressureSymptom.getJobVertexIDs());
                continue;
            }
            if (!(symptom instanceof JobVertexOverParallelized)) continue;
            this.overParallelizedSymptom = (JobVertexOverParallelized)symptom;
            LOGGER.debug("Over parallelized detected for vertices {}.", this.overParallelizedSymptom.getJobVertexIDs());
        }
    }

    private boolean diagnose() {
        boolean bl = this.needScaleUpForMassiveTimer = this.largeTimerCountSymptom != null;
        if (this.needScaleUpForMassiveTimer) {
            LOGGER.debug("Job exists vertices require large number of timer, need rescale parallelism.");
            return true;
        }
        if (this.jobStableSymptom == null || this.jobStableSymptom.getStableTime() < this.stableTime || this.frequentFullGCSymptom != null && this.frequentFullGCSymptom.isSevere() || this.longTimeFullGCSymptom != null && this.longTimeFullGCSymptom.isSevere() || this.failoverSymptom != null) {
            LOGGER.debug("Job is not stable, should not rescale parallelism.");
            return false;
        }
        this.needScaleUpForDelay = this.highDelaySymptom != null || this.delayIncreasingSymptom != null;
        this.needScaleUpForBackpressure = this.backPressureSymptom != null;
        boolean bl2 = this.needScaleDown = this.overParallelizedSymptom != null && this.backPressureSymptom == null;
        if (!(this.needScaleUpForDelay || this.needScaleUpForBackpressure || this.needScaleDown)) {
            LOGGER.debug("No need to rescale parallelism.");
            return false;
        }
        return true;
    }

    @VisibleForTesting
    public Map<JobVertexID, Double> getSubDagScaleUpRatio(Map<JobVertexID, TaskMetrics> taskMetrics) {
        HashMap<JobVertexID, Double> subDagTargetTpsRatio = new HashMap<JobVertexID, Double>();
        HashSet<JobVertexID> subDagRootsToUpScale = new HashSet<JobVertexID>();
        if (this.needScaleUpForBackpressure) {
            HashSet<JobVertexID> backPressureVertices = new HashSet<JobVertexID>();
            backPressureVertices.addAll(this.backPressureSymptom.getJobVertexIDs());
            for (JobVertexID vertexID : backPressureVertices) {
                subDagRootsToUpScale.add(this.jobTopologyAnalyzer.getSubDagRoot(vertexID));
                subDagTargetTpsRatio.put(this.jobTopologyAnalyzer.getSubDagRoot(vertexID), this.scaleTpsRatio);
            }
        }
        if (this.needScaleUpForDelay) {
            HashSet<JobVertexID> verticesToUpScale = new HashSet<JobVertexID>();
            if (this.highDelaySymptom != null) {
                verticesToUpScale.addAll(this.highDelaySymptom.getJobVertexIDs());
            }
            if (this.delayIncreasingSymptom != null) {
                verticesToUpScale.addAll(this.delayIncreasingSymptom.getJobVertexIDs());
            }
            for (JobVertexID vertexId : verticesToUpScale) {
                subDagRootsToUpScale.add(this.jobTopologyAnalyzer.getSubDagRoot(vertexId));
                TaskMetrics metric = taskMetrics.get((Object)vertexId);
                double ratio = 1.0 / (1.0 - metric.getDelayIncreasingRate()) * this.scaleTpsRatio;
                if (metric.isParallelSource() && metric.getWorkload() > 0.0) {
                    double maxTps = 1.0 / Math.max(metric.getPartitionLatency(), metric.getTaskLatencyPerRecord() - metric.getWaitOutputPerRecord()) * metric.getPartitionCount();
                    if (this.highDelaySymptom != null && this.highDelaySymptom.getSevereJobVertexIDs().contains((Object)vertexId)) {
                        ratio = maxTps / metric.getInputTps();
                    } else if (maxTps / metric.getInputTps() * this.scaleTpsRatio < ratio) {
                        ratio = maxTps / metric.getInputTps() * this.scaleTpsRatio;
                    }
                }
                if (!(ratio > 1.0 + this.minDiffParallelismRatio)) continue;
                subDagTargetTpsRatio.put(this.jobTopologyAnalyzer.getSubDagRoot(vertexId), ratio);
            }
        }
        LOGGER.debug("Roots of sub-dags need to scale up: {}.", subDagRootsToUpScale);
        LOGGER.debug("Target scale up tps ratio for sub-dags before adjusting: {}", subDagTargetTpsRatio);
        boolean hasDagScaleUp = true;
        while (hasDagScaleUp) {
            hasDagScaleUp = false;
            for (JobVertexID root : this.jobTopologyAnalyzer.getAllSubDagRoots()) {
                for (JobVertexID upStream : this.jobTopologyAnalyzer.getInputs(root)) {
                    JobVertexID upStreamSubDagRoot = this.jobTopologyAnalyzer.getSubDagRoot(upStream);
                    if (!subDagTargetTpsRatio.containsKey((Object)upStreamSubDagRoot) || subDagTargetTpsRatio.containsKey((Object)root) && !((Double)subDagTargetTpsRatio.get((Object)root) < (Double)subDagTargetTpsRatio.get((Object)upStreamSubDagRoot))) continue;
                    subDagTargetTpsRatio.put(root, (Double)subDagTargetTpsRatio.get((Object)upStreamSubDagRoot));
                    hasDagScaleUp = true;
                }
            }
        }
        if (this.needScaleUpForMassiveTimer) {
            for (JobVertexID vertexID : this.largeTimerCountSymptom.getJobVertexIDs()) {
                JobVertexID rootID;
                double tartParallelism = taskMetrics.get((Object)vertexID).getTimerCount() / this.timerCountThreshold;
                double ratio = tartParallelism / taskMetrics.get((Object)vertexID).getWorkload();
                if (!(ratio > subDagTargetTpsRatio.getOrDefault((Object)(rootID = this.jobTopologyAnalyzer.getSubDagRoot(vertexID)), 1.0))) continue;
                subDagTargetTpsRatio.put(rootID, ratio);
            }
        }
        LOGGER.debug("Target scale up tps ratio for sub-dags after adjusting: {}.", subDagTargetTpsRatio);
        return subDagTargetTpsRatio;
    }

    @VisibleForTesting
    public Map<JobVertexID, Integer> getVertexTargetParallelisms(Map<JobVertexID, Double> subDagTargetTpsRatio, Set<JobVertexID> vertexToDownScale, Map<JobVertexID, TaskMetrics> taskMetrics) {
        RestServerClient.JobConfig jobConfig = this.monitor.getJobConfig();
        HashMap<JobVertexID, Integer> targetParallelisms = new HashMap<JobVertexID, Integer>();
        for (JobVertexID subDagRoot : subDagTargetTpsRatio.keySet()) {
            double ratio = subDagTargetTpsRatio.get((Object)subDagRoot);
            for (JobVertexID vertexId : this.jobTopologyAnalyzer.getSubDagVertices(subDagRoot)) {
                TaskMetrics metric = taskMetrics.get((Object)vertexId);
                if (!(metric.getWorkload() > 0.0) || !((double)jobConfig.getVertexConfigs().get((Object)vertexId).getParallelism() / metric.getWorkload() < ratio)) continue;
                targetParallelisms.put(vertexId, (int)Math.floor(metric.getWorkload() * ratio));
            }
        }
        for (JobVertexID vertexID : vertexToDownScale) {
            if (targetParallelisms.containsKey((Object)vertexID) || !(taskMetrics.get((Object)vertexID).getWorkload() > 0.0)) continue;
            targetParallelisms.put(vertexID, (int)Math.ceil(taskMetrics.get((Object)vertexID).getWorkload() * this.scaleTpsRatio));
        }
        HashSet<JobVertexID> adjustedVertex = new HashSet<JobVertexID>();
        for (JobVertexID vertexID : targetParallelisms.keySet()) {
            for (JobVertexID upStream : this.jobTopologyAnalyzer.getInputs(vertexID)) {
                if (adjustedVertex.contains((Object)vertexID) || this.jobTopologyAnalyzer.getOutputs(upStream).size() <= 1) continue;
                LOGGER.debug("adjusting vertex {} target parallelism: {} -> {}", new Object[]{vertexID, targetParallelisms.get((Object)vertexID), (int)Math.floor((double)((Integer)targetParallelisms.get((Object)vertexID)).intValue() * this.multiOutputRatio)});
                targetParallelisms.put(vertexID, (int)Math.floor((double)((Integer)targetParallelisms.get((Object)vertexID)).intValue() * this.multiOutputRatio));
                adjustedVertex.add(vertexID);
            }
        }
        return targetParallelisms;
    }

    @VisibleForTesting
    public Map<JobVertexID, Integer> getVertexMinParallelisms(RestServerClient.JobConfig jobConfig, Map<JobVertexID, TaskCheckpointStatistics> checkpointInfo, Map<JobVertexID, TaskMetrics> taskMetrics) {
        HashMap<JobVertexID, Integer> minParallelisms = new HashMap<JobVertexID, Integer>();
        for (JobVertexID vertexId : jobConfig.getVertexConfigs().keySet()) {
            double partitionCount;
            int minParallelism;
            minParallelisms.put(vertexId, 1);
            if (this.jobTopologyAnalyzer.isSource(vertexId) && this.sourcePartitionCountSubs.get((Object)vertexId).getValue() != null && (minParallelism = (int)Math.ceil((partitionCount = ((Double)this.sourcePartitionCountSubs.get((Object)((Object)vertexId)).getValue().f1).doubleValue()) / (double)this.maxPartitionPerTask)) > (Integer)minParallelisms.get((Object)vertexId)) {
                minParallelisms.put(vertexId, minParallelism);
            }
            if (checkpointInfo != null && checkpointInfo.containsKey((Object)vertexId)) {
                TaskCheckpointStatistics taskCheckpointStatistics = checkpointInfo.get((Object)vertexId);
                minParallelisms.put(vertexId, Math.max((Integer)minParallelisms.get((Object)vertexId), (int)Math.ceil(1.0 * (double)taskCheckpointStatistics.getFullStateSize() / (double)this.stateSizeThreshold)));
            }
            if (taskMetrics == null || !taskMetrics.containsKey((Object)vertexId)) continue;
            TaskMetrics taskMetric = taskMetrics.get((Object)vertexId);
            double minParallelism2 = taskMetric.getTimerCount() / this.timerCountThreshold;
            LOGGER.debug("Timer count constraint works: constraints {}, origin constraints {}.", (Object)Math.ceil(minParallelism2), minParallelisms.get((Object)vertexId));
            minParallelisms.put(vertexId, Math.max((Integer)minParallelisms.get((Object)vertexId), (int)Math.ceil(minParallelism2)));
        }
        return minParallelisms;
    }

    public void updateTargetParallelismsSubjectToConstraints(Map<JobVertexID, Integer> targetParallelisms, Map<JobVertexID, Integer> minParallelisms, RestServerClient.JobConfig jobConfig) {
        HashMap<JobVertexID, Set<JobVertexID>> epgLeader2Members = new HashMap<JobVertexID, Set<JobVertexID>>();
        HashMap<JobVertexID, JobVertexID> epgMember2Leader = new HashMap<JobVertexID, JobVertexID>();
        HashMap<JobVertexID, Integer> epgLeader2TargetParallelism = new HashMap<JobVertexID, Integer>();
        HashMap<JobVertexID, Integer> epgLeader2MaxParallelism = new HashMap<JobVertexID, Integer>();
        for (JobVertexID jobVertexID : jobConfig.getVertexConfigs().keySet()) {
            RestServerClient.VertexConfig vertexConfig = jobConfig.getVertexConfigs().get((Object)jobVertexID);
            int targetParallelism = vertexConfig.getParallelism();
            int maxParallelism = vertexConfig.getMaxParallelism();
            if (targetParallelisms.containsKey((Object)jobVertexID)) {
                targetParallelism = targetParallelisms.get((Object)jobVertexID);
                if (this.jobTopologyAnalyzer.isSource(jobVertexID) && this.sourcePartitionCountSubs.get((Object)jobVertexID).getValue() != null) {
                    double partitionCount = (Double)this.sourcePartitionCountSubs.get((Object)((Object)jobVertexID)).getValue().f1;
                    if (partitionCount / (double)targetParallelism > (double)this.maxPartitionPerTask) {
                        targetParallelism = (int)Math.ceil(partitionCount / (double)this.maxPartitionPerTask);
                    }
                    if (partitionCount > 0.0) {
                        targetParallelism = (int)Math.ceil(partitionCount / Math.max(1.0, Math.floor(partitionCount / (double)targetParallelism)));
                    }
                    if (partitionCount > 0.0 && (double)maxParallelism > partitionCount) {
                        maxParallelism = (int)partitionCount;
                    }
                }
            }
            if (minParallelisms != null && minParallelisms.containsKey((Object)jobVertexID) && targetParallelism < minParallelisms.get((Object)jobVertexID)) {
                targetParallelism = minParallelisms.get((Object)jobVertexID);
            }
            if (targetParallelism < 1) {
                targetParallelism = 1;
            }
            if (targetParallelism > maxParallelism && maxParallelism > 0) {
                targetParallelism = maxParallelism;
            }
            HashSet<JobVertexID> members = new HashSet<JobVertexID>();
            members.add(jobVertexID);
            epgLeader2Members.put(jobVertexID, members);
            epgMember2Leader.put(jobVertexID, jobVertexID);
            epgLeader2TargetParallelism.put(jobVertexID, targetParallelism);
            epgLeader2MaxParallelism.put(jobVertexID, maxParallelism);
        }
        HashMap colocationGroupId2Leader = new HashMap();
        for (JobVertexID vertexId : jobConfig.getVertexConfigs().keySet()) {
            RestServerClient.VertexConfig vertexConfig = jobConfig.getVertexConfigs().get((Object)vertexId);
            AbstractID colocationGroupId = vertexConfig.getColocationGroupId();
            if (colocationGroupId == null) continue;
            if (colocationGroupId2Leader.containsKey(colocationGroupId)) {
                JobVertexID currentGroupLeader = (JobVertexID)((Object)epgMember2Leader.get((Object)vertexId));
                JobVertexID targetGroupLeader = (JobVertexID)((Object)colocationGroupId2Leader.get(colocationGroupId));
                this.mergeEqualParallelismGroups(currentGroupLeader, targetGroupLeader, epgLeader2Members, epgMember2Leader, epgLeader2TargetParallelism, epgLeader2MaxParallelism);
                continue;
            }
            colocationGroupId2Leader.put(colocationGroupId, epgMember2Leader.get((Object)vertexId));
        }
        for (JobVertexID downStreamVertex : jobConfig.getInputNodes().keySet()) {
            for (Tuple2<JobVertexID, String> edge : jobConfig.getInputNodes().get((Object)downStreamVertex)) {
                JobVertexID upStreamVertex = (JobVertexID)((Object)edge.f0);
                String shipStrategy = (String)edge.f1;
                if (!shipStrategy.equals("FORWARD")) continue;
                JobVertexID currentGroupLeader = (JobVertexID)((Object)epgMember2Leader.get((Object)upStreamVertex));
                JobVertexID targetGroupLeader = (JobVertexID)((Object)epgMember2Leader.get((Object)downStreamVertex));
                this.mergeEqualParallelismGroups(currentGroupLeader, targetGroupLeader, epgLeader2Members, epgMember2Leader, epgLeader2TargetParallelism, epgLeader2MaxParallelism);
            }
        }
        if (this.checkRescale) {
            HashMap<JobVertexID, Set<JobVertexID>> hashMap = new HashMap<JobVertexID, Set<JobVertexID>>();
            HashMap<JobVertexID, JobVertexID> ppgMember2Leader = new HashMap<JobVertexID, JobVertexID>();
            HashMap<JobVertexID, Integer> ppgLeader2Base = new HashMap<JobVertexID, Integer>();
            HashMap<JobVertexID, Integer> ppgMember2Factor = new HashMap<JobVertexID, Integer>();
            for (JobVertexID downStreamVertex : jobConfig.getInputNodes().keySet()) {
                for (Tuple2<JobVertexID, String> edge : jobConfig.getInputNodes().get((Object)downStreamVertex)) {
                    HashSet<JobVertexID> members;
                    JobVertexID upStreamVertex = (JobVertexID)((Object)edge.f0);
                    String shipStrategy = (String)edge.f1;
                    if (!shipStrategy.equals("RESCALE")) continue;
                    JobVertexID upStreamEpg = (JobVertexID)((Object)epgMember2Leader.get((Object)upStreamVertex));
                    JobVertexID downStreamEpg = (JobVertexID)((Object)epgMember2Leader.get((Object)downStreamVertex));
                    if (ppgMember2Leader.containsKey((Object)upStreamEpg) && ppgMember2Leader.containsKey((Object)downStreamEpg) && ((JobVertexID)((Object)ppgMember2Leader.get((Object)upStreamEpg))).equals(ppgMember2Leader.get((Object)downStreamEpg))) continue;
                    if (!ppgMember2Leader.containsKey((Object)upStreamEpg)) {
                        members = new HashSet<JobVertexID>();
                        members.add(upStreamEpg);
                        hashMap.put(upStreamEpg, members);
                        ppgMember2Leader.put(upStreamEpg, upStreamEpg);
                        ppgLeader2Base.put(upStreamEpg, (Integer)epgLeader2TargetParallelism.get((Object)upStreamEpg));
                        ppgMember2Factor.put(upStreamEpg, 1);
                    }
                    if (!ppgMember2Leader.containsKey((Object)downStreamEpg)) {
                        members = new HashSet();
                        members.add(downStreamEpg);
                        hashMap.put(downStreamEpg, members);
                        ppgMember2Leader.put(downStreamEpg, downStreamEpg);
                        ppgLeader2Base.put(downStreamEpg, (Integer)epgLeader2TargetParallelism.get((Object)downStreamEpg));
                        ppgMember2Factor.put(downStreamEpg, 1);
                    }
                    this.mergeProportionalParallelismGroups(upStreamEpg, downStreamEpg, epgLeader2Members, epgLeader2TargetParallelism, epgLeader2MaxParallelism, hashMap, ppgMember2Leader, ppgLeader2Base, ppgMember2Factor, jobConfig);
                }
            }
        }
        targetParallelisms.clear();
        for (JobVertexID epgLeader : epgLeader2Members.keySet()) {
            int targetParallelism = (Integer)epgLeader2TargetParallelism.get((Object)epgLeader);
            for (JobVertexID vertexId : (Set)epgLeader2Members.get((Object)epgLeader)) {
                targetParallelisms.put(vertexId, targetParallelism);
            }
        }
    }

    private void mergeEqualParallelismGroups(JobVertexID currentGroupLeader, JobVertexID targetGroupLeader, Map<JobVertexID, Set<JobVertexID>> leader2Members, Map<JobVertexID, JobVertexID> member2Leader, Map<JobVertexID, Integer> leader2TargetParallelism, Map<JobVertexID, Integer> leader2MaxParallelism) {
        int maxParallelism;
        if (currentGroupLeader.equals((Object)targetGroupLeader)) {
            return;
        }
        int currentGroupTargetParallelism = leader2TargetParallelism.get((Object)currentGroupLeader);
        int currentGroupMaxParallelism = leader2MaxParallelism.get((Object)currentGroupLeader);
        int targetGroupTargetParallelism = leader2TargetParallelism.get((Object)targetGroupLeader);
        int targetGroupMaxParallelism = leader2MaxParallelism.get((Object)targetGroupLeader);
        int targetParallelism = Math.max(currentGroupTargetParallelism, targetGroupTargetParallelism);
        if (targetParallelism > (maxParallelism = Math.min(currentGroupMaxParallelism, targetGroupMaxParallelism))) {
            targetParallelism = maxParallelism;
        }
        leader2Members.get((Object)targetGroupLeader).addAll((Collection<JobVertexID>)leader2Members.get((Object)currentGroupLeader));
        leader2Members.get((Object)currentGroupLeader).forEach(member -> member2Leader.put((JobVertexID)((Object)member), targetGroupLeader));
        leader2TargetParallelism.put(targetGroupLeader, targetParallelism);
        leader2MaxParallelism.put(targetGroupLeader, maxParallelism);
        leader2Members.remove((Object)currentGroupLeader);
        leader2TargetParallelism.remove((Object)currentGroupLeader);
        leader2MaxParallelism.remove((Object)currentGroupLeader);
    }

    /*
     * WARNING - void declaration
     */
    @VisibleForTesting
    void mergeProportionalParallelismGroups(JobVertexID upStreamEpg, JobVertexID downStreamEpg, Map<JobVertexID, Set<JobVertexID>> epgLeader2Members, Map<JobVertexID, Integer> epgLeader2TargetParallelism, Map<JobVertexID, Integer> epgLeader2MaxParallelism, Map<JobVertexID, Set<JobVertexID>> ppgLeader2Members, Map<JobVertexID, JobVertexID> ppgMember2Leader, Map<JobVertexID, Integer> ppgLeader2Base, Map<JobVertexID, Integer> ppgMember2Factor, RestServerClient.JobConfig jobConfig) {
        int downStreamFactorRatio;
        BigInteger upStreamNewBase;
        boolean useUpStreamGreaterPlan;
        int downStreamGreaterK;
        int downStreamParallelism;
        int upStreamGreaterK;
        int upStreamParallelism;
        JobVertexID upStreamPpgLeader = ppgMember2Leader.get((Object)upStreamEpg);
        JobVertexID downStreamPpgLeader = ppgMember2Leader.get((Object)downStreamEpg);
        int upStreamBase = ppgLeader2Base.get((Object)upStreamPpgLeader);
        int upStreamFactor = ppgMember2Factor.get((Object)upStreamEpg);
        int downStreamBase = ppgLeader2Base.get((Object)downStreamPpgLeader);
        int downStreamFactor = ppgMember2Factor.get((Object)downStreamEpg);
        int upStreamGreaterUpStreamBaseIncrease = 0;
        block0: while (true) {
            upStreamParallelism = (upStreamBase + upStreamGreaterUpStreamBaseIncrease) * upStreamFactor;
            upStreamGreaterK = (int)Math.ceil(1.0 * (double)upStreamParallelism / (double)downStreamFactor / (double)downStreamBase);
            while (upStreamGreaterK > 0) {
                if (upStreamParallelism / downStreamFactor / upStreamGreaterK < downStreamBase) {
                    --upStreamGreaterK;
                    continue;
                }
                if (upStreamParallelism / downStreamFactor / upStreamGreaterK >= downStreamBase * 2) break;
                if (upStreamParallelism / downStreamFactor % upStreamGreaterK == 0) break block0;
                --upStreamGreaterK;
            }
            ++upStreamGreaterUpStreamBaseIncrease;
        }
        int upStreamGreaterDownStreamBaseIncrease = upStreamParallelism / downStreamFactor / upStreamGreaterK - downStreamBase;
        boolean upStreamGreaterFeasible = true;
        for (JobVertexID upStreamPpgMemberEpg : ppgLeader2Members.get((Object)upStreamPpgLeader)) {
            int upStreamPpgMemberParallelism = (upStreamBase + upStreamGreaterUpStreamBaseIncrease) * ppgMember2Factor.get((Object)upStreamPpgMemberEpg);
            if (upStreamPpgMemberParallelism <= epgLeader2MaxParallelism.get((Object)upStreamPpgMemberEpg)) continue;
            upStreamGreaterFeasible = false;
            break;
        }
        for (JobVertexID downStreamPpgMemberEpg : ppgLeader2Members.get((Object)downStreamPpgLeader)) {
            int downStreamPpgMemberParallelism = (downStreamBase + upStreamGreaterDownStreamBaseIncrease) * ppgMember2Factor.get((Object)downStreamPpgMemberEpg);
            if (downStreamPpgMemberParallelism <= epgLeader2MaxParallelism.get((Object)downStreamPpgMemberEpg)) continue;
            upStreamGreaterFeasible = false;
            break;
        }
        int downStreamGreaterDownStreamBaseIncrease = 0;
        block4: while (true) {
            downStreamParallelism = (downStreamBase + downStreamGreaterDownStreamBaseIncrease) * downStreamFactor;
            downStreamGreaterK = (int)Math.ceil(1.0 * (double)downStreamParallelism / (double)upStreamFactor / (double)upStreamBase);
            while (downStreamGreaterK > 0) {
                if (downStreamParallelism / upStreamFactor / downStreamGreaterK < upStreamBase) {
                    --downStreamGreaterK;
                    continue;
                }
                if (downStreamParallelism / upStreamFactor / downStreamGreaterK >= upStreamBase * 2) break;
                if (downStreamParallelism / upStreamFactor % downStreamGreaterK == 0) break block4;
                --downStreamGreaterK;
            }
            ++downStreamGreaterDownStreamBaseIncrease;
        }
        int downStreamGreaterUpStreamBaseIncrease = downStreamParallelism / upStreamFactor / downStreamGreaterK - upStreamBase;
        boolean downStreamGreaterFeasible = true;
        for (JobVertexID downStreamPpgMemberEpg : ppgLeader2Members.get((Object)downStreamPpgLeader)) {
            int downStreamPpgMemberParallelism = (downStreamBase + downStreamGreaterDownStreamBaseIncrease) * ppgMember2Factor.get((Object)downStreamPpgMemberEpg);
            if (downStreamPpgMemberParallelism <= epgLeader2MaxParallelism.get((Object)downStreamPpgMemberEpg)) continue;
            downStreamGreaterFeasible = false;
            break;
        }
        for (JobVertexID upStreamPpgMemberEpg : ppgLeader2Members.get((Object)upStreamPpgLeader)) {
            int upStreamPpgMemberParallelism = (upStreamBase + downStreamGreaterUpStreamBaseIncrease) * ppgMember2Factor.get((Object)upStreamPpgMemberEpg);
            if (upStreamPpgMemberParallelism <= epgLeader2MaxParallelism.get((Object)upStreamPpgMemberEpg)) continue;
            downStreamGreaterFeasible = false;
            break;
        }
        if (upStreamGreaterFeasible && downStreamGreaterFeasible) {
            int downStreamGreaterCostMem;
            int i;
            void var28_39;
            ResourceSpec upStreamBaseIncreaseCost = new ResourceSpec.Builder().build();
            for (JobVertexID jobVertexID : ppgLeader2Members.get((Object)upStreamPpgLeader)) {
                ResourceSpec memberEpgIncreaseCost = new ResourceSpec.Builder().build();
                for (JobVertexID upStreamPpgMemberVertex : epgLeader2Members.get((Object)jobVertexID)) {
                    memberEpgIncreaseCost = memberEpgIncreaseCost.sum(jobConfig.getVertexConfigs().get((Object)upStreamPpgMemberVertex).getResourceSpec());
                }
                for (int i3 = 0; i3 < ppgMember2Factor.get((Object)jobVertexID); ++i3) {
                    upStreamBaseIncreaseCost = upStreamBaseIncreaseCost.sum(memberEpgIncreaseCost);
                }
            }
            ResourceSpec downStreamBaseIncreaseCost = new ResourceSpec.Builder().build();
            for (JobVertexID downStreamPpgMemberEpg : ppgLeader2Members.get((Object)downStreamPpgLeader)) {
                ResourceSpec memberEpgIncreaseCost = new ResourceSpec.Builder().build();
                for (JobVertexID downStreamPpgMemberVertex : epgLeader2Members.get((Object)downStreamPpgMemberEpg)) {
                    memberEpgIncreaseCost = memberEpgIncreaseCost.sum(jobConfig.getVertexConfigs().get((Object)downStreamPpgMemberVertex).getResourceSpec());
                }
                for (int i4 = 0; i4 < ppgMember2Factor.get((Object)downStreamPpgMemberEpg); ++i4) {
                    downStreamBaseIncreaseCost = downStreamBaseIncreaseCost.sum(memberEpgIncreaseCost);
                }
            }
            ResourceSpec resourceSpec = new ResourceSpec.Builder().build();
            for (int i2 = 0; i2 < upStreamGreaterUpStreamBaseIncrease; ++i2) {
                void var28_37;
                ResourceSpec resourceSpec2 = var28_37.sum(upStreamBaseIncreaseCost);
            }
            for (int i2 = 0; i2 < upStreamGreaterDownStreamBaseIncrease; ++i2) {
                ResourceSpec resourceSpec3 = var28_39.sum(downStreamBaseIncreaseCost);
            }
            ResourceSpec downStreamGreaterCost = new ResourceSpec.Builder().build();
            for (i = 0; i < downStreamGreaterDownStreamBaseIncrease; ++i) {
                downStreamGreaterCost = downStreamGreaterCost.sum(downStreamBaseIncreaseCost);
            }
            for (i = 0; i < downStreamGreaterUpStreamBaseIncrease; ++i) {
                downStreamGreaterCost = downStreamGreaterCost.sum(upStreamBaseIncreaseCost);
            }
            int upStreamGreaterCostMem = var28_39.getHeapMemory() + var28_39.getDirectMemory() + var28_39.getNativeMemory();
            useUpStreamGreaterPlan = upStreamGreaterCostMem > (downStreamGreaterCostMem = downStreamGreaterCost.getHeapMemory() + downStreamGreaterCost.getDirectMemory() + downStreamGreaterCost.getNativeMemory()) ? false : (upStreamGreaterCostMem < downStreamGreaterCostMem ? true : !(var28_39.getCpuCores() > downStreamGreaterCost.getCpuCores()));
        } else if (upStreamGreaterFeasible) {
            useUpStreamGreaterPlan = true;
        } else if (downStreamGreaterFeasible) {
            useUpStreamGreaterPlan = false;
        } else {
            LOGGER.debug("Could not merge ProportionalParallelism Groups {} and {}.", ppgLeader2Members.get((Object)upStreamPpgLeader), ppgLeader2Members.get((Object)downStreamPpgLeader));
            return;
        }
        if (useUpStreamGreaterPlan) {
            upStreamNewBase = BigInteger.valueOf(upStreamBase + upStreamGreaterUpStreamBaseIncrease);
            BigInteger downStreamNewBase = BigInteger.valueOf(downStreamBase + upStreamGreaterDownStreamBaseIncrease);
            int n = upStreamNewBase.gcd(downStreamNewBase).intValue();
            int upStreamFactorRatio = (upStreamBase + upStreamGreaterUpStreamBaseIncrease) / n;
            downStreamFactorRatio = (downStreamBase + upStreamGreaterDownStreamBaseIncrease) / n;
            for (JobVertexID upStreamPpgMemberEpg : ppgLeader2Members.get((Object)upStreamPpgLeader)) {
                ppgMember2Leader.put(upStreamPpgMemberEpg, downStreamPpgLeader);
                ppgMember2Factor.put(upStreamPpgMemberEpg, ppgMember2Factor.get((Object)upStreamPpgMemberEpg) * upStreamFactorRatio);
            }
            for (JobVertexID downStreamPpgMemberEpg : ppgLeader2Members.get((Object)downStreamPpgLeader)) {
                ppgMember2Factor.put(downStreamPpgMemberEpg, ppgMember2Factor.get((Object)downStreamPpgMemberEpg) * downStreamFactorRatio);
            }
            ppgLeader2Members.get((Object)downStreamPpgLeader).addAll((Collection<JobVertexID>)ppgLeader2Members.get((Object)upStreamPpgLeader));
            ppgLeader2Members.remove((Object)upStreamPpgLeader);
            ppgLeader2Base.put(downStreamPpgLeader, n);
            ppgLeader2Base.remove((Object)upStreamPpgLeader);
            for (JobVertexID downStreamPpgMemberEpg : ppgLeader2Members.get((Object)downStreamPpgLeader)) {
                epgLeader2TargetParallelism.put(downStreamPpgMemberEpg, n * ppgMember2Factor.get((Object)downStreamPpgMemberEpg));
            }
        } else {
            upStreamNewBase = BigInteger.valueOf(upStreamBase + downStreamGreaterUpStreamBaseIncrease);
            BigInteger downStreamNewBase = BigInteger.valueOf(downStreamBase + downStreamGreaterDownStreamBaseIncrease);
            int n = upStreamNewBase.gcd(downStreamNewBase).intValue();
            int upStreamFactorRatio = (upStreamBase + downStreamGreaterUpStreamBaseIncrease) / n;
            downStreamFactorRatio = (downStreamBase + downStreamGreaterDownStreamBaseIncrease) / n;
            for (JobVertexID downStreamPpgMemberEpg : ppgLeader2Members.get((Object)downStreamPpgLeader)) {
                ppgMember2Leader.put(downStreamPpgMemberEpg, upStreamPpgLeader);
                ppgMember2Factor.put(downStreamPpgMemberEpg, ppgMember2Factor.get((Object)downStreamPpgMemberEpg) * downStreamFactorRatio);
            }
            for (JobVertexID upStreamPpgMemberEpg : ppgLeader2Members.get((Object)upStreamPpgLeader)) {
                ppgMember2Factor.put(upStreamPpgMemberEpg, ppgMember2Factor.get((Object)upStreamPpgMemberEpg) * upStreamFactorRatio);
            }
            ppgLeader2Members.get((Object)upStreamPpgLeader).addAll((Collection<JobVertexID>)ppgLeader2Members.get((Object)downStreamPpgLeader));
            ppgLeader2Members.remove((Object)downStreamPpgLeader);
            ppgLeader2Base.put(upStreamPpgLeader, n);
            ppgLeader2Base.remove((Object)downStreamPpgLeader);
            for (JobVertexID upStreamPpgMemberEpg : ppgLeader2Members.get((Object)upStreamPpgLeader)) {
                epgLeader2TargetParallelism.put(upStreamPpgMemberEpg, n * ppgMember2Factor.get((Object)upStreamPpgMemberEpg));
            }
        }
    }

    private RescaleJobParallelism generateRescaleParallelismAction(Map<JobVertexID, Integer> targetParallelisms, Map<JobVertexID, Integer> minParallelisms, RestServerClient.JobConfig jobConfig) {
        if (targetParallelisms.isEmpty()) {
            return null;
        }
        RescaleJobParallelism rescaleJobParallelism = new RescaleJobParallelism(this.jobID, this.timeout);
        for (JobVertexID vertexId : targetParallelisms.keySet()) {
            RestServerClient.VertexConfig vertexConfig = jobConfig.getVertexConfigs().get((Object)vertexId);
            rescaleJobParallelism.addVertex(vertexId, vertexConfig.getParallelism(), targetParallelisms.get((Object)vertexId), vertexConfig.getResourceSpec(), vertexConfig.getResourceSpec());
        }
        if (this.maxCpuLimit != Double.MAX_VALUE || this.maxMemoryLimit != Integer.MAX_VALUE) {
            RestServerClient.JobConfig targetJobConfig = rescaleJobParallelism.getAppliedJobConfig(jobConfig);
            double targetTotalCpu = targetJobConfig.getJobTotalCpuCores();
            int targetTotalMem = targetJobConfig.getJobTotalMemoryMb();
            if (targetTotalCpu > this.maxCpuLimit || targetTotalMem > this.maxMemoryLimit) {
                LOGGER.debug("Try to scale down parallelism: total resource of target job config <cpu, mem>=<{}, {}> exceed max limit <cpu, mem>=<{}, {}>.", new Object[]{targetTotalCpu, targetTotalMem, this.maxCpuLimit, this.maxMemoryLimit});
                RestServerClient.JobConfig adjustedJobConfig = MaxResourceLimitUtil.scaleDownJobConfigToMaxResourceLimit(targetJobConfig, minParallelisms, this.maxCpuLimit, this.maxMemoryLimit);
                if (adjustedJobConfig == null) {
                    LOGGER.debug("Give up adjusting.");
                    return null;
                }
                rescaleJobParallelism = new RescaleJobParallelism(this.jobID, this.timeout);
                for (JobVertexID vertexId : adjustedJobConfig.getVertexConfigs().keySet()) {
                    RestServerClient.VertexConfig originVertexConfig = jobConfig.getVertexConfigs().get((Object)vertexId);
                    RestServerClient.VertexConfig adjustedVertexConfig = adjustedJobConfig.getVertexConfigs().get((Object)vertexId);
                    rescaleJobParallelism.addVertex(vertexId, originVertexConfig.getParallelism(), adjustedVertexConfig.getParallelism(), originVertexConfig.getResourceSpec(), adjustedVertexConfig.getResourceSpec());
                }
            }
        }
        RestServerClient.JobConfig appliedJobConfig = rescaleJobParallelism.getAppliedJobConfig(jobConfig);
        LOGGER.debug("Resource applying generated action: <cpu, memory>=<{}, {}>.", (Object)appliedJobConfig.getJobTotalCpuCores(), (Object)appliedJobConfig.getJobTotalMemoryMb());
        rescaleJobParallelism.exculdeMinorDiffVertices(this.monitor.getConfig());
        return rescaleJobParallelism;
    }
}

