package cn.com.duiba.tuia.media.message.consumer;

import cn.com.duiba.tuia.media.common.exception.TuiaMediaException;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Value;

/* loaded from: input_file:cn/com/duiba/tuia/media/message/consumer/AbstractKafkaConsumer.class */
public abstract class AbstractKafkaConsumer implements InitializingBean {
    public static final int DEFAULT_POOL_SIZE = 1;
    protected Logger log = LoggerFactory.getLogger("message");

    @Value("${media.kafka.group.id}")
    private String kafkaGroupId;

    @Value("${media.kafka.bootstrap.servers}")
    private String kafkaServers;
    private ExecutorService executorService;
    private boolean running;

    /* JADX INFO: Access modifiers changed from: private */
    public void createMessageConsumer() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", this.kafkaServers);
        properties.put("group.id", this.kafkaGroupId);
        properties.put("enable.auto.commit", "false");
        properties.put("session.timeout.ms", "30000");
        properties.put("auto.offset.reset", "earliest");
        properties.put("max.partition.fetch.bytes", "2048");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);
        kafkaConsumer.subscribe(getTopics());
        while (this.running) {
            try {
                ConsumerRecords poll = kafkaConsumer.poll(100L);
                for (TopicPartition topicPartition : poll.partitions()) {
                    for (ConsumerRecord<String, String> consumerRecord : poll.records(topicPartition)) {
                        try {
                            readMessage(consumerRecord);
                            kafkaConsumer.commitSync(Collections.singletonMap(topicPartition, new OffsetAndMetadata(consumerRecord.offset() + 1)));
                        } catch (Exception e) {
                            if (e instanceof KafkaException) {
                                this.log.error(String.format("Kafka Error, offset=[%s]", Long.valueOf(consumerRecord.offset())), e);
                            } else {
                                this.log.error(String.format("Consumer Error, offset=[%s]", Long.valueOf(consumerRecord.offset())), e);
                            }
                        }
                    }
                }
            } finally {
                kafkaConsumer.close();
            }
        }
    }

    public synchronized void start() throws TuiaMediaException {
        if (this.running) {
            return;
        }
        this.running = true;
        this.executorService = Executors.newFixedThreadPool(getPoolSize(), new ThreadFactory() { // from class: cn.com.duiba.tuia.media.message.consumer.AbstractKafkaConsumer.1
            private int i = 0;

            @Override // java.util.concurrent.ThreadFactory
            public Thread newThread(Runnable runnable) {
                StringBuilder append = new StringBuilder().append("Thread-").append(getClass().getName()).append("-Consumer-");
                int i = this.i;
                this.i = i + 1;
                return new Thread(runnable, append.append(i).toString());
            }
        });
        for (int i = 0; i < getPoolSize(); i++) {
            this.executorService.submit(new Runnable() { // from class: cn.com.duiba.tuia.media.message.consumer.AbstractKafkaConsumer.2
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        AbstractKafkaConsumer.this.createMessageConsumer();
                    } catch (Exception e) {
                        AbstractKafkaConsumer.this.log.error("error:", e);
                    }
                }
            });
        }
        this.log.info("Consumer started");
    }

    public synchronized void stop() {
        this.running = false;
        if (this.executorService != null) {
            this.executorService.shutdown();
            this.executorService = null;
        }
    }

    protected int getPoolSize() {
        return 1;
    }

    protected abstract List<String> getTopics();

    protected abstract void readMessage(ConsumerRecord<String, String> consumerRecord);

    public void afterPropertiesSet() throws Exception {
        start();
    }
}
