/*
 * Decompiled with CFR 0.152.
 */
package org.unidal.cat.message.storage.internals;

import com.dianping.cat.Cat;
import com.dianping.cat.config.server.ServerConfigManager;
import com.dianping.cat.statistic.ServerStatisticManager;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import org.codehaus.plexus.logging.LogEnabled;
import org.codehaus.plexus.logging.Logger;
import org.unidal.cat.message.storage.Block;
import org.unidal.cat.message.storage.BlockDumper;
import org.unidal.cat.message.storage.BlockWriter;
import org.unidal.cat.message.storage.exception.BlockQueueFullException;
import org.unidal.helper.Threads;
import org.unidal.lookup.ContainerHolder;
import org.unidal.lookup.annotation.Inject;
import org.unidal.lookup.annotation.Named;

@Named(type=BlockDumper.class, instantiationStrategy="per-lookup")
public class DefaultBlockDumper
extends ContainerHolder
implements BlockDumper,
LogEnabled {
    @Inject
    private ServerStatisticManager m_statisticManager;
    @Inject
    private ServerConfigManager m_configManager;
    private List<BlockingQueue<Block>> m_queues = new ArrayList<BlockingQueue<Block>>();
    private List<BlockWriter> m_writers = new ArrayList<BlockWriter>();
    private int m_failCount = -1;
    private Logger m_logger;

    @Override
    public void awaitTermination() throws InterruptedException {
        for (int index = 0; index < 100; ++index) {
            boolean allEmpty = true;
            for (BlockingQueue<Block> queue : this.m_queues) {
                if (queue.isEmpty()) continue;
                allEmpty = false;
                break;
            }
            if (allEmpty) break;
            TimeUnit.MILLISECONDS.sleep(100L);
        }
        for (BlockWriter writer : this.m_writers) {
            writer.shutdown();
            super.release((Object)writer);
        }
    }

    @Override
    public void dump(Block block) throws IOException {
        String domain = block.getDomain();
        int hash = Math.abs(domain.hashCode());
        int index = hash % this.m_writers.size();
        BlockingQueue<Block> queue = this.m_queues.get(index);
        boolean success = queue.offer(block);
        if (!success) {
            this.m_statisticManager.addBlockLoss(1L);
            if (++this.m_failCount % 100 == 0) {
                Cat.logError((Throwable)new BlockQueueFullException("Error when adding block to queue, fails: " + this.m_failCount));
                this.m_logger.info("block dump queue is full " + this.m_failCount + " index:" + index);
            }
        } else {
            this.m_statisticManager.addBlockTotal(1L);
        }
    }

    public void enableLogging(Logger logger) {
        this.m_logger = logger;
    }

    @Override
    public void initialize(int hour) {
        int threads = this.m_configManager.getMessageDumpThreads();
        for (int i = 0; i < threads; ++i) {
            ArrayBlockingQueue<Block> queue = new ArrayBlockingQueue<Block>(10000);
            BlockWriter writer = (BlockWriter)this.lookup(BlockWriter.class);
            this.m_queues.add(queue);
            this.m_writers.add(writer);
            writer.initialize(hour, i, queue);
            Threads.forGroup((String)"Cat").start((Runnable)((Object)writer));
        }
    }
}

