package cn.com.duibaboot.ext.autoconfigure.rocketmq;

import ch.qos.logback.classic.Level;
import cn.com.duiba.boot.utils.NetUtils;
import cn.com.duibaboot.ext.autoconfigure.cloud.netflix.eureka.DiscoveryMetadataAutoConfiguration;
import cn.com.duibaboot.ext.autoconfigure.cloud.netflix.eureka.EurekaInstanceChangedEvent;
import cn.com.duibaboot.ext.autoconfigure.core.EarlyClose;
import cn.com.duibaboot.ext.autoconfigure.rocketmq.RocketMqProperties;
import com.netflix.appinfo.InstanceInfo;
import com.netflix.discovery.EurekaClient;
import java.lang.reflect.Field;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.annotation.Resource;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.MessageListener;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationContext;
import org.springframework.context.event.EventListener;
import org.springframework.util.ReflectionUtils;

/* loaded from: input_file:cn/com/duibaboot/ext/autoconfigure/rocketmq/DefaultMQPushConsumerWrapper.class */
public class DefaultMQPushConsumerWrapper extends EarlyClose {
    private static final Logger log = LoggerFactory.getLogger(DefaultMQPushConsumerWrapper.class);

    @Resource
    private ApplicationContext applicationContext;
    private DefaultMQPushConsumer rocketMqConsumer;
    private RocketMqProperties rocketMqProperties;
    private RocketMqProperties.ConsumerProperties consumerProperties;
    private String messageListenerBeanId;
    private boolean started = false;
    private String currentAppName;

    @Value("${server.port}")
    private int serverPort;

    @Resource
    private EurekaClient eurekaClient;

    @Value("${spring.application.name}")
    public void setCurrentAppName(String str) {
        this.currentAppName = str.toUpperCase();
    }

    public DefaultMQPushConsumerWrapper(RocketMqProperties rocketMqProperties, int i) {
        this.rocketMqProperties = rocketMqProperties;
        if (i == -1) {
            this.consumerProperties = rocketMqProperties.getConsumer();
            this.messageListenerBeanId = "bootRocketMqMessageListener";
        } else {
            this.consumerProperties = rocketMqProperties.getExtraConsumer()[i];
            this.messageListenerBeanId = "extraBootRocketMqMessageListener" + i;
        }
    }

    public synchronized void startRun() throws MQClientException {
        if (this.started) {
            log.warn("rocketmq consumer have already started, please don't call start again");
            return;
        }
        try {
            MessageListenerConcurrently messageListenerConcurrently = (MessageListener) this.applicationContext.getBean(this.messageListenerBeanId, MessageListener.class);
            DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer(this.consumerProperties.getGroup());
            defaultMQPushConsumer.setNamesrvAddr(this.rocketMqProperties.getNameSrvAddr());
            defaultMQPushConsumer.setMaxReconsumeTimes(this.consumerProperties.getMaxReconsumeTimes().intValue());
            if (this.consumerProperties.getConsumeThreadNums().intValue() > 0) {
                defaultMQPushConsumer.setConsumeThreadMin(this.consumerProperties.getConsumeThreadNums().intValue());
                defaultMQPushConsumer.setConsumeThreadMax(Math.max(this.consumerProperties.getConsumeThreadNums().intValue(), 64));
            }
            defaultMQPushConsumer.setMessageModel(this.consumerProperties.getMessageModelEnum());
            defaultMQPushConsumer.setConsumeMessageBatchMaxSize(this.consumerProperties.getConsumeMessageBatchMaxSize().intValue());
            for (String str : this.consumerProperties.getTopics().split(",")) {
                defaultMQPushConsumer.subscribe(str, "*");
            }
            if (messageListenerConcurrently instanceof MessageListenerConcurrently) {
                defaultMQPushConsumer.registerMessageListener(messageListenerConcurrently);
                if (messageListenerConcurrently instanceof MessageListenerConcurrentlyWrapper) {
                    ((MessageListenerConcurrentlyWrapper) messageListenerConcurrently).setConsumerProperties(this.consumerProperties);
                }
            } else if (messageListenerConcurrently instanceof MessageListenerOrderly) {
                defaultMQPushConsumer.registerMessageListener((MessageListenerOrderly) messageListenerConcurrently);
                if (messageListenerConcurrently instanceof MessageListenerOrderlyWrapper) {
                    ((MessageListenerOrderlyWrapper) messageListenerConcurrently).setConsumerProperties(this.consumerProperties);
                }
            }
            this.rocketMqConsumer = defaultMQPushConsumer;
            this.rocketMqConsumer.start();
            this.started = true;
        } catch (NoSuchBeanDefinitionException e) {
            log.error("", new IllegalStateException("bean id:[" + this.messageListenerBeanId + "] class:[org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently] is not exist in spring's context, 请声明，否则不会启用rocketmq消费!!!，请参考: http://gitlab2.dui88.com/basic_platform/spring-boot-ext/wikis/Starter%E7%89%B9%E6%80%A7/_rocketmq%E3%80%90%E5%BC%80%E6%BA%90RocketMQ%E3%80%91"));
        }
    }

