/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.executiongraph;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.Archiveable;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotProfile;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.deployment.PartialInputChannelDeploymentDescriptor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AccessExecution;
import org.apache.flink.runtime.executiongraph.ArchivedExecution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionEdge;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.IOMetrics;
import org.apache.flink.runtime.executiongraph.IllegalExecutionStateException;
import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
import org.apache.flink.runtime.executiongraph.PartitionInfo;
import org.apache.flink.runtime.instance.SlotSharingGroupId;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.StackTraceSampleResponse;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.OptionalFailure;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;

public class Execution
implements AccessExecution,
Archiveable<ArchivedExecution>,
LogicalSlot.Payload {
    private static final AtomicReferenceFieldUpdater<Execution, ExecutionState> STATE_UPDATER = AtomicReferenceFieldUpdater.newUpdater(Execution.class, ExecutionState.class, "state");
    private static final AtomicReferenceFieldUpdater<Execution, LogicalSlot> ASSIGNED_SLOT_UPDATER = AtomicReferenceFieldUpdater.newUpdater(Execution.class, LogicalSlot.class, "assignedResource");
    private static final Logger LOG = ExecutionGraph.LOG;
    private static final int NUM_CANCEL_CALL_TRIES = 3;
    private static final int NUM_STOP_CALL_TRIES = 3;
    private final Executor executor;
    private final ExecutionVertex vertex;
    private ExecutionAttemptID attemptId;
    private final long globalModVersion;
    private final long[] stateTimestamps;
    private int attemptNumber;
    private final Time rpcTimeout;
    private final ConcurrentLinkedQueue<PartialInputChannelDeploymentDescriptor> partialInputChannelDeploymentDescriptors;
    private final CompletableFuture<ExecutionState> terminalStateFuture;
    private final CompletableFuture<?> releaseFuture;
    private final CompletableFuture<TaskManagerLocation> taskManagerLocationFuture;
    private volatile ExecutionState state = ExecutionState.CREATED;
    private volatile LogicalSlot assignedResource;
    private volatile Throwable failureCause;
    @Nullable
    private volatile JobManagerTaskRestore taskRestore;
    @Nullable
    private volatile AllocationID assignedAllocationID;
    private final Object accumulatorLock = new Object();
    private volatile Map<String, Accumulator<?, ?>> userAccumulators;
    private volatile IOMetrics ioMetrics;
    private final Object updatePartitionLock = new Object();
    private ScheduledFuture updatePartitionFuture;
    private CompletableFuture<ExecutionAttemptID> reconcileFuture;

    public Execution(Executor executor, ExecutionVertex vertex, int attemptNumber, long globalModVersion, long startTimestamp, Time rpcTimeout) {
        this.executor = (Executor)Preconditions.checkNotNull((Object)executor);
        this.vertex = (ExecutionVertex)Preconditions.checkNotNull((Object)vertex);
        this.attemptId = new ExecutionAttemptID();
        this.rpcTimeout = (Time)Preconditions.checkNotNull((Object)rpcTimeout);
        this.globalModVersion = globalModVersion;
        this.attemptNumber = attemptNumber;
        this.stateTimestamps = new long[ExecutionState.values().length];
        this.markTimestamp(ExecutionState.CREATED, startTimestamp);
        this.partialInputChannelDeploymentDescriptors = new ConcurrentLinkedQueue();
        this.terminalStateFuture = new CompletableFuture();
        this.releaseFuture = new CompletableFuture();
        this.taskManagerLocationFuture = new CompletableFuture();
        this.assignedResource = null;
    }

    public ExecutionVertex getVertex() {
        return this.vertex;
    }

    @Override
    public ExecutionAttemptID getAttemptId() {
        return this.attemptId;
    }

    @Override
    public int getAttemptNumber() {
        return this.attemptNumber;
    }

    @Override
    public ExecutionState getState() {
        return this.state;
    }

    @Nullable
    public AllocationID getAssignedAllocationID() {
        return this.assignedAllocationID;
    }

    public long getGlobalModVersion() {
        return this.globalModVersion;
    }

    public CompletableFuture<TaskManagerLocation> getTaskManagerLocationFuture() {
        return this.taskManagerLocationFuture;
    }

    public LogicalSlot getAssignedResource() {
        return this.assignedResource;
    }

    @VisibleForTesting
    boolean tryAssignResource(LogicalSlot logicalSlot) {
        Preconditions.checkNotNull((Object)logicalSlot);
        if (this.state == ExecutionState.SCHEDULED || this.state == ExecutionState.CREATED) {
            if (ASSIGNED_SLOT_UPDATER.compareAndSet(this, null, logicalSlot) && logicalSlot.tryAssignPayload(this)) {
                if (this.state == ExecutionState.SCHEDULED || this.state == ExecutionState.CREATED) {
                    Preconditions.checkState((!this.taskManagerLocationFuture.isDone() ? 1 : 0) != 0, (Object)"The TaskManagerLocationFuture should not be set if we haven't assigned a resource yet.");
                    this.taskManagerLocationFuture.complete(logicalSlot.getTaskManagerLocation());
                    this.assignedAllocationID = logicalSlot.getAllocationId();
                    LOG.info("{} is assigned resource {}_{} with {}", new Object[]{this.getVertexWithAttempt(), logicalSlot.getTaskManagerLocation().getResourceID(), logicalSlot.getPhysicalSlotNumber(), this.assignedAllocationID});
                    return true;
                }
                ASSIGNED_SLOT_UPDATER.set(this, null);
                return false;
            }
            return false;
        }
        return false;
    }

    @Override
    public TaskManagerLocation getAssignedResourceLocation() {
        LogicalSlot currentAssignedResource = this.assignedResource;
        try {
            return currentAssignedResource != null ? currentAssignedResource.getTaskManagerLocation() : (this.taskManagerLocationFuture.isDone() ? this.taskManagerLocationFuture.get() : null);
        }
        catch (Exception e) {
            return null;
        }
    }

    public Throwable getFailureCause() {
        return this.failureCause;
    }

    @Override
    public String getFailureCauseAsString() {
        return ExceptionUtils.stringifyException((Throwable)this.getFailureCause());
    }

    @Override
    public long[] getStateTimestamps() {
        return this.stateTimestamps;
    }

    @Override
    public long getStateTimestamp(ExecutionState state) {
        return this.stateTimestamps[state.ordinal()];
    }

    public boolean isFinished() {
        return this.state.isTerminal();
    }

    @Nullable
    public JobManagerTaskRestore getTaskRestore() {
        return this.taskRestore;
    }

    public void setInitialState(@Nullable JobManagerTaskRestore taskRestore) {
        Preconditions.checkState((this.state == ExecutionState.CREATED ? 1 : 0) != 0, (Object)"Can only assign operator state when execution attempt is in CREATED");
        this.taskRestore = taskRestore;
    }

    public CompletableFuture<ExecutionState> getTerminalStateFuture() {
        return this.terminalStateFuture;
    }

    public CompletableFuture<?> getReleaseFuture() {
        return this.releaseFuture;
    }

    public CompletableFuture<ExecutionAttemptID> getReconcileFuture() {
        return this.reconcileFuture;
    }

    public CompletableFuture<Void> scheduleForExecution() {
        ExecutionGraph executionGraph = this.getVertex().getExecutionGraph();
        SlotProvider resourceProvider = executionGraph.getSlotProvider();
        boolean allowQueued = executionGraph.isQueuedSchedulingAllowed();
        return this.scheduleForExecution(resourceProvider, allowQueued, LocationPreferenceConstraint.ANY);
    }

    public CompletableFuture<Void> scheduleForExecution(SlotProvider slotProvider, boolean queued, LocationPreferenceConstraint locationPreferenceConstraint) {
        Time allocationTimeout = this.vertex.getExecutionGraph().getAllocationTimeout();
        try {
            CompletableFuture<Execution> allocationFuture = this.allocateAndAssignSlotForExecution(slotProvider, queued, locationPreferenceConstraint, allocationTimeout);
            CompletionStage deploymentFuture = allocationFuture.handle((ignored, throwable) -> {
                if (throwable != null) {
                    this.markFailed(ExceptionUtils.stripCompletionException((Throwable)throwable));
                } else {
                    try {
                        this.deploy();
                    }
                    catch (Throwable t) {
                        this.markFailed(ExceptionUtils.stripCompletionException((Throwable)t));
                    }
                }
                return null;
            });
            if (!queued && !((CompletableFuture)deploymentFuture).isDone()) {
                allocationFuture.completeExceptionally(new IllegalArgumentException("The slot allocation future has not been completed yet."));
            }
            return deploymentFuture;
        }
        catch (IllegalExecutionStateException e) {
            return FutureUtils.completedExceptionally(e);
        }
    }

    public Tuple2<ScheduledUnit, SlotProfile> enterScheduledAndPrepareSchedulingResources() throws IllegalStateException {
        SlotSharingGroup sharingGroup = this.vertex.getJobVertex().getSlotSharingGroup();
        CoLocationConstraint locationConstraint = this.vertex.getLocationConstraint();
        if (locationConstraint != null && sharingGroup == null) {
            throw new IllegalStateException("Trying to schedule with co-location constraint but without slot sharing allowed.");
        }
        if (this.transitionState(ExecutionState.CREATED, ExecutionState.SCHEDULED)) {
            SlotSharingGroupId slotSharingGroupId = sharingGroup != null ? sharingGroup.getSlotSharingGroupId() : null;
            ScheduledUnit toSchedule = locationConstraint == null ? new ScheduledUnit(this, slotSharingGroupId) : new ScheduledUnit(this, slotSharingGroupId, locationConstraint);
            ExecutionVertex executionVertex = this.getVertex();
            AllocationID lastAllocation = executionVertex.getLatestPriorAllocation();
            List<AllocationID> previousAllocationIDs = lastAllocation != null ? Collections.singletonList(lastAllocation) : Collections.emptyList();
            Collection<CompletableFuture<TaskManagerLocation>> locationFuture = this.getVertex().getPreferredLocationsBasedOnState();
            List preferredLocations = locationFuture == null ? Collections.EMPTY_LIST : (Collection)FutureUtils.combineAll(locationFuture).join();
            return new Tuple2((Object)toSchedule, (Object)new SlotProfile(this.computeResource(sharingGroup), preferredLocations, previousAllocationIDs, executionVertex.getJobVertex().getJobVertex().getTags()));
        }
        throw new IllegalExecutionStateException(this, ExecutionState.CREATED, this.state);
    }

    public CompletableFuture<Execution> allocateAndAssignSlotForExecution(SlotProvider slotProvider, boolean queued, LocationPreferenceConstraint locationPreferenceConstraint, Time allocationTimeout) throws IllegalExecutionStateException {
        Preconditions.checkNotNull((Object)slotProvider);
        SlotSharingGroup sharingGroup = this.vertex.getJobVertex().getSlotSharingGroup();
        CoLocationConstraint locationConstraint = this.vertex.getLocationConstraint();
        if (locationConstraint != null && sharingGroup == null) {
            throw new IllegalStateException("Trying to schedule with co-location constraint but without slot sharing allowed.");
        }
        if (this.transitionState(ExecutionState.CREATED, ExecutionState.SCHEDULED)) {
            SlotSharingGroupId slotSharingGroupId = sharingGroup != null ? sharingGroup.getSlotSharingGroupId() : null;
            ScheduledUnit toSchedule = locationConstraint == null ? new ScheduledUnit(this, slotSharingGroupId) : new ScheduledUnit(this, slotSharingGroupId, locationConstraint);
            ExecutionVertex executionVertex = this.getVertex();
            AllocationID lastAllocation = executionVertex.getLatestPriorAllocation();
            List<Object> previousAllocationIDs = lastAllocation != null ? Collections.singletonList(lastAllocation) : Collections.emptyList();
            CompletableFuture<Collection<TaskManagerLocation>> preferredLocationsFuture = this.calculatePreferredLocations(locationPreferenceConstraint);
            SlotRequestId slotRequestId = new SlotRequestId();
            CompletionStage logicalSlotFuture = preferredLocationsFuture.thenCompose(preferredLocations -> slotProvider.allocateSlot(slotRequestId, toSchedule, queued, new SlotProfile(this.computeResource(sharingGroup), (Collection<TaskManagerLocation>)preferredLocations, (Collection<AllocationID>)previousAllocationIDs, this.getVertex().getJobVertex().getJobVertex().getTags()), allocationTimeout));
            this.releaseFuture.whenComplete((arg_0, arg_1) -> this.lambda$allocateAndAssignSlotForExecution$2((CompletableFuture)logicalSlotFuture, slotProvider, slotRequestId, slotSharingGroupId, locationConstraint, arg_0, arg_1));
            return ((CompletableFuture)logicalSlotFuture).thenApply(logicalSlot -> {
                if (this.tryAssignResource((LogicalSlot)logicalSlot)) {
                    return this;
                }
                logicalSlot.releaseSlot(new FlinkException("Could not assign logical slot to execution " + this + '.'));
                throw new CompletionException(new FlinkException("Could not assign slot " + logicalSlot + " to execution " + this + " because it has already been assigned "));
            });
        }
        throw new IllegalExecutionStateException(this, ExecutionState.CREATED, this.state);
    }

    private ResourceProfile computeResource(SlotSharingGroup slotSharingGroup) {
        if (slotSharingGroup != null && slotSharingGroup.getResourceProfile() != null) {
            return slotSharingGroup.getResourceProfile();
        }
        return this.getVertex().calculateResourceProfile();
    }

    public void deploy() throws JobException {
        LogicalSlot slot = this.assignedResource;
        Preconditions.checkNotNull((Object)slot, (String)"In order to deploy the execution we first have to assign a resource via tryAssignResource.");
        if (!slot.isAlive()) {
            throw new JobException("Target slot (TaskManager) for deployment is no longer alive.");
        }
        ExecutionState previous = this.state;
        if (previous == ExecutionState.SCHEDULED || previous == ExecutionState.CREATED) {
            if (!this.transitionState(previous, ExecutionState.DEPLOYING)) {
                throw new IllegalStateException("Cannot deploy task: Concurrent deployment call race.");
            }
        } else {
            throw new IllegalStateException("The vertex must be in CREATED or SCHEDULED state to be deployed. Found state " + (Object)((Object)previous));
        }
        if (this != slot.getPayload()) {
            throw new IllegalStateException(String.format("The execution %s has not been assigned to the assigned slot.", this));
        }
        if (this.state != ExecutionState.DEPLOYING) {
            slot.releaseSlot(new FlinkException("Actual state of execution " + this + " (" + (Object)((Object)this.state) + ") does not match expected state DEPLOYING."));
            return;
        }
        if (LOG.isInfoEnabled()) {
            LOG.info(String.format("Deploying %s (attempt #%d) to slot %s_%s on %s", this.vertex.getTaskNameWithSubtaskIndex(), this.attemptNumber, slot.getTaskManagerLocation().getResourceID(), slot.getPhysicalSlotNumber(), this.getAssignedResourceLocation().getHostname()));
        }
        this.executor.execute(() -> {
            try {
                TaskDeploymentDescriptor deployment = this.vertex.createDeploymentDescriptor(this.attemptId, slot, this.taskRestore, this.attemptNumber);
                this.taskRestore = null;
                TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
                CompletableFuture<Acknowledge> submitResultFuture = taskManagerGateway.submitTask(deployment, this.rpcTimeout);
                submitResultFuture.whenCompleteAsync((ack, failure) -> {
                    if (failure != null) {
                        if (failure instanceof TimeoutException) {
                            String taskname = this.vertex.getTaskNameWithSubtaskIndex() + " (" + (Object)((Object)this.attemptId) + ')';
                            this.markFailed(new Exception("Cannot deploy task " + taskname + " - TaskManager (" + this.getAssignedResourceLocation() + ") not responding after a rpcTimeout of " + this.rpcTimeout, (Throwable)failure));
                        } else {
                            this.markFailed((Throwable)failure);
                        }
                    }
                }, this.executor);
            }
            catch (Throwable t) {
                this.markFailed(t);
            }
        });
    }

    public void stop() {
        LogicalSlot slot = this.assignedResource;
        if (slot != null) {
            TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
            CompletableFuture stopResultFuture = FutureUtils.retry(() -> taskManagerGateway.stopTask(this.attemptId, this.rpcTimeout), 3, this.executor);
            stopResultFuture.exceptionally(failure -> {
                LOG.info("Stopping task was not successful.", failure);
                return null;
            });
        }
    }

    public void cancel() {
        ExecutionState current;
        block7: {
            while (true) {
                if ((current = this.state) == ExecutionState.CANCELING || current == ExecutionState.CANCELED) {
                    return;
                }
                if (current == ExecutionState.RUNNING || current == ExecutionState.DEPLOYING) {
                    if (!this.transitionState(current, ExecutionState.CANCELING)) continue;
                    this.sendCancelRpcCall();
                    return;
                }
                if (current == ExecutionState.FINISHED || current == ExecutionState.FAILED) {
                    this.sendFailIntermediateResultPartitionsRpcCall();
                    return;
                }
                if (current != ExecutionState.CREATED && current != ExecutionState.SCHEDULED) break block7;
                if (this.transitionState(current, ExecutionState.CANCELED)) break;
            }
            this.markTimestamp(ExecutionState.CANCELING, this.getStateTimestamp(ExecutionState.CANCELED));
            this.taskManagerLocationFuture.cancel(false);
            try {
                this.vertex.getExecutionGraph().deregisterExecution(this);
                this.releaseAssignedResource(new FlinkException("Execution " + this + " was cancelled."));
            }
            finally {
                this.vertex.executionCanceled(this);
            }
            return;
        }
        throw new IllegalStateException(current.name());
    }

    protected void updateConsumers(List<List<ExecutionEdge>> allConsumers) {
        int numConsumers = allConsumers.size();
        if (numConsumers > 1) {
            this.fail(new IllegalStateException("Currently, only a single consumer group per partition is supported."));
        } else if (numConsumers == 0) {
            return;
        }
        for (ExecutionEdge edge : allConsumers.get(0)) {
            Execution partitionExecution;
            ExecutionVertex consumerVertex = edge.getTarget();
            Execution consumer = consumerVertex.getCurrentExecutionAttempt();
            ExecutionState consumerState = consumer.getState();
            IntermediateResultPartition partition = edge.getSource();
            if (consumerState == ExecutionState.RUNNING) {
                partitionExecution = partition.getProducer().getCurrentExecutionAttempt();
                consumerVertex.cachePartitionInfo(PartialInputChannelDeploymentDescriptor.fromEdge(partition, partitionExecution));
                consumerVertex.getCurrentExecutionAttempt().sendPartitionInfoAsync();
                continue;
            }
            if (consumerState != ExecutionState.CREATED && consumerState != ExecutionState.SCHEDULED && consumerState != ExecutionState.DEPLOYING) continue;
            partitionExecution = partition.getProducer().getCurrentExecutionAttempt();
            consumerVertex.cachePartitionInfo(PartialInputChannelDeploymentDescriptor.fromEdge(partition, partitionExecution));
            if (consumerVertex.getExecutionState() != ExecutionState.RUNNING) continue;
            consumerVertex.getCurrentExecutionAttempt().sendPartitionInfoAsync();
        }
    }

    @Override
    public void fail(Throwable t) {
        this.processFail(t, false);
    }

    public CompletableFuture<StackTraceSampleResponse> requestStackTraceSample(int sampleId, int numSamples, Time delayBetweenSamples, int maxStackTraceDepth, Time timeout) {
        LogicalSlot slot = this.assignedResource;
        if (slot != null) {
            TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
            return taskManagerGateway.requestStackTraceSample(this.attemptId, sampleId, numSamples, delayBetweenSamples, maxStackTraceDepth, timeout);
        }
        return FutureUtils.completedExceptionally(new Exception("The execution has no slot assigned."));
    }

    public void notifyCheckpointComplete(long checkpointId, long timestamp) {
        LogicalSlot slot = this.assignedResource;
        if (slot != null) {
            TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
            taskManagerGateway.notifyCheckpointComplete(this.attemptId, this.getVertex().getJobId(), checkpointId, timestamp);
        } else {
            LOG.debug("The execution has no slot assigned. This indicates that the execution is no longer running.");
        }
    }

    public void triggerCheckpoint(long checkpointId, long timestamp, CheckpointOptions checkpointOptions) {
        LogicalSlot slot = this.assignedResource;
        if (slot != null) {
            TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
            taskManagerGateway.triggerCheckpoint(this.attemptId, this.getVertex().getJobId(), checkpointId, timestamp, checkpointOptions);
        } else {
            LOG.debug("The execution has no slot assigned. This indicates that the execution is no longer running.");
        }
    }

    public CompletableFuture<ExecutionAttemptID> reconcile() {
        Preconditions.checkState((this.reconcileFuture == null ? 1 : 0) != 0);
        this.reconcileFuture = new CompletableFuture();
        this.reconcileFuture.whenComplete((value, throwable) -> {
            if (throwable == null && value == null) {
                this.sendPartitionInfoAsync();
            }
        });
        if (ExecutionState.CREATED.equals((Object)this.state) || ExecutionState.FINISHED.equals((Object)this.state)) {
            this.reconcileFuture.complete(null);
        } else {
            this.getVertex().getExecutionGraph().getFutureExecutorService().schedule(() -> {
                if (!this.reconcileFuture.isDone()) {
                    this.reconcileFuture.complete(this.getAttemptId());
                }
            }, this.getVertex().getExecutionGraph().getJobConfiguration().getLong(JobManagerOptions.JOB_RECONCILE_TIMEOUT), TimeUnit.SECONDS);
        }
        return this.reconcileFuture;
    }

    public boolean reconcileStatus(ExecutionState reportedState, ExecutionAttemptID executionId, int attemptNumber, long startTimestamp, LogicalSlot slot) {
        if (!reportedState.equals((Object)this.state)) {
            LOG.info("Reconcile {} fail as expected status {} with actural {}.", new Object[]{this.vertex.getTaskNameWithSubtaskIndex(), this.state, reportedState});
            return false;
        }
        if (this.reconcileFuture.isDone()) {
            LOG.info("Reconcile {} fail as reconcile has finished.", (Object)this.vertex.getTaskNameWithSubtaskIndex());
            return false;
        }
        if (ASSIGNED_SLOT_UPDATER.compareAndSet(this, null, slot) && slot.tryAssignPayload(this)) {
            if (this.taskManagerLocationFuture.isDone() && !slot.getTaskManagerLocation().equals(this.taskManagerLocationFuture.getNow(null))) {
                ASSIGNED_SLOT_UPDATER.compareAndSet(this, slot, null);
                this.reconcileFuture.complete(this.attemptId);
                LOG.info("Reconcile {} fail as has already has a different location.", (Object)this.vertex.getTaskNameWithSubtaskIndex());
                return false;
            }
            if (!this.taskManagerLocationFuture.isDone() && !this.taskManagerLocationFuture.complete(slot.getTaskManagerLocation())) {
                ASSIGNED_SLOT_UPDATER.compareAndSet(this, slot, null);
                this.reconcileFuture.complete(this.attemptId);
                LOG.info("Reconcile {} fail as has already has a location.", (Object)this.vertex.getTaskNameWithSubtaskIndex());
                return false;
            }
            if (this.reconcileFuture.complete(null)) {
                this.assignedAllocationID = slot.getAllocationId();
                this.attemptId = executionId;
                this.attemptNumber = attemptNumber;
                this.markTimestamp(ExecutionState.CREATED, startTimestamp);
                LOG.info("Reconcile {} success, the state is {}.", (Object)this.vertex.getTaskNameWithSubtaskIndex(), (Object)reportedState);
                return true;
            }
            ASSIGNED_SLOT_UPDATER.compareAndSet(this, slot, null);
            LOG.info("Reconcile {} fail as it has reconciled finished.", (Object)this.vertex.getTaskNameWithSubtaskIndex());
            return false;
        }
        LOG.info("Reconcile {} fail as has already assigned a slot {}.", (Object)this.vertex.getTaskNameWithSubtaskIndex(), (Object)this.assignedResource);
        return false;
    }

    public void recoverState(ExecutionState recoveredState) {
        this.transitionState(this.state, recoveredState);
    }

    void markFailed(Throwable t) {
        this.processFail(t, true);
    }

    void markFailed(Throwable t, Map<String, Accumulator<?, ?>> userAccumulators, IOMetrics metrics) {
        this.processFail(t, true, userAccumulators, metrics);
    }

    void markFinished() {
        this.markFinished(null, null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void markFinished(Map<String, Accumulator<?, ?>> userAccumulators, IOMetrics metrics) {
        ExecutionState current;
        while ((current = this.state) == ExecutionState.RUNNING || current == ExecutionState.DEPLOYING) {
            if (!this.transitionState(current, ExecutionState.FINISHED)) continue;
            try {
                this.getVertex().finishPartitionsAndNotify();
                this.updateAccumulatorsAndMetrics(userAccumulators, metrics);
                this.releaseAssignedResource(null);
                this.vertex.getExecutionGraph().deregisterExecution(this);
            }
            finally {
                this.vertex.executionFinished(this);
            }
            return;
        }
        if (current == ExecutionState.CANCELING) {
            this.cancelingComplete(userAccumulators, metrics);
            return;
        }
        if (current == ExecutionState.CANCELED || current == ExecutionState.FAILED) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Task FINISHED, but concurrently went to state " + (Object)((Object)this.state));
            }
            return;
        }
        this.markFailed(new Exception("Vertex received FINISHED message while being in state " + (Object)((Object)this.state)));
    }

    void cancelingComplete() {
        this.cancelingComplete(null, null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void cancelingComplete(Map<String, Accumulator<?, ?>> userAccumulators, IOMetrics metrics) {
        ExecutionState current;
        block6: {
            do {
                if ((current = this.state) == ExecutionState.CANCELED) {
                    return;
                }
                if (current != ExecutionState.CANCELING && current != ExecutionState.RUNNING && current != ExecutionState.DEPLOYING) break block6;
                this.updateAccumulatorsAndMetrics(userAccumulators, metrics);
            } while (!this.transitionState(current, ExecutionState.CANCELED));
            try {
                this.releaseAssignedResource(new FlinkException("Execution " + this + " was cancelled."));
                this.vertex.getExecutionGraph().deregisterExecution(this);
            }
            finally {
                this.vertex.executionCanceled(this);
            }
            return;
        }
        if (current != ExecutionState.FAILED) {
            String message = String.format("Asynchronous race: Found %s in state %s after successful cancel call.", new Object[]{this.vertex.getTaskNameWithSubtaskIndex(), this.state});
            LOG.error(message);
            this.vertex.getExecutionGraph().failGlobal(new Exception(message));
        }
    }

    void cachePartitionInfo(PartialInputChannelDeploymentDescriptor partitionInfo) {
        this.partialInputChannelDeploymentDescriptors.add(partitionInfo);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void sendPartitionInfos() {
        if (this.vertex.getExecutionGraph().getGraphManager().isReplaying()) {
            return;
        }
        Object object = this.updatePartitionLock;
        synchronized (object) {
            this.updatePartitionFuture = null;
        }
        if (this.partialInputChannelDeploymentDescriptors != null && !this.partialInputChannelDeploymentDescriptors.isEmpty()) {
            PartialInputChannelDeploymentDescriptor partialInputChannelDeploymentDescriptor;
            ArrayList<PartitionInfo> partitionInfos = new ArrayList<PartitionInfo>(this.partialInputChannelDeploymentDescriptors.size());
            while ((partialInputChannelDeploymentDescriptor = this.partialInputChannelDeploymentDescriptors.poll()) != null) {
                partitionInfos.add(new PartitionInfo(partialInputChannelDeploymentDescriptor.getResultId(), partialInputChannelDeploymentDescriptor.createInputChannelDeploymentDescriptor(this, this.getVertex().getExecutionGraph().getResultPartitionLocationTrackerProxy())));
            }
            this.sendUpdatePartitionInfoRpcCall(partitionInfos);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void sendPartitionInfoAsync() {
        if (this.reconcileFuture != null && !this.reconcileFuture.isDone()) {
            return;
        }
        Object object = this.updatePartitionLock;
        synchronized (object) {
            if (this.updatePartitionFuture == null) {
                this.updatePartitionFuture = this.getVertex().getExecutionGraph().getFutureExecutorService().schedule(() -> this.sendPartitionInfos(), this.vertex.getExecutionGraph().getUpdatePartitionInfoSendInterval(), TimeUnit.MILLISECONDS);
            }
        }
    }

    private boolean processFail(Throwable t, boolean isCallback) {
        return this.processFail(t, isCallback, null, null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean processFail(Throwable t, boolean isCallback, Map<String, Accumulator<?, ?>> userAccumulators, IOMetrics metrics) {
        ExecutionState current;
        do {
            if ((current = this.state) == ExecutionState.FAILED) {
                return false;
            }
            if (current == ExecutionState.CANCELED || current == ExecutionState.FINISHED) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Ignoring transition of vertex {} to {} while being {}.", new Object[]{this.getVertexWithAttempt(), ExecutionState.FAILED, current});
                }
                return false;
            }
            if (current != ExecutionState.CANCELING) continue;
            this.cancelingComplete(userAccumulators, metrics);
            return false;
        } while (!this.transitionState(current, ExecutionState.FAILED, t));
        this.failureCause = t;
        this.updateAccumulatorsAndMetrics(userAccumulators, metrics);
        try {
            this.releaseAssignedResource(t);
            this.vertex.getExecutionGraph().deregisterExecution(this);
        }
        finally {
            this.vertex.executionFailed(this, t);
        }
        if (!(isCallback || current != ExecutionState.RUNNING && current != ExecutionState.DEPLOYING)) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Sending out cancel request, to remove task execution from TaskManager.");
            }
            try {
                if (this.assignedResource != null) {
                    this.sendCancelRpcCall();
                }
            }
            catch (Throwable tt) {
                LOG.error("Error triggering cancel call while marking task {} as failed.", (Object)this.getVertex().getTaskNameWithSubtaskIndex(), (Object)tt);
            }
        }
        return true;
    }

    boolean switchToRunning() {
        if (this.transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) {
            this.sendPartitionInfoAsync();
            return true;
        }
        ExecutionState currentState = this.state;
        if (currentState != ExecutionState.FINISHED && currentState != ExecutionState.CANCELED) {
            if (currentState == ExecutionState.CANCELING || currentState == ExecutionState.FAILED) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Concurrent canceling/failing of {} while deployment was in progress.", (Object)this.getVertexWithAttempt());
                }
                this.sendCancelRpcCall();
            } else {
                String message = String.format("Concurrent unexpected state transition of task %s to %s while deployment was in progress.", new Object[]{this.getVertexWithAttempt(), currentState});
                if (LOG.isDebugEnabled()) {
                    LOG.debug(message);
                }
                this.sendCancelRpcCall();
                this.markFailed(new Exception(message));
            }
        }
        return false;
    }

    private void sendCancelRpcCall() {
        LogicalSlot slot = this.assignedResource;
        if (slot != null) {
            TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
            CompletableFuture cancelResultFuture = FutureUtils.retry(() -> taskManagerGateway.cancelTask(this.attemptId, this.rpcTimeout), 3, this.executor);
            cancelResultFuture.whenCompleteAsync((ack, failure) -> {
                if (failure != null) {
                    this.fail(new Exception("Task could not be canceled.", (Throwable)failure));
                }
            }, this.executor);
        }
    }

    private void sendFailIntermediateResultPartitionsRpcCall() {
        LogicalSlot slot = this.assignedResource;
        if (slot != null) {
            TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
            taskManagerGateway.failPartition(this.attemptId);
        }
    }

    private void sendUpdatePartitionInfoRpcCall(Iterable<PartitionInfo> partitionInfos) {
        LogicalSlot slot = this.assignedResource;
        if (slot != null) {
            TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
            TaskManagerLocation taskManagerLocation = slot.getTaskManagerLocation();
            CompletableFuture<Acknowledge> updatePartitionsResultFuture = taskManagerGateway.updatePartitions(this.attemptId, partitionInfos, this.rpcTimeout);
            updatePartitionsResultFuture.whenCompleteAsync((ack, failure) -> {
                if (failure != null) {
                    this.fail(new IllegalStateException("Update task on TaskManager " + taskManagerLocation + " failed due to:", (Throwable)failure));
                }
            }, this.executor);
        }
    }

    private void releaseAssignedResource(@Nullable Throwable cause) {
        LogicalSlot slot = this.assignedResource;
        if (slot != null) {
            slot.releaseSlot(cause).whenComplete((ignored, throwable) -> {
                if (throwable != null) {
                    this.releaseFuture.completeExceptionally((Throwable)throwable);
                } else {
                    this.releaseFuture.complete(null);
                }
            });
        } else {
            this.releaseFuture.complete(null);
        }
    }

    @VisibleForTesting
    public CompletableFuture<Collection<TaskManagerLocation>> calculatePreferredLocations(LocationPreferenceConstraint locationPreferenceConstraint) {
        CompletableFuture preferredLocationsFuture;
        Collection<CompletableFuture<TaskManagerLocation>> preferredLocationFutures = this.getVertex().getPreferredLocations();
        switch (locationPreferenceConstraint) {
            case ALL: {
                preferredLocationsFuture = FutureUtils.combineAll(preferredLocationFutures);
                break;
            }
            case ANY: {
                ArrayList<TaskManagerLocation> completedTaskManagerLocations = new ArrayList<TaskManagerLocation>(preferredLocationFutures.size());
                for (CompletableFuture<TaskManagerLocation> preferredLocationFuture : preferredLocationFutures) {
                    if (!preferredLocationFuture.isDone() || preferredLocationFuture.isCompletedExceptionally()) continue;
                    TaskManagerLocation taskManagerLocation = preferredLocationFuture.getNow(null);
                    if (taskManagerLocation == null) {
                        throw new FlinkRuntimeException("TaskManagerLocationFuture was completed with null. This indicates a programming bug.");
                    }
                    completedTaskManagerLocations.add(taskManagerLocation);
                }
                preferredLocationsFuture = CompletableFuture.completedFuture(completedTaskManagerLocations);
                break;
            }
            default: {
                throw new RuntimeException("Unknown LocationPreferenceConstraint " + (Object)((Object)locationPreferenceConstraint) + '.');
            }
        }
        return preferredLocationsFuture;
    }

    private boolean transitionState(ExecutionState currentState, ExecutionState targetState) {
        return this.transitionState(currentState, targetState, null);
    }

    private boolean transitionState(ExecutionState currentState, ExecutionState targetState, Throwable error) {
        if (currentState.isTerminal()) {
            throw new IllegalStateException("Cannot leave terminal state " + (Object)((Object)currentState) + " to transition to " + (Object)((Object)targetState) + '.');
        }
        if (STATE_UPDATER.compareAndSet(this, currentState, targetState)) {
            this.markTimestamp(targetState);
            if (error == null) {
                LOG.info("{} ({}) switched from {} to {}.", new Object[]{this.getVertex().getTaskNameWithSubtaskIndex(), this.getAttemptId(), currentState, targetState});
            } else {
                LOG.info("{} ({}) switched from {} to {}.", new Object[]{this.getVertex().getTaskNameWithSubtaskIndex(), this.getAttemptId(), currentState, targetState, error});
            }
            if (targetState.isTerminal()) {
                this.terminalStateFuture.complete(targetState);
            }
            try {
                this.vertex.notifyStateTransition(this, targetState, error);
            }
            catch (Throwable t) {
                LOG.error("Error while notifying execution graph of execution state transition.", t);
            }
            return true;
        }
        return false;
    }

    private void markTimestamp(ExecutionState state) {
        this.markTimestamp(state, System.currentTimeMillis());
    }

    private void markTimestamp(ExecutionState state, long timestamp) {
        this.stateTimestamps[state.ordinal()] = timestamp;
    }

    public String getVertexWithAttempt() {
        return this.vertex.getTaskNameWithSubtaskIndex() + " - execution #" + this.attemptNumber;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void setAccumulators(Map<String, Accumulator<?, ?>> userAccumulators) {
        Object object = this.accumulatorLock;
        synchronized (object) {
            if (!this.state.isTerminal()) {
                this.userAccumulators = userAccumulators;
            }
        }
    }

    public Map<String, Accumulator<?, ?>> getUserAccumulators() {
        return this.userAccumulators;
    }

    @Override
    public StringifiedAccumulatorResult[] getUserAccumulatorsStringified() {
        Map<String, OptionalFailure> accumulators = this.userAccumulators == null ? null : this.userAccumulators.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry -> OptionalFailure.of(entry.getValue())));
        return StringifiedAccumulatorResult.stringifyAccumulatorResults(accumulators);
    }

    @Override
    public int getParallelSubtaskIndex() {
        return this.getVertex().getParallelSubtaskIndex();
    }

    @Override
    public IOMetrics getIOMetrics() {
        return this.ioMetrics;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void updateAccumulatorsAndMetrics(Map<String, Accumulator<?, ?>> userAccumulators, IOMetrics metrics) {
        if (userAccumulators != null) {
            Object object = this.accumulatorLock;
            synchronized (object) {
                this.userAccumulators = userAccumulators;
            }
        }
        if (metrics != null) {
            this.ioMetrics = metrics;
        }
    }

    public String toString() {
        LogicalSlot slot = this.assignedResource;
        return String.format("Attempt #%d (%s) @ %s - [%s]", new Object[]{this.attemptNumber, this.vertex.getTaskNameWithSubtaskIndex(), slot == null ? "(unassigned)" : slot, this.state});
    }

    public ArchivedExecution archive() {
        return new ArchivedExecution(this);
    }

    private /* synthetic */ void lambda$allocateAndAssignSlotForExecution$2(CompletableFuture logicalSlotFuture, SlotProvider slotProvider, SlotRequestId slotRequestId, SlotSharingGroupId slotSharingGroupId, CoLocationConstraint locationConstraint, Object ignored, Throwable throwable) {
        if (logicalSlotFuture.cancel(false)) {
            slotProvider.cancelSlotRequest(slotRequestId, slotSharingGroupId, locationConstraint, new FlinkException("Execution " + this + " was released."));
        }
    }
}

