/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.executiongraph.failover;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Executor;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionEdge;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.IntermediateResult;
import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
import org.apache.flink.runtime.executiongraph.failover.FailoverRegion;
import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
import org.apache.flink.runtime.executiongraph.failover.RestartPipelinedRegionStrategy;
import org.apache.flink.runtime.io.network.partition.DataConsumptionException;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StrictRestartPipelinedRegionStrategy
extends RestartPipelinedRegionStrategy {
    private static final Logger LOG = LoggerFactory.getLogger(StrictRestartPipelinedRegionStrategy.class);

    public StrictRestartPipelinedRegionStrategy(ExecutionGraph executionGraph, int regionFailLimit) {
        this(executionGraph, executionGraph.getFutureExecutor(), regionFailLimit);
    }

    public StrictRestartPipelinedRegionStrategy(ExecutionGraph executionGraph, Executor executor, int regionFailLimit) {
        super(executionGraph, executor, regionFailLimit);
    }

    @Override
    public void onTaskFailure(Execution taskExecution, Throwable cause) {
        ExecutionVertex ev = taskExecution.getVertex();
        FailoverRegion failoverRegion = (FailoverRegion)this.vertexToRegion.get(ev);
        if (failoverRegion == null) {
            this.executionGraph.failGlobal(new FlinkException("Can not find a failover region for the execution " + ev.getTaskNameWithSubtaskIndex(), cause));
            return;
        }
        List<FailoverRegion> sortedRegionsToRestart = this.sortRegionsTopologically(this.getRegionsToRestart(taskExecution.getVertex(), cause));
        LOG.info("Recovering task failure for {} #{} ({}) via restarting {} failover regions", new Object[]{ev.getTaskNameWithSubtaskIndex(), taskExecution.getAttemptNumber(), taskExecution.getAttemptId(), sortedRegionsToRestart.size()});
        for (FailoverRegion regionToRestart : sortedRegionsToRestart) {
            regionToRestart.onExecutionFail(taskExecution.getGlobalModVersion(), cause);
        }
    }

    private Set<FailoverRegion> getRegionsToRestart(ExecutionVertex failedVertex, Throwable cause) {
        IdentityHashMap regionsToRestart = new IdentityHashMap();
        IdentityHashMap visitedRegions = new IdentityHashMap();
        ArrayDeque<FailoverRegion> regionsToVisit = new ArrayDeque<FailoverRegion>();
        FailoverRegion rootFailedRegion = this.getRootFailedRegion(failedVertex, cause);
        visitedRegions.put(rootFailedRegion, null);
        regionsToVisit.add(rootFailedRegion);
        while (!regionsToVisit.isEmpty()) {
            FailoverRegion regionToRestart = (FailoverRegion)regionsToVisit.poll();
            regionsToRestart.put(regionToRestart, null);
            for (ExecutionVertex vertex : regionToRestart.getAllExecutionVertices()) {
                for (IntermediateResultPartition resultPartition : vertex.getProducedPartitions().values()) {
                    for (List<ExecutionEdge> edges : resultPartition.getConsumers()) {
                        for (ExecutionEdge edge : edges) {
                            ExecutionVertex consumerVertex = edge.getTarget();
                            FailoverRegion consumerRegion = (FailoverRegion)this.vertexToRegion.get(consumerVertex);
                            if (visitedRegions.containsKey(consumerRegion)) continue;
                            visitedRegions.put(consumerRegion, null);
                            regionsToVisit.add(consumerRegion);
                        }
                    }
                }
            }
        }
        return regionsToRestart.keySet();
    }

    private FailoverRegion getRootFailedRegion(ExecutionVertex failedVertex, Throwable cause) {
        Optional dataConsumptionException = ExceptionUtils.findThrowable((Throwable)cause, DataConsumptionException.class);
        if (dataConsumptionException.isPresent()) {
            LOG.info("Try restarting producer of {} due to DataConsumptionException", (Object)failedVertex);
            ResultPartitionID predecessorResultPartition = ((DataConsumptionException)dataConsumptionException.get()).getResultPartitionId();
            Execution producer = this.executionGraph.getRegisteredExecutions().get((Object)predecessorResultPartition.getProducerId());
            if (producer == null) {
                for (IntermediateResult intermediateResult : failedVertex.getJobVertex().getInputs()) {
                    IntermediateResultPartition resultPartition = intermediateResult.getPartitionOrNullById(predecessorResultPartition.getPartitionId());
                    if (resultPartition == null) continue;
                    Execution producerVertexCurrentAttempt = resultPartition.getProducer().getCurrentExecutionAttempt();
                    if (producerVertexCurrentAttempt.getAttemptId().equals((Object)predecessorResultPartition.getProducerId())) {
                        producer = producerVertexCurrentAttempt;
                        break;
                    }
                    LOG.warn("partition {} has already been disposed, skip restarting the producer.", (Object)predecessorResultPartition);
                    break;
                }
            }
            return (FailoverRegion)this.vertexToRegion.get(producer.getVertex());
        }
        return (FailoverRegion)this.vertexToRegion.get(failedVertex);
    }

    private List<FailoverRegion> sortRegionsTopologically(Set<FailoverRegion> regions) {
        ArrayList<FailoverRegion> regionsSorted = new ArrayList<FailoverRegion>();
        for (FailoverRegion region : this.sortedRegions) {
            if (!regions.contains(region)) continue;
            regionsSorted.add(region);
        }
        return regionsSorted;
    }

    @Override
    public String getStrategyName() {
        return "Strict Pipelined Region Failover";
    }

    public static class Factory
    implements FailoverStrategy.Factory {
        private int regionFailLimit = 100;

        @Override
        public FailoverStrategy create(ExecutionGraph executionGraph) {
            return new StrictRestartPipelinedRegionStrategy(executionGraph, this.regionFailLimit);
        }

        public void setRegionFailLimit(int regionFailLimit) {
            this.regionFailLimit = regionFailLimit;
        }
    }
}

