/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.executiongraph.failover;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Executor;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionEdge;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.IntermediateResult;
import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
import org.apache.flink.runtime.executiongraph.failover.FailoverRegion;
import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
import org.apache.flink.runtime.io.network.partition.DataConsumptionException;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RestartPipelinedRegionStrategy
extends FailoverStrategy {
    private static final Logger LOG = LoggerFactory.getLogger(RestartPipelinedRegionStrategy.class);
    protected final ExecutionGraph executionGraph;
    private final Executor executor;
    protected final HashMap<ExecutionVertex, FailoverRegion> vertexToRegion;
    protected final List<FailoverRegion> sortedRegions;
    private int regionFailLimit;

    public RestartPipelinedRegionStrategy(ExecutionGraph executionGraph, int regionFailLimit) {
        this(executionGraph, executionGraph.getFutureExecutor(), regionFailLimit);
    }

    public RestartPipelinedRegionStrategy(ExecutionGraph executionGraph, Executor executor, int regionFailLimit) {
        this.executionGraph = (ExecutionGraph)Preconditions.checkNotNull((Object)executionGraph);
        this.executor = (Executor)Preconditions.checkNotNull((Object)executor);
        this.vertexToRegion = new HashMap();
        this.sortedRegions = new ArrayList<FailoverRegion>();
        this.regionFailLimit = regionFailLimit;
    }

    @Override
    public void onTaskFailure(Execution taskExecution, Throwable cause) {
        ExecutionVertex ev = taskExecution.getVertex();
        FailoverRegion failoverRegion = this.vertexToRegion.get(ev);
        if (failoverRegion == null) {
            this.executionGraph.failGlobal(new FlinkException("Can not find a failover region for the execution " + ev.getTaskNameWithSubtaskIndex(), cause));
            return;
        }
        Optional dataConsumptionException = ExceptionUtils.findThrowable((Throwable)cause, DataConsumptionException.class);
        if (dataConsumptionException.isPresent()) {
            ResultPartitionID predecessorResultPartition = ((DataConsumptionException)dataConsumptionException.get()).getResultPartitionId();
            Execution producer = this.executionGraph.getRegisteredExecutions().get((Object)predecessorResultPartition.getProducerId());
            if (producer == null) {
                for (IntermediateResult intermediateResult : ev.getJobVertex().getInputs()) {
                    IntermediateResultPartition resultPartition = intermediateResult.getPartitionOrNullById(predecessorResultPartition.getPartitionId());
                    if (resultPartition == null) continue;
                    Execution producerVertexCurrentAttempt = resultPartition.getProducer().getCurrentExecutionAttempt();
                    if (producerVertexCurrentAttempt.getAttemptId().equals((Object)predecessorResultPartition.getProducerId())) {
                        producer = producerVertexCurrentAttempt;
                        break;
                    }
                    LOG.warn("partition {} has already been disposed, skip restarting the producer.", (Object)predecessorResultPartition);
                    break;
                }
            }
            if (producer != null) {
                FailoverRegion producerRegion = this.vertexToRegion.get(producer.getVertex());
                if (producerRegion == null) {
                    this.executionGraph.failGlobal(new Exception("Can not find a failover region for the execution " + producer.getVertex().getTaskNameWithSubtaskIndex(), cause));
                    return;
                }
                if (producerRegion != failoverRegion) {
                    LOG.info("Try restarting producer of {} due to DataConsumptionException", (Object)taskExecution);
                    this.onTaskFailure(producer, new FlinkException(predecessorResultPartition.toString() + " was report error by consumer."));
                }
            }
        }
        LOG.info("Recovering task failure for {} #{} ({}) via restart of failover region", new Object[]{ev.getTaskNameWithSubtaskIndex(), taskExecution.getAttemptNumber(), taskExecution.getAttemptId()});
        failoverRegion.onExecutionFail(taskExecution.getGlobalModVersion(), cause);
    }

    @Override
    public void notifyNewVertices(List<ExecutionJobVertex> newJobVerticesTopological) {
        this.generateAllFailoverRegion(newJobVerticesTopological);
        this.sortRegions();
    }

    @Override
    public String getStrategyName() {
        return "Pipelined Region Failover";
    }

    private void generateAllFailoverRegion(List<ExecutionJobVertex> newJobVerticesTopological) {
        IdentityHashMap vertexToRegion = new IdentityHashMap();
        IdentityHashMap distinctRegions = new IdentityHashMap();
        for (ExecutionJobVertex ejv : newJobVerticesTopological) {
            if (ejv.getCoLocationGroup() != null) {
                this.makeAllOneRegion(newJobVerticesTopological);
                return;
            }
            List<IntermediateResult> inputs = ejv.getInputs();
            int numInputs = inputs.size();
            boolean hasPipelinedInputs = false;
            for (IntermediateResult input : inputs) {
                if (!input.getResultType().isPipelined()) continue;
                hasPipelinedInputs = true;
                break;
            }
            if (hasPipelinedInputs) {
                for (ExecutionVertex ev : ejv.getTaskVertices()) {
                    ArrayList thisRegion = null;
                    for (int inputNum = 0; inputNum < numInputs; ++inputNum) {
                        if (!inputs.get(inputNum).getResultType().isPipelined()) continue;
                        for (ExecutionEdge edge : ev.getInputEdges(inputNum)) {
                            ExecutionVertex predecessor = edge.getSource().getProducer();
                            ArrayList predecessorRegion = (ArrayList)vertexToRegion.get(predecessor);
                            if (thisRegion != null) {
                                if (predecessorRegion == thisRegion) continue;
                                predecessorRegion.addAll(thisRegion);
                                distinctRegions.remove(thisRegion);
                                thisRegion = predecessorRegion;
                                for (ExecutionVertex inPredRegion : predecessorRegion) {
                                    vertexToRegion.put(inPredRegion, thisRegion);
                                }
                                continue;
                            }
                            if (predecessor != null) {
                                thisRegion = predecessorRegion;
                                thisRegion.add(ev);
                                vertexToRegion.put(ev, thisRegion);
                                continue;
                            }
                            throw new FlinkRuntimeException("bug in the logic to construct the pipelined failover regions");
                        }
                    }
                }
                continue;
            }
            for (ExecutionVertex ev : ejv.getTaskVertices()) {
                ArrayList<ExecutionVertex> region = new ArrayList<ExecutionVertex>(1);
                region.add(ev);
                vertexToRegion.put(ev, region);
                distinctRegions.put(region, null);
            }
        }
        LOG.info("Creating {} individual failover regions for job {} ({})", new Object[]{distinctRegions.keySet().size(), this.executionGraph.getJobName(), this.executionGraph.getJobID()});
        for (List region : distinctRegions.keySet()) {
            FailoverRegion failoverRegion = new FailoverRegion(this.executionGraph, this.executor, region, this.regionFailLimit);
            for (ExecutionVertex ev : region) {
                this.vertexToRegion.put(ev, failoverRegion);
            }
        }
    }

    private void makeAllOneRegion(List<ExecutionJobVertex> jobVertices) {
        LOG.warn("Cannot decompose ExecutionGraph into individual failover regions due to use of Co-Location constraints (iterations). Job will fail over as one holistic unit.");
        ArrayList<ExecutionVertex> allVertices = new ArrayList<ExecutionVertex>();
        for (ExecutionJobVertex ejv : jobVertices) {
            allVertices.ensureCapacity(allVertices.size() + ejv.getParallelism());
            for (ExecutionVertex ev : ejv.getTaskVertices()) {
                allVertices.add(ev);
            }
        }
        FailoverRegion singleRegion = new FailoverRegion(this.executionGraph, this.executor, allVertices, this.regionFailLimit);
        for (ExecutionVertex ev : allVertices) {
            this.vertexToRegion.put(ev, singleRegion);
        }
    }

    private void sortRegions() {
        HashSet<FailoverRegion> sortedRegionSet = new HashSet<FailoverRegion>();
        this.sortedRegions.clear();
        for (ExecutionJobVertex jobVertex : this.executionGraph.getVerticesTopologically()) {
            for (ExecutionVertex ev : jobVertex.getTaskVertices()) {
                FailoverRegion region = this.vertexToRegion.get(ev);
                if (sortedRegionSet.contains(region)) continue;
                sortedRegionSet.add(region);
                this.sortedRegions.add(region);
            }
        }
    }

    @VisibleForTesting
    public FailoverRegion getFailoverRegion(ExecutionVertex ev) {
        return this.vertexToRegion.get(ev);
    }

    public static class Factory
    implements FailoverStrategy.Factory {
        private int regionFailLimit = 100;

        @Override
        public FailoverStrategy create(ExecutionGraph executionGraph) {
            return new RestartPipelinedRegionStrategy(executionGraph, this.regionFailLimit);
        }

        public void setRegionFailLimit(int regionFailLimit) {
            this.regionFailLimit = regionFailLimit;
        }
    }
}

