/*
 * Decompiled with CFR 0.152.
 */
package cn.com.duiba.apollo.client.service.rocketmq;

import cn.com.duiba.apollo.client.service.rocketmq.ApolloClientRocketmqProperties;
import cn.com.duiba.apollo.client.service.rocketmq.ConfigServerEventListener;
import cn.com.duiba.apollo.client.service.rocketmq.ConsumeMode;
import java.util.List;
import java.util.Objects;
import javax.annotation.Resource;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListener;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RocketMQListenerBindingContainer {
    private static final Logger log = LoggerFactory.getLogger(RocketMQListenerBindingContainer.class);
    @Resource
    private ConfigServerEventListener configServerEventListener;
    @Resource
    private ApolloClientRocketmqProperties apolloClientRocketmqProperties;
    private DefaultMQPushConsumer consumer;

    void destroy() {
        if (Objects.nonNull(this.consumer)) {
            this.consumer.shutdown();
        }
        log.info("container destroyed, {}", (Object)this.toString());
    }

    void init() throws Exception {
        ConsumeMode consumeMode = this.apolloClientRocketmqProperties.getOrderly() != false ? ConsumeMode.ORDERLY : ConsumeMode.CONCURRENTLY;
        this.consumer = new DefaultMQPushConsumer(this.apolloClientRocketmqProperties.getGroup());
        this.consumer.setNamesrvAddr(this.apolloClientRocketmqProperties.getNameServer());
        this.consumer.setConsumeThreadMax(this.apolloClientRocketmqProperties.getConcurrency());
        this.consumer.setConsumeThreadMin(this.apolloClientRocketmqProperties.getConcurrency());
        this.consumer.setMessageModel(MessageModel.BROADCASTING);
        this.consumer.subscribe(this.apolloClientRocketmqProperties.getTopic(), null);
        switch (consumeMode) {
            case ORDERLY: {
                this.consumer.setMessageListener((MessageListener)new DefaultMessageListenerOrderly());
                break;
            }
            case CONCURRENTLY: {
                this.consumer.setMessageListener((MessageListener)new DefaultMessageListenerConcurrently());
                break;
            }
            default: {
                throw new IllegalArgumentException("Property 'consumeMode' was wrong.");
            }
        }
        try {
            this.consumer.start();
        }
        catch (MQClientException e) {
            throw new IllegalStateException("Failed to start RocketMQ push consumer", e);
        }
    }

    private void acceptMessages(MessageExt message) {
        long now = System.currentTimeMillis();
        this.configServerEventListener.onMessage(message);
        long costTime = System.currentTimeMillis() - now;
        log.debug("topic:{} consume:{} cost:{} ms", new Object[]{message.getTopic(), message.getMsgId(), costTime});
    }

    public class DefaultMessageListenerOrderly
    implements MessageListenerOrderly {
        public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
            for (MessageExt messageExt : msgs) {
                try {
                    RocketMQListenerBindingContainer.this.acceptMessages(messageExt);
                }
                catch (Exception e) {
                    log.error("consume message failed. messageExt:{}", (Object)messageExt, (Object)e);
                    context.setSuspendCurrentQueueTimeMillis(1000L);
                    return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                }
            }
            return ConsumeOrderlyStatus.SUCCESS;
        }
    }

    public class DefaultMessageListenerConcurrently
    implements MessageListenerConcurrently {
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            for (MessageExt messageExt : msgs) {
                try {
                    RocketMQListenerBindingContainer.this.acceptMessages(messageExt);
                }
                catch (Exception e) {
                    log.error("consume message failed. messageExt:{}", (Object)messageExt, (Object)e);
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    }
}

