/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.healthmanager.plugins.resolvers;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.healthmanager.HealthMonitor;
import org.apache.flink.runtime.healthmanager.RestServerClient;
import org.apache.flink.runtime.healthmanager.metrics.MetricAggType;
import org.apache.flink.runtime.healthmanager.metrics.MetricProvider;
import org.apache.flink.runtime.healthmanager.metrics.TaskMetricSubscription;
import org.apache.flink.runtime.healthmanager.metrics.timeline.TimelineAggType;
import org.apache.flink.runtime.healthmanager.plugins.Action;
import org.apache.flink.runtime.healthmanager.plugins.Resolver;
import org.apache.flink.runtime.healthmanager.plugins.Symptom;
import org.apache.flink.runtime.healthmanager.plugins.actions.RescaleJobParallelism;
import org.apache.flink.runtime.healthmanager.plugins.detectors.LargeTimerCountDetector;
import org.apache.flink.runtime.healthmanager.plugins.symptoms.JobStable;
import org.apache.flink.runtime.healthmanager.plugins.symptoms.JobStuck;
import org.apache.flink.runtime.healthmanager.plugins.symptoms.JobVertexBackPressure;
import org.apache.flink.runtime.healthmanager.plugins.symptoms.JobVertexDelayIncreasing;
import org.apache.flink.runtime.healthmanager.plugins.symptoms.JobVertexFailover;
import org.apache.flink.runtime.healthmanager.plugins.symptoms.JobVertexFrequentFullGC;
import org.apache.flink.runtime.healthmanager.plugins.symptoms.JobVertexHighDelay;
import org.apache.flink.runtime.healthmanager.plugins.symptoms.JobVertexLargeTimerCount;
import org.apache.flink.runtime.healthmanager.plugins.symptoms.JobVertexLongTimeFullGC;
import org.apache.flink.runtime.healthmanager.plugins.symptoms.JobVertexOverParallelized;
import org.apache.flink.runtime.healthmanager.plugins.utils.HealthMonitorOptions;
import org.apache.flink.runtime.healthmanager.plugins.utils.MaxResourceLimitUtil;
import org.apache.flink.runtime.healthmanager.plugins.utils.MetricUtils;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatistics;
import org.apache.flink.runtime.rest.messages.checkpoints.TaskCheckpointStatistics;
import org.apache.flink.util.AbstractID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ParallelismScaler
implements Resolver {
    private static final Logger LOGGER = LoggerFactory.getLogger(ParallelismScaler.class);
    private JobID jobID;
    private HealthMonitor monitor;
    private MetricProvider metricProvider;
    private double scaleTpsRatio;
    private long timeout;
    private long checkInterval;
    private int maxPartitionPerTask;
    private long stableTime;
    private long stateSizeThreshold;
    private double maxCpuLimit;
    private int maxMemoryLimit;
    private long checkpointIntervalThreshold;
    private Map<JobVertexID, TaskMetricSubscription> inputTpsSubs;
    private Map<JobVertexID, TaskMetricSubscription> outputTpsSubs;
    private Map<JobVertexID, TaskMetricSubscription> timerCountSubs;
    private Map<JobVertexID, TaskMetricSubscription> taskLatencyCountRangeSubs;
    private Map<JobVertexID, TaskMetricSubscription> taskLatencySumRangeSubs;
    private Map<JobVertexID, TaskMetricSubscription> sourceLatencyCountRangeSubs;
    private Map<JobVertexID, TaskMetricSubscription> sourceLatencySumRangeSubs;
    private Map<JobVertexID, TaskMetricSubscription> sourceProcessLatencyCountRangeSubs;
    private Map<JobVertexID, TaskMetricSubscription> sourceProcessLatencySumRangeSubs;
    private Map<JobVertexID, TaskMetricSubscription> waitOutputCountRangeSubs;
    private Map<JobVertexID, TaskMetricSubscription> waitOutputSumRangeSubs;
    private Map<JobVertexID, TaskMetricSubscription> sourcePartitionCountSubs;
    private Map<JobVertexID, TaskMetricSubscription> sourcePartitionLatencyCountRangeSubs;
    private Map<JobVertexID, TaskMetricSubscription> sourcePartitionLatencySumRangeSubs;
    private Map<JobVertexID, TaskMetricSubscription> sourceDelayRateSubs;
    private JobVertexHighDelay highDelaySymptom;
    private JobVertexDelayIncreasing delayIncreasingSymptom;
    private JobVertexBackPressure backPressureSymptom;
    private JobVertexOverParallelized overParallelizedSymptom;
    private JobStable jobStableSymptom;
    private JobVertexFrequentFullGC frequentFullGCSymptom;
    private JobVertexLongTimeFullGC longTimeFullGCSymptom;
    private JobVertexLargeTimerCount largeTimerCountSymptom;
    private JobVertexFailover failoverSymptom;
    private JobStuck jobStuckSymptom;
    private boolean needScaleUpForDelay;
    private boolean needScaleUpForBackpressure;
    private boolean needScaleUpForMassiveTimer;
    private boolean needScaleDown;
    private Map<JobVertexID, List<JobVertexID>> subDagRoot2SubDagVertex;
    private Map<JobVertexID, JobVertexID> vertex2SubDagRoot;
    private Map<JobVertexID, List<JobVertexID>> subDagRoot2UpstreamVertices;
    private Map<JobVertexID, Boolean> isSink;
    private Map<JobVertexID, Boolean> isSource;

    @Override
    public void open(HealthMonitor monitor) {
        this.monitor = monitor;
        this.jobID = monitor.getJobID();
        this.metricProvider = monitor.getMetricProvider();
        this.scaleTpsRatio = monitor.getConfig().getDouble(HealthMonitorOptions.PARALLELISM_MIN_RATIO);
        this.timeout = monitor.getConfig().getLong(HealthMonitorOptions.PARALLELISM_SCALE_TIME_OUT);
        this.checkInterval = monitor.getConfig().getLong(HealthMonitorOptions.PARALLELISM_SCALE_INTERVAL);
        this.maxPartitionPerTask = monitor.getConfig().getInteger(HealthMonitorOptions.MAX_PARTITION_PER_TASK);
        this.stableTime = monitor.getConfig().getLong(HealthMonitorOptions.PARALLELISM_SCALE_STABLE_TIME);
        this.stateSizeThreshold = monitor.getConfig().getLong(HealthMonitorOptions.PARALLELISM_SCALE_STATE_SIZE_THRESHOLD);
        this.checkpointIntervalThreshold = monitor.getConfig().getLong(HealthMonitorOptions.PARALLELISM_SCALE_CHECKPOINT_THRESHOLD);
        this.maxCpuLimit = MaxResourceLimitUtil.getMaxCpu(monitor.getConfig());
        this.maxMemoryLimit = MaxResourceLimitUtil.getMaxMem(monitor.getConfig());
        this.inputTpsSubs = new HashMap<JobVertexID, TaskMetricSubscription>();
        this.outputTpsSubs = new HashMap<JobVertexID, TaskMetricSubscription>();
        this.timerCountSubs = new HashMap<JobVertexID, TaskMetricSubscription>();
        this.taskLatencyCountRangeSubs = new HashMap<JobVertexID, TaskMetricSubscription>();
        this.taskLatencySumRangeSubs = new HashMap<JobVertexID, TaskMetricSubscription>();
        this.sourceLatencyCountRangeSubs = new HashMap<JobVertexID, TaskMetricSubscription>();
        this.sourceLatencySumRangeSubs = new HashMap<JobVertexID, TaskMetricSubscription>();
        this.sourceProcessLatencyCountRangeSubs = new HashMap<JobVertexID, TaskMetricSubscription>();
        this.sourceProcessLatencySumRangeSubs = new HashMap<JobVertexID, TaskMetricSubscription>();
        this.waitOutputCountRangeSubs = new HashMap<JobVertexID, TaskMetricSubscription>();
        this.waitOutputSumRangeSubs = new HashMap<JobVertexID, TaskMetricSubscription>();
        this.sourcePartitionCountSubs = new HashMap<JobVertexID, TaskMetricSubscription>();
        this.sourcePartitionLatencyCountRangeSubs = new HashMap<JobVertexID, TaskMetricSubscription>();
        this.sourcePartitionLatencySumRangeSubs = new HashMap<JobVertexID, TaskMetricSubscription>();
        this.sourceDelayRateSubs = new HashMap<JobVertexID, TaskMetricSubscription>();
        RestServerClient.JobConfig jobConfig = monitor.getJobConfig();
        this.analyzeJobGraph(jobConfig);
        for (JobVertexID vertexId : jobConfig.getVertexConfigs().keySet()) {
            TaskMetricSubscription inputTpsSub = this.metricProvider.subscribeTaskMetric(this.jobID, vertexId, "numRecordsReceived", MetricAggType.SUM, this.checkInterval, TimelineAggType.RATE);
            this.inputTpsSubs.put(vertexId, inputTpsSub);
            TaskMetricSubscription outputTps = this.metricProvider.subscribeTaskMetric(this.jobID, vertexId, "numRecordsSent", MetricAggType.SUM, this.checkInterval, TimelineAggType.RATE);
            this.outputTpsSubs.put(vertexId, outputTps);
            TaskMetricSubscription timerCount = this.metricProvider.subscribeTaskMetric(this.jobID, vertexId, "TimerCount.Total", MetricAggType.SUM, 1L, TimelineAggType.LATEST);
            this.timerCountSubs.put(vertexId, timerCount);
            if (this.isSource.get((Object)vertexId).booleanValue()) {
                this.sourceLatencyCountRangeSubs.put(vertexId, this.metricProvider.subscribeTaskMetric(this.jobID, vertexId, "sourceLatency.count", MetricAggType.SUM, this.checkInterval, TimelineAggType.LATEST));
                this.sourceLatencySumRangeSubs.put(vertexId, this.metricProvider.subscribeTaskMetric(this.jobID, vertexId, "sourceLatency.sum", MetricAggType.SUM, this.checkInterval, TimelineAggType.LATEST));
                this.sourcePartitionCountSubs.put(vertexId, this.metricProvider.subscribeTaskMetric(this.jobID, vertexId, "partitionCount", MetricAggType.SUM, this.checkInterval, TimelineAggType.LATEST));
                this.sourcePartitionLatencyCountRangeSubs.put(vertexId, this.metricProvider.subscribeTaskMetric(this.jobID, vertexId, "partitionLatency.count", MetricAggType.SUM, this.checkInterval, TimelineAggType.RANGE));
                this.sourcePartitionLatencySumRangeSubs.put(vertexId, this.metricProvider.subscribeTaskMetric(this.jobID, vertexId, "partitionLatency.sum", MetricAggType.SUM, this.checkInterval, TimelineAggType.RANGE));
                this.sourceProcessLatencyCountRangeSubs.put(vertexId, this.metricProvider.subscribeTaskMetric(this.jobID, vertexId, "sourceProcessLatency.count", MetricAggType.SUM, this.checkInterval, TimelineAggType.LATEST));
                this.sourceProcessLatencySumRangeSubs.put(vertexId, this.metricProvider.subscribeTaskMetric(this.jobID, vertexId, "sourceProcessLatency.sum", MetricAggType.SUM, this.checkInterval, TimelineAggType.LATEST));
                this.sourceDelayRateSubs.put(vertexId, this.metricProvider.subscribeTaskMetric(this.jobID, vertexId, "fetched_delay", MetricAggType.AVG, this.checkInterval, TimelineAggType.RATE));
            }
            TaskMetricSubscription latencyCountRangeSub = this.metricProvider.subscribeTaskMetric(this.jobID, vertexId, "taskLatency.count", MetricAggType.SUM, this.checkInterval, TimelineAggType.LATEST);
            this.taskLatencyCountRangeSubs.put(vertexId, latencyCountRangeSub);
            TaskMetricSubscription latencySumRangeSub = this.metricProvider.subscribeTaskMetric(this.jobID, vertexId, "taskLatency.sum", MetricAggType.SUM, this.checkInterval, TimelineAggType.LATEST);
            this.taskLatencySumRangeSubs.put(vertexId, latencySumRangeSub);
            TaskMetricSubscription waitOutputCountRangeSub = this.metricProvider.subscribeTaskMetric(this.jobID, vertexId, "waitOutput.count", MetricAggType.SUM, this.checkInterval, TimelineAggType.LATEST);
            this.waitOutputCountRangeSubs.put(vertexId, waitOutputCountRangeSub);
            TaskMetricSubscription waitOutputSumRangeSub = this.metricProvider.subscribeTaskMetric(this.jobID, vertexId, "waitOutput.sum", MetricAggType.SUM, this.checkInterval, TimelineAggType.LATEST);
            this.waitOutputSumRangeSubs.put(vertexId, waitOutputSumRangeSub);
        }
    }

    @Override
    public void close() {
        if (this.metricProvider == null) {
            return;
        }
        if (this.inputTpsSubs != null) {
            for (TaskMetricSubscription sub : this.inputTpsSubs.values()) {
                this.metricProvider.unsubscribe(sub);
            }
        }
        if (this.outputTpsSubs != null) {
            for (TaskMetricSubscription sub : this.outputTpsSubs.values()) {
                this.metricProvider.unsubscribe(sub);
            }
        }
        if (this.timerCountSubs != null) {
            for (TaskMetricSubscription sub : this.timerCountSubs.values()) {
                this.metricProvider.unsubscribe(sub);
            }
        }
        if (this.sourceLatencyCountRangeSubs != null) {
            for (TaskMetricSubscription sub : this.sourceLatencyCountRangeSubs.values()) {
                this.metricProvider.unsubscribe(sub);
            }
        }
        if (this.sourceLatencySumRangeSubs != null) {
            for (TaskMetricSubscription sub : this.sourceLatencySumRangeSubs.values()) {
                this.metricProvider.unsubscribe(sub);
            }
        }
        if (this.sourcePartitionCountSubs != null) {
            for (TaskMetricSubscription sub : this.sourcePartitionCountSubs.values()) {
                this.metricProvider.unsubscribe(sub);
            }
        }
        if (this.sourcePartitionLatencyCountRangeSubs != null) {
            for (TaskMetricSubscription sub : this.sourcePartitionLatencyCountRangeSubs.values()) {
                this.metricProvider.unsubscribe(sub);
            }
        }
        if (this.sourcePartitionLatencySumRangeSubs != null) {
            for (TaskMetricSubscription sub : this.sourcePartitionLatencySumRangeSubs.values()) {
                this.metricProvider.unsubscribe(sub);
            }
        }
        if (this.taskLatencyCountRangeSubs != null) {
            for (TaskMetricSubscription sub : this.taskLatencyCountRangeSubs.values()) {
                this.metricProvider.unsubscribe(sub);
            }
        }
        if (this.taskLatencySumRangeSubs != null) {
            for (TaskMetricSubscription sub : this.taskLatencySumRangeSubs.values()) {
                this.metricProvider.unsubscribe(sub);
            }
        }
        if (this.waitOutputCountRangeSubs != null) {
            for (TaskMetricSubscription sub : this.waitOutputCountRangeSubs.values()) {
                this.metricProvider.unsubscribe(sub);
            }
        }
        if (this.waitOutputSumRangeSubs != null) {
            for (TaskMetricSubscription sub : this.waitOutputSumRangeSubs.values()) {
                this.metricProvider.unsubscribe(sub);
            }
        }
    }

    @VisibleForTesting
    public void setMonitor(HealthMonitor monitor) {
        this.monitor = monitor;
    }

    @Override
    public Action resolve(List<Symptom> symptomList) {
        HashMap<JobVertexID, Integer> targetParallelisms;
        LOGGER.debug("Start resolving.");
        this.parseSymptoms(symptomList);
        if (!this.diagnose()) {
            return null;
        }
        this.analyzeJobGraph(this.monitor.getJobConfig());
        Map<JobVertexID, TaskMetrics> taskMetrics = this.prepareTaskMetrics();
        if (taskMetrics == null && !this.needScaleUpForMassiveTimer) {
            LOGGER.debug("Can not rescale, metrics are not completed.");
            return null;
        }
        Map<JobVertexID, TaskCheckpointStatistics> taskCheckpointInfo = null;
        CheckpointStatistics completedCheckpointStats = null;
        long lastCheckpointTime = 0L;
        try {
            completedCheckpointStats = this.monitor.getRestServerClient().getLatestCheckPointStates(this.monitor.getJobID());
            if (completedCheckpointStats != null) {
                taskCheckpointInfo = completedCheckpointStats.getCheckpointStatisticsPerTask();
                lastCheckpointTime = completedCheckpointStats.getLatestAckTimestamp();
            }
        }
        catch (Exception exception) {
            // empty catch block
        }
        if (taskMetrics != null) {
            Map<JobVertexID, Double> subDagScaleUpRatio = this.getSubDagScaleUpRatio(taskMetrics);
            HashSet<JobVertexID> vertexToDownScale = new HashSet();
            if (System.currentTimeMillis() - lastCheckpointTime < this.checkpointIntervalThreshold) {
                vertexToDownScale = this.getVertexToScaleDown();
            }
            targetParallelisms = this.getVertexTargetParallelisms(subDagScaleUpRatio, vertexToDownScale, taskMetrics);
        } else {
            HashSet<JobVertexID> massiveTimerVertices = new HashSet<JobVertexID>();
            massiveTimerVertices.addAll(this.largeTimerCountSymptom.getJobVertexIDs());
            targetParallelisms = new HashMap();
            RestServerClient.JobConfig originJobConfig = this.monitor.getJobConfig();
            for (JobVertexID vertexID : massiveTimerVertices) {
                Tuple2<Long, Double> timerCount;
                if (!this.timerCountSubs.containsKey((Object)vertexID) || (timerCount = this.timerCountSubs.get((Object)vertexID).getValue()) == null) continue;
                LOGGER.debug("Current timer count {} for vertex {}.", timerCount.f1, (Object)vertexID);
                int targetParallelism = (int)Math.ceil((Double)timerCount.f1 / (double)this.monitor.getConfig().getLong(LargeTimerCountDetector.LARGE_TIMER_COUNT_THRESHOLD) * this.monitor.getConfig().getDouble(HealthMonitorOptions.TIMER_SCALE_RATIO));
                targetParallelism = targetParallelism > 1 ? targetParallelism : 1;
                RestServerClient.VertexConfig originVertexConfig = originJobConfig.getVertexConfigs().get((Object)vertexID);
                if (targetParallelism <= originVertexConfig.getParallelism()) continue;
                targetParallelisms.put(vertexID, targetParallelism);
            }
        }
        LOGGER.debug("Target parallelism for vertices before applying constraints: {}.", targetParallelisms);
        Map<JobVertexID, Integer> minParallelisms = this.getVertexMinParallelisms(this.monitor.getJobConfig(), taskCheckpointInfo, taskMetrics);
        LOGGER.debug("Min parallelism for vertices: {}", minParallelisms);
        this.updateTargetParallelismsSubjectToConstraints(targetParallelisms, minParallelisms, this.monitor.getJobConfig());
        LOGGER.debug("Target parallelism for vertices after applying constraints: {}.", targetParallelisms);
        RescaleJobParallelism rescaleJobParallelism = this.generateRescaleParallelismAction(targetParallelisms, minParallelisms, this.monitor.getJobConfig());
        if (rescaleJobParallelism != null && !rescaleJobParallelism.isEmpty()) {
            LOGGER.info("RescaleJobParallelism action generated: {}.", (Object)rescaleJobParallelism);
            return rescaleJobParallelism;
        }
        return null;
    }

    private Set<JobVertexID> getVertexToScaleDown() {
        HashSet<JobVertexID> vertexToDownScale = new HashSet<JobVertexID>();
        vertexToDownScale.clear();
        if (this.needScaleDown) {
            HashSet<JobVertexID> verticesToDownScale = new HashSet<JobVertexID>(this.overParallelizedSymptom.getJobVertexIDs());
            for (JobVertexID vertexId : verticesToDownScale) {
                vertexToDownScale.add(vertexId);
            }
        }
        LOGGER.debug("Roots of sub-dags need to scale down: {}.", vertexToDownScale);
        return vertexToDownScale;
    }

    @VisibleForTesting
    public void parseSymptoms(List<Symptom> symptomList) {
        this.jobStableSymptom = null;
        this.frequentFullGCSymptom = null;
        this.longTimeFullGCSymptom = null;
        this.largeTimerCountSymptom = null;
        this.failoverSymptom = null;
        this.jobStuckSymptom = null;
        this.highDelaySymptom = null;
        this.delayIncreasingSymptom = null;
        this.backPressureSymptom = null;
        this.overParallelizedSymptom = null;
        for (Symptom symptom : symptomList) {
            if (symptom instanceof JobStable) {
                this.jobStableSymptom = (JobStable)symptom;
            }
            if (symptom instanceof JobVertexFrequentFullGC) {
                this.frequentFullGCSymptom = (JobVertexFrequentFullGC)symptom;
                LOGGER.debug("Frequent full gc detected for vertices {}.", this.frequentFullGCSymptom.getJobVertexIDs());
                continue;
            }
            if (symptom instanceof JobVertexLongTimeFullGC) {
                this.longTimeFullGCSymptom = (JobVertexLongTimeFullGC)symptom;
                LOGGER.debug("Long time full gc detected for vertices {}.", this.longTimeFullGCSymptom.getJobVertexIDs());
                continue;
            }
            if (symptom instanceof JobVertexLargeTimerCount) {
                this.largeTimerCountSymptom = (JobVertexLargeTimerCount)symptom;
                LOGGER.debug("Large timer count detected for vertices {}.", this.largeTimerCountSymptom.getJobVertexIDs());
                continue;
            }
            if (symptom instanceof JobVertexFailover) {
                this.failoverSymptom = (JobVertexFailover)symptom;
                LOGGER.debug("Failover detected for vertices {}.", this.failoverSymptom.getJobVertexIDs());
                continue;
            }
            if (symptom instanceof JobStuck) {
                this.jobStuckSymptom = (JobStuck)symptom;
                LOGGER.debug("Stuck detected for vertices {}.", this.jobStuckSymptom.getJobVertexIDs());
                continue;
            }
            if (symptom instanceof JobVertexHighDelay) {
                this.highDelaySymptom = (JobVertexHighDelay)symptom;
                LOGGER.debug("High delay detected for vertices {}.", this.highDelaySymptom.getJobVertexIDs());
                continue;
            }
            if (symptom instanceof JobVertexDelayIncreasing) {
                this.delayIncreasingSymptom = (JobVertexDelayIncreasing)symptom;
                LOGGER.debug("Delay increasing detected for vertices {}.", this.delayIncreasingSymptom.getJobVertexIDs());
                continue;
            }
            if (symptom instanceof JobVertexBackPressure) {
                this.backPressureSymptom = (JobVertexBackPressure)symptom;
                LOGGER.debug("Back pressure detected for vertices {}.", this.backPressureSymptom.getJobVertexIDs());
                continue;
            }
            if (!(symptom instanceof JobVertexOverParallelized)) continue;
            this.overParallelizedSymptom = (JobVertexOverParallelized)symptom;
            LOGGER.debug("Over parallelized detected for vertices {}.", this.overParallelizedSymptom.getJobVertexIDs());
        }
    }

    private boolean diagnose() {
        boolean bl = this.needScaleUpForMassiveTimer = this.largeTimerCountSymptom != null;
        if (this.needScaleUpForMassiveTimer) {
            LOGGER.debug("Job exists vertices require large number of timer, need rescale parallelism.");
            return true;
        }
        if (this.jobStableSymptom == null || this.jobStableSymptom.getStableTime() < this.stableTime || this.frequentFullGCSymptom != null && this.frequentFullGCSymptom.isSevere() || this.longTimeFullGCSymptom != null && this.longTimeFullGCSymptom.isSevere() || this.failoverSymptom != null) {
            LOGGER.debug("Job is not stable, should not rescale parallelism.");
            return false;
        }
        this.needScaleUpForDelay = this.highDelaySymptom != null || this.delayIncreasingSymptom != null;
        this.needScaleUpForBackpressure = this.backPressureSymptom != null;
        boolean bl2 = this.needScaleDown = this.overParallelizedSymptom != null && this.backPressureSymptom == null;
        if (!(this.needScaleUpForDelay || this.needScaleUpForBackpressure || this.needScaleDown)) {
            LOGGER.debug("No need to rescale parallelism.");
            return false;
        }
        return true;
    }

    public void analyzeJobGraph(RestServerClient.JobConfig jobConfig) {
        this.subDagRoot2SubDagVertex = new HashMap<JobVertexID, List<JobVertexID>>();
        this.vertex2SubDagRoot = new HashMap<JobVertexID, JobVertexID>();
        this.subDagRoot2UpstreamVertices = new HashMap<JobVertexID, List<JobVertexID>>();
        this.isSink = new HashMap<JobVertexID, Boolean>();
        this.isSource = new HashMap<JobVertexID, Boolean>();
        for (JobVertexID vertexId : jobConfig.getVertexConfigs().keySet()) {
            this.subDagRoot2SubDagVertex.put(vertexId, new ArrayList());
            this.subDagRoot2SubDagVertex.get((Object)vertexId).add(vertexId);
            this.vertex2SubDagRoot.put(vertexId, vertexId);
            this.subDagRoot2UpstreamVertices.put(vertexId, new ArrayList());
            this.isSink.put(vertexId, true);
            this.isSource.put(vertexId, false);
        }
        for (JobVertexID vertexId : jobConfig.getInputNodes().keySet()) {
            List<Tuple2<JobVertexID, String>> upstreamVertices = jobConfig.getInputNodes().get((Object)vertexId);
            if (upstreamVertices.isEmpty()) {
                this.isSource.put(vertexId, true);
                continue;
            }
            if (upstreamVertices.size() == 1) {
                JobVertexID upstreamVertex = (JobVertexID)((Object)upstreamVertices.get((int)0).f0);
                JobVertexID upstreamSubDagRoot = this.vertex2SubDagRoot.get((Object)upstreamVertex);
                for (JobVertexID subDagVertex : this.subDagRoot2SubDagVertex.get((Object)vertexId)) {
                    this.subDagRoot2SubDagVertex.get((Object)upstreamSubDagRoot).add(subDagVertex);
                    this.vertex2SubDagRoot.put(subDagVertex, upstreamSubDagRoot);
                }
                this.subDagRoot2SubDagVertex.remove((Object)vertexId);
                this.subDagRoot2UpstreamVertices.remove((Object)vertexId);
                this.isSink.put(upstreamVertex, false);
                continue;
            }
            for (Tuple2<JobVertexID, String> upstreamVertex : upstreamVertices) {
                this.subDagRoot2UpstreamVertices.get((Object)vertexId).add((JobVertexID)((Object)upstreamVertex.f0));
                this.isSink.put((JobVertexID)((Object)upstreamVertex.f0), false);
            }
        }
    }

    private Map<JobVertexID, TaskMetrics> prepareTaskMetrics() {
        long now = System.currentTimeMillis();
        RestServerClient.JobConfig jobConfig = this.monitor.getJobConfig();
        if (jobConfig == null) {
            return null;
        }
        HashMap<JobVertexID, TaskMetrics> metrics = new HashMap<JobVertexID, TaskMetrics>();
        for (JobVertexID vertexId : jobConfig.getVertexConfigs().keySet()) {
            TaskMetricSubscription inputTpsSub = this.inputTpsSubs.get((Object)vertexId);
            TaskMetricSubscription outputTpsSub = this.outputTpsSubs.get((Object)vertexId);
            TaskMetricSubscription timerCountSub = this.timerCountSubs.get((Object)vertexId);
            TaskMetricSubscription sourceLatencyCountRangeSub = this.sourceLatencyCountRangeSubs.get((Object)vertexId);
            TaskMetricSubscription sourceLatencySumRangeSub = this.sourceLatencySumRangeSubs.get((Object)vertexId);
            TaskMetricSubscription sourceProcessLatencyCountRangeSub = this.sourceProcessLatencyCountRangeSubs.get((Object)vertexId);
            TaskMetricSubscription sourceProcessLatencySumRangeSub = this.sourceProcessLatencySumRangeSubs.get((Object)vertexId);
            TaskMetricSubscription taskLatencyCountRangeSub = this.taskLatencyCountRangeSubs.get((Object)vertexId);
            TaskMetricSubscription taskLatencySumRangeSub = this.taskLatencySumRangeSubs.get((Object)vertexId);
            TaskMetricSubscription waitOutputCountRangeSub = this.waitOutputCountRangeSubs.get((Object)vertexId);
            TaskMetricSubscription waitOutputSumRangeSub = this.waitOutputSumRangeSubs.get((Object)vertexId);
            TaskMetricSubscription sourcePartitionCountSub = this.sourcePartitionCountSubs.get((Object)vertexId);
            TaskMetricSubscription sourcePartitionLatencyCountRangeSub = this.sourcePartitionLatencyCountRangeSubs.get((Object)vertexId);
            TaskMetricSubscription sourcePartitionLatencySumRangeSub = this.sourcePartitionLatencySumRangeSubs.get((Object)vertexId);
            TaskMetricSubscription sourceDelayIncreasingRateSub = this.sourceDelayRateSubs.get((Object)vertexId);
            if (inputTpsSub.getValue() == null || now - (Long)inputTpsSub.getValue().f0 > this.checkInterval * 2L || taskLatencyCountRangeSub.getValue() == null || now - (Long)taskLatencyCountRangeSub.getValue().f0 > this.checkInterval * 2L || taskLatencySumRangeSub.getValue() == null || now - (Long)taskLatencySumRangeSub.getValue().f0 > this.checkInterval * 2L || timerCountSub.getValue() == null || now - (Long)timerCountSub.getValue().f0 > this.checkInterval * 2L) {
                LOGGER.debug("input metric missing " + (Object)((Object)vertexId));
                LOGGER.debug("input tps " + inputTpsSub.getValue() + ", task latency count range " + taskLatencyCountRangeSub.getValue() + ", task latency sum range " + taskLatencySumRangeSub.getValue() + ", timer count " + timerCountSub.getValue());
                return null;
            }
            if (!this.isSink.get((Object)vertexId).booleanValue() && !MetricUtils.validateTaskMetric(this.monitor, this.checkInterval * 2L, outputTpsSub, waitOutputCountRangeSub, waitOutputSumRangeSub)) {
                LOGGER.debug("output metric missing " + (Object)((Object)vertexId));
                LOGGER.debug("output tps " + outputTpsSub.getValue() + "wait output count range " + waitOutputCountRangeSub.getValue() + "wait output sum range " + waitOutputSumRangeSub.getValue());
                return null;
            }
            if (this.isSource.get((Object)vertexId).booleanValue() && !MetricUtils.validateTaskMetric(this.monitor, this.checkInterval * 2L, sourceLatencyCountRangeSub, sourceLatencySumRangeSub)) {
                LOGGER.debug("input metric missing for source " + (Object)((Object)vertexId));
                LOGGER.debug("source latency count range " + sourceLatencyCountRangeSub.getValue() + "source latency sum range " + sourceLatencySumRangeSub.getValue());
                return null;
            }
            boolean isParallelReader = false;
            if (this.isSource.get((Object)vertexId).booleanValue()) {
                isParallelReader = MetricUtils.validateTaskMetric(this.monitor, this.checkInterval * 2L, sourcePartitionCountSub, sourcePartitionLatencyCountRangeSub, sourcePartitionLatencySumRangeSub) && (Double)sourcePartitionCountSub.getValue().f1 > 0.0;
                LOGGER.debug("Treat vertex {} as {} reader.", (Object)vertexId, (Object)(isParallelReader ? "parallel" : "non-parallel"));
                LOGGER.debug("source partition count " + sourcePartitionCountSub.getValue() + " source partition latency count range " + sourcePartitionLatencyCountRangeSub.getValue() + " source partition latency sum range " + sourcePartitionLatencySumRangeSub.getValue());
            }
            double inputTps = (Double)inputTpsSub.getValue().f1;
            double outputTps = (Double)outputTpsSub.getValue().f1;
            double timerCount = (Double)timerCountSub.getValue().f1;
            double taskLatencyCount = (Double)taskLatencyCountRangeSub.getValue().f1;
            double taskLatencySum = (Double)taskLatencySumRangeSub.getValue().f1;
            double taskLatency = taskLatencyCount <= 0.0 ? 0.0 : taskLatencySum / taskLatencyCount / 1.0E9;
            double sourceLatency = 0.0;
            if (this.isSource.get((Object)vertexId).booleanValue()) {
                double sourceLatencyCount = (Double)sourceLatencyCountRangeSub.getValue().f1;
                double sourceLatencySum = (Double)sourceLatencySumRangeSub.getValue().f1;
                sourceLatency = sourceLatencyCount <= 0.0 ? 0.0 : sourceLatencySum / sourceLatencyCount / 1.0E9;
            }
            double waitOutput = 0.0;
            if (!this.isSink.get((Object)vertexId).booleanValue()) {
                double waitOutputCount = (Double)waitOutputCountRangeSub.getValue().f1;
                double waitOutputSum = (Double)waitOutputSumRangeSub.getValue().f1;
                waitOutput = waitOutputCount <= 0.0 ? 0.0 : waitOutputSum / waitOutputCount / 1.0E9;
            }
            double waitOutputPerInputRecord = inputTps <= 0.0 ? 0.0 : waitOutput * outputTps / inputTps;
            double partitionLatency = 0.0;
            double partitionCount = 0.0;
            if (isParallelReader) {
                double processLatencyCount = (Double)sourceProcessLatencyCountRangeSub.getValue().f1;
                double processLatencySum = (Double)sourceProcessLatencySumRangeSub.getValue().f1;
                taskLatency = processLatencyCount <= 0.0 ? 0.0 : processLatencySum / processLatencyCount / 1.0E9;
                double partitionLatencyCount = (Double)sourcePartitionLatencyCountRangeSub.getValue().f1;
                double partitionLatencySum = (Double)sourcePartitionLatencySumRangeSub.getValue().f1;
                partitionLatency = partitionLatencyCount <= 0.0 ? 0.0 : partitionLatencySum / partitionLatencyCount / 1.0E9;
                partitionCount = (Double)sourcePartitionCountSub.getValue().f1;
            }
            double workload = (taskLatency - waitOutputPerInputRecord) * inputTps;
            double delayIncreasingRate = 0.0;
            if (sourceDelayIncreasingRateSub != null) {
                delayIncreasingRate = (Double)sourceDelayIncreasingRateSub.getValue().f1 / 1000.0;
            }
            TaskMetrics taskMetrics = new TaskMetrics(vertexId, isParallelReader, inputTps, outputTps, timerCount, taskLatency, sourceLatency, waitOutputPerInputRecord, workload, delayIncreasingRate, partitionLatency, partitionCount);
            LOGGER.debug("Metrics for vertex {}.", (Object)taskMetrics.toString());
            metrics.put(vertexId, taskMetrics);
        }
        return metrics;
    }

    @VisibleForTesting
    public Map<JobVertexID, Double> getSubDagScaleUpRatio(Map<JobVertexID, TaskMetrics> taskMetrics) {
        HashMap<JobVertexID, Double> subDagTargetTpsRatio = new HashMap<JobVertexID, Double>();
        HashMap<JobVertexID, Double> vertexTargetTpsRatio = new HashMap<JobVertexID, Double>();
        HashSet<JobVertexID> subDagRootsToUpScale = new HashSet<JobVertexID>();
        if (this.needScaleUpForBackpressure) {
            HashSet<JobVertexID> backPressureVertices = new HashSet<JobVertexID>();
            backPressureVertices.addAll(this.backPressureSymptom.getJobVertexIDs());
            for (JobVertexID vertexID : backPressureVertices) {
                subDagRootsToUpScale.add(this.vertex2SubDagRoot.get((Object)vertexID));
                subDagTargetTpsRatio.put(this.vertex2SubDagRoot.get((Object)vertexID), this.scaleTpsRatio);
            }
        }
        if (this.needScaleUpForMassiveTimer) {
            HashSet<JobVertexID> massiveTimerVertices = new HashSet<JobVertexID>();
            massiveTimerVertices.addAll(this.largeTimerCountSymptom.getJobVertexIDs());
            for (JobVertexID vertexID : massiveTimerVertices) {
                subDagRootsToUpScale.add(vertexID);
                vertexTargetTpsRatio.put(vertexID, this.scaleTpsRatio);
            }
        }
        if (this.needScaleUpForDelay) {
            HashSet<JobVertexID> verticesToUpScale = new HashSet<JobVertexID>();
            if (this.highDelaySymptom != null) {
                verticesToUpScale.addAll(this.highDelaySymptom.getJobVertexIDs());
            }
            if (this.delayIncreasingSymptom != null) {
                verticesToUpScale.addAll(this.delayIncreasingSymptom.getJobVertexIDs());
            }
            for (JobVertexID vertexId : verticesToUpScale) {
                subDagRootsToUpScale.add(this.vertex2SubDagRoot.get((Object)vertexId));
                TaskMetrics metric = taskMetrics.get((Object)vertexId);
                double ratio = 1.0 / (1.0 - metric.delayIncreasingRate) * this.scaleTpsRatio;
                if (metric.isParallelSource && metric.workload > 0.0) {
                    double maxTps = 1.0 / Math.max(metric.partitionLatency, metric.taskLatencyPerRecord - metric.waitOutputPerRecord) * metric.partitionCount;
                    if (this.highDelaySymptom != null && this.highDelaySymptom.getSevereJobVertexIDs().contains((Object)vertexId)) {
                        ratio = maxTps / metric.getInputTps();
                    } else if (maxTps / metric.getInputTps() * this.scaleTpsRatio < ratio) {
                        ratio = maxTps / metric.getInputTps() * this.scaleTpsRatio;
                    }
                }
                subDagTargetTpsRatio.put(this.vertex2SubDagRoot.get((Object)vertexId), ratio);
            }
        }
        LOGGER.debug("Roots of sub-dags need to scale up: {}.", subDagRootsToUpScale);
        LOGGER.debug("Target scale up tps ratio for sub-dags before adjusting: {}, {}.", subDagTargetTpsRatio, vertexTargetTpsRatio);
        boolean hasDagScaleUp = true;
        while (hasDagScaleUp) {
            hasDagScaleUp = false;
            for (JobVertexID downStreamSubDagRoot : this.subDagRoot2UpstreamVertices.keySet()) {
                for (JobVertexID upStreamVertex : this.subDagRoot2UpstreamVertices.get((Object)downStreamSubDagRoot)) {
                    JobVertexID upStreamSubDagRoot = this.vertex2SubDagRoot.get((Object)upStreamVertex);
                    if (!subDagTargetTpsRatio.containsKey((Object)upStreamSubDagRoot) || subDagTargetTpsRatio.containsKey((Object)downStreamSubDagRoot) && !((Double)subDagTargetTpsRatio.get((Object)downStreamSubDagRoot) < (Double)subDagTargetTpsRatio.get((Object)upStreamSubDagRoot))) continue;
                    subDagTargetTpsRatio.put(downStreamSubDagRoot, (Double)subDagTargetTpsRatio.get((Object)upStreamSubDagRoot));
                    hasDagScaleUp = true;
                }
            }
        }
        for (JobVertexID vertexID : vertexTargetTpsRatio.keySet()) {
            if (subDagTargetTpsRatio.containsKey((Object)vertexID)) continue;
            subDagTargetTpsRatio.put(vertexID, (Double)vertexTargetTpsRatio.get((Object)vertexID));
        }
        LOGGER.debug("Target scale up tps ratio for sub-dags after adjusting: {}.", subDagTargetTpsRatio);
        return subDagTargetTpsRatio;
    }

    @VisibleForTesting
    public Map<JobVertexID, Integer> getVertexTargetParallelisms(Map<JobVertexID, Double> subDagTargetTpsRatio, Set<JobVertexID> vertexToDownScale, Map<JobVertexID, TaskMetrics> taskMetrics) {
        HashMap<JobVertexID, Integer> targetParallelisms = new HashMap<JobVertexID, Integer>();
        for (JobVertexID subDagRoot : subDagTargetTpsRatio.keySet()) {
            double ratio = subDagTargetTpsRatio.get((Object)subDagRoot);
            for (JobVertexID vertexId : this.subDagRoot2SubDagVertex.get((Object)subDagRoot)) {
                TaskMetrics metric = taskMetrics.get((Object)vertexId);
                if (!(metric.getWorkload() > 0.0)) continue;
                targetParallelisms.put(vertexId, (int)Math.floor(metric.getWorkload() * ratio));
                int newParallelism = (int)Math.ceil(taskMetrics.get((Object)vertexId).getTimerCount() / (double)this.monitor.getConfig().getLong(LargeTimerCountDetector.LARGE_TIMER_COUNT_THRESHOLD) * this.monitor.getConfig().getDouble(HealthMonitorOptions.TIMER_SCALE_RATIO));
                if ((Integer)targetParallelisms.get((Object)vertexId) >= newParallelism) continue;
                targetParallelisms.put(vertexId, newParallelism);
            }
        }
        for (JobVertexID vertexID : vertexToDownScale) {
            if (targetParallelisms.containsKey((Object)vertexID) || !(taskMetrics.get((Object)vertexID).getWorkload() > 0.0)) continue;
            targetParallelisms.put(vertexID, (int)Math.ceil(taskMetrics.get((Object)vertexID).getWorkload() * this.scaleTpsRatio));
        }
        return targetParallelisms;
    }

    @VisibleForTesting
    public Map<JobVertexID, Integer> getVertexMinParallelisms(RestServerClient.JobConfig jobConfig, Map<JobVertexID, TaskCheckpointStatistics> checkpointInfo, Map<JobVertexID, TaskMetrics> taskMetrics) {
        HashMap<JobVertexID, Integer> minParallelisms = new HashMap<JobVertexID, Integer>();
        for (JobVertexID vertexId : jobConfig.getVertexConfigs().keySet()) {
            double partitionCount;
            int minParallelism;
            minParallelisms.put(vertexId, 1);
            if (this.isSource.get((Object)vertexId).booleanValue() && this.sourcePartitionCountSubs.get((Object)vertexId).getValue() != null && (minParallelism = (int)Math.ceil((partitionCount = ((Double)this.sourcePartitionCountSubs.get((Object)((Object)vertexId)).getValue().f1).doubleValue()) / (double)this.maxPartitionPerTask)) > (Integer)minParallelisms.get((Object)vertexId)) {
                minParallelisms.put(vertexId, minParallelism);
            }
            if (checkpointInfo != null && checkpointInfo.containsKey((Object)vertexId)) {
                TaskCheckpointStatistics taskCheckpointStatistics = checkpointInfo.get((Object)vertexId);
                minParallelisms.put(vertexId, Math.max((Integer)minParallelisms.get((Object)vertexId), (int)Math.ceil(1.0 * (double)taskCheckpointStatistics.getFullStateSize() / (double)this.stateSizeThreshold)));
            }
            if (taskMetrics == null || !taskMetrics.containsKey((Object)vertexId)) continue;
            TaskMetrics taskMetric = taskMetrics.get((Object)vertexId);
            double minParallelism2 = taskMetric.getTimerCount() / (double)jobConfig.getConfig().getLong(LargeTimerCountDetector.LARGE_TIMER_COUNT_THRESHOLD);
            LOGGER.debug("Timer count constraint works: constraints {}, origin constraints {}.", (Object)Math.ceil(minParallelism2), minParallelisms.get((Object)vertexId));
            minParallelisms.put(vertexId, Math.max((Integer)minParallelisms.get((Object)vertexId), (int)Math.ceil(minParallelism2)));
        }
        return minParallelisms;
    }

    public void updateTargetParallelismsSubjectToConstraints(Map<JobVertexID, Integer> targetParallelisms, Map<JobVertexID, Integer> minParallelisms, RestServerClient.JobConfig jobConfig) {
        HashMap<JobVertexID, Set<JobVertexID>> epgLeader2Members = new HashMap<JobVertexID, Set<JobVertexID>>();
        HashMap<JobVertexID, JobVertexID> epgMember2Leader = new HashMap<JobVertexID, JobVertexID>();
        HashMap<JobVertexID, Integer> epgLeader2TargetParallelism = new HashMap<JobVertexID, Integer>();
        HashMap<JobVertexID, Integer> epgLeader2MaxParallelism = new HashMap<JobVertexID, Integer>();
        for (JobVertexID jobVertexID : jobConfig.getVertexConfigs().keySet()) {
            RestServerClient.VertexConfig vertexConfig = jobConfig.getVertexConfigs().get((Object)jobVertexID);
            int targetParallelism = vertexConfig.getParallelism();
            int maxParallelism = vertexConfig.getMaxParallelism();
            if (targetParallelisms.containsKey((Object)jobVertexID)) {
                targetParallelism = targetParallelisms.get((Object)jobVertexID);
                if (this.isSource.get((Object)jobVertexID).booleanValue() && this.sourcePartitionCountSubs.get((Object)jobVertexID).getValue() != null) {
                    double partitionCount = (Double)this.sourcePartitionCountSubs.get((Object)((Object)jobVertexID)).getValue().f1;
                    if (partitionCount / (double)targetParallelism > (double)this.maxPartitionPerTask) {
                        targetParallelism = (int)Math.ceil(partitionCount / (double)this.maxPartitionPerTask);
                    }
                    if (partitionCount > 0.0 && (double)maxParallelism > partitionCount) {
                        maxParallelism = (int)partitionCount;
                    }
                }
            }
            if (minParallelisms != null && minParallelisms.containsKey((Object)jobVertexID) && targetParallelism < minParallelisms.get((Object)jobVertexID)) {
                targetParallelism = minParallelisms.get((Object)jobVertexID);
            }
            if (targetParallelism < 1) {
                targetParallelism = 1;
            }
            if (targetParallelism > maxParallelism && maxParallelism > 0) {
                targetParallelism = maxParallelism;
            }
            HashSet<JobVertexID> members = new HashSet<JobVertexID>();
            members.add(jobVertexID);
            epgLeader2Members.put(jobVertexID, members);
            epgMember2Leader.put(jobVertexID, jobVertexID);
            epgLeader2TargetParallelism.put(jobVertexID, targetParallelism);
            epgLeader2MaxParallelism.put(jobVertexID, maxParallelism);
        }
        HashMap colocationGroupId2Leader = new HashMap();
        for (JobVertexID vertexId : jobConfig.getVertexConfigs().keySet()) {
            RestServerClient.VertexConfig vertexConfig = jobConfig.getVertexConfigs().get((Object)vertexId);
            AbstractID colocationGroupId = vertexConfig.getColocationGroupId();
            if (colocationGroupId == null) continue;
            if (colocationGroupId2Leader.containsKey(colocationGroupId)) {
                JobVertexID currentGroupLeader = (JobVertexID)((Object)epgMember2Leader.get((Object)vertexId));
                JobVertexID targetGroupLeader = (JobVertexID)((Object)colocationGroupId2Leader.get(colocationGroupId));
                this.mergeEqualParallelismGroups(currentGroupLeader, targetGroupLeader, epgLeader2Members, epgMember2Leader, epgLeader2TargetParallelism, epgLeader2MaxParallelism);
                continue;
            }
            colocationGroupId2Leader.put(colocationGroupId, epgMember2Leader.get((Object)vertexId));
        }
        for (JobVertexID downStreamVertex : jobConfig.getInputNodes().keySet()) {
            for (Tuple2<JobVertexID, String> edge : jobConfig.getInputNodes().get((Object)downStreamVertex)) {
                JobVertexID upStreamVertex = (JobVertexID)((Object)edge.f0);
                String shipStrategy = (String)edge.f1;
                if (!shipStrategy.equals("FORWARD")) continue;
                JobVertexID currentGroupLeader = (JobVertexID)((Object)epgMember2Leader.get((Object)upStreamVertex));
                JobVertexID jobVertexID = (JobVertexID)((Object)epgMember2Leader.get((Object)downStreamVertex));
                this.mergeEqualParallelismGroups(currentGroupLeader, jobVertexID, epgLeader2Members, epgMember2Leader, epgLeader2TargetParallelism, epgLeader2MaxParallelism);
            }
        }
        HashMap<JobVertexID, Set<JobVertexID>> hashMap = new HashMap<JobVertexID, Set<JobVertexID>>();
        HashMap<JobVertexID, JobVertexID> ppgMember2Leader = new HashMap<JobVertexID, JobVertexID>();
        HashMap<JobVertexID, Integer> ppgLeader2Base = new HashMap<JobVertexID, Integer>();
        HashMap<JobVertexID, Integer> ppgMember2Factor = new HashMap<JobVertexID, Integer>();
        for (JobVertexID downStreamVertex : jobConfig.getInputNodes().keySet()) {
            for (Tuple2 tuple2 : jobConfig.getInputNodes().get((Object)downStreamVertex)) {
                JobVertexID downStreamEpg;
                JobVertexID upStreamVertex = (JobVertexID)((Object)tuple2.f0);
                String shipStrategy = (String)tuple2.f1;
                if (!shipStrategy.equals("RESCALE")) continue;
                JobVertexID upStreamEpg = (JobVertexID)((Object)epgMember2Leader.get((Object)upStreamVertex));
                if (!ppgMember2Leader.containsKey((Object)upStreamEpg)) {
                    HashSet<JobVertexID> members = new HashSet<JobVertexID>();
                    members.add(upStreamEpg);
                    hashMap.put(upStreamEpg, members);
                    ppgMember2Leader.put(upStreamEpg, upStreamEpg);
                    ppgLeader2Base.put(upStreamEpg, (Integer)epgLeader2TargetParallelism.get((Object)upStreamEpg));
                    ppgMember2Factor.put(upStreamEpg, 1);
                }
                if (!ppgMember2Leader.containsKey((Object)(downStreamEpg = (JobVertexID)((Object)epgMember2Leader.get((Object)downStreamVertex))))) {
                    HashSet<JobVertexID> members = new HashSet<JobVertexID>();
                    members.add(downStreamEpg);
                    hashMap.put(downStreamEpg, members);
                    ppgMember2Leader.put(downStreamEpg, downStreamEpg);
                    ppgLeader2Base.put(downStreamEpg, (Integer)epgLeader2TargetParallelism.get((Object)downStreamEpg));
                    ppgMember2Factor.put(downStreamEpg, 1);
                }
                this.mergeProportionalParallelismGroups(upStreamEpg, downStreamEpg, epgLeader2Members, epgLeader2TargetParallelism, epgLeader2MaxParallelism, hashMap, ppgMember2Leader, ppgLeader2Base, ppgMember2Factor, jobConfig);
            }
        }
        targetParallelisms.clear();
        for (JobVertexID epgLeader : epgLeader2Members.keySet()) {
            int targetParallelism = (Integer)epgLeader2TargetParallelism.get((Object)epgLeader);
            for (JobVertexID vertexId : (Set)epgLeader2Members.get((Object)epgLeader)) {
                targetParallelisms.put(vertexId, targetParallelism);
            }
        }
    }

    private void mergeEqualParallelismGroups(JobVertexID currentGroupLeader, JobVertexID targetGroupLeader, Map<JobVertexID, Set<JobVertexID>> leader2Members, Map<JobVertexID, JobVertexID> member2Leader, Map<JobVertexID, Integer> leader2TargetParallelism, Map<JobVertexID, Integer> leader2MaxParallelism) {
        int maxParallelism;
        if (currentGroupLeader.equals((Object)targetGroupLeader)) {
            return;
        }
        int currentGroupTargetParallelism = leader2TargetParallelism.get((Object)currentGroupLeader);
        int currentGroupMaxParallelism = leader2MaxParallelism.get((Object)currentGroupLeader);
        int targetGroupTargetParallelism = leader2TargetParallelism.get((Object)targetGroupLeader);
        int targetGroupMaxParallelism = leader2MaxParallelism.get((Object)targetGroupLeader);
        int targetParallelism = Math.max(currentGroupTargetParallelism, targetGroupTargetParallelism);
        if (targetParallelism > (maxParallelism = Math.min(currentGroupMaxParallelism, targetGroupMaxParallelism))) {
            targetParallelism = maxParallelism;
        }
        leader2Members.get((Object)targetGroupLeader).addAll((Collection<JobVertexID>)leader2Members.get((Object)currentGroupLeader));
        leader2Members.get((Object)currentGroupLeader).forEach(member -> member2Leader.put((JobVertexID)((Object)member), targetGroupLeader));
        leader2TargetParallelism.put(targetGroupLeader, targetParallelism);
        leader2MaxParallelism.put(targetGroupLeader, maxParallelism);
        leader2Members.remove((Object)currentGroupLeader);
        leader2TargetParallelism.remove((Object)currentGroupLeader);
        leader2MaxParallelism.remove((Object)currentGroupLeader);
    }

    private void mergeProportionalParallelismGroups(JobVertexID upStreamEpg, JobVertexID downStreamEpg, Map<JobVertexID, Set<JobVertexID>> epgLeader2Members, Map<JobVertexID, Integer> epgLeader2TargetParallelism, Map<JobVertexID, Integer> epgLeader2MaxParallelism, Map<JobVertexID, Set<JobVertexID>> ppgLeader2Members, Map<JobVertexID, JobVertexID> ppgMember2Leader, Map<JobVertexID, Integer> ppgLeader2Base, Map<JobVertexID, Integer> ppgMember2Factor, RestServerClient.JobConfig jobConfig) {
        boolean useUpStreamGreaterPlan;
        int downStreamParallelism;
        int upStreamParallelism;
        JobVertexID upStreamPpgLeader = ppgMember2Leader.get((Object)upStreamEpg);
        JobVertexID downStreamPpgLeader = ppgMember2Leader.get((Object)downStreamEpg);
        int upStreamBase = ppgLeader2Base.get((Object)upStreamPpgLeader);
        int upStreamFactor = ppgMember2Factor.get((Object)upStreamEpg);
        int downStreamBase = ppgLeader2Base.get((Object)downStreamPpgLeader);
        int downStreamFactor = ppgMember2Factor.get((Object)downStreamEpg);
        int upStreamGreaterUpStreamBaseIncrease = 0;
        while (true) {
            if ((upStreamParallelism = (upStreamBase + upStreamGreaterUpStreamBaseIncrease) * upStreamFactor) % downStreamFactor == 0 && upStreamParallelism / downStreamFactor >= downStreamBase) break;
            ++upStreamGreaterUpStreamBaseIncrease;
        }
        int upStreamGreaterK = upStreamParallelism / downStreamFactor / downStreamBase;
        int upStreamGreaterDownStreamBaseIncrease = upStreamParallelism / downStreamFactor / upStreamGreaterK - downStreamBase;
        boolean upStreamGreaterFeasible = true;
        for (JobVertexID upStreamPpgMemberEpg : ppgLeader2Members.get((Object)upStreamPpgLeader)) {
            int upStreamPpgMemberParallelism = (upStreamBase + upStreamGreaterUpStreamBaseIncrease) * ppgMember2Factor.get((Object)upStreamPpgMemberEpg);
            if (upStreamPpgMemberParallelism <= epgLeader2MaxParallelism.get((Object)upStreamPpgMemberEpg)) continue;
            upStreamGreaterFeasible = false;
            break;
        }
        for (JobVertexID downStreamPpgMemberEpg : ppgLeader2Members.get((Object)downStreamPpgLeader)) {
            int downStreamPpgMemberParallelism = (downStreamBase + upStreamGreaterDownStreamBaseIncrease) * ppgMember2Factor.get((Object)downStreamPpgMemberEpg);
            if (downStreamPpgMemberParallelism <= epgLeader2MaxParallelism.get((Object)downStreamPpgMemberEpg)) continue;
            upStreamGreaterFeasible = false;
            break;
        }
        int downStreamGreaterDownStreamBaseIncrease = 0;
        while (true) {
            if ((downStreamParallelism = (downStreamBase + downStreamGreaterDownStreamBaseIncrease) * downStreamFactor) % upStreamFactor == 0 && downStreamParallelism / upStreamFactor >= upStreamBase) break;
            ++downStreamGreaterDownStreamBaseIncrease;
        }
        int downStreamGreaterK = downStreamParallelism / upStreamFactor / upStreamBase;
        int downStreamGreaterUpStreamBaseIncrease = downStreamParallelism / upStreamFactor / downStreamGreaterK - upStreamBase;
        boolean downStreamGreaterFeasible = true;
        for (JobVertexID downStreamPpgMemberEpg : ppgLeader2Members.get((Object)downStreamPpgLeader)) {
            int downStreamPpgMemberParallelism = (downStreamBase + downStreamGreaterDownStreamBaseIncrease) * ppgMember2Factor.get((Object)downStreamPpgMemberEpg);
            if (downStreamPpgMemberParallelism <= epgLeader2MaxParallelism.get((Object)downStreamPpgMemberEpg)) continue;
            downStreamGreaterFeasible = false;
            break;
        }
        for (JobVertexID upStreamPpgMemberEpg : ppgLeader2Members.get((Object)upStreamPpgLeader)) {
            int upStreamPpgMemberParallelism = (upStreamBase + downStreamGreaterUpStreamBaseIncrease) * ppgMember2Factor.get((Object)upStreamPpgMemberEpg);
            if (upStreamPpgMemberParallelism <= epgLeader2MaxParallelism.get((Object)upStreamPpgMemberEpg)) continue;
            downStreamGreaterFeasible = false;
            break;
        }
        if (upStreamGreaterFeasible && downStreamGreaterFeasible) {
            int downStreamGreaterCostMem;
            int i;
            int i2;
            ResourceSpec upStreamBaseIncreaseCost = new ResourceSpec.Builder().build();
            for (Object upStreamPpgMemberEpg : ppgLeader2Members.get((Object)upStreamPpgLeader)) {
                ResourceSpec memberEpgIncreaseCost = new ResourceSpec.Builder().build();
                for (JobVertexID upStreamPpgMemberVertex : epgLeader2Members.get(upStreamPpgMemberEpg)) {
                    memberEpgIncreaseCost = memberEpgIncreaseCost.merge(jobConfig.getVertexConfigs().get((Object)upStreamPpgMemberVertex).getResourceSpec());
                }
                for (int i3 = 0; i3 < ppgMember2Factor.get(upStreamPpgMemberEpg); ++i3) {
                    upStreamBaseIncreaseCost = upStreamBaseIncreaseCost.merge(memberEpgIncreaseCost);
                }
            }
            ResourceSpec downStreamBaseIncreaseCost = new ResourceSpec.Builder().build();
            for (JobVertexID downStreamPpgMemberEpg : ppgLeader2Members.get((Object)downStreamPpgLeader)) {
                ResourceSpec memberEpgIncreaseCost = new ResourceSpec.Builder().build();
                for (JobVertexID downStreamPpgMemberVertex : epgLeader2Members.get((Object)downStreamPpgMemberEpg)) {
                    memberEpgIncreaseCost = memberEpgIncreaseCost.merge(jobConfig.getVertexConfigs().get((Object)downStreamPpgMemberVertex).getResourceSpec());
                }
                for (int i4 = 0; i4 < ppgMember2Factor.get((Object)downStreamPpgMemberEpg); ++i4) {
                    downStreamBaseIncreaseCost = downStreamBaseIncreaseCost.merge(memberEpgIncreaseCost);
                }
            }
            ResourceSpec upStreamGreaterCost = new ResourceSpec.Builder().build();
            for (i2 = 0; i2 < upStreamGreaterUpStreamBaseIncrease; ++i2) {
                upStreamGreaterCost = upStreamGreaterCost.merge(upStreamBaseIncreaseCost);
            }
            for (i2 = 0; i2 < upStreamGreaterDownStreamBaseIncrease; ++i2) {
                upStreamGreaterCost = upStreamGreaterCost.merge(downStreamBaseIncreaseCost);
            }
            ResourceSpec downStreamGreaterCost = new ResourceSpec.Builder().build();
            for (i = 0; i < downStreamGreaterDownStreamBaseIncrease; ++i) {
                downStreamGreaterCost = downStreamGreaterCost.merge(downStreamBaseIncreaseCost);
            }
            for (i = 0; i < downStreamGreaterUpStreamBaseIncrease; ++i) {
                downStreamGreaterCost = downStreamGreaterCost.merge(upStreamBaseIncreaseCost);
            }
            int upStreamGreaterCostMem = upStreamGreaterCost.getHeapMemory() + upStreamGreaterCost.getDirectMemory() + upStreamGreaterCost.getNativeMemory();
            useUpStreamGreaterPlan = upStreamGreaterCostMem > (downStreamGreaterCostMem = downStreamGreaterCost.getHeapMemory() + downStreamGreaterCost.getDirectMemory() + downStreamGreaterCost.getNativeMemory()) ? false : (upStreamGreaterCostMem < downStreamGreaterCostMem ? true : !(upStreamGreaterCost.getCpuCores() > downStreamGreaterCost.getCpuCores()));
        } else if (upStreamGreaterFeasible) {
            useUpStreamGreaterPlan = true;
        } else if (downStreamGreaterFeasible) {
            useUpStreamGreaterPlan = false;
        } else {
            LOGGER.debug("Could not merge ProportionalParallelism Groups {} and {}.", ppgLeader2Members.get((Object)upStreamPpgLeader), ppgLeader2Members.get((Object)downStreamPpgLeader));
            return;
        }
        if (useUpStreamGreaterPlan) {
            int downStreamNewBase = downStreamBase + upStreamGreaterDownStreamBaseIncrease;
            ppgLeader2Base.put(downStreamPpgLeader, downStreamNewBase);
            ppgLeader2Members.get((Object)downStreamPpgLeader).addAll((Collection<JobVertexID>)ppgLeader2Members.get((Object)upStreamPpgLeader));
            for (Object upStreamPpgMemberEpg : ppgLeader2Members.get((Object)upStreamPpgLeader)) {
                ppgMember2Leader.put((JobVertexID)((Object)upStreamPpgMemberEpg), downStreamPpgLeader);
                ppgMember2Factor.put((JobVertexID)((Object)upStreamPpgMemberEpg), ppgMember2Factor.get(upStreamPpgMemberEpg) * upStreamGreaterK);
            }
            ppgLeader2Members.remove((Object)upStreamPpgLeader);
            ppgLeader2Base.remove((Object)upStreamPpgLeader);
            for (JobVertexID downStreamPpgMemberEpg : ppgLeader2Members.get((Object)downStreamPpgLeader)) {
                epgLeader2TargetParallelism.put(downStreamPpgMemberEpg, downStreamNewBase * ppgMember2Factor.get((Object)downStreamPpgMemberEpg));
            }
        } else {
            int upStreamNewBase = upStreamBase + downStreamGreaterUpStreamBaseIncrease;
            ppgLeader2Base.put(upStreamPpgLeader, upStreamNewBase);
            ppgLeader2Members.get((Object)upStreamPpgLeader).addAll((Collection<JobVertexID>)ppgLeader2Members.get((Object)downStreamPpgLeader));
            for (JobVertexID downStreamPpgMemberEpg : ppgLeader2Members.get((Object)downStreamPpgLeader)) {
                ppgMember2Leader.put(downStreamPpgMemberEpg, upStreamPpgLeader);
                ppgMember2Factor.put(downStreamPpgMemberEpg, ppgMember2Factor.get((Object)downStreamPpgMemberEpg) * downStreamGreaterK);
            }
            ppgLeader2Members.remove((Object)downStreamPpgLeader);
            ppgLeader2Base.remove((Object)downStreamPpgLeader);
            for (Object upStreamPpgMemberEpg : ppgLeader2Members.get((Object)upStreamPpgLeader)) {
                epgLeader2TargetParallelism.put((JobVertexID)((Object)upStreamPpgMemberEpg), upStreamNewBase * ppgMember2Factor.get(upStreamPpgMemberEpg));
            }
        }
    }

    private RescaleJobParallelism generateRescaleParallelismAction(Map<JobVertexID, Integer> targetParallelisms, Map<JobVertexID, Integer> minParallelisms, RestServerClient.JobConfig jobConfig) {
        if (targetParallelisms.isEmpty()) {
            return null;
        }
        RescaleJobParallelism rescaleJobParallelism = new RescaleJobParallelism(this.jobID, this.timeout);
        for (JobVertexID vertexId : targetParallelisms.keySet()) {
            RestServerClient.VertexConfig vertexConfig = jobConfig.getVertexConfigs().get((Object)vertexId);
            int targetParallelism = targetParallelisms.get((Object)vertexId);
            rescaleJobParallelism.addVertex(vertexId, vertexConfig.getParallelism(), targetParallelisms.get((Object)vertexId), vertexConfig.getResourceSpec(), vertexConfig.getResourceSpec());
        }
        if (this.maxCpuLimit != Double.MAX_VALUE || this.maxMemoryLimit != Integer.MAX_VALUE) {
            RestServerClient.JobConfig targetJobConfig = rescaleJobParallelism.getAppliedJobConfig(jobConfig);
            double targetTotalCpu = targetJobConfig.getJobTotalCpuCores();
            int targetTotalMem = targetJobConfig.getJobTotalMemoryMb();
            if (targetTotalCpu > this.maxCpuLimit || targetTotalMem > this.maxMemoryLimit) {
                LOGGER.debug("Try to scale down parallelism: total resource of target job config <cpu, mem>=<{}, {}> exceed max limit <cpu, mem>=<{}, {}>.", new Object[]{targetTotalCpu, targetTotalMem, this.maxCpuLimit, this.maxMemoryLimit});
                RestServerClient.JobConfig adjustedJobConfig = MaxResourceLimitUtil.scaleDownJobConfigToMaxResourceLimit(targetJobConfig, minParallelisms, this.maxCpuLimit, this.maxMemoryLimit);
                if (adjustedJobConfig == null) {
                    LOGGER.debug("Give up adjusting.");
                    return null;
                }
                rescaleJobParallelism = new RescaleJobParallelism(this.jobID, this.timeout);
                for (JobVertexID vertexId : adjustedJobConfig.getVertexConfigs().keySet()) {
                    RestServerClient.VertexConfig originVertexConfig = jobConfig.getVertexConfigs().get((Object)vertexId);
                    RestServerClient.VertexConfig adjustedVertexConfig = adjustedJobConfig.getVertexConfigs().get((Object)vertexId);
                    rescaleJobParallelism.addVertex(vertexId, originVertexConfig.getParallelism(), adjustedVertexConfig.getParallelism(), originVertexConfig.getResourceSpec(), adjustedVertexConfig.getResourceSpec());
                }
            }
        }
        RestServerClient.JobConfig appliedJobConfig = rescaleJobParallelism.getAppliedJobConfig(jobConfig);
        LOGGER.debug("Resource applying generated action: <cpu, memory>=<{}, {}>.", (Object)appliedJobConfig.getJobTotalCpuCores(), (Object)appliedJobConfig.getJobTotalMemoryMb());
        rescaleJobParallelism.exculdeMinorDiffVertices(this.monitor.getConfig());
        return rescaleJobParallelism;
    }

    public static class TaskMetrics {
        private final JobVertexID jobVertexID;
        private final boolean isParallelSource;
        private final double inputTps;
        private final double outputTps;
        private final double timerCount;
        private final double taskLatencyPerRecord;
        private final double sourceLatencyPerRecord;
        private final double waitOutputPerRecord;
        private final double workload;
        private final double delayIncreasingRate;
        private final double partitionLatency;
        private final double partitionCount;

        public TaskMetrics(JobVertexID jobVertexId, boolean isParallelSource, double inputTps, double outputTps, double timerCount, double taskLatencyPerRecord, double sourceLatencyPerRecord, double waitOutputPerRecord, double workload, double delayIncreasingRate, double partitionLatency, double partitionCount) {
            this.jobVertexID = jobVertexId;
            this.isParallelSource = isParallelSource;
            this.inputTps = inputTps;
            this.outputTps = outputTps;
            this.timerCount = timerCount;
            this.taskLatencyPerRecord = taskLatencyPerRecord;
            this.sourceLatencyPerRecord = sourceLatencyPerRecord;
            this.waitOutputPerRecord = waitOutputPerRecord;
            this.workload = workload;
            this.delayIncreasingRate = delayIncreasingRate;
            this.partitionLatency = partitionLatency;
            this.partitionCount = partitionCount;
        }

        public JobVertexID getJobVertexID() {
            return this.jobVertexID;
        }

        public double getInputTps() {
            return this.inputTps;
        }

        public double getOutputTps() {
            return this.outputTps;
        }

        public double getTimerCount() {
            return this.timerCount;
        }

        public double getTaskLatencyPerRecord() {
            return this.taskLatencyPerRecord;
        }

        public double getSourceLatencyPerRecord() {
            return this.sourceLatencyPerRecord;
        }

        public double getWaitOutputPerRecord() {
            return this.waitOutputPerRecord;
        }

        public double getWorkload() {
            return this.workload;
        }

        public String toString() {
            return "TaskMetrics{JobVertexID:" + (Object)((Object)this.jobVertexID) + ", isParallelSource:" + this.isParallelSource + ", inputTps:" + this.inputTps + ", outputTps:" + this.outputTps + ", timerCount:" + this.timerCount + ", taskLatencyPerRecord:" + this.taskLatencyPerRecord + ", sourceLatencyPerRecord:" + this.sourceLatencyPerRecord + ", waitOutputPerRecord:" + this.waitOutputPerRecord + ", workload:" + this.workload + ", delayIncreasingRate:" + this.delayIncreasingRate + ", partitionLatency:" + this.partitionLatency + ", partitionCount:" + this.partitionCount + "}";
        }

        public double getDelayIncreasingRate() {
            return this.delayIncreasingRate;
        }
    }
}

