/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.executiongraph;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.Archiveable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.api.common.resources.CommonExtendedResource;
import org.apache.flink.api.common.resources.Resource;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.PartialInputChannelDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionLocationTrackerProxy;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionVertex;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionEdge;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraphException;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.GlobalModVersionMismatch;
import org.apache.flink.runtime.executiongraph.IntermediateResult;
import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
import org.apache.flink.runtime.executiongraph.JobInformation;
import org.apache.flink.runtime.executiongraph.TaskInformation;
import org.apache.flink.runtime.executiongraph.failover.RestartIndividualStrategy;
import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.io.network.partition.BlockingShuffleType;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.ExecutionVertexID;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.TaskNetworkMemoryUtil;
import org.apache.flink.runtime.jobmaster.failover.ResultDescriptor;
import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
import org.apache.flink.runtime.schedule.ExecutionVertexStatus;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.util.EvictingBoundedList;
import org.apache.flink.types.Either;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.slf4j.Logger;

public class ExecutionVertex
implements AccessExecutionVertex,
Archiveable<ArchivedExecutionVertex> {
    private static final Logger LOG = ExecutionGraph.LOG;
    private static final int MAX_DISTINCT_LOCATIONS_TO_CONSIDER = 8;
    private final ExecutionJobVertex jobVertex;
    private Map<IntermediateResultPartitionID, IntermediateResultPartition> resultPartitions;
    private final ExecutionEdge[][] inputEdges;
    private final int subTaskIndex;
    private final ExecutionVertexID executionVertexID;
    private final EvictingBoundedList<Execution> priorExecutions;
    private final Time timeout;
    private final String taskNameWithSubtask;
    private volatile CoLocationConstraint locationConstraint;
    private volatile Execution currentExecution;
    private long createTimestamp;
    private TaskManagerLocation latestPriorLocation = null;
    private final Map<OperatorID, List<InputSplit>> assignedInputSplitsMap = new HashMap<OperatorID, List<InputSplit>>();
    private final Map<OperatorID, Integer> inputSplitIndexMap = new HashMap<OperatorID, Integer>();

    @VisibleForTesting
    ExecutionVertex(ExecutionJobVertex jobVertex, int subTaskIndex, IntermediateResult[] producedDataSets, Time timeout) {
        this(jobVertex, subTaskIndex, producedDataSets, timeout, 1L, System.currentTimeMillis(), (Integer)JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE.defaultValue());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public ExecutionVertex(ExecutionJobVertex jobVertex, int subTaskIndex, IntermediateResult[] producedDataSets, Time timeout, long initialGlobalModVersion, long createTimestamp, int maxPriorExecutionHistoryLength) {
        this.jobVertex = jobVertex;
        this.subTaskIndex = subTaskIndex;
        this.executionVertexID = new ExecutionVertexID(jobVertex.getJobVertexId(), subTaskIndex);
        this.taskNameWithSubtask = String.format("%s (%d/%d)", jobVertex.getJobVertex().getName(), subTaskIndex + 1, jobVertex.getParallelism());
        this.resultPartitions = new LinkedHashMap<IntermediateResultPartitionID, IntermediateResultPartition>(producedDataSets.length, 1.0f);
        for (IntermediateResult result : producedDataSets) {
            IntermediateResultPartition irp = new IntermediateResultPartition(result, this, subTaskIndex);
            result.setPartition(subTaskIndex, irp);
            this.resultPartitions.put(irp.getPartitionId(), irp);
        }
        this.inputEdges = new ExecutionEdge[jobVertex.getJobVertex().getInputs().size()][];
        this.priorExecutions = new EvictingBoundedList(maxPriorExecutionHistoryLength);
        this.createTimestamp = createTimestamp;
        this.currentExecution = new Execution(this.getExecutionGraph().getFutureExecutor(), this, 0, initialGlobalModVersion, createTimestamp, timeout);
        CoLocationGroup clg = jobVertex.getCoLocationGroup();
        if (clg != null) {
            CoLocationGroup coLocationGroup = clg;
            synchronized (coLocationGroup) {
                this.locationConstraint = clg.getLocationConstraint(subTaskIndex);
            }
        } else {
            this.locationConstraint = null;
        }
        this.getExecutionGraph().registerExecution(this.currentExecution);
        this.timeout = timeout;
    }

    public JobID getJobId() {
        return this.jobVertex.getJobId();
    }

    public ExecutionJobVertex getJobVertex() {
        return this.jobVertex;
    }

    public JobVertexID getJobvertexId() {
        return this.jobVertex.getJobVertexId();
    }

    public ExecutionVertexID getExecutionVertexID() {
        return this.executionVertexID;
    }

    public String getTaskName() {
        return this.jobVertex.getJobVertex().getName();
    }

    @Override
    public String getTaskNameWithSubtaskIndex() {
        return this.taskNameWithSubtask;
    }

    public int getTotalNumberOfParallelSubtasks() {
        return this.jobVertex.getParallelism();
    }

    public int getMaxParallelism() {
        return this.jobVertex.getMaxParallelism();
    }

    @Override
    public int getParallelSubtaskIndex() {
        return this.subTaskIndex;
    }

    public int getNumberOfInputs() {
        return this.inputEdges.length;
    }

    public ExecutionEdge[] getInputEdges(int input) {
        if (input < 0 || input >= this.inputEdges.length) {
            throw new IllegalArgumentException(String.format("Input %d is out of range [0..%d)", input, this.inputEdges.length));
        }
        return this.inputEdges[input];
    }

    public CoLocationConstraint getLocationConstraint() {
        return this.locationConstraint;
    }

    @Override
    public Execution getCurrentExecutionAttempt() {
        return this.currentExecution;
    }

    @Override
    public ExecutionState getExecutionState() {
        return this.currentExecution.getState();
    }

    @Override
    public long getStateTimestamp(ExecutionState state) {
        return this.currentExecution.getStateTimestamp(state);
    }

    @Override
    public String getFailureCauseAsString() {
        return ExceptionUtils.stringifyException((Throwable)this.getFailureCause());
    }

    public Throwable getFailureCause() {
        return this.currentExecution.getFailureCause();
    }

    public CompletableFuture<TaskManagerLocation> getCurrentTaskManagerLocationFuture() {
        return this.currentExecution.getTaskManagerLocationFuture();
    }

    public LogicalSlot getCurrentAssignedResource() {
        return this.currentExecution.getAssignedResource();
    }

    @Override
    public TaskManagerLocation getCurrentAssignedResourceLocation() {
        return this.currentExecution.getAssignedResourceLocation();
    }

    @Override
    public Execution getPriorExecutionAttempt(int attemptNumber) {
        EvictingBoundedList<Execution> evictingBoundedList = this.priorExecutions;
        synchronized (evictingBoundedList) {
            if (attemptNumber >= 0 && attemptNumber < this.priorExecutions.size()) {
                return this.priorExecutions.get(attemptNumber);
            }
            throw new IllegalArgumentException("attempt does not exist");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Execution getLatestPriorExecution() {
        EvictingBoundedList<Execution> evictingBoundedList = this.priorExecutions;
        synchronized (evictingBoundedList) {
            int size = this.priorExecutions.size();
            if (size > 0) {
                return this.priorExecutions.get(size - 1);
            }
            return null;
        }
    }

    public TaskManagerLocation getLatestPriorLocation() {
        return this.latestPriorLocation;
    }

    public void setLatestPriorLocation(TaskManagerLocation location) {
        this.latestPriorLocation = location;
    }

    public AllocationID getLatestPriorAllocation() {
        Execution latestPriorExecution = this.getLatestPriorExecution();
        return latestPriorExecution != null ? latestPriorExecution.getAssignedAllocationID() : null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    EvictingBoundedList<Execution> getCopyOfPriorExecutionsList() {
        EvictingBoundedList<Execution> evictingBoundedList = this.priorExecutions;
        synchronized (evictingBoundedList) {
            return new EvictingBoundedList<Execution>(this.priorExecutions);
        }
    }

    public ExecutionGraph getExecutionGraph() {
        return this.jobVertex.getGraph();
    }

    public Map<IntermediateResultPartitionID, IntermediateResultPartition> getProducedPartitions() {
        return this.resultPartitions;
    }

    public ExecutionVertexStatus getCurrentStatus() {
        return new ExecutionVertexStatus(this.executionVertexID, this.getExecutionState());
    }

    public void setInputExecutionEdges(ExecutionEdge[] edges, int inputNumber) {
        this.inputEdges[inputNumber] = edges;
    }

    public Collection<CompletableFuture<TaskManagerLocation>> getPreferredLocations() {
        Collection<CompletableFuture<TaskManagerLocation>> basedOnState = this.getPreferredLocationsBasedOnState();
        return basedOnState != null ? basedOnState : this.getPreferredLocationsBasedOnInputs();
    }

    public Collection<CompletableFuture<TaskManagerLocation>> getPreferredLocationsBasedOnState() {
        TaskManagerLocation priorLocation;
        if (this.currentExecution.getTaskRestore() != null && (priorLocation = this.getLatestPriorLocation()) != null) {
            return Collections.singleton(CompletableFuture.completedFuture(priorLocation));
        }
        return null;
    }

    public Collection<CompletableFuture<TaskManagerLocation>> getPreferredLocationsBasedOnInputs() {
        if (this.inputEdges == null) {
            return Collections.emptySet();
        }
        HashSet locations = new HashSet(this.getTotalNumberOfParallelSubtasks());
        HashSet<CompletableFuture<TaskManagerLocation>> inputLocations = new HashSet<CompletableFuture<TaskManagerLocation>>(this.getTotalNumberOfParallelSubtasks());
        for (int i = 0; i < this.inputEdges.length; ++i) {
            inputLocations.clear();
            ExecutionEdge[] sources = this.inputEdges[i];
            if (sources != null) {
                for (int k = 0; k < sources.length; ++k) {
                    CompletableFuture<TaskManagerLocation> locationFuture = sources[k].getSource().getProducer().getCurrentTaskManagerLocationFuture();
                    inputLocations.add(locationFuture);
                    if (inputLocations.size() <= 8) continue;
                    inputLocations.clear();
                    break;
                }
            }
            if (!locations.isEmpty() && (inputLocations.isEmpty() || inputLocations.size() >= locations.size())) continue;
            locations.clear();
            locations.addAll(inputLocations);
        }
        return locations.isEmpty() ? Collections.emptyList() : locations;
    }

    public ResourceProfile calculateResourceProfile() {
        if (this.jobVertex.getJobVertex().getMinResources().equals((Object)ResourceSpec.DEFAULT)) {
            return ResourceProfile.UNKNOWN;
        }
        int networkMemory = this.calculateTaskNetworkMemory();
        int additionalManagedMemory = this.calculateTaskExtraManagedMemory();
        ResourceSpec additionalResourceSpec = ResourceSpec.newBuilder().addExtendedResource(new Resource[]{new CommonExtendedResource("MANAGED_MEMORY_MB", (double)additionalManagedMemory)}).build();
        return ResourceProfile.fromResourceSpec(this.getJobVertex().getJobVertex().getMinResources().merge(additionalResourceSpec), this.getJobVertex().getJobVertex().getResourceConstraints(), networkMemory);
    }

    @VisibleForTesting
    int calculateTaskNetworkMemory() {
        Configuration config = this.jobVertex.getGraph().getJobManagerConfiguration();
        BlockingShuffleType shuffleType = BlockingShuffleType.getBlockingShuffleTypeFromConfiguration(config, LOG);
        int numInternalSubpartitions = 0;
        int numInternalResultPartitions = 0;
        for (IntermediateResultPartition irp : this.getProducedPartitions().values()) {
            if (shuffleType == BlockingShuffleType.YARN && irp.getIntermediateResult().getResultType().isBlocking()) continue;
            for (List<ExecutionEdge> consumer : irp.getConsumers()) {
                numInternalSubpartitions += consumer.size();
            }
            ++numInternalResultPartitions;
        }
        int maxBlockingRequestsInFlight = config.getInteger(TaskManagerOptions.TASK_EXTERNAL_SHUFFLE_MAX_CONCURRENT_REQUESTS);
        int numPipelineChannels = 0;
        int numPipelineGates = 0;
        int numExternalBlockingChannels = 0;
        int numExternalBlockingGates = 0;
        for (int j = 0; j < this.getNumberOfInputs(); ++j) {
            boolean isExternalBlocking;
            ExecutionEdge[] edges = this.getInputEdges(j);
            Preconditions.checkState((edges.length > 0 ? 1 : 0) != 0, (Object)"There should be at least on edge for each input");
            boolean bl = isExternalBlocking = edges[0].getSource().getIntermediateResult().getResultType().isBlocking() && shuffleType == BlockingShuffleType.YARN;
            if (isExternalBlocking) {
                numExternalBlockingChannels += edges.length;
                ++numExternalBlockingGates;
                continue;
            }
            numPipelineChannels += edges.length;
            ++numPipelineGates;
        }
        if (maxBlockingRequestsInFlight > 0) {
            numExternalBlockingChannels = Math.min(numExternalBlockingChannels, maxBlockingRequestsInFlight);
            numExternalBlockingChannels = Math.max(numExternalBlockingChannels, numExternalBlockingGates);
        }
        return TaskNetworkMemoryUtil.calculateTaskNetworkMemory(config, numInternalSubpartitions, numInternalResultPartitions, numPipelineChannels, numPipelineGates, numExternalBlockingChannels, numExternalBlockingGates);
    }

    private int calculateTaskExtraManagedMemory() {
        Configuration config = this.getJobVertex().getGraph().getJobManagerConfiguration();
        BlockingShuffleType shuffleType = BlockingShuffleType.getBlockingShuffleTypeFromConfiguration(config, LOG);
        int numExternalResultPartitions = 0;
        for (IntermediateResultPartition irp : this.getProducedPartitions().values()) {
            if (shuffleType != BlockingShuffleType.YARN || !irp.getIntermediateResult().getResultType().isBlocking()) continue;
            ++numExternalResultPartitions;
        }
        int mapOutputMemoryInMB = config.getInteger(TaskManagerOptions.TASK_MANAGER_OUTPUT_MEMORY_MB);
        return mapOutputMemoryInMB * numExternalResultPartitions;
    }

    public Execution resetForNewExecution(long timestamp, long originatingGlobalModVersion) throws GlobalModVersionMismatch {
        LOG.debug("Resetting execution vertex {} for new execution.", (Object)this.getTaskNameWithSubtaskIndex());
        EvictingBoundedList<Execution> evictingBoundedList = this.priorExecutions;
        synchronized (evictingBoundedList) {
            long actualModVersion = this.getExecutionGraph().getGlobalModVersion();
            if (actualModVersion > originatingGlobalModVersion) {
                throw new GlobalModVersionMismatch(originatingGlobalModVersion, actualModVersion);
            }
            Execution oldExecution = this.currentExecution;
            ExecutionState oldState = oldExecution.getState();
            if (oldState.isTerminal() || this.getExecutionGraph().getGraphManager().isReplaying()) {
                Execution newExecution;
                oldExecution.clearStateRestore();
                this.priorExecutions.add(oldExecution);
                this.latestPriorLocation = oldExecution.getAssignedResourceLocation();
                this.currentExecution = newExecution = new Execution(this.getExecutionGraph().getFutureExecutor(), this, oldExecution.getAttemptNumber() + 1, originatingGlobalModVersion, timestamp, this.timeout);
                CoLocationGroup grp = this.jobVertex.getCoLocationGroup();
                if (grp != null) {
                    this.locationConstraint = grp.getLocationConstraint(this.subTaskIndex);
                }
                this.getExecutionGraph().registerExecution(newExecution);
                if (oldState == ExecutionState.FINISHED) {
                    this.getExecutionGraph().vertexUnFinished();
                }
                this.inputSplitIndexMap.clear();
                for (IntermediateResultPartition resultPartition : this.resultPartitions.values()) {
                    resultPartition.resetForNewExecution();
                }
                return newExecution;
            }
            throw new IllegalStateException("Cannot reset a vertex that is in non-terminal state " + (Object)((Object)oldState));
        }
    }

    public CompletableFuture<Void> scheduleForExecution(SlotProvider slotProvider, boolean queued, LocationPreferenceConstraint locationPreferenceConstraint) {
        return this.currentExecution.scheduleForExecution(slotProvider, queued, locationPreferenceConstraint);
    }

    @VisibleForTesting
    public void deployToSlot(SimpleSlot slot) throws JobException {
        if (!this.currentExecution.tryAssignResource(slot)) {
            throw new IllegalStateException("Could not assign resource " + slot + " to current execution " + this.currentExecution + '.');
        }
        this.currentExecution.deploy();
    }

    public CompletableFuture<?> cancel() {
        Execution exec = this.currentExecution;
        exec.cancel();
        return exec.getReleaseFuture();
    }

    public void stop() {
        this.currentExecution.stop();
    }

    public void fail(Throwable t) {
        this.currentExecution.fail(t);
    }

    void scheduleOrUpdateConsumers(ResultPartitionID partitionId) {
        Execution execution = this.currentExecution;
        if (!partitionId.getProducerId().equals((Object)execution.getAttemptId())) {
            return;
        }
        IntermediateResultPartition partition = this.resultPartitions.get((Object)partitionId.getPartitionId());
        if (partition == null) {
            throw new IllegalStateException("Unknown partition " + partitionId + ".");
        }
        if (!partition.getIntermediateResult().getResultType().isPipelined()) {
            throw new IllegalArgumentException("ScheduleOrUpdateConsumers msg is only valid forpipelined partitions.");
        }
        partition.markDataProduced();
        this.notifyAndUpdateConsumers(partition);
    }

    protected void notifyAndUpdateConsumers(IntermediateResultPartition partition) {
        this.getExecutionGraph().getGraphManager().notifyResultPartitionConsumable(this.getExecutionVertexID(), partition.getIntermediateResult().getId(), partition.getPartitionNumber(), this.getCurrentAssignedResourceLocation());
        this.getExecutionGraph().getFutureExecutor().execute(() -> this.currentExecution.updateConsumers(partition.getConsumers()));
    }

    public void cachePartitionInfo(PartialInputChannelDeploymentDescriptor partitionInfo) {
        this.getCurrentExecutionAttempt().cachePartitionInfo(partitionInfo);
    }

    void clearAssignedInputSplits() {
        this.assignedInputSplitsMap.clear();
        this.inputSplitIndexMap.clear();
    }

    void finishPartitionsAndNotify() {
        for (IntermediateResultPartition partition : this.resultPartitions.values()) {
            partition.markFinished();
            if (!partition.getResultType().isBlocking()) continue;
            this.notifyAndUpdateConsumers(partition);
        }
    }

    void resetResultPartitionID(ResultPartitionID[] partitionIds) {
        LinkedHashMap<IntermediateResultPartitionID, IntermediateResultPartition> newResultPartitions = new LinkedHashMap<IntermediateResultPartitionID, IntermediateResultPartition>(this.resultPartitions.size());
        Iterator<IntermediateResultPartition> iterator = this.resultPartitions.values().iterator();
        for (int i = 0; i < this.resultPartitions.size(); ++i) {
            IntermediateResultPartition resultPartition = iterator.next();
            IntermediateResultPartitionID originId = resultPartition.getPartitionId();
            resultPartition.setPartitionId(partitionIds[i].getPartitionId());
            resultPartition.getIntermediateResult().resetLookupHelper(originId, partitionIds[i].getPartitionId());
            newResultPartitions.put(resultPartition.getPartitionId(), resultPartition);
        }
        this.resultPartitions = newResultPartitions;
    }

    public void inputSplitAssigned(OperatorID operatorID, InputSplit inputSplit) {
        this.assignedInputSplitsMap.putIfAbsent(operatorID, new LinkedList());
        this.assignedInputSplitsMap.get((Object)operatorID).add(inputSplit);
        this.inputSplitIndexMap.put(operatorID, this.inputSplitIndexMap.getOrDefault((Object)operatorID, 0) + 1);
        Preconditions.checkArgument((this.inputSplitIndexMap.get((Object)operatorID).intValue() == this.assignedInputSplitsMap.get((Object)operatorID).size() ? 1 : 0) != 0);
    }

    public InputSplit getNextInputSplitFromAssgined(OperatorID operatorID) {
        List assignedInputSplits = this.assignedInputSplitsMap.getOrDefault((Object)operatorID, Collections.emptyList());
        Integer inputSplitIndex = this.inputSplitIndexMap.getOrDefault((Object)operatorID, 0);
        if (assignedInputSplits.isEmpty() || inputSplitIndex >= assignedInputSplits.size()) {
            return null;
        }
        Integer n = inputSplitIndex;
        Integer n2 = inputSplitIndex = Integer.valueOf(inputSplitIndex + 1);
        InputSplit split = (InputSplit)assignedInputSplits.get(n);
        this.inputSplitIndexMap.put(operatorID, inputSplitIndex);
        return split;
    }

    public Map<OperatorID, List<InputSplit>> getAssignedInputSplits() {
        return Collections.unmodifiableMap(this.assignedInputSplitsMap);
    }

    public boolean isOverInputSplitsLimit(OperatorID operatorID) {
        int limit = this.getJobVertex().getInputSplitsLimit(operatorID);
        Integer inputSplitIndex = this.inputSplitIndexMap.getOrDefault((Object)operatorID, 0);
        return limit != 0 && inputSplitIndex >= limit;
    }

    public void recoverStatus(ExecutionState state, Map<OperatorID, List<InputSplit>> assignedInputSplits, ResultDescriptor resultDescriptor) {
        if (!ExecutionState.FINISHED.equals((Object)state) && assignedInputSplits != null && resultDescriptor != null) {
            throw new FlinkRuntimeException("Can not assign input split or result partion when execution is " + (Object)((Object)state));
        }
        switch (state) {
            case FINISHED: {
                this.currentExecution.getTaskManagerLocationFuture().complete(resultDescriptor.getTaskManagerLocation());
                this.resetResultPartitionID(resultDescriptor.getResultPartitionIds());
                this.currentExecution.markFinished();
                if (assignedInputSplits == null) break;
                this.assignedInputSplitsMap.clear();
                this.assignedInputSplitsMap.putAll(assignedInputSplits);
                for (Map.Entry<OperatorID, List<InputSplit>> opToInputs : assignedInputSplits.entrySet()) {
                    this.getJobVertex().getSplitAssigner(opToInputs.getKey()).inputSplitsAssigned(this.subTaskIndex, opToInputs.getValue());
                }
                break;
            }
            case RUNNING: {
                this.currentExecution.switchToRunning();
                break;
            }
            case DEPLOYING: {
                this.currentExecution.recoverState(state);
                break;
            }
            default: {
                throw new FlinkRuntimeException("Unsupported replaying the state " + (Object)((Object)state));
            }
        }
    }

    public void recoverResultPartitionStatus(IntermediateDataSetID resultId, TaskManagerLocation location) {
        IntermediateResultPartition partitionToRecover = null;
        for (IntermediateResultPartition irp : this.getProducedPartitions().values()) {
            if (!irp.getIntermediateResult().getId().equals((Object)resultId)) continue;
            partitionToRecover = irp;
        }
        if (partitionToRecover == null) {
            throw new FlinkRuntimeException("Can not find the intermediate result " + (Object)((Object)resultId) + " on " + this.getTaskNameWithSubtaskIndex());
        }
        if (!ExecutionState.RUNNING.equals((Object)this.currentExecution.getState()) || !partitionToRecover.getResultType().isPipelined()) {
            throw new FlinkRuntimeException("Invalid state " + (Object)((Object)this.currentExecution.getState()) + " for " + this.getTaskNameWithSubtaskIndex());
        }
        this.currentExecution.getTaskManagerLocationFuture().complete(location);
        this.scheduleOrUpdateConsumers(new ResultPartitionID(partitionToRecover.getPartitionId(), this.currentExecution.getAttemptId()));
    }

    public boolean reconcileExecution(ExecutionState state, ExecutionAttemptID executionId, int attemptNumber, long startTimestamp, ResultPartitionID[] partitionIds, boolean[] partitionsConsumable, Map<OperatorID, List<InputSplit>> assignedInputSplits, LogicalSlot slot) {
        LOG.debug("Reconcile execution vertex {} for current execution.", (Object)this.getTaskNameWithSubtaskIndex());
        if (this.resultPartitions.size() != partitionIds.length) {
            LOG.info("Reconcile execution failed due to partition number with actual {}, expect {}.", (Object)partitionIds.length, (Object)this.resultPartitions.size());
            return false;
        }
        this.resetResultPartitionID(partitionIds);
        for (int i = 0; i < partitionIds.length; ++i) {
            IntermediateResultPartition partition = this.resultPartitions.get((Object)partitionIds[i].getPartitionId());
            if (!partition.getResultType().isPipelined() || partition.hasDataProduced() == partitionsConsumable[i]) continue;
            LOG.info("Reconcile execution {} failed due to partition {} consumable not equals to {}.", new Object[]{this.getTaskNameWithSubtaskIndex(), partition.getPartitionId(), partition.hasDataProduced()});
            this.currentExecution.getReconcileFuture().complete(this.currentExecution.getAttemptId());
            return false;
        }
        this.getExecutionGraph().deregisterExecution(this.currentExecution);
        if (this.currentExecution.reconcileStatus(state, executionId, attemptNumber, startTimestamp, slot)) {
            this.getExecutionGraph().registerExecution(this.currentExecution);
            this.inputSplitIndexMap.clear();
            this.assignedInputSplitsMap.clear();
            for (Map.Entry<OperatorID, List<InputSplit>> opToInputs : assignedInputSplits.entrySet()) {
                for (InputSplit inputSplit : opToInputs.getValue()) {
                    this.inputSplitAssigned(opToInputs.getKey(), inputSplit);
                }
                this.getJobVertex().getSplitAssigner(opToInputs.getKey()).inputSplitsAssigned(this.subTaskIndex, opToInputs.getValue());
            }
            return true;
        }
        this.getExecutionGraph().registerExecution(this.currentExecution);
        return false;
    }

    void executionFinished(Execution execution) {
        this.getExecutionGraph().vertexFinished();
    }

    void executionCanceled(Execution execution) {
    }

    void executionFailed(Execution execution, Throwable cause) {
    }

    void notifyStateTransition(Execution execution, ExecutionState newState, Throwable error) {
        if (this.currentExecution == execution) {
            this.getExecutionGraph().notifyExecutionChange(execution, newState, error);
        }
    }

    TaskDeploymentDescriptor createDeploymentDescriptor(ExecutionAttemptID executionId, LogicalSlot targetSlot, @Nullable JobManagerTaskRestore taskRestore, int attemptNumber) throws ExecutionGraphException {
        Either<SerializedValue<TaskInformation>, PermanentBlobKey> taskInformationOrBlobKey;
        ArrayList<ResultPartitionDeploymentDescriptor> producedPartitions = new ArrayList<ResultPartitionDeploymentDescriptor>(this.resultPartitions.size());
        ArrayList<InputGateDeploymentDescriptor> consumedPartitions = new ArrayList<InputGateDeploymentDescriptor>(this.inputEdges.length);
        boolean sendScheduleOrUpdateConsumersMessage = this.getExecutionGraph().isLazyDeploymentAllowed();
        boolean allowLazyDeployment = this.getExecutionGraph().getFailoverStrategy() instanceof RestartIndividualStrategy ? true : this.getExecutionGraph().isLazyDeploymentAllowed();
        for (IntermediateResultPartition intermediateResultPartition : this.resultPartitions.values()) {
            List<List<ExecutionEdge>> consumers = intermediateResultPartition.getConsumers();
            if (consumers.isEmpty()) {
                producedPartitions.add(ResultPartitionDeploymentDescriptor.from(intermediateResultPartition, 32768, sendScheduleOrUpdateConsumersMessage));
                continue;
            }
            Preconditions.checkState((1 == consumers.size() ? 1 : 0) != 0, (Object)("Only one consumer supported in the current implementation! Found: " + consumers.size()));
            List<ExecutionEdge> consumer = consumers.get(0);
            ExecutionJobVertex vertex = consumer.get(0).getTarget().getJobVertex();
            int maxParallelism = vertex.getMaxParallelism();
            producedPartitions.add(ResultPartitionDeploymentDescriptor.from(intermediateResultPartition, maxParallelism, sendScheduleOrUpdateConsumersMessage));
        }
        ResultPartitionLocationTrackerProxy resultPartitionLocationTrackerProxy = this.currentExecution.getVertex().getExecutionGraph().getResultPartitionLocationTrackerProxy();
        for (ExecutionEdge[] edges : this.inputEdges) {
            InputChannelDeploymentDescriptor[] partitions = InputChannelDeploymentDescriptor.fromEdges(resultPartitionLocationTrackerProxy, edges, targetSlot.getTaskManagerLocation(), allowLazyDeployment);
            int numConsumerEdges = edges[0].getSource().getConsumers().get(0).size();
            int queueToRequest = this.subTaskIndex % numConsumerEdges;
            IntermediateResult consumedIntermediateResult = edges[0].getSource().getIntermediateResult();
            IntermediateDataSetID resultId = consumedIntermediateResult.getId();
            ResultPartitionType partitionType = consumedIntermediateResult.getResultType();
            consumedPartitions.add(new InputGateDeploymentDescriptor(resultId, partitionType, queueToRequest, partitions));
        }
        Either<SerializedValue<JobInformation>, PermanentBlobKey> either = this.getExecutionGraph().getJobInformationOrBlobKey();
        TaskDeploymentDescriptor.MaybeOffloaded serializedJobInformation = either.isLeft() ? new TaskDeploymentDescriptor.NonOffloaded((SerializedValue)either.left()) : new TaskDeploymentDescriptor.Offloaded((PermanentBlobKey)either.right());
        try {
            taskInformationOrBlobKey = this.jobVertex.getTaskInformationOrBlobKey();
        }
        catch (IOException e) {
            throw new ExecutionGraphException("Could not create a serialized JobVertexInformation for " + (Object)((Object)this.jobVertex.getJobVertexId()), e);
        }
        TaskDeploymentDescriptor.MaybeOffloaded serializedTaskInformation = taskInformationOrBlobKey.isLeft() ? new TaskDeploymentDescriptor.NonOffloaded((SerializedValue)taskInformationOrBlobKey.left()) : new TaskDeploymentDescriptor.Offloaded((PermanentBlobKey)taskInformationOrBlobKey.right());
        return new TaskDeploymentDescriptor(this.getJobId(), serializedJobInformation, serializedTaskInformation, executionId, targetSlot.getAllocationId(), this.subTaskIndex, attemptNumber, targetSlot.getPhysicalSlotNumber(), this.createTimestamp, taskRestore, producedPartitions, consumedPartitions);
    }

    public String toString() {
        return this.getTaskNameWithSubtaskIndex();
    }

    public ArchivedExecutionVertex archive() {
        return new ArchivedExecutionVertex(this);
    }
}

