/*
 * Decompiled with CFR 0.152.
 */
package com.netflix.spinnaker.cats.dynomite.cluster;

import com.netflix.dyno.connectionpool.exception.DynoException;
import com.netflix.spinnaker.cats.agent.Agent;
import com.netflix.spinnaker.cats.agent.AgentExecution;
import com.netflix.spinnaker.cats.agent.AgentLock;
import com.netflix.spinnaker.cats.agent.AgentScheduler;
import com.netflix.spinnaker.cats.agent.AgentSchedulerAware;
import com.netflix.spinnaker.cats.agent.ExecutionInstrumentation;
import com.netflix.spinnaker.cats.module.CatsModuleAware;
import com.netflix.spinnaker.cats.redis.cluster.AgentIntervalProvider;
import com.netflix.spinnaker.cats.redis.cluster.ClusteredAgentScheduler;
import com.netflix.spinnaker.cats.redis.cluster.NodeIdentity;
import com.netflix.spinnaker.cats.redis.cluster.NodeStatusProvider;
import com.netflix.spinnaker.cats.thread.NamedThreadFactory;
import com.netflix.spinnaker.kork.dynomite.DynomiteClientDelegate;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import net.jodah.failsafe.Failsafe;
import net.jodah.failsafe.RetryPolicy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.JedisCommands;
import redis.clients.jedis.exceptions.JedisException;

