package org.apache.flink.runtime.schedule;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.ExecutionVertexID;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobEdge;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/schedule/VertexInputTracker.class */
public class VertexInputTracker {
    private static final Logger LOG = LoggerFactory.getLogger(VertexInputTracker.class);
    private VertexScheduler scheduler;
    private VertexInputTrackerConfig config;
    private Map<ExecutionVertexID, Map<IntermediateDataSetID, VertexInput>> vertexInputsMap = new HashMap();

    /* loaded from: input_file:org/apache/flink/runtime/schedule/VertexInputTracker$AllToAllVertexInput.class */
    private static class AllToAllVertexInput extends VertexInput {
        public AllToAllVertexInput(IntermediateDataSetID intermediateDataSetID, double d) {
            super(intermediateDataSetID, d);
        }

        @Override // org.apache.flink.runtime.schedule.VertexInputTracker.VertexInput
        public void addPartition(int i) {
        }

        @Override // org.apache.flink.runtime.schedule.VertexInputTracker.VertexInput
        public boolean isConsumable(VertexScheduler vertexScheduler) {
            return vertexScheduler.getResultConsumablePartitionRatio(this.resultID) >= this.consumableThreshold;
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/schedule/VertexInputTracker$InputDependencyConstraint.class */
    public enum InputDependencyConstraint {
        ANY,
        ALL
    }

    /* loaded from: input_file:org/apache/flink/runtime/schedule/VertexInputTracker$VertexInput.class */
    private static class VertexInput {
        protected double consumableThreshold;
        protected IntermediateDataSetID resultID;
        private Collection<Integer> partitions = new ArrayList();

        public VertexInput(IntermediateDataSetID intermediateDataSetID, double d) {
            this.resultID = intermediateDataSetID;
            this.consumableThreshold = d;
        }

        public void addPartition(int i) {
            this.partitions.add(Integer.valueOf(i));
        }

        public boolean isConsumable(VertexScheduler vertexScheduler) {
            int i = 0;
            Iterator<Integer> it = this.partitions.iterator();
            while (it.hasNext()) {
                if (vertexScheduler.getResultPartitionStatus(this.resultID, it.next().intValue()).isConsumable()) {
                    i++;
                }
            }
            return (1.0d * ((double) i)) / ((double) this.partitions.size()) >= this.consumableThreshold;
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/schedule/VertexInputTracker$VertexInputTrackerConfig.class */
    public static class VertexInputTrackerConfig implements Serializable {
        private InputDependencyConstraint globalInputDependencyConstraint;
        private double globalPipelinedInputConsumableThreshold;
        private double globalBlockingInputConsumableThreshold;
        private Map<JobVertexID, InputDependencyConstraint> vertexInputDependencyConstraintMap = new HashMap();
        private Map<JobVertexID, Map<IntermediateDataSetID, Double>> inputConsumableThresholdMap = new HashMap();

        public VertexInputTrackerConfig(InputDependencyConstraint inputDependencyConstraint, double d, double d2) {
            this.globalInputDependencyConstraint = inputDependencyConstraint;
            this.globalPipelinedInputConsumableThreshold = d;
            this.globalBlockingInputConsumableThreshold = d2;
        }

        public void setInputDependencyConstraint(JobVertexID jobVertexID, InputDependencyConstraint inputDependencyConstraint) {
            this.vertexInputDependencyConstraintMap.put(jobVertexID, inputDependencyConstraint);
        }

        public InputDependencyConstraint getInputDependencyConstraint(JobVertexID jobVertexID) {
            return this.vertexInputDependencyConstraintMap.containsKey(jobVertexID) ? this.vertexInputDependencyConstraintMap.get(jobVertexID) : this.globalInputDependencyConstraint;
        }

        public void setInputConsumableThreshold(JobVertexID jobVertexID, IntermediateDataSetID intermediateDataSetID, double d) {
            if (!this.inputConsumableThresholdMap.containsKey(jobVertexID)) {
                this.inputConsumableThresholdMap.put(jobVertexID, new HashMap());
            }
            this.inputConsumableThresholdMap.get(jobVertexID).put(intermediateDataSetID, Double.valueOf(d));
        }

        public double getInputConsumableThreshold(JobVertexID jobVertexID, IntermediateDataSetID intermediateDataSetID, boolean z) {
            return (this.inputConsumableThresholdMap.containsKey(jobVertexID) && this.inputConsumableThresholdMap.get(jobVertexID).containsKey(intermediateDataSetID)) ? this.inputConsumableThresholdMap.get(jobVertexID).get(intermediateDataSetID).doubleValue() : z ? this.globalPipelinedInputConsumableThreshold : this.globalBlockingInputConsumableThreshold;
        }

        public String toString() {
            return "VertexInputTrackerConfig(globalInputDependencyConstraint: " + this.globalInputDependencyConstraint + ", globalPipelinedInputConsumableThreshold: " + this.globalPipelinedInputConsumableThreshold + ", globalBlockingInputConsumableThreshold: " + this.globalBlockingInputConsumableThreshold + ", vertexInputDependencyConstraintMap: " + this.vertexInputDependencyConstraintMap + ", vertexInputConsumableThresholdMap: " + this.inputConsumableThresholdMap + ", )";
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/schedule/VertexInputTracker$VertexInputTrackerOptions.class */
    public static class VertexInputTrackerOptions {
        public static final String VERTEX_INPUT_TRACKER_CONFIG = "vertex-input-tracker.config";
        public static final ConfigOption<String> INPUT_DEPENDENCY_CONSTRAINT = ConfigOptions.key("vertex-input-tracker.config.input-dependency-constraint").defaultValue(InputDependencyConstraint.ANY.toString());
        public static final ConfigOption<Double> INPUT_CONSUMABLE_THRESHOLD_PIPELINED = ConfigOptions.key("vertex-input-tracker.config.input-consumable-threshold.pipelined").defaultValue(Double.valueOf(Double.MIN_VALUE));
        public static final ConfigOption<Double> INPUT_CONSUMABLE_THRESHOLD_BLOCKING = ConfigOptions.key("vertex-input-tracker.config.input-consumable-threshold.blocking").defaultValue(Double.valueOf(1.0d));
    }

    public VertexInputTracker(JobGraph jobGraph, VertexScheduler vertexScheduler, SchedulingConfig schedulingConfig) {
        Preconditions.checkNotNull(jobGraph);
        Preconditions.checkNotNull(schedulingConfig);
        this.scheduler = (VertexScheduler) Preconditions.checkNotNull(vertexScheduler);
        try {
            Configuration configuration = schedulingConfig.getConfiguration();
            this.config = (VertexInputTrackerConfig) InstantiationUtil.readObjectFromConfig(configuration, VertexInputTrackerOptions.VERTEX_INPUT_TRACKER_CONFIG, schedulingConfig.getUserClassLoader());
            if (this.config == null) {
                this.config = new VertexInputTrackerConfig(InputDependencyConstraint.valueOf(configuration.getValue(VertexInputTrackerOptions.INPUT_DEPENDENCY_CONSTRAINT)), configuration.getDouble(VertexInputTrackerOptions.INPUT_CONSUMABLE_THRESHOLD_PIPELINED), configuration.getDouble(VertexInputTrackerOptions.INPUT_CONSUMABLE_THRESHOLD_BLOCKING));
            }
            LOG.info("VertexInputTracker config: {}", this.config);
            for (IntermediateDataSetID intermediateDataSetID : jobGraph.getResultIDs()) {
                JobVertex findVertexByID = jobGraph.findVertexByID(jobGraph.getResultProducerID(intermediateDataSetID));
                for (int i = 0; i < findVertexByID.getParallelism(); i++) {
                    for (JobEdge jobEdge : jobGraph.getResult(intermediateDataSetID).getConsumers()) {
                        for (ExecutionVertexID executionVertexID : jobEdge.getConsumerExecutionVertices(i)) {
                            if (!this.vertexInputsMap.containsKey(executionVertexID)) {
                                this.vertexInputsMap.put(executionVertexID, new HashMap());
                            }
                            if (!this.vertexInputsMap.get(executionVertexID).containsKey(intermediateDataSetID)) {
                                double inputConsumableThreshold = this.config.getInputConsumableThreshold(executionVertexID.getJobVertexID(), intermediateDataSetID, jobGraph.getResult(intermediateDataSetID).getResultType().isPipelined());
                                if (jobEdge.getDistributionPattern() == DistributionPattern.ALL_TO_ALL) {
                                    this.vertexInputsMap.get(executionVertexID).put(intermediateDataSetID, new AllToAllVertexInput(intermediateDataSetID, inputConsumableThreshold));
                                } else {
                                    this.vertexInputsMap.get(executionVertexID).put(intermediateDataSetID, new VertexInput(intermediateDataSetID, inputConsumableThreshold));
                                }
                            }
                            this.vertexInputsMap.get(executionVertexID).get(intermediateDataSetID).addPartition(i);
                        }
                    }
                }
            }
        } catch (Exception e) {
            throw new IllegalStateException("Could not load or create InputTracker config.", e);
        }
    }

    public boolean areInputsReady(ExecutionVertexID executionVertexID) {
        if (!this.vertexInputsMap.containsKey(executionVertexID)) {
            return true;
        }
        if (this.config.getInputDependencyConstraint(executionVertexID.getJobVertexID()) == InputDependencyConstraint.ALL) {
            Iterator<VertexInput> it = this.vertexInputsMap.get(executionVertexID).values().iterator();
            while (it.hasNext()) {
                if (!it.next().isConsumable(this.scheduler)) {
                    return false;
                }
            }
            return true;
        }
        Iterator<VertexInput> it2 = this.vertexInputsMap.get(executionVertexID).values().iterator();
        while (it2.hasNext()) {
            if (it2.next().isConsumable(this.scheduler)) {
                return true;
            }
        }
        return false;
    }

    public boolean isInputReady(ExecutionVertexID executionVertexID, IntermediateDataSetID intermediateDataSetID) {
        Preconditions.checkState(this.vertexInputsMap.containsKey(executionVertexID));
        return this.vertexInputsMap.get(executionVertexID).get(intermediateDataSetID).isConsumable(this.scheduler);
    }
}
