/*
 * Decompiled with CFR 0.152.
 */
package com.taobao.api.internal.tmc;

import com.taobao.api.internal.tmc.KeySelector;
import com.taobao.api.internal.tmc.Message;
import com.taobao.api.internal.tmc.TmcClient;
import com.taobao.api.internal.tmc.TmcHandler;
import com.taobao.api.internal.util.json.JSONValidatingReader;
import com.taobao.top.link.endpoint.EndpointContext;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class DuplicateRemoverTmcHandler
extends TmcHandler {
    private static final Log log = LogFactory.getLog(DuplicateRemoverTmcHandler.class);
    private static final Log statlog = LogFactory.getLog((String)DuplicateRemoverTmcHandler.class.getSimpleName());
    private static final long TIMER_DELAY = 10L;
    private static final long TIMER_PERIOD = 500L;
    private AtomicLong totalMessageNum = new AtomicLong(0L);
    private AtomicLong curProduceNum = new AtomicLong(0L);
    private AtomicLong hasConsumeNum = new AtomicLong(0L);
    private ConcurrentHashMap<String, Message> msgMap;
    private ArrayBlockingQueue<String> msgKeyQueue;
    private KeySelector keySelector;
    private Timer timer;

    public DuplicateRemoverTmcHandler(TmcClient tmcClient) {
        super(tmcClient);
        this.keySelector = tmcClient.getKeySelector() == null ? new MsgKeySelector() : tmcClient.getKeySelector();
        this.msgMap = new ConcurrentHashMap();
        this.msgKeyQueue = new ArrayBlockingQueue(tmcClient.getQueueSize());
        this.timer = new Timer("tmc-duplicate-remover", true);
        this.timer.schedule((TimerTask)new MsgScheduleTask(), 10L, 500L);
    }

    public void onMessage(EndpointContext context) throws Exception {
        Message msg;
        String key;
        Map<String, Object> rawMsg = context.getMessage();
        if (log.isDebugEnabled()) {
            log.debug((Object)String.format("onMessage from %s: %s", context.getMessageFrom(), rawMsg));
        }
        if ((key = this.keySelector.selectKey(msg = this.parse(rawMsg))) == null) {
            super.handleMessage(msg, false);
        } else if (!this.put(key, msg)) {
            super.handleMessage(msg, true);
            this.log(key, msg.getId());
        }
    }

    private void log(String key, Long msgId) {
        StringBuilder buf = new StringBuilder();
        buf.append(System.currentTimeMillis()).append(",");
        buf.append(this.tmcClient.getAppKey()).append(",");
        buf.append(this.tmcClient.getGroupName()).append(",");
        buf.append(msgId).append(",");
        buf.append(key);
        statlog.fatal((Object)buf.toString());
    }

    public void close() {
        super.close();
        if (this.timer != null) {
            this.timer.cancel();
            this.timer = null;
        }
    }

    private boolean put(String key, Message message) throws InterruptedException {
        Message obj = this.msgMap.putIfAbsent(key, message);
        if (obj == null) {
            this.msgKeyQueue.put(key);
            this.totalMessageNum.incrementAndGet();
            return true;
        }
        return false;
    }

    private class MsgScheduleTask
    extends TimerTask {
        private MsgScheduleTask() {
        }

        public void run() {
            long beginTime = System.currentTimeMillis();
            long leftNum = DuplicateRemoverTmcHandler.this.curProduceNum.get();
            while (DuplicateRemoverTmcHandler.this.hasConsumeNum.get() < leftNum && System.currentTimeMillis() - beginTime <= 500L) {
                String key = null;
                try {
                    key = (String)DuplicateRemoverTmcHandler.this.msgKeyQueue.take();
                    DuplicateRemoverTmcHandler.this.handleMessage((Message)DuplicateRemoverTmcHandler.this.msgMap.remove(key), false);
                    DuplicateRemoverTmcHandler.this.hasConsumeNum.incrementAndGet();
                }
                catch (Exception e) {
                    log.error((Object)("handle message fail: %s" + key), (Throwable)e);
                }
            }
            DuplicateRemoverTmcHandler.this.curProduceNum.set(DuplicateRemoverTmcHandler.this.totalMessageNum.get());
        }
    }

    private class MsgKeySelector
    implements KeySelector {
        private MsgKeySelector() {
        }

        public String selectKey(Message message) {
            String topic = message.getTopic();
            String key = null;
            key = topic.startsWith("taobao_trade") || topic.equals("taobao_datapush_SynTrade") ? "trade_" + this.getId(message, "tid") : (topic.startsWith("taobao_item") || topic.equals("taobao_datapush_SynItem") ? "item_" + this.getId(message, "num_iid") : (topic.startsWith("taobao_refund") ? "refund_" + this.getId(message, "refund_id") : null));
            return key;
        }

        private String getId(Message message, String field) {
            JSONValidatingReader reader = new JSONValidatingReader();
            message.setContentMap((Map)reader.read(message.getContent()));
            Object id = message.getContentMap().get(field);
            return String.valueOf(id);
        }
    }
}