public class DynoClusteredAgentScheduler
extends CatsModuleAware
implements AgentScheduler<AgentLock>,
Runnable {
    private static final Logger log = LoggerFactory.getLogger(DynoClusteredAgentScheduler.class);
    private static final RetryPolicy ACQUIRE_LOCK_RETRY_POLICY = new RetryPolicy().retryOn(Arrays.asList(DynoException.class, JedisException.class)).withMaxRetries(3).withDelay(25L, TimeUnit.MILLISECONDS);
    private final DynomiteClientDelegate redisClientDelegate;
    private final NodeIdentity nodeIdentity;
    private final AgentIntervalProvider intervalProvider;
    private final ExecutorService agentExecutionPool;
    private final Map<String, AgentExecutionAction> agents = new ConcurrentHashMap<String, AgentExecutionAction>();
    private final Map<String, NextAttempt> activeAgents = new ConcurrentHashMap<String, NextAttempt>();
    private final NodeStatusProvider nodeStatusProvider;

    public DynoClusteredAgentScheduler(DynomiteClientDelegate redisClientDelegate, NodeIdentity nodeIdentity, AgentIntervalProvider intervalProvider, NodeStatusProvider nodeStatusProvider) {
        this(redisClientDelegate, nodeIdentity, intervalProvider, nodeStatusProvider, Executors.newSingleThreadScheduledExecutor((ThreadFactory)new NamedThreadFactory(ClusteredAgentScheduler.class.getSimpleName())), Executors.newCachedThreadPool((ThreadFactory)new NamedThreadFactory(AgentExecutionAction.class.getSimpleName())));
    }

    public DynoClusteredAgentScheduler(DynomiteClientDelegate redisClientDelegate, NodeIdentity nodeIdentity, AgentIntervalProvider intervalProvider, NodeStatusProvider nodeStatusProvider, ScheduledExecutorService lockPollingScheduler, ExecutorService agentExecutionPool) {
        this.redisClientDelegate = redisClientDelegate;
        this.nodeIdentity = nodeIdentity;
        this.intervalProvider = intervalProvider;
        this.nodeStatusProvider = nodeStatusProvider;
        this.agentExecutionPool = agentExecutionPool;
        lockPollingScheduler.scheduleAtFixedRate(this, 0L, 5L, TimeUnit.SECONDS);
    }

    public void schedule(Agent agent, AgentExecution agentExecution, ExecutionInstrumentation executionInstrumentation) {
        if (agent instanceof AgentSchedulerAware) {
            ((AgentSchedulerAware)agent).setAgentScheduler((AgentScheduler)this);
        }
        AgentExecutionAction agentExecutionAction = new AgentExecutionAction(agent, agentExecution, executionInstrumentation);
        this.agents.put(agent.getAgentType(), agentExecutionAction);
    }

    public void unschedule(Agent agent) {
        this.releaseRunKey(agent.getAgentType(), 0L);
        this.agents.remove(agent.getAgentType());
    }

    @Override
    public void run() {
        if (!this.nodeStatusProvider.isNodeEnabled()) {
            return;
        }
        try {
            this.runAgents();
        }
        catch (Throwable t) {
            log.error("Failed running cache agents", t);
        }
    }

    private Map<String, NextAttempt> acquire() {
        HashMap<String, NextAttempt> acquired = new HashMap<String, NextAttempt>(this.agents.size());
        HashSet<String> skip = new HashSet<String>(this.activeAgents.keySet());
        this.agents.entrySet().stream().filter(a -> !skip.contains(a.getKey())).forEach(a -> {
            AgentIntervalProvider.Interval interval;
            String agentType = (String)a.getKey();
            if (this.acquireRunKey(agentType, (interval = this.intervalProvider.getInterval(((AgentExecutionAction)a.getValue()).getAgent())).getTimeout())) {
                acquired.put(agentType, new NextAttempt(System.currentTimeMillis(), interval.getInterval(), interval.getErrorInterval()));
            }
        });
        return acquired;
    }

    private boolean acquireRunKey(String agentType, long timeout) {
        String identity = this.nodeIdentity.getNodeIdentity();
        return (Boolean)this.redisClientDelegate.withCommandsClient(client -> (Boolean)Failsafe.with((RetryPolicy)ACQUIRE_LOCK_RETRY_POLICY).get(() -> {
            String response = client.get(agentType);
            if (response == null && client.setnx(agentType, identity) == 1L) {
                client.pexpireAt(agentType, System.currentTimeMillis() + timeout);
                return true;
            }
            if (client.ttl(agentType) == -1L) {
                log.warn("Detected potential deadlocked agent, removing lock key: " + agentType);
                client.del(agentType);
            }
            return false;
        }));
    }

    private void runAgents() {
        Map<String, NextAttempt> thisRun = this.acquire();
        this.activeAgents.putAll(thisRun);
        for (Map.Entry<String, NextAttempt> toRun : thisRun.entrySet()) {
            AgentExecutionAction exec = this.agents.get(toRun.getKey());
            this.agentExecutionPool.submit(new AgentJob(toRun.getValue(), exec, this));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void agentCompleted(String agentType, long nextExecutionTime) {
        try {
            this.releaseRunKey(agentType, nextExecutionTime);
        }
        finally {
            this.activeAgents.remove(agentType);
        }
    }

    private void releaseRunKey(String agentType, long when) {
        long newTtl = when - System.currentTimeMillis();
        boolean delete = newTtl < 2500L;
        this.redisClientDelegate.withCommandsClient(client -> {
            if (delete) {
                this.deleteLock((JedisCommands)client, agentType);
            } else {
                this.ttlLock((JedisCommands)client, agentType, newTtl);
            }
        });
    }

    private void deleteLock(JedisCommands client, String agentType) {
        client.del(agentType);
    }

    private void ttlLock(JedisCommands client, String agentType, long newTtl) {
        String response = client.get(agentType);
        if (this.nodeIdentity.getNodeIdentity().equals(response)) {
            client.pexpireAt(agentType, System.currentTimeMillis() + newTtl);
        }
    }

    private static class AgentExecutionAction {
        private final Agent agent;
        private final AgentExecution agentExecution;
        private final ExecutionInstrumentation executionInstrumentation;

        public AgentExecutionAction(Agent agent, AgentExecution agentExecution, ExecutionInstrumentation executionInstrumentation) {
            this.agent = agent;
            this.agentExecution = agentExecution;
            this.executionInstrumentation = executionInstrumentation;
        }

        public Agent getAgent() {
            return this.agent;
        }

        public Status execute() {
            try {
                this.executionInstrumentation.executionStarted(this.agent);
                long startTime = System.nanoTime();
                this.agentExecution.executeAgent(this.agent);
                this.executionInstrumentation.executionCompleted(this.agent, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime));
                return Status.SUCCESS;
            }
            catch (Throwable cause) {
                this.executionInstrumentation.executionFailed(this.agent, cause);
                return Status.FAILURE;
            }
        }
    }

    private static class AgentJob
    implements Runnable {
        private final NextAttempt lockReleaseTime;
        private final AgentExecutionAction action;
        private final DynoClusteredAgentScheduler scheduler;

        public AgentJob(NextAttempt lockReleaseTime, AgentExecutionAction action, DynoClusteredAgentScheduler scheduler) {
            this.lockReleaseTime = lockReleaseTime;
            this.action = action;
            this.scheduler = scheduler;
        }

        @Override
        public void run() {
            Status status = Status.FAILURE;
            try {
                status = this.action.execute();
            }
            finally {
                this.scheduler.agentCompleted(this.action.getAgent().getAgentType(), this.lockReleaseTime.getNextTime(status));
            }
        }
    }

    private static class NextAttempt {
        private final long currentTime;
        private final long successInterval;
        private final long errorInterval;

        public NextAttempt(long currentTime, long successInterval, long errorInterval) {
            this.currentTime = currentTime;
            this.successInterval = successInterval;
            this.errorInterval = errorInterval;
        }

        public long getNextTime(Status status) {
            if (status == Status.SUCCESS) {
                return this.currentTime + this.successInterval;
            }
            return this.currentTime + this.errorInterval;
        }
    }

    private static enum Status {
        SUCCESS,
        FAILURE;

    }
}

