/*
 * Decompiled with CFR 0.152.
 */
package org.unidal.cat.message.storage.internals;

import com.dianping.cat.Cat;
import com.dianping.cat.config.server.ServerConfigManager;
import com.dianping.cat.message.Transaction;
import com.dianping.cat.message.internal.MessageId;
import com.dianping.cat.message.spi.MessageTree;
import io.netty.buffer.ByteBuf;
import io.netty.util.ReferenceCountUtil;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.unidal.cat.message.storage.Block;
import org.unidal.cat.message.storage.BlockDumper;
import org.unidal.cat.message.storage.BlockDumperManager;
import org.unidal.cat.message.storage.MessageFinder;
import org.unidal.cat.message.storage.MessageFinderManager;
import org.unidal.cat.message.storage.MessageProcessor;
import org.unidal.cat.message.storage.internals.DefaultBlock;
import org.unidal.lookup.annotation.Inject;
import org.unidal.lookup.annotation.Named;

@Named(type=MessageProcessor.class, instantiationStrategy="per-lookup")
public class DefaultMessageProcessor
implements MessageProcessor,
MessageFinder {
    @Inject
    private BlockDumperManager m_blockDumperManager;
    @Inject
    private MessageFinderManager m_finderManager;
    @Inject
    private ServerConfigManager m_configManger;
    private BlockDumper m_dumper;
    private int m_index;
    private BlockingQueue<MessageTree> m_queue;
    private ConcurrentHashMap<String, Block> m_blocks = new ConcurrentHashMap();
    private int m_hour;
    private AtomicBoolean m_enabled;
    private CountDownLatch m_latch;
    private int m_count;

    @Override
    public ByteBuf find(MessageId id) {
        String domain = id.getDomain();
        Block block = this.m_blocks.get(domain);
        if (block != null) {
            return block.find(id);
        }
        return null;
    }

    public String getName() {
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        return this.getClass().getSimpleName() + " " + sdf.format(new Date(TimeUnit.HOURS.toMillis(this.m_hour))) + "-" + this.m_index;
    }

    @Override
    public void initialize(int hour, int index, BlockingQueue<MessageTree> queue) {
        this.m_index = index;
        this.m_queue = queue;
        this.m_enabled = new AtomicBoolean(true);
        this.m_dumper = this.m_blockDumperManager.findOrCreate(hour);
        this.m_hour = hour;
        this.m_latch = new CountDownLatch(1);
        this.m_finderManager.register(hour, this);
    }

    private boolean isMonitor() {
        return ++this.m_count % 100000 == 0;
    }

    private MessageTree pollMessage() throws InterruptedException {
        return this.m_queue.poll(5L, TimeUnit.MILLISECONDS);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void processMessage(MessageTree tree) {
        MessageId id = tree.getFormatMessageId();
        String domain = id.getDomain();
        int hour = id.getHour();
        Block block = this.m_blocks.get(domain);
        if (block == null) {
            block = new DefaultBlock(domain, hour);
            this.m_blocks.put(domain, block);
        }
        ByteBuf buffer = tree.getBuffer();
        try {
            if (block.isFull()) {
                block.finish();
                this.m_dumper.dump(block);
                block = new DefaultBlock(domain, hour);
                this.m_blocks.put(domain, block);
            }
            block.pack(id, buffer);
        }
        catch (Exception e) {
            Cat.logError((Throwable)e);
        }
        finally {
            ReferenceCountUtil.release((Object)buffer);
        }
    }

    public void run() {
        try {
            while (this.m_enabled.get() || !this.m_queue.isEmpty()) {
                MessageTree tree = this.pollMessage();
                if (tree == null) continue;
                if (this.isMonitor()) {
                    Transaction t = Cat.newTransaction((String)"Processor", (String)("index-" + this.m_index));
                    this.processMessage(tree);
                    t.setStatus("0");
                    t.complete();
                    continue;
                }
                this.processMessage(tree);
            }
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
        for (Block block : this.m_blocks.values()) {
            try {
                block.finish();
                this.m_dumper.dump(block);
            }
            catch (IOException iOException) {}
        }
        this.m_blocks.clear();
        this.m_latch.countDown();
    }

    public void shutdown() {
        this.m_enabled.set(false);
        try {
            this.m_latch.await();
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
    }
}

