/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.executiongraph.failover;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.GlobalModVersionMismatch;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.util.AbstractID;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FailoverRegion {
    private static final AtomicReferenceFieldUpdater<FailoverRegion, JobStatus> STATE_UPDATER = AtomicReferenceFieldUpdater.newUpdater(FailoverRegion.class, JobStatus.class, "state");
    private static final AtomicLongFieldUpdater<FailoverRegion> REGION_VERSION_UPDATER = AtomicLongFieldUpdater.newUpdater(FailoverRegion.class, "regionModVersion");
    private static final Logger LOG = LoggerFactory.getLogger(FailoverRegion.class);
    private final AbstractID id = new AbstractID();
    private final ExecutionGraph executionGraph;
    private final List<ExecutionVertex> connectedExecutionVertices;
    private final Executor executor;
    private volatile JobStatus state = JobStatus.RUNNING;
    private volatile long regionModVersion;
    private final int regionFailLimit;
    private volatile int regionFailCount = 0;

    public FailoverRegion(ExecutionGraph executionGraph, Executor executor, List<ExecutionVertex> connectedExecutions, int regionFailLimit) {
        this.executionGraph = (ExecutionGraph)Preconditions.checkNotNull((Object)executionGraph);
        this.executor = (Executor)Preconditions.checkNotNull((Object)executor);
        this.connectedExecutionVertices = (List)Preconditions.checkNotNull(connectedExecutions);
        this.regionFailLimit = regionFailLimit;
        LOG.debug("Created failover region {} with vertices: {}", (Object)this.id, connectedExecutions);
    }

    public void onExecutionFail(long globalModVersionOfFailover, Throwable cause) {
        this.incrementRegionModVersion();
        this.failover(globalModVersionOfFailover, this.regionModVersion, cause);
    }

    private void allVerticesInTerminalState(long globalModVersionOfFailover) {
        block1: {
            JobStatus curStatus;
            while ((curStatus = this.state).equals((Object)JobStatus.CANCELLING)) {
                if (!this.transitionState(curStatus, JobStatus.CANCELED)) continue;
                this.reset(globalModVersionOfFailover, this.regionModVersion);
                break block1;
            }
            LOG.info("FailoverRegion {} is {} when allVerticesInTerminalState.", (Object)this.id, (Object)this.state);
        }
    }

    public JobStatus getState() {
        return this.state;
    }

    public List<ExecutionVertex> getAllExecutionVertices() {
        return this.connectedExecutionVertices;
    }

    private void failover(long globalModVersionOfFailover, long regionModVersion, Throwable cause) {
        if (regionModVersion < this.regionModVersion) {
            LOG.info("Ignoring concurrent failover of region {}.", (Object)this.id);
            return;
        }
        LOG.info("Try to fail and restart region due to error: ", cause);
        ++this.regionFailCount;
        if (!this.executionGraph.getRestartStrategy().canRestart()) {
            this.executionGraph.failGlobal(new FlinkException("RestartStrategy validate fail", cause));
        } else if (this.regionFailCount > this.regionFailLimit) {
            this.executionGraph.failGlobal(new FlinkException("FailoverRegion " + this.id + " exceeds max region restart limit", cause));
        } else {
            JobStatus curStatus = this.state;
            if (curStatus.equals((Object)JobStatus.RUNNING)) {
                this.cancel(globalModVersionOfFailover, regionModVersion);
            } else if (curStatus.equals((Object)JobStatus.CANCELED)) {
                this.reset(globalModVersionOfFailover, regionModVersion);
            } else {
                LOG.info("FailoverRegion {} is {} when notified to failover.", (Object)this.id, (Object)this.state);
            }
        }
    }

    private void cancel(long globalModVersionOfFailover, long regionModVersion) {
        block2: {
            JobStatus curStatus;
            while ((curStatus = this.state).equals((Object)JobStatus.RUNNING)) {
                if (!this.transitionState(curStatus, JobStatus.CANCELLING)) continue;
                ArrayList futures = new ArrayList(this.connectedExecutionVertices.size());
                for (ExecutionVertex vertex : this.connectedExecutionVertices) {
                    futures.add(vertex.cancel());
                }
                FutureUtils.ConjunctFuture<Void> allTerminal = FutureUtils.waitForAll(futures);
                allTerminal.whenCompleteAsync((ignored, throwable) -> {
                    if (throwable != null) {
                        this.failover(globalModVersionOfFailover, regionModVersion, new FlinkException("Could not cancel all execution job vertices properly.", throwable));
                    } else {
                        this.allVerticesInTerminalState(globalModVersionOfFailover);
                    }
                }, this.executor);
                break block2;
            }
            LOG.info("FailoverRegion {} is {} when cancel.", (Object)this.id, (Object)this.state);
        }
    }

    private void reset(long globalModVersionOfFailover, long regionModVersion) {
        if (this.transitionState(JobStatus.CANCELED, JobStatus.CREATED)) {
            HashSet<CoLocationGroup> colGroups = new HashSet<CoLocationGroup>();
            for (ExecutionVertex ev : this.connectedExecutionVertices) {
                CoLocationGroup cgroup = ev.getJobVertex().getCoLocationGroup();
                if (cgroup == null || colGroups.contains(cgroup)) continue;
                cgroup.resetConstraints();
                colGroups.add(cgroup);
            }
            this.restart(globalModVersionOfFailover, regionModVersion);
        } else {
            this.failover(globalModVersionOfFailover, regionModVersion, new FlinkException("FailoverRegion " + this.id + " switch from CANCELLED to CREATED fail."));
        }
    }

    private void restart(long globalModVersionOfFailover, long regionModVersion) {
        try {
            if (this.transitionState(JobStatus.CREATED, JobStatus.RUNNING)) {
                this.executionGraph.resetExecutionVerticesAndNotify(globalModVersionOfFailover, this.connectedExecutionVertices);
            } else {
                this.failover(globalModVersionOfFailover, regionModVersion, new FlinkException("FailoverRegion " + this.id + " witch from CREATED to RUNNING fail."));
            }
        }
        catch (GlobalModVersionMismatch globalModVersionMismatch) {
        }
        catch (Exception e) {
            this.failover(globalModVersionOfFailover, regionModVersion, new FlinkException("FailoverRegion " + this.id + " restart failed.", (Throwable)e));
        }
    }

    private boolean transitionState(JobStatus current, JobStatus newState) {
        if (STATE_UPDATER.compareAndSet(this, current, newState)) {
            LOG.info("FailoverRegion {} switched from state {} to {}.", new Object[]{this.id, current, newState});
            return true;
        }
        return false;
    }

    private long incrementRegionModVersion() {
        return REGION_VERSION_UPDATER.incrementAndGet(this);
    }
}

