/*
 * Decompiled with CFR 0.152.
 */
package org.unidal.cat.message.storage.internals;

import com.dianping.cat.Cat;
import com.dianping.cat.config.server.ServerConfigManager;
import com.dianping.cat.message.internal.MessageId;
import com.dianping.cat.message.spi.MessageTree;
import com.dianping.cat.statistic.ServerStatisticManager;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.codehaus.plexus.logging.LogEnabled;
import org.codehaus.plexus.logging.Logger;
import org.unidal.cat.message.storage.BlockDumperManager;
import org.unidal.cat.message.storage.BucketManager;
import org.unidal.cat.message.storage.MessageDumper;
import org.unidal.cat.message.storage.MessageProcessor;
import org.unidal.cat.message.storage.exception.MessageQueueFullException;
import org.unidal.helper.Threads;
import org.unidal.lookup.ContainerHolder;
import org.unidal.lookup.annotation.Inject;
import org.unidal.lookup.annotation.Named;

@Named(type=MessageDumper.class, instantiationStrategy="per-lookup")
public class DefaultMessageDumper
extends ContainerHolder
implements MessageDumper,
LogEnabled {
    @Inject
    private BlockDumperManager m_blockDumperManager;
    @Inject(value={"local"})
    private BucketManager m_bucketManager;
    @Inject
    private ServerStatisticManager m_statisticManager;
    @Inject
    private ServerConfigManager m_configManager;
    private List<BlockingQueue<MessageTree>> m_queues = new ArrayList<BlockingQueue<MessageTree>>();
    private List<MessageProcessor> m_processors = new ArrayList<MessageProcessor>();
    private AtomicInteger m_failCount = new AtomicInteger(-1);
    private Logger m_logger;
    private long m_total;
    private int m_processThreads;

    @Override
    public void awaitTermination(int hour) throws InterruptedException {
        SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd HH:mm:ss");
        String date = sdf.format(new Date((long)hour * 3600000L));
        this.m_logger.info("starting close message processor " + date);
        this.closeMessageProcessor();
        this.m_logger.info("end close dumper processor " + date);
        this.m_logger.info("starting close dumper manager " + date);
        this.m_blockDumperManager.close(hour);
        this.m_logger.info("end close dumper manager " + date);
        this.m_logger.info("starting close bucket manager " + date);
        this.m_bucketManager.closeBuckets(hour);
        this.m_logger.info("end close bucket manager " + date);
    }

    private void closeMessageProcessor() throws InterruptedException {
        while (true) {
            boolean allEmpty = true;
            for (BlockingQueue<MessageTree> queue : this.m_queues) {
                if (queue.isEmpty()) continue;
                allEmpty = false;
                break;
            }
            if (allEmpty) break;
            TimeUnit.MILLISECONDS.sleep(1L);
        }
        for (MessageProcessor processor : this.m_processors) {
            processor.shutdown();
            super.release((Object)processor);
        }
    }

    public void enableLogging(Logger logger) {
        this.m_logger = logger;
    }

    private int getIndex(String key) {
        return Math.abs(key.hashCode()) % this.m_processThreads;
    }

    @Override
    public void initialize(int hour) {
        int processThreads;
        this.m_processThreads = processThreads = this.m_configManager.getMessageProcessorThreads();
        for (int i = 0; i < processThreads; ++i) {
            ArrayBlockingQueue<MessageTree> queue = new ArrayBlockingQueue<MessageTree>(10000);
            MessageProcessor processor = (MessageProcessor)this.lookup(MessageProcessor.class);
            this.m_queues.add(queue);
            this.m_processors.add(processor);
            processor.initialize(hour, i, queue);
            Threads.forGroup((String)"Cat").start((Runnable)((Object)processor));
        }
    }

    @Override
    public void process(MessageTree tree) {
        MessageId id = tree.getFormatMessageId();
        String domain = id.getDomain();
        int index = this.getIndex(id.getIpAddressInHex());
        BlockingQueue<MessageTree> queue = this.m_queues.get(index);
        boolean success = queue.offer(tree);
        if (!success) {
            this.m_statisticManager.addMessageDumpLoss(1L);
            if (this.m_failCount.incrementAndGet() % 100 == 0) {
                Cat.logError((Throwable)new MessageQueueFullException("Error when adding message to queue, fails: " + this.m_failCount));
                this.m_logger.info("message tree queue is full " + this.m_failCount + " index " + index);
            }
        } else {
            this.m_statisticManager.addMessageSize(domain, tree.getBuffer().readableBytes());
            if (++this.m_total % 1000L == 0L) {
                this.m_statisticManager.addMessageDump(1000L);
            }
        }
    }
}

