package cn.com.duiba.nezha.engine.biz.message.advert.consumer;

import cn.com.duiba.nezha.engine.biz.message.advert.distribute.MessageConsumerBo;
import java.util.Arrays;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import org.apache.commons.lang.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;

/* loaded from: input_file:cn/com/duiba/nezha/engine/biz/message/advert/consumer/AdvertUpdateConsumer.class */
public class AdvertUpdateConsumer implements InitializingBean {
    private static Logger logger = LoggerFactory.getLogger(AdvertUpdateConsumer.class);
    private int pool;
    private String bootstrapServers;
    private String subscribe;
    private Properties props = null;

    @Autowired
    MessageConsumerBo messageConsumerBo;

    @Autowired
    ExecutorService executorService;

    public void afterPropertiesSet() throws Exception {
        start();
    }

    public void setBootstrapServers(String str) {
        this.bootstrapServers = str;
    }

    public String getBootstrapServers() {
        return this.bootstrapServers;
    }

    public void setSubscribe(String str) {
        this.subscribe = str;
    }

    public String getSubscribe() {
        return this.subscribe;
    }

    public void setPool(int i) {
        this.pool = i;
    }

    public int getPool() {
        return this.pool;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Properties getProps(Properties properties) {
        if (properties == null) {
            Properties properties2 = new Properties();
            properties2.put("bootstrap.servers", this.bootstrapServers);
            properties2.put("group.id", "nezha-engine-");
            properties2.put("enable.auto.commit", "false");
            properties2.put("session.timeout.ms", "30000");
            properties2.put("auto.offset.reset", "earliest");
            properties2.put("max.partition.fetch.bytes", "2048");
            properties2.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            properties2.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            properties2.put("max.poll.records", "20");
            this.props = properties2;
        }
        return this.props;
    }

    public void start() {
        for (int i = 0; i < this.pool; i++) {
            this.executorService.submit(new Runnable() { // from class: cn.com.duiba.nezha.engine.biz.message.advert.consumer.AdvertUpdateConsumer.1
                @Override // java.lang.Runnable
                public void run() {
                    KafkaConsumer kafkaConsumer = null;
                    try {
                        try {
                            kafkaConsumer = new KafkaConsumer(AdvertUpdateConsumer.this.getProps(AdvertUpdateConsumer.this.props));
                            kafkaConsumer.subscribe(Arrays.asList(AdvertUpdateConsumer.this.subscribe));
                            while (true) {
                                ConsumerRecords poll = kafkaConsumer.poll(20L);
                                for (TopicPartition topicPartition : poll.partitions()) {
                                    for (ConsumerRecord consumerRecord : poll.records(topicPartition)) {
                                        String str = (String) consumerRecord.value();
                                        if (StringUtils.isNotBlank(str)) {
                                            AdvertUpdateConsumer.logger.info("kafka_message pt = " + topicPartition + ",msg = " + str);
                                            AdvertUpdateConsumer.this.messageConsumerBo.consume(str);
                                        }
                                        kafkaConsumer.commitSync(Collections.singletonMap(topicPartition, new OffsetAndMetadata(consumerRecord.offset() + 1)));
                                    }
                                }
                            }
                        } catch (Exception e) {
                            AdvertUpdateConsumer.logger.error("error:", e);
                            if (kafkaConsumer != null) {
                                kafkaConsumer.close();
                            }
                        }
                    } catch (Throwable th) {
                        if (kafkaConsumer != null) {
                            kafkaConsumer.close();
                        }
                        throw th;
                    }
                }
            });
        }
        logger.info("Consumer started");
    }
}
