package cn.com.duiba.tuia.message.rocketmq.listener;

import cn.com.duiba.tuia.cache.AdvertMapCacheManager;
import cn.com.duiba.tuia.cache.AdvertPkgCacheService;
import cn.com.duiba.tuia.dao.engine.AdvertOrientationPackageDAO;
import cn.com.duiba.tuia.domain.dataobject.AdvertOrientationPackageDO;
import cn.com.duiba.tuia.service.AdvertOrientationService;
import cn.com.duiba.tuia.task.MsgDelayTask;
import cn.com.duiba.wolf.utils.DateUtils;
import cn.com.tuia.advert.enums.PkgPutTypeEnum;
import com.alibaba.fastjson.JSON;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Executors;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
/* loaded from: input_file:cn/com/duiba/tuia/message/rocketmq/listener/RefreshAdvertOrientPkgMsgHandler.class */
public class RefreshAdvertOrientPkgMsgHandler extends AbstractMessageResultHandler {

    @Autowired
    private AdvertOrientationService orientationService;

    @Autowired
    private AdvertOrientationPackageDAO orientationPackageDAO;

    @Autowired
    private AdvertMapCacheManager advertMapCacheManager;

    @Autowired
    private AdvertPkgCacheService advertPkgCacheService;
    private static final String ORIENTATION_ID = "orientationId";
    private static final String ADVERT_TYPE = "advertType";
    private static final long DELAY_MS = 1000;
    private static final ConcurrentHashMap<String, List<String>> map = new ConcurrentHashMap<>();
    private static final DelayQueue<MsgDelayTask> queue = new DelayQueue<>();
    private static final Integer QUEUE_SIZE_LIMIT = 1000;

    @Override // cn.com.duiba.tuia.message.rocketmq.listener.AbstractMessageResultHandler
    public String getListenTag() {
        return "updateAdvertOrientPackageMsg";
    }

    @Override // cn.com.duiba.tuia.message.rocketmq.listener.AbstractMessageResultHandler
    public void consumer(String str) {
        Long valueOf;
        this.logger.info("接收到消息，tag=" + getListenTag() + ",msg=" + str);
        if (StringUtils.isNotBlank(str) && StringUtils.isNumeric(str)) {
            valueOf = Long.valueOf(Long.parseLong(str));
        } else {
            Map map2 = (Map) JSON.parseObject(str, Map.class);
            Object obj = map2.get(ADVERT_TYPE);
            Object obj2 = map2.get(ORIENTATION_ID);
            if (obj2 == null) {
                this.logger.error("handle orient package massage orientationId is null error");
                return;
            }
            valueOf = Long.valueOf(obj2.toString());
            Integer num = obj == null ? null : (Integer) obj;
            if (num != null && num.intValue() != 1) {
                return;
            }
        }
        preExecute(String.valueOf(valueOf));
        this.logger.info("success，tag=" + getListenTag() + ",msg=" + str);
    }

    private void preExecute(String str) {
        String str2 = str + DateUtils.getSecondOnlyStr(new Date());
        boolean containsKey = map.containsKey(str2);
        putToMap(str, str2);
        if (!containsKey) {
            execute(str, false);
        } else {
            if (queue.size() >= QUEUE_SIZE_LIMIT.intValue()) {
                this.logger.info("updateAdvertOrientPackageMsg消息, {}, 超出队列限制长度直接执行, orientId={}", DateUtils.getMillisecond(), str);
                execute(str, false);
                return;
            }
            this.logger.info("updateAdvertOrientPackageMsg消息, {}, 加入延时队列, orientId={}", DateUtils.getMillisecond(), str);
        }
        queue.add((DelayQueue<MsgDelayTask>) new MsgDelayTask(str, str2, System.currentTimeMillis() + 1000));
    }

    private void putToMap(String str, String str2) {
        map.compute(str2, (str3, list) -> {
            if (null == list) {
                list = new ArrayList();
            }
            list.add(str);
            return list;
        });
    }

    private void execute(String str, boolean z) {
        Long valueOf = Long.valueOf(str);
        AdvertOrientationPackageDO selectById = this.orientationPackageDAO.selectById(valueOf);
        if (selectById == null || !selectById.getPkgPutType().equals(PkgPutTypeEnum.INTERACTIVE_TYPE.getCode())) {
            return;
        }
        this.orientationService.updateOrientation(selectById);
        Long advertId = selectById.getAdvertId();
        if (selectById.getIsDefault().equals(1)) {
            valueOf = 0L;
        }
        this.advertMapCacheManager.updateValidPkgFilterCache(advertId, valueOf);
        this.advertPkgCacheService.updateAdvertOrientPkgCache(advertId + "|" + valueOf);
        Logger logger = this.logger;
        Object[] objArr = new Object[3];
        objArr[0] = DateUtils.getMillisecond();
        objArr[1] = z ? "延迟消费" : "直接消费";
        objArr[2] = str;
        logger.info("updateAdvertOrientPackageMsg消息, {}, {}, orientId={}", objArr);
    }

    public void afterPropertiesSet() throws Exception {
        RocketMqMessageListener.registerCallback(this);
        Executors.newSingleThreadExecutor().submit(() -> {
            while (true) {
                try {
                    MsgDelayTask take = queue.take();
                    List<String> list = map.get(take.getKey());
                    if (null != list && list.size() > 1) {
                        execute(list.get(0), true);
                    }
                    map.remove(take.getKey());
                } catch (Exception e) {
                    this.logger.error("updateAdvertOrientPackageMsg消息, 延时线程执行异常", e);
                }
            }
        });
    }
}
