package org.unidal.cat.message.storage.internals;

import com.dianping.cat.config.server.ServerConfigManager;
import com.dianping.cat.message.spi.MessageTree;
import com.dianping.cat.statistic.ServerStatisticManager;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.codehaus.plexus.logging.LogEnabled;
import org.codehaus.plexus.logging.Logger;
import org.unidal.cat.message.storage.BlockDumperManager;
import org.unidal.cat.message.storage.BucketManager;
import org.unidal.cat.message.storage.MessageDumper;
import org.unidal.cat.message.storage.MessageProcessor;
import org.unidal.helper.Threads;
import org.unidal.lookup.ContainerHolder;
import org.unidal.lookup.annotation.Inject;
import org.unidal.lookup.annotation.Named;

@Named(type = MessageDumper.class, instantiationStrategy = "per-lookup")
/* loaded from: input_file:org/unidal/cat/message/storage/internals/DefaultMessageDumper.class */
public class DefaultMessageDumper extends ContainerHolder implements MessageDumper, LogEnabled {

    @Inject
    private BlockDumperManager m_blockDumperManager;

    @Inject({"local"})
    private BucketManager m_bucketManager;

    @Inject
    private ServerStatisticManager m_statisticManager;

    @Inject
    private ServerConfigManager m_configManager;
    private List<BlockingQueue<MessageTree>> m_queues = new ArrayList();
    private List<MessageProcessor> m_processors = new ArrayList();
    private AtomicInteger m_failCount = new AtomicInteger(-1);
    private Logger m_logger;
    private long m_total;
    private int m_processThreads;

    @Override // org.unidal.cat.message.storage.MessageDumper
    public void awaitTermination(int i) throws InterruptedException {
        String format = new SimpleDateFormat("yyyyMMdd HH:mm:ss").format(new Date(i * 3600000));
        this.m_logger.info("starting close message processor " + format);
        closeMessageProcessor();
        this.m_logger.info("end close dumper processor " + format);
        this.m_logger.info("starting close dumper manager " + format);
        this.m_blockDumperManager.close(i);
        this.m_logger.info("end close dumper manager " + format);
        this.m_logger.info("starting close bucket manager " + format);
        this.m_bucketManager.closeBuckets(i);
        this.m_logger.info("end close bucket manager " + format);
    }

    private void closeMessageProcessor() throws InterruptedException {
        while (true) {
            boolean z = true;
            Iterator<BlockingQueue<MessageTree>> it = this.m_queues.iterator();
            while (true) {
                if (it.hasNext()) {
                    if (!it.next().isEmpty()) {
                        z = false;
                        break;
                    }
                } else {
                    break;
                }
            }
            if (z) {
                break;
            } else {
                TimeUnit.MILLISECONDS.sleep(1L);
            }
        }
        for (MessageProcessor messageProcessor : this.m_processors) {
            messageProcessor.shutdown();
            super.release(messageProcessor);
        }
    }

    public void enableLogging(Logger logger) {
        this.m_logger = logger;
    }

