/*
 * Decompiled with CFR 0.152.
 */
package com.netflix.dyno.queues.redis;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.netflix.dyno.connectionpool.HashPartitioner;
import com.netflix.dyno.connectionpool.impl.hash.Murmur3HashPartitioner;
import com.netflix.dyno.queues.DynoQueue;
import com.netflix.dyno.queues.Message;
import com.netflix.dyno.queues.redis.QueueMonitor;
import com.netflix.servo.monitor.Stopwatch;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.Pipeline;
import redis.clients.jedis.Response;
import redis.clients.jedis.Tuple;
import redis.clients.jedis.params.sortedset.ZAddParams;

public class RedisQueue
implements DynoQueue {
    private final Logger logger = LoggerFactory.getLogger(RedisQueue.class);
    private String queueName;
    private String shardName;
    private String messageStoreKeyPrefix;
    private String myQueueShard;
    private String unackShardKeyPrefix;
    private int unackTime = 60;
    private QueueMonitor monitor;
    private ObjectMapper om;
    private JedisPool connPool;
    private JedisPool nonQuorumPool;
    private ScheduledExecutorService schedulerForUnacksProcessing;
    private ScheduledExecutorService schedulerForPrefetchProcessing;
    private HashPartitioner partitioner = new Murmur3HashPartitioner();
    private int maxHashBuckets = 1024;

    public RedisQueue(String redisKeyPrefix, String queueName, String shardName, int unackTime, JedisPool pool) {
        this(redisKeyPrefix, queueName, shardName, unackTime, unackTime, pool);
    }

    public RedisQueue(String redisKeyPrefix, String queueName, String shardName, int unackScheduleInMS, int unackTime, JedisPool pool) {
        this.queueName = queueName;
        this.shardName = shardName;
        this.messageStoreKeyPrefix = redisKeyPrefix + ".MESSAGE.";
        this.myQueueShard = redisKeyPrefix + ".QUEUE." + queueName + "." + shardName;
        this.unackShardKeyPrefix = redisKeyPrefix + ".UNACK." + queueName + "." + shardName + ".";
        this.unackTime = unackTime;
        this.connPool = pool;
        this.nonQuorumPool = pool;
        ObjectMapper om = new ObjectMapper();
        om.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
        om.configure(DeserializationFeature.FAIL_ON_IGNORED_PROPERTIES, false);
        om.configure(DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES, false);
        om.setSerializationInclusion(JsonInclude.Include.NON_NULL);
        om.setSerializationInclusion(JsonInclude.Include.NON_EMPTY);
        om.disable(SerializationFeature.INDENT_OUTPUT);
        this.om = om;
        this.monitor = new QueueMonitor(queueName, shardName);
        this.schedulerForUnacksProcessing = Executors.newScheduledThreadPool(1);
        this.schedulerForPrefetchProcessing = Executors.newScheduledThreadPool(1);
        this.schedulerForUnacksProcessing.scheduleAtFixedRate(() -> this.processUnacks(), unackScheduleInMS, unackScheduleInMS, TimeUnit.MILLISECONDS);
        this.logger.info(RedisQueue.class.getName() + " is ready to serve " + queueName);
    }

    public void setNonQuorumPool(JedisPool nonQuorumPool) {
        this.nonQuorumPool = nonQuorumPool;
    }

    public String getName() {
        return this.queueName;
    }

    public int getUnackTime() {
        return this.unackTime;
    }

    public List<String> push(List<Message> messages) {
        Stopwatch sw = this.monitor.start(this.monitor.push, messages.size());
        Jedis conn = this.connPool.getResource();
        try {
            Pipeline pipe = conn.pipelined();
            for (Message message : messages) {
                String json = this.om.writeValueAsString((Object)message);
                pipe.hset(this.messageStoreKey(message.getId()), message.getId(), json);
                double priority = (double)message.getPriority() / 100.0;
                double score = Long.valueOf(System.currentTimeMillis() + message.getTimeout()).doubleValue() + priority;
                pipe.zadd(this.myQueueShard, score, message.getId());
            }
            pipe.sync();
            pipe.close();
            List list = messages.stream().map(msg -> msg.getId()).collect(Collectors.toList());
            return list;
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
        finally {
            conn.close();
            sw.stop();
        }
    }

    private String messageStoreKey(String msgId) {
        Long hash = this.partitioner.hash(msgId);
        long bucket = hash % (long)this.maxHashBuckets;
        return this.messageStoreKeyPrefix + bucket + "." + this.queueName;
    }

    private String unackShardKey(String messageId) {
        Long hash = this.partitioner.hash(messageId);
        long bucket = hash % (long)this.maxHashBuckets;
        return this.unackShardKeyPrefix + bucket;
    }

    public List<Message> peek(int messageCount) {
        Stopwatch sw = this.monitor.peek.start();
        Jedis jedis = this.connPool.getResource();
        try {
            Set<String> ids = this.peekIds(0, messageCount);
            if (ids == null) {
                List<Message> list = Collections.emptyList();
                return list;
            }
            LinkedList<Message> messages = new LinkedList<Message>();
            for (String id : ids) {
                String json = jedis.hget(this.messageStoreKey(id), id);
                Message message = (Message)this.om.readValue(json, Message.class);
                messages.add(message);
            }
            LinkedList<Message> linkedList = messages;
            return linkedList;
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
        finally {
            jedis.close();
            sw.stop();
        }
    }

    public synchronized List<Message> pop(int messageCount, int wait, TimeUnit unit) {
        if (messageCount < 1) {
            return Collections.emptyList();
        }
        Stopwatch sw = this.monitor.start(this.monitor.pop, messageCount);
        try {
            List<Message> popped;
            List<String> peeked = this.peekIds(0, messageCount).stream().collect(Collectors.toList());
            List<Message> list = popped = this._pop(peeked);
            return list;
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
        finally {
            sw.stop();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private List<Message> _pop(List<String> batch) throws Exception {
        double unackScore = Long.valueOf(System.currentTimeMillis() + (long)this.unackTime).doubleValue();
        LinkedList<Message> popped = new LinkedList<Message>();
        ZAddParams zParams = ZAddParams.zAddParams().nx();
        try (Jedis jedis = this.connPool.getResource();){
            int i;
            String msgId;
            Pipeline pipe = jedis.pipelined();
            ArrayList<Response> zadds = new ArrayList<Response>(batch.size());
            for (int i2 = 0; i2 < batch.size() && (msgId = batch.get(i2)) != null; ++i2) {
                zadds.add(pipe.zadd(this.unackShardKey(msgId), unackScore, msgId, zParams));
            }
            pipe.sync();
            int count = zadds.size();
            ArrayList<String> zremIds = new ArrayList<String>(count);
            LinkedList<Response> zremRes = new LinkedList<Response>();
            for (int i3 = 0; i3 < count; ++i3) {
                long added = (Long)((Response)zadds.get(i3)).get();
                if (added == 0L) {
                    if (this.logger.isDebugEnabled()) {
                        this.logger.debug("Cannot add {} to unack queue shard", (Object)batch.get(i3));
                    }
                    this.monitor.misses.increment();
                    continue;
                }
                String id = batch.get(i3);
                zremIds.add(id);
                zremRes.add(pipe.zrem(this.myQueueShard, new String[]{id}));
            }
            pipe.sync();
            ArrayList<Response> getRes = new ArrayList<Response>(count);
            for (i = 0; i < zremRes.size(); ++i) {
                long removed = (Long)((Response)zremRes.get(i)).get();
                if (removed == 0L) {
                    if (this.logger.isDebugEnabled()) {
                        this.logger.debug("Cannot remove {} from queue shard", zremIds.get(i));
                    }
                    this.monitor.misses.increment();
                    continue;
                }
                getRes.add(pipe.hget(this.messageStoreKey((String)zremIds.get(i)), (String)zremIds.get(i)));
            }
            pipe.sync();
            for (i = 0; i < getRes.size(); ++i) {
                String json = (String)((Response)getRes.get(i)).get();
                if (json == null) {
                    if (this.logger.isDebugEnabled()) {
                        this.logger.debug("Cannot read payload for {}", zremIds.get(i));
                    }
                    this.monitor.misses.increment();
                    continue;
                }
                Message msg = (Message)this.om.readValue(json, Message.class);
                msg.setShard(this.shardName);
                popped.add(msg);
            }
            LinkedList<Message> linkedList = popped;
            return linkedList;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean ack(String messageId) {
        Stopwatch sw = this.monitor.ack.start();
        Jedis jedis = this.connPool.getResource();
        try {
            Long removed = jedis.zrem(this.unackShardKey(messageId), new String[]{messageId});
            if (removed > 0L) {
                jedis.hdel(this.messageStoreKey(messageId), new String[]{messageId});
                boolean bl = true;
                return bl;
            }
            boolean bl = false;
            return bl;
        }
        finally {
            jedis.close();
            sw.stop();
        }
    }

    public void ack(List<Message> messages) {
        Stopwatch sw = this.monitor.ack.start();
        Jedis jedis = this.connPool.getResource();
        Pipeline pipe = jedis.pipelined();
        LinkedList<Response> responses = new LinkedList<Response>();
        try {
            for (Message msg : messages) {
                responses.add(pipe.zrem(this.unackShardKey(msg.getId()), new String[]{msg.getId()}));
            }
            pipe.sync();
            pipe.close();
            LinkedList<Response> dels = new LinkedList<Response>();
            for (int i = 0; i < messages.size(); ++i) {
                Long removed = (Long)((Response)responses.get(i)).get();
                if (removed <= 0L) continue;
                dels.add(pipe.hdel(this.messageStoreKey(messages.get(i).getId()), new String[]{messages.get(i).getId()}));
            }
            pipe.sync();
            pipe.close();
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
        finally {
            jedis.close();
            sw.stop();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean setUnackTimeout(String messageId, long timeout) {
        Stopwatch sw = this.monitor.ack.start();
        Jedis jedis = this.connPool.getResource();
        try {
            double unackScore = Long.valueOf(System.currentTimeMillis() + timeout).doubleValue();
            Double score = jedis.zscore(this.unackShardKey(messageId), messageId);
            if (score != null) {
                jedis.zadd(this.unackShardKey(messageId), unackScore, messageId);
                boolean bl = true;
                return bl;
            }
            boolean bl = false;
            return bl;
        }
        finally {
            jedis.close();
            sw.stop();
        }
    }

    public boolean setTimeout(String messageId, long timeout) {
        try (Jedis jedis = this.connPool.getResource();){
            String json = jedis.hget(this.messageStoreKey(messageId), messageId);
            if (json == null) {
                boolean bl = false;
                return bl;
            }
            Message message = (Message)this.om.readValue(json, Message.class);
            message.setTimeout(timeout);
            Double score = jedis.zscore(this.myQueueShard, messageId);
            if (score != null) {
                double priorityd = (double)message.getPriority() / 100.0;
                double newScore = Long.valueOf(System.currentTimeMillis() + timeout).doubleValue() + priorityd;
                jedis.zadd(this.myQueueShard, newScore, messageId);
                json = this.om.writeValueAsString((Object)message);
                jedis.hset(this.messageStoreKey(message.getId()), message.getId(), json);
                boolean bl = true;
                return bl;
            }
            boolean bl = false;
            return bl;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean remove(String messageId) {
        Stopwatch sw = this.monitor.remove.start();
        Jedis jedis = this.connPool.getResource();
        try {
            jedis.zrem(this.unackShardKey(messageId), new String[]{messageId});
            Long removed = jedis.zrem(this.myQueueShard, new String[]{messageId});
            Long msgRemoved = jedis.hdel(this.messageStoreKey(messageId), new String[]{messageId});
            if (removed > 0L && msgRemoved > 0L) {
                boolean bl = true;
                return bl;
            }
            boolean bl = false;
            return bl;
        }
        finally {
            jedis.close();
            sw.stop();
        }
    }

    public Message get(String messageId) {
        Stopwatch sw = this.monitor.get.start();
        Jedis jedis = this.connPool.getResource();
        try {
            Message msg;
            String json = jedis.hget(this.messageStoreKey(messageId), messageId);
            if (json == null) {
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("Cannot get the message payload " + messageId);
                }
                Message message = null;
                return message;
            }
            Message message = msg = (Message)this.om.readValue(json, Message.class);
            return message;
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
        finally {
            jedis.close();
            sw.stop();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public long size() {
        Stopwatch sw = this.monitor.size.start();
        Jedis jedis = this.nonQuorumPool.getResource();
        try {
            long size;
            long l = size = jedis.zcard(this.myQueueShard).longValue();
            return l;
        }
        finally {
            jedis.close();
            sw.stop();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Map<String, Map<String, Long>> shardSizes() {
        Stopwatch sw = this.monitor.size.start();
        HashMap<String, Map<String, Long>> shardSizes = new HashMap<String, Map<String, Long>>();
        Jedis jedis = this.nonQuorumPool.getResource();
        try {
            long size = jedis.zcard(this.myQueueShard);
            long uacked = 0L;
            for (int i = 0; i < this.maxHashBuckets; ++i) {
                String unackShardKey = this.unackShardKeyPrefix + i;
                uacked += jedis.zcard(unackShardKey).longValue();
            }
            HashMap<String, Long> shardDetails = new HashMap<String, Long>();
            shardDetails.put("size", size);
            shardDetails.put("uacked", uacked);
            shardSizes.put(this.shardName, shardDetails);
            HashMap<String, Map<String, Long>> hashMap = shardSizes;
            return hashMap;
        }
        finally {
            jedis.close();
            sw.stop();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void clear() {
        try (Jedis jedis = this.connPool.getResource();){
            jedis.del(this.myQueueShard);
            for (int i = 0; i < this.maxHashBuckets; ++i) {
                String unackShardKey = this.unackShardKeyPrefix + i;
                jedis.del(unackShardKey);
                String messageStoreKey = this.messageStoreKeyPrefix + i + "." + this.queueName;
                jedis.del(messageStoreKey);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Set<String> peekIds(int offset, int count) {
        try (Jedis jedis = this.connPool.getResource();){
            Set scanned;
            double now = Long.valueOf(System.currentTimeMillis() + 1L).doubleValue();
            Set set = scanned = jedis.zrangeByScore(this.myQueueShard, 0.0, now, offset, count);
            return set;
        }
    }

    public void processUnacks() {
        for (int i = 0; i < this.maxHashBuckets; ++i) {
            String unackShardKey = this.unackShardKeyPrefix + i;
            this.processUnacks(unackShardKey);
        }
    }

    private void processUnacks(String unackShardKey) {
        Stopwatch sw = this.monitor.processUnack.start();
        Jedis jedis = this.connPool.getResource();
        try {
            block4: while (true) {
                long queueDepth = this.size();
                this.monitor.queueDepth.record(queueDepth);
                int batchSize = 1000;
                double now = Long.valueOf(System.currentTimeMillis()).doubleValue();
                Set unacks = jedis.zrangeByScoreWithScores(unackShardKey, 0.0, now, 0, batchSize);
                if (unacks.size() <= 0) {
                    return;
                }
                this.logger.debug("Adding " + unacks.size() + " messages back to the queue for " + this.queueName);
                Iterator iterator = unacks.iterator();
                while (true) {
                    if (!iterator.hasNext()) continue block4;
                    Tuple unack = (Tuple)iterator.next();
                    double score = unack.getScore();
                    String member = unack.getElement();
                    String payload = jedis.hget(this.messageStoreKey(member), member);
                    if (payload == null) {
                        jedis.zrem(this.unackShardKey(member), new String[]{member});
                        continue;
                    }
                    jedis.zadd(this.myQueueShard, score, member);
                    jedis.zrem(this.unackShardKey(member), new String[]{member});
                }
                break;
            }
        }
        finally {
            jedis.close();
            sw.stop();
        }
    }

    public void close() throws IOException {
        this.schedulerForUnacksProcessing.shutdown();
        this.schedulerForPrefetchProcessing.shutdown();
        this.monitor.close();
    }
}

