/*
 * Decompiled with CFR 0.152.
 */
package com.netflix.spinnaker.cats.dynomite.cluster;

import com.netflix.spinnaker.cats.agent.Agent;
import com.netflix.spinnaker.cats.agent.AgentExecution;
import com.netflix.spinnaker.cats.agent.AgentScheduler;
import com.netflix.spinnaker.cats.agent.AgentSchedulerAware;
import com.netflix.spinnaker.cats.agent.CacheResult;
import com.netflix.spinnaker.cats.agent.CachingAgent;
import com.netflix.spinnaker.cats.agent.ExecutionInstrumentation;
import com.netflix.spinnaker.cats.dynomite.DynomiteUtils;
import com.netflix.spinnaker.cats.dynomite.ExcessiveDynoFailureRetries;
import com.netflix.spinnaker.cats.module.CatsModuleAware;
import com.netflix.spinnaker.cats.redis.cluster.AgentIntervalProvider;
import com.netflix.spinnaker.cats.redis.cluster.ClusteredSortAgentLock;
import com.netflix.spinnaker.cats.redis.cluster.NodeStatusProvider;
import com.netflix.spinnaker.cats.thread.NamedThreadFactory;
import com.netflix.spinnaker.kork.jedis.RedisClientDelegate;
import java.time.Clock;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import net.jodah.failsafe.Failsafe;
import net.jodah.failsafe.RetryPolicy;
import net.jodah.failsafe.SyncFailsafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DynoClusteredSortAgentScheduler
extends CatsModuleAware
implements AgentScheduler<ClusteredSortAgentLock>,
Runnable {
    private final Clock clock;
    private final RedisClientDelegate redisClientDelegate;
    private final NodeStatusProvider nodeStatusProvider;
    private final AgentIntervalProvider intervalProvider;
    private final ExecutorService agentWorkPool;
    private static final int NOW = 0;
    private static final int REDIS_REFRESH_PERIOD = 30;
    private int runCount = 0;
    private final Logger log;
    private Map<String, AgentWorker> agents;
    private Optional<Semaphore> runningAgents;
    private static final String WAITING_SET = "{scheduler}:WAITZ";
    private static final String WORKING_SET = "{scheduler}:WORKZ";
    private static final String ADD_AGENT_SCRIPT = "addAgentScript";
    private static final String VALID_SCORE_SCRIPT = "validScoreScript";
    private static final String SWAP_SET_SCRIPT = "swapSetScript";
    private static final String REMOVE_AGENT_SCRIPT = "removeAgentScript";
    private static final String CONDITIONAL_SWAP_SET_SCRIPT = "conditionalSwapSetScript";
    private static final RetryPolicy RETRY_POLICY = DynomiteUtils.greedyRetryPolicy(3000L);
    private ConcurrentHashMap<String, String> scripts;

    public DynoClusteredSortAgentScheduler(Clock clock, RedisClientDelegate redisClientDelegate, NodeStatusProvider nodeStatusProvider, AgentIntervalProvider intervalProvider, Integer parallelism) {
        this.clock = clock;
        this.redisClientDelegate = redisClientDelegate;
        this.nodeStatusProvider = nodeStatusProvider;
        this.agents = new ConcurrentHashMap<String, AgentWorker>();
        this.intervalProvider = intervalProvider;
        this.log = LoggerFactory.getLogger(this.getClass());
        if (parallelism == 0 || parallelism < -1) {
            throw new IllegalArgumentException("Argument 'parallelism' must be positive, or -1 (for unlimited parallelism).");
        }
        this.runningAgents = parallelism > 0 ? Optional.of(new Semaphore(parallelism)) : Optional.empty();
        this.scripts = new ConcurrentHashMap();
        this.storeScripts();
        this.agentWorkPool = Executors.newCachedThreadPool((ThreadFactory)new NamedThreadFactory(AgentWorker.class.getSimpleName()));
        Executors.newSingleThreadScheduledExecutor((ThreadFactory)new NamedThreadFactory(DynoClusteredSortAgentScheduler.class.getSimpleName())).scheduleAtFixedRate(this, 0L, 1L, TimeUnit.SECONDS);
    }

    private void storeScripts() {
        this.scripts.put(SWAP_SET_SCRIPT, "local score = redis.call('zscore', KEYS[1], ARGV[1])\nif score ~= nil then\n  redis.call('zrem', KEYS[1], ARGV[1])\n  redis.call('zadd', KEYS[2], ARGV[2], ARGV[1])\n  return score\nelse return nil end\n");
        this.scripts.put(CONDITIONAL_SWAP_SET_SCRIPT, "local score = redis.call('zscore', KEYS[1], ARGV[1])\nif score == ARGV[3] then\n  redis.call('zrem', KEYS[1], ARGV[1])\n  redis.call('zadd', KEYS[2], ARGV[2], ARGV[1])\n  return score\nelse return nil end\n");
        this.scripts.put(VALID_SCORE_SCRIPT, "local score = redis.call('zscore', KEYS[1], ARGV[1])\nif score == ARGV[2] then\n  return score\nelse return nil end\n");
        this.scripts.put(ADD_AGENT_SCRIPT, "if redis.call('zrank', KEYS[1], ARGV[1]) ~= nil then\n  if redis.call('zrank', KEYS[2], ARGV[1]) ~= nil then\n    return redis.call('zadd', KEYS[1], ARGV[2], ARGV[1])\n  else return nil end\nelse return nil end\n");
        this.scripts.put(REMOVE_AGENT_SCRIPT, "redis.call('zrem', KEYS[1], ARGV[1])\nredis.call('zrem', KEYS[2], ARGV[1])\n");
    }

    private String getScript(String scriptName) {
        String scriptSha = this.scripts.get(scriptName);
        if (scriptSha == null) {
            this.storeScripts();
            scriptSha = this.scripts.get(scriptName);
            if (scriptSha == null) {
                throw new RuntimeException("Failed to load caching scripts.");
            }
        }
        return this.scripts.get(scriptName);
    }

    public void schedule(Agent agent, AgentExecution agentExecution, ExecutionInstrumentation executionInstrumentation) {
        if (agent instanceof AgentSchedulerAware) {
            ((AgentSchedulerAware)agent).setAgentScheduler((AgentScheduler)this);
        }
        this.withRetry(String.format("Scheduling %s", agent.getAgentType()), () -> this.redisClientDelegate.withScriptingClient(client -> client.eval(this.getScript(ADD_AGENT_SCRIPT), 2, new String[]{WAITING_SET, WORKING_SET, agent.getAgentType(), this.score(0L)})));
        this.agents.put(agent.getAgentType(), new AgentWorker(agent, agentExecution, executionInstrumentation, this));
    }

    public ClusteredSortAgentLock tryLock(Agent agent) {
        ScoreTuple scores = this.acquireAgent(agent);
        if (scores != null) {
            return new ClusteredSortAgentLock(agent, scores.acquireScore, scores.releaseScore);
        }
        return null;
    }

    public boolean tryRelease(ClusteredSortAgentLock lock) {
        return this.conditionalReleaseAgent(lock.getAgent(), lock.getAcquireScore(), lock.getReleaseScore()) != null;
    }

    public boolean lockValid(ClusteredSortAgentLock lock) {
        return this.withRetry(String.format("Checking if lock is valid for %s", lock.getAgent().getAgentType()), () -> (Boolean)this.redisClientDelegate.withScriptingClient(client -> client.eval(this.getScript(VALID_SCORE_SCRIPT), 1, new String[]{WORKING_SET, lock.getAgent().getAgentType(), lock.getAcquireScore()}) != null));
    }

    public void unschedule(Agent agent) {
        this.agents.remove(agent.getAgentType());
        this.withRetry(String.format("Unscheduling %s", agent.getAgentType()), () -> this.redisClientDelegate.withScriptingClient(client -> client.eval(this.getScript(REMOVE_AGENT_SCRIPT), 2, new String[]{WAITING_SET, WORKING_SET, agent.getAgentType()})));
    }

    public boolean isAtomic() {
        return true;
    }

    @Override
    public void run() {
        if (!this.nodeStatusProvider.isNodeEnabled()) {
            return;
        }
        try {
            this.saturatePool();
        }
        catch (Throwable t) {
            this.log.error("Failed to run caching agents", t);
        }
        finally {
            ++this.runCount;
        }
    }

    private String score(long offsetSeconds) {
        return String.format("%d", this.clock.instant().plus(offsetSeconds, ChronoUnit.SECONDS).getEpochSecond());
    }

    private ScoreTuple acquireAgent(Agent agent) {
        String acquireScore = this.score(this.intervalProvider.getInterval(agent).getTimeout());
        Object releaseScore = this.withRetry(String.format("Acquiring lock on %s", agent.getAgentType()), () -> this.redisClientDelegate.withScriptingClient(client -> client.eval(this.getScript(SWAP_SET_SCRIPT), Arrays.asList(WAITING_SET, WORKING_SET), Arrays.asList(agent.getAgentType(), acquireScore))));
        return releaseScore != null ? new ScoreTuple(acquireScore, releaseScore.toString()) : null;
    }

    private ScoreTuple conditionalReleaseAgent(Agent agent, String acquireScore, Status status) {
        long newInterval = status == Status.SUCCESS ? this.intervalProvider.getInterval(agent).getInterval() : this.intervalProvider.getInterval(agent).getErrorInterval();
        String newAcquireScore = this.score(newInterval);
        Object releaseScore = this.withRetry(String.format("Conditionally releasing %s", agent.getAgentType()), () -> this.redisClientDelegate.withScriptingClient(client -> client.eval(this.getScript(CONDITIONAL_SWAP_SET_SCRIPT), Arrays.asList(WORKING_SET, WAITING_SET), Arrays.asList(agent.getAgentType(), newAcquireScore, acquireScore))));
        return releaseScore != null ? new ScoreTuple(newAcquireScore, releaseScore.toString()) : null;
    }

    private ScoreTuple conditionalReleaseAgent(Agent agent, String acquireScore, String newAcquireScore) {
        String releaseScore = this.withRetry(String.format("Conditionally releasing %s", agent.getAgentType()), () -> (String)this.redisClientDelegate.withScriptingClient(client -> client.eval(this.getScript(CONDITIONAL_SWAP_SET_SCRIPT), Arrays.asList(WORKING_SET, WAITING_SET), Arrays.asList(agent.getAgentType(), newAcquireScore, acquireScore)).toString()));
        return releaseScore != null ? new ScoreTuple(newAcquireScore, releaseScore.toString()) : null;
    }

    private ScoreTuple releaseAgent(Agent agent) {
        String acquireScore = this.score(this.intervalProvider.getInterval(agent).getInterval());
        String releaseScore = this.withRetry(String.format("Releasing %s", agent.getAgentType()), () -> (String)this.redisClientDelegate.withScriptingClient(client -> client.eval(this.getScript(SWAP_SET_SCRIPT), Arrays.asList(WORKING_SET, WAITING_SET), Arrays.asList(agent.getAgentType(), acquireScore)).toString()));
        return releaseScore != null ? new ScoreTuple(acquireScore, releaseScore.toString()) : null;
    }

    private void saturatePool() {
        this.withRetry("Repopulating agents into waiting set", () -> this.redisClientDelegate.withScriptingClient(client -> {
            if (this.runCount % 30 == 0) {
                for (String agent : this.agents.keySet()) {
                    client.eval(this.getScript(ADD_AGENT_SCRIPT), 2, new String[]{WAITING_SET, WORKING_SET, agent, this.score(0L)});
                }
            }
        }));
        List keys = this.withRetry("Getting available agents", () -> (ArrayList)this.redisClientDelegate.withCommandsClient(client -> {
            Set oldKeys = client.zrangeByScore(WORKING_SET, "-inf", this.score(0L));
            for (String key : oldKeys) {
                AgentWorker worker = this.agents.get(key);
                if (worker == null) continue;
                this.releaseAgent(worker.agent);
            }
            return new ArrayList(client.zrangeByScore(WAITING_SET, "-inf", this.score(0L)));
        }));
        HashSet<AgentWorker> workers = new HashSet<AgentWorker>();
        while (!keys.isEmpty() && this.runningAgents.map(Semaphore::tryAcquire).orElse(true).booleanValue()) {
            ScoreTuple score;
            String agent = (String)keys.remove(0);
            AgentWorker worker = this.agents.get(agent);
            if (worker == null || (score = this.acquireAgent(worker.agent)) == null) continue;
            worker.setScore(score.acquireScore);
            workers.add(worker);
        }
        for (AgentWorker worker : workers) {
            this.agentWorkPool.submit(worker);
        }
    }

    private <T> T withRetry(String description, Callable<T> callback) {
        return (T)((SyncFailsafe)Failsafe.with((RetryPolicy)RETRY_POLICY).onRetriesExceeded(failure -> {
            throw new ExcessiveDynoFailureRetries(description, (Throwable)failure);
        })).get(callback);
    }

    private void withRetry(String description, Runnable callback) {
        ((SyncFailsafe)Failsafe.with((RetryPolicy)RETRY_POLICY).onRetriesExceeded(failure -> {
            throw new ExcessiveDynoFailureRetries(description, (Throwable)failure);
        })).run(callback::run);
    }

    private static class ScoreTuple {
        private final String acquireScore;
        private final String releaseScore;

        public ScoreTuple(String acquireScore, String releaseScore) {
            this.acquireScore = acquireScore;
            this.releaseScore = releaseScore;
        }
    }

    private static class AgentWorker
    implements Runnable {
        private final Agent agent;
        private final AgentExecution agentExecution;
        private final ExecutionInstrumentation executionInstrumentation;
        private final DynoClusteredSortAgentScheduler scheduler;
        private String acquireScore;

        AgentWorker(Agent agent, AgentExecution agentExecution, ExecutionInstrumentation executionInstrumentation, DynoClusteredSortAgentScheduler scheduler) {
            this.agent = agent;
            this.agentExecution = agentExecution;
            this.executionInstrumentation = executionInstrumentation;
            this.scheduler = scheduler;
        }

        public void setScore(String score) {
            this.acquireScore = score;
        }

        @Override
        public void run() {
            assert (this.acquireScore != null);
            if (this.agentExecution instanceof CachingAgent.CacheExecution) {
                this.runAsCache();
            } else {
                this.runAsSideEffect();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void runAsCache() {
            if (!(this.agentExecution instanceof CachingAgent.CacheExecution)) {
                throw new IllegalStateException("Agent execution must be a CacheExecution to runAsCache");
            }
            CachingAgent.CacheExecution agentExecution = (CachingAgent.CacheExecution)this.agentExecution;
            CacheResult result = null;
            Status status = Status.FAILURE;
            try {
                this.executionInstrumentation.executionStarted(this.agent);
                long startTime = System.nanoTime();
                result = agentExecution.executeAgentWithoutStore(this.agent);
                this.executionInstrumentation.executionCompleted(this.agent, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime));
                status = Status.SUCCESS;
            }
            catch (Throwable cause) {
                this.executionInstrumentation.executionFailed(this.agent, cause);
            }
            finally {
                this.scheduler.runningAgents.ifPresent(Semaphore::release);
                if (this.scheduler.conditionalReleaseAgent(this.agent, this.acquireScore, status) != null && result != null) {
                    agentExecution.storeAgentResult(this.agent, result);
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void runAsSideEffect() {
            Status status = Status.FAILURE;
            try {
                this.executionInstrumentation.executionStarted(this.agent);
                long startTime = System.nanoTime();
                this.agentExecution.executeAgent(this.agent);
                this.executionInstrumentation.executionCompleted(this.agent, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime));
                status = Status.SUCCESS;
            }
            catch (Throwable cause) {
                this.executionInstrumentation.executionFailed(this.agent, cause);
            }
            finally {
                this.scheduler.runningAgents.ifPresent(Semaphore::release);
                this.scheduler.conditionalReleaseAgent(this.agent, this.acquireScore, status);
            }
        }
    }

    private static enum Status {
        SUCCESS,
        FAILURE;

    }
}

