package org.apache.flink.runtime.executiongraph.failover;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.GlobalModVersionMismatch;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
import org.apache.flink.util.AbstractID;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/executiongraph/failover/FailoverRegion.class */
public class FailoverRegion {
    private static final AtomicReferenceFieldUpdater<FailoverRegion, JobStatus> STATE_UPDATER = AtomicReferenceFieldUpdater.newUpdater(FailoverRegion.class, JobStatus.class, JobDetailsInfo.FIELD_NAME_JOB_STATUS);
    private static final AtomicLongFieldUpdater<FailoverRegion> REGION_VERSION_UPDATER = AtomicLongFieldUpdater.newUpdater(FailoverRegion.class, "regionModVersion");
    private static final Logger LOG = LoggerFactory.getLogger(FailoverRegion.class);
    private final ExecutionGraph executionGraph;
    private final List<ExecutionVertex> connectedExecutionVertices;
    private final Executor executor;
    private volatile long regionModVersion;
    private final int regionFailLimit;
    private final AbstractID id = new AbstractID();
    private volatile JobStatus state = JobStatus.RUNNING;
    private volatile int regionFailCount = 0;

    public FailoverRegion(ExecutionGraph executionGraph, Executor executor, List<ExecutionVertex> list, int i) {
        this.executionGraph = (ExecutionGraph) Preconditions.checkNotNull(executionGraph);
        this.executor = (Executor) Preconditions.checkNotNull(executor);
        this.connectedExecutionVertices = (List) Preconditions.checkNotNull(list);
        this.regionFailLimit = i;
        LOG.debug("Created failover region {} with vertices: {}", this.id, list);
    }

    public void onExecutionFail(long j, Throwable th) {
        incrementRegionModVersion();
        failover(j, this.regionModVersion, th);
    }

    private void allVerticesInTerminalState(long j) {
        JobStatus jobStatus;
        do {
            jobStatus = this.state;
            if (!jobStatus.equals(JobStatus.CANCELLING)) {
                LOG.info("FailoverRegion {} is {} when allVerticesInTerminalState.", this.id, this.state);
                return;
            }
        } while (!transitionState(jobStatus, JobStatus.CANCELED));
        reset(j, this.regionModVersion);
    }

    public JobStatus getState() {
        return this.state;
    }

    public List<ExecutionVertex> getAllExecutionVertices() {
        return this.connectedExecutionVertices;
    }

    private void failover(long j, long j2, Throwable th) {
        if (j2 < this.regionModVersion) {
            LOG.info("Ignoring concurrent failover of region {}.", this.id);
            return;
        }
        LOG.info("Try to fail and restart region due to error: ", th);
        this.regionFailCount++;
        if (!this.executionGraph.getRestartStrategy().canRestart()) {
            this.executionGraph.failGlobal(new FlinkException("RestartStrategy validate fail", th));
            return;
        }
        if (this.regionFailCount > this.regionFailLimit) {
            this.executionGraph.failGlobal(new FlinkException("FailoverRegion " + this.id + " exceeds max region restart limit", th));
            return;
        }
        JobStatus jobStatus = this.state;
        if (jobStatus.equals(JobStatus.RUNNING)) {
            cancel(j, j2);
        } else if (jobStatus.equals(JobStatus.CANCELED)) {
            reset(j, j2);
        } else {
            LOG.info("FailoverRegion {} is {} when notified to failover.", this.id, this.state);
        }
    }

    private void cancel(long j, long j2) {
        JobStatus jobStatus;
        do {
            jobStatus = this.state;
            if (!jobStatus.equals(JobStatus.RUNNING)) {
                LOG.info("FailoverRegion {} is {} when cancel.", this.id, this.state);
                return;
            }
        } while (!transitionState(jobStatus, JobStatus.CANCELLING));
        ArrayList arrayList = new ArrayList(this.connectedExecutionVertices.size());
        Iterator<ExecutionVertex> it = this.connectedExecutionVertices.iterator();
        while (it.hasNext()) {
            arrayList.add(it.next().cancel());
        }
        FutureUtils.waitForAll(arrayList).whenCompleteAsync((r15, th) -> {
            if (th != null) {
                failover(j, j2, new FlinkException("Could not cancel all execution job vertices properly.", th));
            } else {
                allVerticesInTerminalState(j);
            }
        }, this.executor);
    }

    private void reset(long j, long j2) {
        if (!transitionState(JobStatus.CANCELED, JobStatus.CREATED)) {
            failover(j, j2, new FlinkException("FailoverRegion " + this.id + " switch from CANCELLED to CREATED fail."));
            return;
        }
        HashSet hashSet = new HashSet();
        Iterator<ExecutionVertex> it = this.connectedExecutionVertices.iterator();
        while (it.hasNext()) {
            CoLocationGroup coLocationGroup = it.next().getJobVertex().getCoLocationGroup();
            if (coLocationGroup != null && !hashSet.contains(coLocationGroup)) {
                coLocationGroup.resetConstraints();
                hashSet.add(coLocationGroup);
            }
        }
        restart(j, j2);
    }

    private void restart(long j, long j2) {
        try {
            if (transitionState(JobStatus.CREATED, JobStatus.RUNNING)) {
                this.executionGraph.resetExecutionVerticesAndNotify(j, this.connectedExecutionVertices);
            } else {
                failover(j, j2, new FlinkException("FailoverRegion " + this.id + " witch from CREATED to RUNNING fail."));
            }
        } catch (GlobalModVersionMismatch e) {
        } catch (Exception e2) {
            failover(j, j2, new FlinkException("FailoverRegion " + this.id + " restart failed.", e2));
        }
    }

    private boolean transitionState(JobStatus jobStatus, JobStatus jobStatus2) {
        if (!STATE_UPDATER.compareAndSet(this, jobStatus, jobStatus2)) {
            return false;
        }
        LOG.info("FailoverRegion {} switched from state {} to {}.", new Object[]{this.id, jobStatus, jobStatus2});
        return true;
    }

    private long incrementRegionModVersion() {
        return REGION_VERSION_UPDATER.incrementAndGet(this);
    }
}
