package cn.com.duiba.tuia.core.biz.util;

import cn.com.duiba.tuia.core.api.dto.AdvertDto;
import cn.com.duiba.tuia.core.api.dto.AdvertMessageBody;
import cn.com.duiba.tuia.core.api.dto.KafkaAdvertMessage;
import cn.com.duiba.tuia.core.biz.dao.AdvertDAO;
import cn.com.duiba.tuia.core.biz.model.AdvertKafkaInfo;
import cn.com.duiba.tuia.core.common.ErrorCode;
import cn.com.duiba.tuia.core.common.TuiaCoreException;
import com.alibaba.fastjson.JSON;
import com.alibaba.ttl.threadpool.TtlExecutors;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

@Component
/* loaded from: input_file:cn/com/duiba/tuia/core/biz/util/KafkaClient.class */
public class KafkaClient implements InitializingBean {

    @Value("${tuiaCore.kafka.bootstrap.servers}")
    private String bootstrapServers;
    private static final int POOL_MAX_THREAD = 20;

    @Autowired
    private AdvertDAO advertDAO;

    @Value("${tuiaCore.kafka.queueName}")
    private String advertMessageTopic;
    private Logger logger = LoggerFactory.getLogger(KafkaClient.class);
    private Producer<String, String> producer = null;
    private ExecutorService pool = TtlExecutors.getTtlExecutorService(Executors.newFixedThreadPool(20));

    public void afterPropertiesSet() throws Exception {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", this.bootstrapServers);
        properties.put("acks", "1");
        properties.put("retries", 3);
        properties.put("batch.size", 16384);
        properties.put("linger.ms", 0);
        properties.put("request.timeout.ms", 300);
        properties.put("metadata.fetch.timeout.ms", 5000);
        properties.put("buffer.memory", 33554432);
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        this.producer = new KafkaProducer(properties);
    }

