package org.unidal.cat.message.storage.internals;

import com.dianping.cat.Cat;
import com.dianping.cat.config.server.ServerConfigManager;
import com.dianping.cat.message.Transaction;
import com.dianping.cat.message.internal.MessageId;
import com.dianping.cat.message.spi.MessageTree;
import io.netty.buffer.ByteBuf;
import io.netty.util.ReferenceCountUtil;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.configuration.DataConfiguration;
import org.unidal.cat.message.storage.Block;
import org.unidal.cat.message.storage.BlockDumper;
import org.unidal.cat.message.storage.BlockDumperManager;
import org.unidal.cat.message.storage.MessageFinder;
import org.unidal.cat.message.storage.MessageFinderManager;
import org.unidal.cat.message.storage.MessageProcessor;
import org.unidal.lookup.annotation.Inject;
import org.unidal.lookup.annotation.Named;

@Named(type = MessageProcessor.class, instantiationStrategy = Named.PER_LOOKUP)
/* loaded from: input_file:WEB-INF/lib/cat-hadoop-3.0.4.jar:org/unidal/cat/message/storage/internals/DefaultMessageProcessor.class */
public class DefaultMessageProcessor implements MessageProcessor, MessageFinder {

    @Inject
    private BlockDumperManager m_blockDumperManager;

    @Inject
    private MessageFinderManager m_finderManager;

    @Inject
    private ServerConfigManager m_configManger;
    private BlockDumper m_dumper;
    private int m_index;
    private BlockingQueue<MessageTree> m_queue;
    private ConcurrentHashMap<String, Block> m_blocks = new ConcurrentHashMap<>();
    private int m_hour;
    private AtomicBoolean m_enabled;
    private CountDownLatch m_latch;
    private int m_count;

    @Override // org.unidal.cat.message.storage.MessageFinder
    public ByteBuf find(MessageId messageId) {
        Block block = this.m_blocks.get(messageId.getDomain());
        if (block != null) {
            return block.find(messageId);
        }
        return null;
    }

    @Override // org.unidal.helper.Threads.Task
    public String getName() {
        return getClass().getSimpleName() + " " + new SimpleDateFormat(DataConfiguration.DEFAULT_DATE_FORMAT).format(new Date(TimeUnit.HOURS.toMillis(this.m_hour))) + HelpFormatter.DEFAULT_OPT_PREFIX + this.m_index;
    }

    @Override // org.unidal.cat.message.storage.MessageProcessor
    public void initialize(int i, int i2, BlockingQueue<MessageTree> blockingQueue) {
        this.m_index = i2;
        this.m_queue = blockingQueue;
        this.m_enabled = new AtomicBoolean(true);
        this.m_dumper = this.m_blockDumperManager.findOrCreate(i);
        this.m_hour = i;
        this.m_latch = new CountDownLatch(1);
        this.m_finderManager.register(i, this);
    }

    private boolean isMonitor() {
        int i = this.m_count + 1;
        this.m_count = i;
        return i % 100000 == 0;
    }

    private MessageTree pollMessage() throws InterruptedException {
        return this.m_queue.poll(5L, TimeUnit.MILLISECONDS);
    }

    private void processMessage(MessageTree messageTree) {
        MessageId formatMessageId = messageTree.getFormatMessageId();
        String domain = formatMessageId.getDomain();
        int hour = formatMessageId.getHour();
        Block block = this.m_blocks.get(domain);
        if (block == null) {
            block = new DefaultBlock(domain, hour);
            this.m_blocks.put(domain, block);
        }
        ByteBuf buffer = messageTree.getBuffer();
        try {
            try {
                if (block.isFull()) {
                    block.finish();
                    this.m_dumper.dump(block);
                    block = new DefaultBlock(domain, hour);
                    this.m_blocks.put(domain, block);
                }
                block.pack(formatMessageId, buffer);
                ReferenceCountUtil.release(buffer);
            } catch (Exception e) {
                Cat.logError(e);
                ReferenceCountUtil.release(buffer);
            }
        } catch (Throwable th) {
            ReferenceCountUtil.release(buffer);
            throw th;
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        while (true) {
            try {
                if (!this.m_enabled.get() && this.m_queue.isEmpty()) {
                    break;
                }
                MessageTree pollMessage = pollMessage();
                if (pollMessage != null) {
                    if (isMonitor()) {
                        Transaction newTransaction = Cat.newTransaction("Processor", "index-" + this.m_index);
                        processMessage(pollMessage);
                        newTransaction.setStatus("0");
                        newTransaction.complete();
                    } else {
                        processMessage(pollMessage);
                    }
                }
            } catch (InterruptedException e) {
            }
        }
        for (Block block : this.m_blocks.values()) {
            try {
                block.finish();
                this.m_dumper.dump(block);
            } catch (IOException e2) {
            }
        }
        this.m_blocks.clear();
        this.m_latch.countDown();
    }

    @Override // org.unidal.helper.Threads.Task
    public void shutdown() {
        this.m_enabled.set(false);
        try {
            this.m_latch.await();
        } catch (InterruptedException e) {
        }
    }
}
