/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.schedule;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.ExecutionVertexID;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobEdge;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.schedule.SchedulingConfig;
import org.apache.flink.runtime.schedule.VertexScheduler;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class VertexInputTracker {
    private static final Logger LOG = LoggerFactory.getLogger(VertexInputTracker.class);
    private VertexScheduler scheduler;
    private VertexInputTrackerConfig config;
    private Map<ExecutionVertexID, Map<IntermediateDataSetID, VertexInput>> vertexInputsMap = new HashMap<ExecutionVertexID, Map<IntermediateDataSetID, VertexInput>>();

    public VertexInputTracker(JobGraph jobGraph, VertexScheduler scheduler, SchedulingConfig schedulingConfig) {
        Preconditions.checkNotNull((Object)jobGraph);
        Preconditions.checkNotNull((Object)schedulingConfig);
        this.scheduler = (VertexScheduler)Preconditions.checkNotNull((Object)scheduler);
        try {
            Configuration configuration = schedulingConfig.getConfiguration();
            this.config = (VertexInputTrackerConfig)InstantiationUtil.readObjectFromConfig((Configuration)configuration, (String)"vertex-input-tracker.config", (ClassLoader)schedulingConfig.getUserClassLoader());
            if (this.config == null) {
                this.config = new VertexInputTrackerConfig(InputDependencyConstraint.valueOf(configuration.getValue(VertexInputTrackerOptions.INPUT_DEPENDENCY_CONSTRAINT)), configuration.getDouble(VertexInputTrackerOptions.INPUT_CONSUMABLE_THRESHOLD_PIPELINED), configuration.getDouble(VertexInputTrackerOptions.INPUT_CONSUMABLE_THRESHOLD_BLOCKING));
            }
            LOG.info("VertexInputTracker config: {}", (Object)this.config);
        }
        catch (Exception e) {
            throw new IllegalStateException("Could not load or create InputTracker config.", e);
        }
        for (IntermediateDataSetID resultID : jobGraph.getResultIDs()) {
            JobVertex producerJobVertex = jobGraph.findVertexByID(jobGraph.getResultProducerID(resultID));
            for (int i = 0; i < producerJobVertex.getParallelism(); ++i) {
                for (JobEdge edge : jobGraph.getResult(resultID).getConsumers()) {
                    for (ExecutionVertexID vertexID : edge.getConsumerExecutionVertices(i)) {
                        if (!this.vertexInputsMap.containsKey(vertexID)) {
                            this.vertexInputsMap.put(vertexID, new HashMap());
                        }
                        if (!this.vertexInputsMap.get(vertexID).containsKey((Object)resultID)) {
                            double consumableThreshold = this.config.getInputConsumableThreshold(vertexID.getJobVertexID(), resultID, jobGraph.getResult(resultID).getResultType().isPipelined());
                            if (edge.getDistributionPattern() == DistributionPattern.ALL_TO_ALL) {
                                this.vertexInputsMap.get(vertexID).put(resultID, new AllToAllVertexInput(resultID, consumableThreshold));
                            } else {
                                this.vertexInputsMap.get(vertexID).put(resultID, new VertexInput(resultID, consumableThreshold));
                            }
                        }
                        this.vertexInputsMap.get(vertexID).get((Object)resultID).addPartition(i);
                    }
                }
            }
        }
    }

    public boolean areInputsReady(ExecutionVertexID vertexID) {
        if (!this.vertexInputsMap.containsKey(vertexID)) {
            return true;
        }
        if (this.config.getInputDependencyConstraint(vertexID.getJobVertexID()) == InputDependencyConstraint.ALL) {
            for (VertexInput input : this.vertexInputsMap.get(vertexID).values()) {
                if (input.isConsumable(this.scheduler)) continue;
                return false;
            }
            return true;
        }
        for (VertexInput input : this.vertexInputsMap.get(vertexID).values()) {
            if (!input.isConsumable(this.scheduler)) continue;
            return true;
        }
        return false;
    }

    public boolean isInputReady(ExecutionVertexID vertexID, IntermediateDataSetID dataSetID) {
        Preconditions.checkState((boolean)this.vertexInputsMap.containsKey(vertexID));
        return this.vertexInputsMap.get(vertexID).get((Object)dataSetID).isConsumable(this.scheduler);
    }

    private static class AllToAllVertexInput
    extends VertexInput {
        public AllToAllVertexInput(IntermediateDataSetID resultID, double consumableThreshold) {
            super(resultID, consumableThreshold);
        }

        @Override
        public void addPartition(int partitionNumber) {
        }

        @Override
        public boolean isConsumable(VertexScheduler scheduler) {
            return scheduler.getResultConsumablePartitionRatio(this.resultID) >= this.consumableThreshold;
        }
    }

    private static class VertexInput {
        protected double consumableThreshold;
        protected IntermediateDataSetID resultID;
        private Collection<Integer> partitions = new ArrayList<Integer>();

        public VertexInput(IntermediateDataSetID resultID, double consumableThreshold) {
            this.resultID = resultID;
            this.consumableThreshold = consumableThreshold;
        }

        public void addPartition(int partitionNumber) {
            this.partitions.add(partitionNumber);
        }

        public boolean isConsumable(VertexScheduler scheduler) {
            int consumableCount = 0;
            for (int partitionNumber : this.partitions) {
                if (!scheduler.getResultPartitionStatus(this.resultID, partitionNumber).isConsumable()) continue;
                ++consumableCount;
            }
            return 1.0 * (double)consumableCount / (double)this.partitions.size() >= this.consumableThreshold;
        }
    }

    public static class VertexInputTrackerConfig
    implements Serializable {
        private InputDependencyConstraint globalInputDependencyConstraint;
        private double globalPipelinedInputConsumableThreshold;
        private double globalBlockingInputConsumableThreshold;
        private Map<JobVertexID, InputDependencyConstraint> vertexInputDependencyConstraintMap = new HashMap<JobVertexID, InputDependencyConstraint>();
        private Map<JobVertexID, Map<IntermediateDataSetID, Double>> inputConsumableThresholdMap = new HashMap<JobVertexID, Map<IntermediateDataSetID, Double>>();

        public VertexInputTrackerConfig(InputDependencyConstraint inputDependencyConstraint, double pipelinedInputConsumableThreshold, double blockingInputConsumableThreshold) {
            this.globalInputDependencyConstraint = inputDependencyConstraint;
            this.globalPipelinedInputConsumableThreshold = pipelinedInputConsumableThreshold;
            this.globalBlockingInputConsumableThreshold = blockingInputConsumableThreshold;
        }

        public void setInputDependencyConstraint(JobVertexID vertexID, InputDependencyConstraint inputDependencyConstraint) {
            this.vertexInputDependencyConstraintMap.put(vertexID, inputDependencyConstraint);
        }

        public InputDependencyConstraint getInputDependencyConstraint(JobVertexID vertexID) {
            if (this.vertexInputDependencyConstraintMap.containsKey((Object)vertexID)) {
                return this.vertexInputDependencyConstraintMap.get((Object)vertexID);
            }
            return this.globalInputDependencyConstraint;
        }

        public void setInputConsumableThreshold(JobVertexID vertexID, IntermediateDataSetID resultID, double threshold) {
            if (!this.inputConsumableThresholdMap.containsKey((Object)vertexID)) {
                this.inputConsumableThresholdMap.put(vertexID, new HashMap());
            }
            this.inputConsumableThresholdMap.get((Object)vertexID).put(resultID, threshold);
        }

        public double getInputConsumableThreshold(JobVertexID vertexID, IntermediateDataSetID resultID, boolean isPipelined) {
            if (this.inputConsumableThresholdMap.containsKey((Object)vertexID) && this.inputConsumableThresholdMap.get((Object)vertexID).containsKey((Object)resultID)) {
                return this.inputConsumableThresholdMap.get((Object)vertexID).get((Object)resultID);
            }
            if (isPipelined) {
                return this.globalPipelinedInputConsumableThreshold;
            }
            return this.globalBlockingInputConsumableThreshold;
        }

        public String toString() {
            return "VertexInputTrackerConfig(globalInputDependencyConstraint: " + (Object)((Object)this.globalInputDependencyConstraint) + ", globalPipelinedInputConsumableThreshold: " + this.globalPipelinedInputConsumableThreshold + ", globalBlockingInputConsumableThreshold: " + this.globalBlockingInputConsumableThreshold + ", vertexInputDependencyConstraintMap: " + this.vertexInputDependencyConstraintMap + ", vertexInputConsumableThresholdMap: " + this.inputConsumableThresholdMap + ", )";
        }
    }

    public static class VertexInputTrackerOptions {
        public static final String VERTEX_INPUT_TRACKER_CONFIG = "vertex-input-tracker.config";
        public static final ConfigOption<String> INPUT_DEPENDENCY_CONSTRAINT = ConfigOptions.key((String)"vertex-input-tracker.config.input-dependency-constraint").defaultValue((Object)InputDependencyConstraint.ANY.toString());
        public static final ConfigOption<Double> INPUT_CONSUMABLE_THRESHOLD_PIPELINED = ConfigOptions.key((String)"vertex-input-tracker.config.input-consumable-threshold.pipelined").defaultValue((Object)Double.MIN_VALUE);
        public static final ConfigOption<Double> INPUT_CONSUMABLE_THRESHOLD_BLOCKING = ConfigOptions.key((String)"vertex-input-tracker.config.input-consumable-threshold.blocking").defaultValue((Object)1.0);
    }

    public static enum InputDependencyConstraint {
        ANY,
        ALL;

    }
}