    private String sendMsg(String str) throws TuiaCoreException {
        String replaceAll = UUID.randomUUID().toString().replaceAll("-", "");
        try {
            this.producer.send(new ProducerRecord(this.advertMessageTopic, replaceAll, str), new Callback() { // from class: cn.com.duiba.tuia.core.biz.util.KafkaClient.1
                public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
                }
            }).get();
            return replaceAll;
        } catch (InterruptedException | ExecutionException e) {
            this.logger.error("send kafuka message error", e);
            throw new TuiaCoreException(ErrorCode.E0002008);
        }
    }

    public String sendMsg(String str, String str2) throws TuiaCoreException {
        String replaceAll = UUID.randomUUID().toString().replaceAll("-", "");
        try {
            this.producer.send(new ProducerRecord(str, replaceAll, str2), new Callback() { // from class: cn.com.duiba.tuia.core.biz.util.KafkaClient.2
                public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
                }
            }).get();
            return replaceAll;
        } catch (InterruptedException | ExecutionException e) {
            this.logger.error("send kafuka message error", e);
            throw new TuiaCoreException(ErrorCode.E0002008);
        }
    }

    public void sendKafkaMsg(final List<KafkaAdvertMessage> list) {
        try {
            this.pool.submit(new Runnable() { // from class: cn.com.duiba.tuia.core.biz.util.KafkaClient.3
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        ArrayList arrayList = new ArrayList(list.size());
                        HashMap hashMap = new HashMap(list.size());
                        KafkaClient.this.doFilterAdvertMsg(list, arrayList, hashMap);
                        KafkaClient.this.buildMsgAndSendMsg(hashMap, KafkaClient.this.selectAdvertBasebyIds(arrayList));
                    } catch (Exception e) {
                        KafkaClient.this.logger.error(" kafka send msg error the msg=[{}] ", e);
                    }
                }
            });
        } catch (Exception e) {
            this.logger.error(" kafkaClient sendKafkaMsg error the msg=[{}]", e);
        }
    }

    public void sendUpdateAdvertTagMsg(final Long l, final String str, final String str2) throws TuiaCoreException {
        try {
            this.logger.info("send update advertTags kaka info, the advertId=[{}], the newTags=[{}], the srcTags=[{}]", new Object[]{l, str, str2});
            this.pool.submit(new Runnable() { // from class: cn.com.duiba.tuia.core.biz.util.KafkaClient.4
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        if (StringUtils.isEmpty(str2)) {
                            List asList = Arrays.asList(str.split(","));
                            KafkaClient.this.doSendTagMsg(l, asList, "insert-advert-tag");
                            KafkaClient.this.logger.info("advertId=[{}] add tags=[{}] ", l, asList);
                        } else {
                            List<String> stringListByStr = StringTool.getStringListByStr(str2);
                            if (StringUtils.isNotEmpty(str)) {
                                List<String> stringListByStr2 = StringTool.getStringListByStr(str);
                                Iterator<String> it = stringListByStr.iterator();
                                while (it.hasNext()) {
                                    int indexOf = stringListByStr2.indexOf(it.next());
                                    if (indexOf != -1) {
                                        stringListByStr2.remove(indexOf);
                                        it.remove();
                                    }
                                }
                                if (CollectionUtils.isNotEmpty(stringListByStr)) {
                                    KafkaClient.this.doSendTagMsg(l, stringListByStr, "delete-advert-tag");
                                    KafkaClient.this.logger.info("advertId=[{}] delete tags=[{}] ", l, stringListByStr);
                                }
                                if (CollectionUtils.isNotEmpty(stringListByStr2)) {
                                    KafkaClient.this.doSendTagMsg(l, stringListByStr2, "insert-advert-tag");
                                    KafkaClient.this.logger.info("advertId=[{}] add tags=[{}] ", l, stringListByStr2);
                                }
                            } else {
                                KafkaClient.this.doSendTagMsg(l, stringListByStr, "delete-advert-tag");
                                KafkaClient.this.logger.info("advertId=[{}] delete tags=[{}] ", l, stringListByStr);
                            }
                        }
                    } catch (Exception e) {
                        KafkaClient.this.logger.error(" doContrastTags error the msg=[{}] ", e);
                    }
                }
            });
        } catch (Exception e) {
            this.logger.error(" kafkaClient doContrastTags error the msg=[{}]", e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void doSendTagMsg(Long l, List<String> list, String str) throws TuiaCoreException {
        AdvertDto selectById = this.advertDAO.selectById(l);
        ArrayList arrayList = new ArrayList(1);
        AdvertMessageBody advertMessageBody = new AdvertMessageBody();
        advertMessageBody.setAdvertId(l);
        advertMessageBody.setFee(selectById.getFee());
        advertMessageBody.setTags(StringUtils.join(list, ","));
        advertMessageBody.setMsgType(str);
        KafkaAdvertMessage kafkaAdvertMessage = new KafkaAdvertMessage();
        kafkaAdvertMessage.setAdvertMessageBody(advertMessageBody);
        kafkaAdvertMessage.setSelectTag(0);
        arrayList.add(kafkaAdvertMessage);
        sendKafkaMsg(arrayList);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public List<AdvertKafkaInfo> selectAdvertBasebyIds(List<Long> list) throws TuiaCoreException {
        List<AdvertKafkaInfo> list2 = null;
        if (CollectionUtils.isNotEmpty(list)) {
            list2 = this.advertDAO.selectAdvertKafkaInfo(list);
        }
        return list2;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void doFilterAdvertMsg(List<KafkaAdvertMessage> list, List<Long> list2, HashMap<Long, AdvertMessageBody> hashMap) throws TuiaCoreException {
        for (KafkaAdvertMessage kafkaAdvertMessage : list) {
            if (1 == kafkaAdvertMessage.getSelectTag()) {
                list2.add(kafkaAdvertMessage.getAdvertMessageBody().getAdvertId());
                hashMap.put(kafkaAdvertMessage.getAdvertMessageBody().getAdvertId(), kafkaAdvertMessage.getAdvertMessageBody());
            } else {
                doSendMsg(kafkaAdvertMessage.getAdvertMessageBody());
            }
        }
    }

    private void doSendMsg(AdvertMessageBody advertMessageBody) throws TuiaCoreException {
        String jSONString = JSON.toJSONString(advertMessageBody);
        sendMsg(jSONString);
        this.logger.info(" sendKafkaMsg msg=[{}]", jSONString);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void buildMsgAndSendMsg(HashMap<Long, AdvertMessageBody> hashMap, List<AdvertKafkaInfo> list) throws TuiaCoreException {
        if (CollectionUtils.isNotEmpty(list)) {
            for (AdvertKafkaInfo advertKafkaInfo : list) {
                AdvertMessageBody advertMessageBody = hashMap.get(advertKafkaInfo.getId());
                advertMessageBody.setTags(advertKafkaInfo.getTags());
                doSendMsg(advertMessageBody);
            }
        }
    }
}
