/*
 * Decompiled with CFR 0.152.
 */
package com.netflix.dyno.queues.redis;

import com.google.common.base.Function;
import com.google.common.collect.Collections2;
import com.google.common.collect.Lists;
import com.netflix.appinfo.AmazonInfo;
import com.netflix.appinfo.InstanceInfo;
import com.netflix.discovery.EurekaClient;
import com.netflix.discovery.shared.Application;
import com.netflix.dyno.connectionpool.Host;
import com.netflix.dyno.queues.DynoQueue;
import com.netflix.dyno.queues.Message;
import com.netflix.dyno.queues.redis.RedisQueue;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

public class MultiRedisQueue
implements DynoQueue {
    private List<String> shards;
    private String name;
    private Map<String, RedisQueue> queues = new HashMap<String, RedisQueue>();
    private RedisQueue me;
    private AtomicInteger nextShardIndex = new AtomicInteger(0);

    public MultiRedisQueue(String queueName, String shardName, Map<String, RedisQueue> queues) {
        this.name = queueName;
        this.queues = queues;
        this.me = queues.get(shardName);
        if (this.me == null) {
            throw new IllegalArgumentException("List of shards supplied (" + queues.keySet() + ") does not contain current shard name: " + shardName);
        }
        this.shards = queues.keySet().stream().collect(Collectors.toList());
    }

    public String getName() {
        return this.name;
    }

    public int getUnackTime() {
        return this.me.getUnackTime();
    }

    public List<String> push(List<Message> messages) {
        int size = this.queues.size();
        int partitionSize = messages.size() / size;
        LinkedList<String> ids = new LinkedList<String>();
        for (int i = 0; i < size - 1; ++i) {
            RedisQueue queue = this.queues.get(this.getNextShard());
            int start = i * partitionSize;
            int end = start + partitionSize;
            ids.addAll(queue.push(messages.subList(start, end)));
        }
        RedisQueue queue = this.queues.get(this.getNextShard());
        int start = (size - 1) * partitionSize;
        ids.addAll(queue.push(messages.subList(start, messages.size())));
        return ids;
    }

    public List<Message> pop(int messageCount, int wait, TimeUnit unit) {
        return this.me.pop(messageCount, wait, unit);
    }

    public List<Message> peek(int messageCount) {
        return this.me.peek(messageCount);
    }

    public boolean ack(String messageId) {
        for (DynoQueue dynoQueue : this.queues.values()) {
            if (!dynoQueue.ack(messageId)) continue;
            return true;
        }
        return false;
    }

    public void ack(List<Message> messages) {
        Map<String, List<Message>> byShard = messages.stream().collect(Collectors.groupingBy(Message::getShard));
        for (Map.Entry<String, List<Message>> e : byShard.entrySet()) {
            this.queues.get(e.getKey()).ack(e.getValue());
        }
    }

    public boolean setUnackTimeout(String messageId, long timeout) {
        for (DynoQueue dynoQueue : this.queues.values()) {
            if (!dynoQueue.setUnackTimeout(messageId, timeout)) continue;
            return true;
        }
        return false;
    }

    public boolean setTimeout(String messageId, long timeout) {
        for (DynoQueue dynoQueue : this.queues.values()) {
            if (!dynoQueue.setTimeout(messageId, timeout)) continue;
            return true;
        }
        return false;
    }

    public boolean remove(String messageId) {
        for (DynoQueue dynoQueue : this.queues.values()) {
            if (!dynoQueue.remove(messageId)) continue;
            return true;
        }
        return false;
    }

    public Message get(String messageId) {
        for (DynoQueue dynoQueue : this.queues.values()) {
            Message msg = dynoQueue.get(messageId);
            if (msg == null) continue;
            return msg;
        }
        return null;
    }

    public long size() {
        long size = 0L;
        for (DynoQueue dynoQueue : this.queues.values()) {
            size += dynoQueue.size();
        }
        return size;
    }

    public Map<String, Map<String, Long>> shardSizes() {
        HashMap<String, Map<String, Long>> sizes = new HashMap<String, Map<String, Long>>();
        for (Map.Entry<String, RedisQueue> e : this.queues.entrySet()) {
            sizes.put(e.getKey(), e.getValue().shardSizes().get(e.getKey()));
        }
        return sizes;
    }

    public void clear() {
        for (DynoQueue dynoQueue : this.queues.values()) {
            dynoQueue.clear();
        }
    }

    public void close() throws IOException {
        for (RedisQueue queue : this.queues.values()) {
            queue.close();
        }
    }

    public void processUnacks() {
        for (RedisQueue queue : this.queues.values()) {
            queue.processUnacks();
        }
    }

    private String getNextShard() {
        int indx = this.nextShardIndex.incrementAndGet();
        if (indx >= this.shards.size()) {
            this.nextShardIndex.set(0);
            indx = 0;
        }
        String s = this.shards.get(indx);
        return s;
    }

    public static class Builder {
        private String queueName;
        private EurekaClient ec;
        private String dynomiteClusterName;
        private String redisKeyPrefix;
        private int unackTime;
        private String currentShard;
        private Function<Host, String> hostToShardMap;
        private int redisPoolSize;
        private int quorumPort;
        private int nonQuorumPort;
        private List<Host> hosts;

        public Builder setQueueName(String queueName) {
            this.queueName = queueName;
            return this;
        }

        public Builder setEc(EurekaClient ec) {
            this.ec = ec;
            return this;
        }

        public Builder setDynomiteClusterName(String dynomiteClusterName) {
            this.dynomiteClusterName = dynomiteClusterName;
            return this;
        }

        public Builder setRedisKeyPrefix(String redisKeyPrefix) {
            this.redisKeyPrefix = redisKeyPrefix;
            return this;
        }

        public Builder setUnackTime(int unackTime) {
            this.unackTime = unackTime;
            return this;
        }

        public Builder setCurrentShard(String currentShard) {
            this.currentShard = currentShard;
            return this;
        }

        public Builder setHostToShardMap(Function<Host, String> hostToShardMap) {
            this.hostToShardMap = hostToShardMap;
            return this;
        }

        public Builder setRedisPoolSize(int redisPoolSize) {
            this.redisPoolSize = redisPoolSize;
            return this;
        }

        public Builder setQuorumPort(int quorumPort) {
            this.quorumPort = quorumPort;
            return this;
        }

        public Builder setNonQuorumPort(int nonQuorumPort) {
            this.nonQuorumPort = nonQuorumPort;
            return this;
        }

        public Builder setHosts(List<Host> hosts) {
            this.hosts = hosts;
            return this;
        }

        public MultiRedisQueue build() {
            if (this.hosts == null) {
                this.hosts = Builder.getHostsFromEureka(this.ec, this.dynomiteClusterName);
            }
            HashMap<String, Host> shardMap = new HashMap<String, Host>();
            for (Host host : this.hosts) {
                String shard = (String)this.hostToShardMap.apply((Object)host);
                shardMap.put(shard, host);
            }
            JedisPoolConfig config = new JedisPoolConfig();
            config.setTestOnBorrow(true);
            config.setTestOnCreate(true);
            config.setMaxTotal(this.redisPoolSize);
            config.setMaxIdle(5);
            config.setMaxWaitMillis(60000L);
            HashMap<String, RedisQueue> queues = new HashMap<String, RedisQueue>();
            for (String queueShard : shardMap.keySet()) {
                String host = ((Host)shardMap.get(queueShard)).getIpAddress();
                JedisPool pool = new JedisPool((GenericObjectPoolConfig)config, host, this.quorumPort, 0);
                JedisPool readPool = new JedisPool((GenericObjectPoolConfig)config, host, this.nonQuorumPort, 0);
                RedisQueue q = new RedisQueue(this.redisKeyPrefix, this.queueName, queueShard, this.unackTime, pool);
                q.setNonQuorumPool(readPool);
                queues.put(queueShard, q);
            }
            MultiRedisQueue queue = new MultiRedisQueue(this.queueName, this.currentShard, queues);
            return queue;
        }

        private static List<Host> getHostsFromEureka(EurekaClient ec, String applicationName) {
            Application app = ec.getApplication(applicationName);
            ArrayList hosts = new ArrayList();
            if (app == null) {
                return hosts;
            }
            List ins = app.getInstances();
            if (ins == null || ins.isEmpty()) {
                return hosts;
            }
            hosts = Lists.newArrayList((Iterable)Collections2.transform((Collection)ins, (Function)new Function<InstanceInfo, Host>(){

                public Host apply(InstanceInfo info) {
                    Host.Status status = info.getStatus() == InstanceInfo.InstanceStatus.UP ? Host.Status.Up : Host.Status.Down;
                    String rack = null;
                    if (info.getDataCenterInfo() instanceof AmazonInfo) {
                        AmazonInfo amazonInfo = (AmazonInfo)info.getDataCenterInfo();
                        rack = amazonInfo.get(AmazonInfo.MetaDataKey.availabilityZone);
                    }
                    Host host = new Host(info.getHostName(), info.getIPAddr(), rack, status);
                    return host;
                }
            }));
            return hosts;
        }
    }
}

