package cn.com.duiba.crecord.message;

import cn.com.duiba.service.KafkaClient;
import cn.com.duiba.service.ThreadPoolService;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

@Service
/* loaded from: input_file:cn/com/duiba/crecord/message/CrecordConsumer.class */
public class CrecordConsumer implements InitializingBean {

    @Value("${notify.topic.crecord.requset}")
    private String crecordRequest;

    @Autowired
    private KafkaClient kafkaClient;

    @Autowired
    private CrecordAsyncHttp crecordAsyncHttp;

    @Autowired
    private ThreadPoolService threadPoolService;
    private static Logger log = LoggerFactory.getLogger(CrecordConsumer.class);
    private static int pool = 3;

    public void afterPropertiesSet() {
        start();
    }

    public synchronized void start() {
        for (int i = 0; i < pool; i++) {
            this.threadPoolService.submit(new Runnable() { // from class: cn.com.duiba.crecord.message.CrecordConsumer.1
                @Override // java.lang.Runnable
                public void run() {
                    CrecordConsumer.this.whilePoll();
                }
            });
        }
        log.info("CrecordConsumer started");
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void whilePoll() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", this.kafkaClient.getBootstrapServers());
        properties.put("group.id", "notify-center");
        properties.put("enable.auto.commit", "false");
        properties.put("session.timeout.ms", "30000");
        properties.put("auto.offset.reset", "earliest");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        try {
            KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
            Throwable th = null;
            try {
                try {
                    kafkaConsumer.subscribe(Arrays.asList(this.crecordRequest));
                    while (true) {
                        ConsumerRecords<String, String> poll = kafkaConsumer.poll(100L);
                        Iterator it = poll.partitions().iterator();
                        while (it.hasNext()) {
                            commit(kafkaConsumer, poll, (TopicPartition) it.next());
                        }
                    }
                } finally {
                }
            } finally {
            }
        } catch (Exception e) {
            log.error("error:", e);
        }
    }

    private void commit(KafkaConsumer<String, String> kafkaConsumer, ConsumerRecords<String, String> consumerRecords, TopicPartition topicPartition) {
        for (ConsumerRecord consumerRecord : consumerRecords.records(topicPartition)) {
            try {
                this.crecordAsyncHttp.asyncSubmit((String) consumerRecord.value());
                kafkaConsumer.commitSync(Collections.singletonMap(topicPartition, new OffsetAndMetadata(consumerRecord.offset() + 1)));
            } catch (Exception e) {
                log.error("CrecordConsumer error:", e);
            }
        }
    }
}
