/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.executiongraph.failover;

import java.util.ArrayDeque;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.GlobalModVersionMismatch;
import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RestartIndividualStrategy
extends FailoverStrategy {
    private static final Logger LOG = LoggerFactory.getLogger(RestartIndividualStrategy.class);
    private final ExecutionGraph executionGraph;
    private final Executor callbackExecutor;
    private final Queue<Long> taskFailuresTimestamps;
    private final int taskFailuresDetectTimeSpan;
    private final int taskFailuresLimit;

    public RestartIndividualStrategy(ExecutionGraph executionGraph, Executor callbackExecutor) {
        this(executionGraph, callbackExecutor, 60, Integer.MAX_VALUE);
    }

    public RestartIndividualStrategy(ExecutionGraph executionGraph, Executor callbackExecutor, int taskFailuresDetectTimeSpan, int taskFailuresLimit) {
        this.executionGraph = (ExecutionGraph)Preconditions.checkNotNull((Object)executionGraph);
        this.callbackExecutor = (Executor)Preconditions.checkNotNull((Object)callbackExecutor);
        this.taskFailuresDetectTimeSpan = taskFailuresDetectTimeSpan;
        this.taskFailuresLimit = taskFailuresLimit;
        this.taskFailuresTimestamps = new ArrayDeque<Long>();
    }

    @Override
    public void onTaskFailure(Execution taskExecution, Throwable cause) {
        long currentTimestamp = System.currentTimeMillis();
        this.taskFailuresTimestamps.add(currentTimestamp);
        while (this.taskFailuresTimestamps.peek() != null && this.taskFailuresTimestamps.peek() < currentTimestamp / 1000L - (long)this.taskFailuresDetectTimeSpan) {
            this.taskFailuresTimestamps.poll();
        }
        int latestFailuresCount = this.taskFailuresTimestamps.size();
        if (latestFailuresCount > this.taskFailuresLimit) {
            LOG.info("Task failures count {} in last {} seconds exceeds failures limit {}. Will fail globally.", new Object[]{latestFailuresCount, this.taskFailuresDetectTimeSpan, this.taskFailuresLimit});
            this.executionGraph.failGlobal(cause);
            return;
        }
        LOG.info("Recovering task failure for {} (#{}) via individual restart.", (Object)taskExecution.getVertex().getTaskNameWithSubtaskIndex(), (Object)taskExecution.getAttemptNumber());
        CompletableFuture<ExecutionState> terminationFuture = taskExecution.getTerminalStateFuture();
        ExecutionVertex vertexToRecover = taskExecution.getVertex();
        long globalModVersion = taskExecution.getGlobalModVersion();
        terminationFuture.thenAcceptAsync(value -> {
            try {
                this.executionGraph.resetExecutionVertices(globalModVersion, Collections.singletonList(vertexToRecover));
                vertexToRecover.getCurrentExecutionAttempt().setUpdatePartitionToConsumersRequired(true);
                this.executionGraph.notifyExecutionVertexFailover(Collections.singletonList(vertexToRecover));
            }
            catch (GlobalModVersionMismatch globalModVersionMismatch) {
            }
            catch (Exception e) {
                this.executionGraph.failGlobal(new Exception("Error during fine grained recovery - triggering full recovery", e));
            }
        }, this.callbackExecutor);
    }

    @Override
    public void notifyNewVertices(List<ExecutionJobVertex> newJobVerticesTopological) {
    }

    @Override
    public String getStrategyName() {
        return "Individual Task Restart";
    }

    @Override
    public void registerMetrics(MetricGroup metricGroup) {
    }

    public static class Factory
    implements FailoverStrategy.Factory {
        private final int taskFailuresDetectTimeSpan;
        private final int taskFailuresLimit;

        public Factory(int taskFailuresDetectTimeSpan, int taskFailuresLimit) {
            this.taskFailuresDetectTimeSpan = taskFailuresDetectTimeSpan;
            this.taskFailuresLimit = taskFailuresLimit;
        }

        @Override
        public RestartIndividualStrategy create(ExecutionGraph executionGraph) {
            return new RestartIndividualStrategy(executionGraph, executionGraph.getFutureExecutor(), this.taskFailuresDetectTimeSpan, this.taskFailuresLimit);
        }
    }
}

