package org.unidal.cat.message.storage.internals;

import com.dianping.cat.Cat;
import com.dianping.cat.config.server.ServerConfigManager;
import com.dianping.cat.statistic.ServerStatisticManager;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import org.codehaus.plexus.logging.LogEnabled;
import org.codehaus.plexus.logging.Logger;
import org.unidal.cat.message.storage.Block;
import org.unidal.cat.message.storage.BlockDumper;
import org.unidal.cat.message.storage.BlockWriter;
import org.unidal.cat.message.storage.exception.BlockQueueFullException;
import org.unidal.helper.Threads;
import org.unidal.lookup.ContainerHolder;
import org.unidal.lookup.annotation.Inject;
import org.unidal.lookup.annotation.Named;

@Named(type = BlockDumper.class, instantiationStrategy = Named.PER_LOOKUP)
/* loaded from: input_file:WEB-INF/lib/cat-hadoop-3.0.2.jar:org/unidal/cat/message/storage/internals/DefaultBlockDumper.class */
public class DefaultBlockDumper extends ContainerHolder implements BlockDumper, LogEnabled {

    @Inject
    private ServerStatisticManager m_statisticManager;

    @Inject
    private ServerConfigManager m_configManager;
    private List<BlockingQueue<Block>> m_queues = new ArrayList();
    private List<BlockWriter> m_writers = new ArrayList();
    private int m_failCount = -1;
    private Logger m_logger;

    @Override // org.unidal.cat.message.storage.BlockDumper
    public void awaitTermination() throws InterruptedException {
        for (int i = 0; i < 100; i++) {
            boolean z = true;
            Iterator<BlockingQueue<Block>> it = this.m_queues.iterator();
            while (true) {
                if (it.hasNext()) {
                    if (!it.next().isEmpty()) {
                        z = false;
                        break;
                    }
                } else {
                    break;
                }
            }
            if (z) {
                break;
            }
            TimeUnit.MILLISECONDS.sleep(100L);
        }
        for (BlockWriter blockWriter : this.m_writers) {
            blockWriter.shutdown();
            super.release(blockWriter);
        }
    }

    @Override // org.unidal.cat.message.storage.BlockDumper
    public void dump(Block block) throws IOException {
        int abs = Math.abs(block.getDomain().hashCode()) % this.m_writers.size();
        if (this.m_queues.get(abs).offer(block)) {
            this.m_statisticManager.addBlockTotal(1L);
            return;
        }
        this.m_statisticManager.addBlockLoss(1L);
        int i = this.m_failCount + 1;
        this.m_failCount = i;
        if (i % 100 == 0) {
            Cat.logError(new BlockQueueFullException("Error when adding block to queue, fails: " + this.m_failCount));
            this.m_logger.info("block dump queue is full " + this.m_failCount + " index:" + abs);
        }
    }

    @Override // org.codehaus.plexus.logging.LogEnabled
    public void enableLogging(Logger logger) {
        this.m_logger = logger;
    }

    @Override // org.unidal.cat.message.storage.BlockDumper
    public void initialize(int i) {
        int messageDumpThreads = this.m_configManager.getMessageDumpThreads();
        for (int i2 = 0; i2 < messageDumpThreads; i2++) {
            ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(10000);
            BlockWriter blockWriter = (BlockWriter) lookup(BlockWriter.class);
            this.m_queues.add(arrayBlockingQueue);
            this.m_writers.add(blockWriter);
            blockWriter.initialize(i, i2, arrayBlockingQueue);
            Threads.forGroup("Cat").start(blockWriter);
        }
    }
}
