package org.apache.flink.runtime.executiongraph.failover;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Executor;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionEdge;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.IntermediateResult;
import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
import org.apache.flink.runtime.io.network.partition.DataConsumptionException;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/executiongraph/failover/StrictRestartPipelinedRegionStrategy.class */
public class StrictRestartPipelinedRegionStrategy extends RestartPipelinedRegionStrategy {
    private static final Logger LOG = LoggerFactory.getLogger(StrictRestartPipelinedRegionStrategy.class);

    /* loaded from: input_file:org/apache/flink/runtime/executiongraph/failover/StrictRestartPipelinedRegionStrategy$Factory.class */
    public static class Factory implements FailoverStrategy.Factory {
        private int regionFailLimit = 100;

        @Override // org.apache.flink.runtime.executiongraph.failover.FailoverStrategy.Factory
        public FailoverStrategy create(ExecutionGraph executionGraph) {
            return new StrictRestartPipelinedRegionStrategy(executionGraph, this.regionFailLimit);
        }

        public void setRegionFailLimit(int i) {
            this.regionFailLimit = i;
        }
    }

    public StrictRestartPipelinedRegionStrategy(ExecutionGraph executionGraph, int i) {
        this(executionGraph, executionGraph.getFutureExecutor(), i);
    }

    public StrictRestartPipelinedRegionStrategy(ExecutionGraph executionGraph, Executor executor, int i) {
        super(executionGraph, executor, i);
    }

    @Override // org.apache.flink.runtime.executiongraph.failover.RestartPipelinedRegionStrategy, org.apache.flink.runtime.executiongraph.failover.FailoverStrategy
    public void onTaskFailure(Execution execution, Throwable th) {
        ExecutionVertex vertex = execution.getVertex();
        if (this.vertexToRegion.get(vertex) == null) {
            this.executionGraph.failGlobal(new FlinkException("Can not find a failover region for the execution " + vertex.getTaskNameWithSubtaskIndex(), th));
            return;
        }
        List<FailoverRegion> sortRegionsTopologically = sortRegionsTopologically(getRegionsToRestart(execution.getVertex(), th));
        LOG.info("Recovering task failure for {} #{} ({}) via restarting {} failover regions", new Object[]{vertex.getTaskNameWithSubtaskIndex(), Integer.valueOf(execution.getAttemptNumber()), execution.getAttemptId(), Integer.valueOf(sortRegionsTopologically.size())});
        Iterator<FailoverRegion> it = sortRegionsTopologically.iterator();
        while (it.hasNext()) {
            it.next().onExecutionFail(execution.getGlobalModVersion(), th);
        }
    }

    private Set<FailoverRegion> getRegionsToRestart(ExecutionVertex executionVertex, Throwable th) {
        IdentityHashMap identityHashMap = new IdentityHashMap();
        IdentityHashMap identityHashMap2 = new IdentityHashMap();
        ArrayDeque arrayDeque = new ArrayDeque();
        FailoverRegion rootFailedRegion = getRootFailedRegion(executionVertex, th);
        identityHashMap2.put(rootFailedRegion, null);
        arrayDeque.add(rootFailedRegion);
        while (!arrayDeque.isEmpty()) {
            FailoverRegion failoverRegion = (FailoverRegion) arrayDeque.poll();
            identityHashMap.put(failoverRegion, null);
            Iterator<ExecutionVertex> it = failoverRegion.getAllExecutionVertices().iterator();
            while (it.hasNext()) {
                Iterator<IntermediateResultPartition> it2 = it.next().getProducedPartitions().values().iterator();
                while (it2.hasNext()) {
                    Iterator<List<ExecutionEdge>> it3 = it2.next().getConsumers().iterator();
                    while (it3.hasNext()) {
                        Iterator<ExecutionEdge> it4 = it3.next().iterator();
                        while (it4.hasNext()) {
                            FailoverRegion failoverRegion2 = this.vertexToRegion.get(it4.next().getTarget());
                            if (!identityHashMap2.containsKey(failoverRegion2)) {
                                identityHashMap2.put(failoverRegion2, null);
                                arrayDeque.add(failoverRegion2);
                            }
                        }
                    }
                }
            }
        }
        return identityHashMap.keySet();
    }

    private FailoverRegion getRootFailedRegion(ExecutionVertex executionVertex, Throwable th) {
        Optional findThrowable = ExceptionUtils.findThrowable(th, DataConsumptionException.class);
        if (!findThrowable.isPresent()) {
            return this.vertexToRegion.get(executionVertex);
        }
        LOG.info("Try restarting producer of {} due to DataConsumptionException", executionVertex);
        ResultPartitionID resultPartitionId = ((DataConsumptionException) findThrowable.get()).getResultPartitionId();
        Execution execution = this.executionGraph.getRegisteredExecutions().get(resultPartitionId.getProducerId());
        if (execution == null) {
            Iterator<IntermediateResult> it = executionVertex.getJobVertex().getInputs().iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                IntermediateResultPartition partitionOrNullById = it.next().getPartitionOrNullById(resultPartitionId.getPartitionId());
                if (partitionOrNullById != null) {
                    Execution currentExecutionAttempt = partitionOrNullById.getProducer().getCurrentExecutionAttempt();
                    if (currentExecutionAttempt.getAttemptId().equals(resultPartitionId.getProducerId())) {
                        execution = currentExecutionAttempt;
                    } else {
                        LOG.warn("partition {} has already been disposed, skip restarting the producer.", resultPartitionId);
                    }
                }
            }
        }
        return this.vertexToRegion.get(execution.getVertex());
    }

    private List<FailoverRegion> sortRegionsTopologically(Set<FailoverRegion> set) {
        ArrayList arrayList = new ArrayList();
        for (FailoverRegion failoverRegion : this.sortedRegions) {
            if (set.contains(failoverRegion)) {
                arrayList.add(failoverRegion);
            }
        }
        return arrayList;
    }

    @Override // org.apache.flink.runtime.executiongraph.failover.RestartPipelinedRegionStrategy, org.apache.flink.runtime.executiongraph.failover.FailoverStrategy
    public String getStrategyName() {
        return "Strict Pipelined Region Failover";
    }
}
