/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.executiongraph;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.Archiveable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.accumulators.AccumulatorHelper;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.core.io.InputSplitSource;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
import org.apache.flink.runtime.blob.BlobWriter;
import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionEdge;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.GlobalModVersionMismatch;
import org.apache.flink.runtime.executiongraph.IntermediateResult;
import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
import org.apache.flink.runtime.executiongraph.TaskInformation;
import org.apache.flink.runtime.jobgraph.ExecutionVertexID;
import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobEdge;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorDescriptor;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.types.Either;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.slf4j.Logger;

public class ExecutionJobVertex
implements AccessExecutionJobVertex,
Archiveable<ArchivedExecutionJobVertex> {
    private static final Logger LOG = ExecutionGraph.LOG;
    public static final int VALUE_NOT_SET = -1;
    private final Object stateMonitor = new Object();
    private final ExecutionGraph graph;
    private final JobVertex jobVertex;
    private final List<OperatorID> operatorIDs;
    private final List<OperatorID> userDefinedOperatorIds;
    private final ExecutionVertex[] taskVertices;
    private final IntermediateResult[] producedDataSets;
    private final List<IntermediateResult> inputs;
    private final int parallelism;
    private final SlotSharingGroup slotSharingGroup;
    private final CoLocationGroup coLocationGroup;
    private final Map<OperatorID, InputSplit[]> inputSplitsMap;
    private final Map<OperatorID, Integer> inputSplitsLimitMap;
    private final boolean maxParallelismConfigured;
    private int maxParallelism;
    private SerializedValue<TaskInformation> serializedTaskInformation;
    @Nullable
    private PermanentBlobKey taskInformationBlobKey = null;
    private Either<SerializedValue<TaskInformation>, PermanentBlobKey> taskInformationOrBlobKey = null;
    private final Map<OperatorID, InputSplitAssigner> splitAssignerMap;

    @VisibleForTesting
    ExecutionJobVertex(ExecutionGraph graph, JobVertex jobVertex, int defaultParallelism, Time timeout) throws JobException {
        this(graph, jobVertex, defaultParallelism, timeout, 1L, System.currentTimeMillis());
    }

    public ExecutionJobVertex(ExecutionGraph graph, JobVertex jobVertex, int defaultParallelism, Time timeout, long initialGlobalModVersion, long createTimestamp) throws JobException {
        if (graph == null || jobVertex == null) {
            throw new NullPointerException();
        }
        this.graph = graph;
        this.jobVertex = jobVertex;
        int vertexParallelism = jobVertex.getParallelism();
        int numTaskVertices = vertexParallelism > 0 ? vertexParallelism : defaultParallelism;
        int configuredMaxParallelism = jobVertex.getMaxParallelism();
        this.maxParallelismConfigured = -1 != configuredMaxParallelism;
        this.setMaxParallelismInternal(this.maxParallelismConfigured ? configuredMaxParallelism : KeyGroupRangeAssignment.computeDefaultMaxParallelism(numTaskVertices));
        if (numTaskVertices > this.maxParallelism) {
            throw new JobException(String.format("Vertex %s's parallelism (%s) is higher than the max parallelism (%s). Please lower the parallelism or increase the max parallelism.", jobVertex.getName(), numTaskVertices, this.maxParallelism));
        }
        this.parallelism = numTaskVertices;
        this.serializedTaskInformation = null;
        this.taskVertices = new ExecutionVertex[numTaskVertices];
        this.operatorIDs = Collections.unmodifiableList(jobVertex.getOperatorIDs());
        this.userDefinedOperatorIds = Collections.unmodifiableList(jobVertex.getUserDefinedOperatorIDs());
        this.inputs = new ArrayList<IntermediateResult>(jobVertex.getInputs().size());
        this.slotSharingGroup = jobVertex.getSlotSharingGroup();
        this.coLocationGroup = jobVertex.getCoLocationGroup();
        if (this.coLocationGroup != null && this.slotSharingGroup == null) {
            throw new JobException("Vertex uses a co-location constraint without using slot sharing");
        }
        this.producedDataSets = new IntermediateResult[jobVertex.getNumberOfProducedIntermediateDataSets()];
        for (int i = 0; i < jobVertex.getProducedDataSets().size(); ++i) {
            IntermediateDataSet result = jobVertex.getProducedDataSets().get(i);
            this.producedDataSets[i] = new IntermediateResult(result.getId(), this, numTaskVertices, result.getResultType());
        }
        Configuration jobConfiguration = graph.getJobConfiguration();
        int maxPriorAttemptsHistoryLength = jobConfiguration != null ? jobConfiguration.getInteger(JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE) : ((Integer)JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE.defaultValue()).intValue();
        for (int i = 0; i < numTaskVertices; ++i) {
            ExecutionVertex vertex;
            this.taskVertices[i] = vertex = new ExecutionVertex(this, i, this.producedDataSets, timeout, initialGlobalModVersion, createTimestamp, maxPriorAttemptsHistoryLength);
        }
        for (IntermediateResult ir : this.producedDataSets) {
            if (ir.getNumberOfAssignedPartitions() == this.parallelism) continue;
            throw new RuntimeException("The intermediate result's partitions were not correctly assigned.");
        }
        if (jobVertex.getInputSplitSources() != null) {
            this.inputSplitsMap = new HashMap<OperatorID, InputSplit[]>();
            this.splitAssignerMap = new HashMap<OperatorID, InputSplitAssigner>();
            this.inputSplitsLimitMap = new HashMap<OperatorID, Integer>();
        } else {
            this.inputSplitsMap = null;
            this.splitAssignerMap = null;
            this.inputSplitsLimitMap = null;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void setUpInputSplits(Map<OperatorID, InputSplit[]> inputSplitsInLog) throws JobException {
        block8: {
            if (this.inputSplitsMap != null && this.inputSplitsMap.size() > 0) {
                return;
            }
            try {
                Map<OperatorID, InputSplitSource<?>> splitSourceMap = this.jobVertex.getInputSplitSources();
                if (splitSourceMap == null) break block8;
                Thread currentThread = Thread.currentThread();
                ClassLoader oldContextClassLoader = currentThread.getContextClassLoader();
                currentThread.setContextClassLoader(this.graph.getUserClassLoader());
                try {
                    double limitMultiplier = this.graph.getPerTaskInputSplitsLimitAsAverageMultiplier();
                    for (Map.Entry<OperatorID, InputSplitSource<?>> entry : splitSourceMap.entrySet()) {
                        InputSplit[] inputSplits;
                        OperatorID operatorID = entry.getKey();
                        InputSplitSource<?> splitSource = entry.getValue();
                        InputSplit[] inputSplitArray = inputSplits = inputSplitsInLog != null ? inputSplitsInLog.get((Object)operatorID) : null;
                        if (inputSplits == null) {
                            inputSplits = splitSource.createInputSplits(this.parallelism);
                        }
                        if (inputSplits == null) continue;
                        this.inputSplitsMap.put(operatorID, inputSplits);
                        this.splitAssignerMap.put(operatorID, splitSource.getInputSplitAssigner(inputSplits));
                        if (!(limitMultiplier >= 1.0)) continue;
                        this.inputSplitsLimitMap.put(operatorID, (int)Math.ceil(1.0 * (double)inputSplits.length / (double)this.parallelism * limitMultiplier));
                    }
                    this.getGraph().getGraphManager().notifyInputSplitsCreated(this.getJobVertexId(), this.inputSplitsMap);
                }
                finally {
                    currentThread.setContextClassLoader(oldContextClassLoader);
                }
            }
            catch (Throwable t) {
                throw new JobException("Creating the input splits caused an error: " + t.getMessage(), t);
            }
        }
    }

    public List<OperatorID> getOperatorIDs() {
        return this.operatorIDs;
    }

    public List<OperatorID> getUserDefinedOperatorIDs() {
        return this.userDefinedOperatorIds;
    }

    public void setMaxParallelism(int maxParallelismDerived) {
        Preconditions.checkState((!this.maxParallelismConfigured ? 1 : 0) != 0, (Object)("Attempt to override a configured max parallelism. Configured: " + this.maxParallelism + ", argument: " + maxParallelismDerived));
        this.setMaxParallelismInternal(maxParallelismDerived);
    }

    private void setMaxParallelismInternal(int maxParallelism) {
        if (maxParallelism == Integer.MAX_VALUE) {
            maxParallelism = 32768;
        }
        Preconditions.checkArgument((maxParallelism > 0 && maxParallelism <= 32768 ? 1 : 0) != 0, (String)"Overriding max parallelism is not in valid bounds (1..%s), found: %s", (Object[])new Object[]{32768, maxParallelism});
        this.maxParallelism = maxParallelism;
    }

    public ExecutionGraph getGraph() {
        return this.graph;
    }

    public JobVertex getJobVertex() {
        return this.jobVertex;
    }

    @Override
    public String getName() {
        return this.getJobVertex().getName();
    }

    @Override
    public int getParallelism() {
        return this.parallelism;
    }

    @Override
    public int getMaxParallelism() {
        return this.maxParallelism;
    }

    public boolean isMaxParallelismConfigured() {
        return this.maxParallelismConfigured;
    }

    public JobID getJobId() {
        return this.graph.getJobID();
    }

    @Override
    public JobVertexID getJobVertexId() {
        return this.jobVertex.getID();
    }

    public ExecutionVertex[] getTaskVertices() {
        return this.taskVertices;
    }

    public IntermediateResult[] getProducedDataSets() {
        return this.producedDataSets;
    }

    public InputSplitAssigner getSplitAssigner(OperatorID operatorID) {
        return this.splitAssignerMap == null ? null : this.splitAssignerMap.get((Object)operatorID);
    }

    public SlotSharingGroup getSlotSharingGroup() {
        return this.slotSharingGroup;
    }

    public CoLocationGroup getCoLocationGroup() {
        return this.coLocationGroup;
    }

    public List<IntermediateResult> getInputs() {
        return this.inputs;
    }

    public int getInputSplitsLimit(OperatorID operatorID) {
        if (this.inputSplitsLimitMap != null && this.inputSplitsLimitMap.containsKey((Object)operatorID)) {
            return this.inputSplitsLimitMap.get((Object)operatorID);
        }
        return 0;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Either<SerializedValue<TaskInformation>, PermanentBlobKey> getTaskInformationOrBlobKey() throws IOException {
        Object object = this.stateMonitor;
        synchronized (object) {
            if (this.taskInformationOrBlobKey == null) {
                BlobWriter blobWriter = this.graph.getBlobWriter();
                TaskInformation taskInformation = new TaskInformation(this.jobVertex.getID(), this.jobVertex.getName(), this.parallelism, this.maxParallelism, this.jobVertex.getInvokableClassName(), this.jobVertex.getConfiguration());
                this.taskInformationOrBlobKey = BlobWriter.serializeAndTryOffload(taskInformation, this.getJobId(), blobWriter);
            }
        }
        return this.taskInformationOrBlobKey;
    }

    @Override
    public ExecutionState getAggregateState() {
        int[] num = new int[ExecutionState.values().length];
        for (ExecutionVertex vertex : this.taskVertices) {
            int n = vertex.getExecutionState().ordinal();
            num[n] = num[n] + 1;
        }
        return ExecutionJobVertex.getAggregateJobVertexState(num, this.parallelism);
    }

    private String generateDebugString() {
        return "ExecutionJobVertex(" + this.jobVertex.getName() + " | " + (Object)((Object)this.jobVertex.getID()) + "){parallelism=" + this.parallelism + ", maxParallelism=" + this.getMaxParallelism() + ", maxParallelismConfigured=" + this.maxParallelismConfigured + '}';
    }

    public void connectToPredecessors(Map<IntermediateDataSetID, IntermediateResult> intermediateDataSets) throws JobException {
        List<JobEdge> inputs = this.jobVertex.getInputs();
        if (LOG.isDebugEnabled()) {
            LOG.debug(String.format("Connecting ExecutionJobVertex %s (%s) to %d predecessors.", new Object[]{this.jobVertex.getID(), this.jobVertex.getName(), inputs.size()}));
        }
        for (int num = 0; num < inputs.size(); ++num) {
            int i;
            IntermediateResult ires;
            JobEdge edge = inputs.get(num);
            if (LOG.isDebugEnabled()) {
                if (edge.getSource() == null) {
                    LOG.debug(String.format("Connecting input %d of vertex %s (%s) to intermediate result referenced via ID %s, %s.", new Object[]{num, this.jobVertex.getID(), this.jobVertex.getName(), edge.getSourceId(), edge.getDistributionPattern()}));
                } else {
                    LOG.debug(String.format("Connecting input %d of vertex %s (%s) to intermediate result referenced via predecessor %s (%s), %s, %s, %s", new Object[]{num, this.jobVertex.getID(), this.jobVertex.getName(), edge.getSource().getProducer().getID(), edge.getSource().getProducer().getName(), edge.getDistributionPattern(), edge.getSource().getResultType(), edge.getSchedulingMode()}));
                }
            }
            if ((ires = intermediateDataSets.get((Object)edge.getSourceId())) == null) {
                throw new JobException("Cannot connect this job graph to the previous graph. No previous intermediate result found for ID " + (Object)((Object)edge.getSourceId()));
            }
            this.inputs.add(ires);
            int consumerIndex = ires.registerConsumer();
            ArrayList executionEdges = new ArrayList(this.parallelism);
            for (i = 0; i < this.parallelism; ++i) {
                executionEdges.add(new ArrayList());
            }
            for (i = 0; i < ires.getPartitions().length; ++i) {
                IntermediateResultPartition partition = ires.getPartitions()[i];
                Collection<ExecutionVertexID> consumerExecutionVertices = edge.getConsumerExecutionVertices(i);
                for (ExecutionVertexID executionVertexID : consumerExecutionVertices) {
                    ExecutionVertex consumerVertex = this.taskVertices[executionVertexID.getSubTaskIndex()];
                    ExecutionEdge ee = new ExecutionEdge(partition, consumerVertex, num);
                    partition.addConsumer(ee, consumerIndex);
                    ((ArrayList)executionEdges.get(executionVertexID.getSubTaskIndex())).add(ee);
                }
            }
            for (i = 0; i < this.parallelism; ++i) {
                ExecutionVertex ev = this.taskVertices[i];
                ev.setInputExecutionEdges(((ArrayList)executionEdges.get(i)).toArray(new ExecutionEdge[0]), num);
            }
        }
    }

    public CompletableFuture<Void> scheduleAll(SlotProvider slotProvider, boolean queued, LocationPreferenceConstraint locationPreferenceConstraint) {
        ExecutionVertex[] vertices = this.taskVertices;
        ArrayList<CompletableFuture<Void>> scheduleFutures = new ArrayList<CompletableFuture<Void>>(vertices.length);
        for (ExecutionVertex ev : vertices) {
            scheduleFutures.add(ev.scheduleForExecution(slotProvider, queued, locationPreferenceConstraint));
        }
        return FutureUtils.waitForAll(scheduleFutures);
    }

    public Collection<CompletableFuture<Execution>> allocateResourcesForAll(SlotProvider resourceProvider, boolean queued, LocationPreferenceConstraint locationPreferenceConstraint, Time allocationTimeout) {
        ExecutionVertex[] vertices = this.taskVertices;
        CompletableFuture[] slots = new CompletableFuture[vertices.length];
        for (int i = 0; i < vertices.length; ++i) {
            CompletableFuture<Execution> allocationFuture;
            Execution exec = vertices[i].getCurrentExecutionAttempt();
            slots[i] = allocationFuture = exec.allocateAndAssignSlotForExecution(resourceProvider, queued, locationPreferenceConstraint, allocationTimeout);
        }
        return Arrays.asList(slots);
    }

    public void cancel() {
        for (ExecutionVertex ev : this.getTaskVertices()) {
            ev.cancel();
        }
    }

    public CompletableFuture<Void> cancelWithFuture() {
        CompletableFuture[] futures = (CompletableFuture[])Arrays.stream(this.getTaskVertices()).map(ExecutionVertex::cancel).toArray(CompletableFuture[]::new);
        return CompletableFuture.allOf(futures);
    }

    public void fail(Throwable t) {
        for (ExecutionVertex ev : this.getTaskVertices()) {
            ev.fail(t);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void resetForNewExecution(long timestamp, long expectedGlobalModVersion) throws GlobalModVersionMismatch {
        Object object = this.stateMonitor;
        synchronized (object) {
            if (this.slotSharingGroup != null) {
                this.slotSharingGroup.clearTaskAssignment();
            }
            for (int i = 0; i < this.parallelism; ++i) {
                this.taskVertices[i].resetForNewExecution(timestamp, expectedGlobalModVersion);
                this.taskVertices[i].clearAssignedInputSplits();
            }
            try {
                if (this.inputSplitsMap != null) {
                    this.splitAssignerMap.clear();
                    Map<OperatorID, InputSplitSource<?>> splitSourceMap = this.jobVertex.getInputSplitSources();
                    for (Map.Entry<OperatorID, InputSplit[]> entry : this.inputSplitsMap.entrySet()) {
                        OperatorID operatorID = entry.getKey();
                        this.splitAssignerMap.put(operatorID, splitSourceMap.get((Object)operatorID).getInputSplitAssigner(entry.getValue()));
                    }
                }
            }
            catch (Throwable t) {
                throw new RuntimeException("Re-creating the input split assigner failed: " + t.getMessage(), t);
            }
        }
    }

    @Override
    public StringifiedAccumulatorResult[] getAggregatedUserAccumulatorsStringified() {
        HashMap userAccumulators = new HashMap();
        for (ExecutionVertex vertex : this.taskVertices) {
            Map<String, Accumulator<?, ?>> next = vertex.getCurrentExecutionAttempt().getUserAccumulators();
            if (next == null) continue;
            AccumulatorHelper.mergeInto(userAccumulators, next);
        }
        return StringifiedAccumulatorResult.stringifyAccumulatorResults(userAccumulators);
    }

    @Override
    public List<OperatorDescriptor> getOperatorDescriptors() {
        return this.jobVertex.getOperatorDescriptors();
    }

    public ArchivedExecutionJobVertex archive() {
        return new ArchivedExecutionJobVertex(this);
    }

    public static ExecutionState getAggregateJobVertexState(int[] verticesPerState, int parallelism) {
        if (verticesPerState == null || verticesPerState.length != ExecutionState.values().length) {
            throw new IllegalArgumentException("Must provide an array as large as there are execution states.");
        }
        if (verticesPerState[ExecutionState.FAILED.ordinal()] > 0) {
            return ExecutionState.FAILED;
        }
        if (verticesPerState[ExecutionState.CANCELING.ordinal()] > 0) {
            return ExecutionState.CANCELING;
        }
        if (verticesPerState[ExecutionState.CANCELED.ordinal()] > 0) {
            return ExecutionState.CANCELED;
        }
        if (verticesPerState[ExecutionState.RUNNING.ordinal()] > 0) {
            return ExecutionState.RUNNING;
        }
        if (verticesPerState[ExecutionState.FINISHED.ordinal()] > 0) {
            return verticesPerState[ExecutionState.FINISHED.ordinal()] == parallelism ? ExecutionState.FINISHED : ExecutionState.RUNNING;
        }
        return ExecutionState.CREATED;
    }

    public static Map<JobVertexID, ExecutionJobVertex> includeLegacyJobVertexIDs(Map<JobVertexID, ExecutionJobVertex> tasks) {
        HashMap<JobVertexID, ExecutionJobVertex> expanded = new HashMap<JobVertexID, ExecutionJobVertex>(2 * tasks.size());
        expanded.putAll(tasks);
        for (ExecutionJobVertex executionJobVertex : tasks.values()) {
            JobVertex jobVertex;
            if (null == executionJobVertex || null == (jobVertex = executionJobVertex.getJobVertex())) continue;
            List<JobVertexID> alternativeIds = jobVertex.getIdAlternatives();
            for (JobVertexID jobVertexID : alternativeIds) {
                ExecutionJobVertex old = expanded.put(jobVertexID, executionJobVertex);
                Preconditions.checkState((null == old || old.equals(executionJobVertex) ? 1 : 0) != 0, (Object)"Ambiguous jobvertex id detected during expansion to legacy ids.");
            }
        }
        return expanded;
    }

    public static Map<OperatorID, ExecutionJobVertex> includeAlternativeOperatorIDs(Map<OperatorID, ExecutionJobVertex> operatorMapping) {
        HashMap<OperatorID, ExecutionJobVertex> expanded = new HashMap<OperatorID, ExecutionJobVertex>(2 * operatorMapping.size());
        expanded.putAll(operatorMapping);
        for (ExecutionJobVertex executionJobVertex : operatorMapping.values()) {
            JobVertex jobVertex;
            if (executionJobVertex == null || (jobVertex = executionJobVertex.getJobVertex()) == null) continue;
            for (OperatorID operatorID : jobVertex.getUserDefinedOperatorIDs()) {
                if (operatorID == null) continue;
                expanded.put(operatorID, executionJobVertex);
            }
        }
        return expanded;
    }
}