    public synchronized void shutdown() {
        if (this.started) {
            this.rocketMqConsumer.shutdown();
            if (this.rocketMqConsumer != null) {
                this.rocketMqConsumer.shutdown();
                awaitShutdown(this.rocketMqConsumer, 3);
            }
            this.started = false;
        }
    }

    private void awaitShutdown(DefaultMQPushConsumer defaultMQPushConsumer, int i) {
        Field findField = ReflectionUtils.findField(defaultMQPushConsumer.getClass(), "defaultMQPushConsumerImpl");
        findField.setAccessible(true);
        Object field = ReflectionUtils.getField(findField, defaultMQPushConsumer);
        Field findField2 = ReflectionUtils.findField(field.getClass(), "consumeMessageService");
        findField2.setAccessible(true);
        Object field2 = ReflectionUtils.getField(findField2, field);
        Field findField3 = ReflectionUtils.findField(field2.getClass(), "consumeExecutor");
        findField3.setAccessible(true);
        try {
            ((ThreadPoolExecutor) ReflectionUtils.getField(findField3, field2)).awaitTermination(i, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    synchronized boolean isStarted() {
        return this.started;
    }

    @EventListener({EurekaInstanceChangedEvent.class})
    public void onEurekaInstanceChanged(EurekaInstanceChangedEvent eurekaInstanceChangedEvent) throws MQClientException {
        InstanceInfo.InstanceStatus instanceStatus = null;
        String str = null;
        if (eurekaInstanceChangedEvent.getAppNames().contains(this.currentAppName)) {
            for (InstanceInfo instanceInfo : this.eurekaClient.getApplication(this.currentAppName).getInstancesAsIsFromEureka()) {
                if (NetUtils.getLocalIp().equals(instanceInfo.getIPAddr()) && this.serverPort == instanceInfo.getPort()) {
                    instanceStatus = instanceInfo.getStatus();
                    str = (String) instanceInfo.getMetadata().getOrDefault(DiscoveryMetadataAutoConfiguration.WEIGHT_KEY, "100");
                }
            }
        }
        if ((instanceStatus == InstanceInfo.InstanceStatus.OUT_OF_SERVICE || "0".equals(str)) && isStarted()) {
            log.info("检测到当前实例被禁用(或权重调为0), 停止本实例的rocketmq消费");
            shutdown();
        } else {
            if (instanceStatus != InstanceInfo.InstanceStatus.UP || "0".equals(str) || isStarted()) {
                return;
            }
            log.info("检测到当前实例被重新启用且权重大于0, 重新开始本实例的rocketmq消费");
            startRun();
        }
    }

    public void stop() {
        shutdown();
    }

    public int getPhase() {
        return 1;
    }

    static {
        log.setLevel(Level.INFO);
    }
}