    private int getIndex(String str) {
        return Math.abs(str.hashCode()) % this.m_processThreads;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v9, types: [org.unidal.cat.message.storage.MessageProcessor, java.lang.Object, java.lang.Runnable] */
    @Override // org.unidal.cat.message.storage.MessageDumper
    public void initialize(int i) {
        int messageProcessorThreads = this.m_configManager.getMessageProcessorThreads();
        this.m_processThreads = messageProcessorThreads;
        for (int i2 = 0; i2 < messageProcessorThreads; i2++) {
            ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(10000);
            ?? r0 = (MessageProcessor) lookup(MessageProcessor.class);
            this.m_queues.add(arrayBlockingQueue);
            this.m_processors.add(r0);
            r0.initialize(i, i2, arrayBlockingQueue);
            Threads.forGroup("Cat").start((Runnable) r0);
        }
    }

    /*  JADX ERROR: Failed to decode insn: 0x00AE: MOVE_MULTI, method: org.unidal.cat.message.storage.internals.DefaultMessageDumper.process(com.dianping.cat.message.spi.MessageTree):void
        java.lang.ArrayIndexOutOfBoundsException: arraycopy: source index -1 out of bounds for object array[6]
        	at java.base/java.lang.System.arraycopy(Native Method)
        	at jadx.plugins.input.java.data.code.StackState.insert(StackState.java:49)
        	at jadx.plugins.input.java.data.code.CodeDecodeState.insert(CodeDecodeState.java:118)
        	at jadx.plugins.input.java.data.code.JavaInsnsRegister.dup2x1(JavaInsnsRegister.java:313)
        	at jadx.plugins.input.java.data.code.JavaInsnData.decode(JavaInsnData.java:46)
        	at jadx.core.dex.instructions.InsnDecoder.lambda$process$0(InsnDecoder.java:54)
        	at jadx.plugins.input.java.data.code.JavaCodeReader.visitInstructions(JavaCodeReader.java:81)
        	at jadx.core.dex.instructions.InsnDecoder.process(InsnDecoder.java:50)
        	at jadx.core.dex.nodes.MethodNode.load(MethodNode.java:156)
        	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:443)
        	at jadx.core.ProcessClass.process(ProcessClass.java:70)
        	at jadx.core.ProcessClass.generateCode(ProcessClass.java:118)
        	at jadx.core.dex.nodes.ClassNode.generateClassCode(ClassNode.java:400)
        	at jadx.core.dex.nodes.ClassNode.decompile(ClassNode.java:388)
        	at jadx.core.dex.nodes.ClassNode.getCode(ClassNode.java:338)
        */
    @Override // org.unidal.cat.message.storage.MessageDumper
    public void process(com.dianping.cat.message.spi.MessageTree r7) {
        /*
            r6 = this;
            r0 = r7
            com.dianping.cat.message.internal.MessageId r0 = r0.getFormatMessageId()
            r8 = r0
            r0 = r8
            java.lang.String r0 = r0.getDomain()
            r9 = r0
            r0 = r6
            r1 = r8
            java.lang.String r1 = r1.getIpAddressInHex()
            int r0 = r0.getIndex(r1)
            r10 = r0
            r0 = r6
            java.util.List<java.util.concurrent.BlockingQueue<com.dianping.cat.message.spi.MessageTree>> r0 = r0.m_queues
            r1 = r10
            java.lang.Object r0 = r0.get(r1)
            java.util.concurrent.BlockingQueue r0 = (java.util.concurrent.BlockingQueue) r0
            r11 = r0
            r0 = r11
            r1 = r7
            boolean r0 = r0.offer(r1)
            r12 = r0
            r0 = r12
            if (r0 != 0) goto L96
            r0 = r6
            com.dianping.cat.statistic.ServerStatisticManager r0 = r0.m_statisticManager
            r1 = 1
            r0.addMessageDumpLoss(r1)
            r0 = r6
            java.util.concurrent.atomic.AtomicInteger r0 = r0.m_failCount
            int r0 = r0.incrementAndGet()
            r1 = 100
            int r0 = r0 % r1
            if (r0 != 0) goto Lc5
            org.unidal.cat.message.storage.exception.MessageQueueFullException r0 = new org.unidal.cat.message.storage.exception.MessageQueueFullException
            r1 = r0
            java.lang.StringBuilder r2 = new java.lang.StringBuilder
            r3 = r2
            r3.<init>()
            java.lang.String r3 = "Error when adding message to queue, fails: "
            java.lang.StringBuilder r2 = r2.append(r3)
            r3 = r6
            java.util.concurrent.atomic.AtomicInteger r3 = r3.m_failCount
            java.lang.StringBuilder r2 = r2.append(r3)
            java.lang.String r2 = r2.toString()
            r1.<init>(r2)
            com.dianping.cat.Cat.logError(r0)
            r0 = r6
            org.codehaus.plexus.logging.Logger r0 = r0.m_logger
            java.lang.StringBuilder r1 = new java.lang.StringBuilder
            r2 = r1
            r2.<init>()
            java.lang.String r2 = "message tree queue is full "
            java.lang.StringBuilder r1 = r1.append(r2)
            r2 = r6
            java.util.concurrent.atomic.AtomicInteger r2 = r2.m_failCount
            java.lang.StringBuilder r1 = r1.append(r2)
            java.lang.String r2 = " index "
            java.lang.StringBuilder r1 = r1.append(r2)
            r2 = r10
            java.lang.StringBuilder r1 = r1.append(r2)
            java.lang.String r1 = r1.toString()
            r0.info(r1)
            goto Lc5
            r0 = r6
            com.dianping.cat.statistic.ServerStatisticManager r0 = r0.m_statisticManager
            r1 = r9
            r2 = r7
            io.netty.buffer.ByteBuf r2 = r2.getBuffer()
            int r2 = r2.readableBytes()
            r0.addMessageSize(r1, r2)
            r0 = r6
            r1 = r0
            long r1 = r1.m_total
            r2 = 1
            long r1 = r1 + r2
            // decode failed: arraycopy: source index -1 out of bounds for object array[6]
            r0.m_total = r1
            r0 = 1000(0x3e8, double:4.94E-321)
            long r-1 = r-1 % r0
            r0 = 0
            int r-1 = (r-1 > r0 ? 1 : (r-1 == r0 ? 0 : -1))
            if (r-1 != 0) goto Lc5
            r-1 = r6
            com.dianping.cat.statistic.ServerStatisticManager r-1 = r-1.m_statisticManager
            r0 = 1000(0x3e8, double:4.94E-321)
            r-1.addMessageDump(r0)
            return
        */
        throw new UnsupportedOperationException("Method not decompiled: org.unidal.cat.message.storage.internals.DefaultMessageDumper.process(com.dianping.cat.message.spi.MessageTree):void");
    }
}
