package com.dianping.cat.consumer.dump;

import com.dianping.cat.Cat;
import com.dianping.cat.config.server.ServerConfigManager;
import com.dianping.cat.configuration.NetworkInterfaceManager;
import com.dianping.cat.message.MessageProducer;
import com.dianping.cat.message.PathBuilder;
import com.dianping.cat.message.Transaction;
import com.dianping.cat.message.internal.MessageId;
import com.dianping.cat.message.spi.MessageTree;
import com.dianping.cat.message.spi.internal.DefaultMessageTree;
import com.dianping.cat.message.storage.LocalMessageBucket;
import com.dianping.cat.message.storage.MessageBlock;
import com.dianping.cat.message.storage.MessageBucket;
import com.dianping.cat.message.storage.MessageBucketManager;
import com.dianping.cat.statistic.ServerStatisticManager;
import java.io.File;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
import org.codehaus.plexus.logging.LogEnabled;
import org.codehaus.plexus.logging.Logger;
import org.codehaus.plexus.personality.plexus.lifecycle.phase.Initializable;
import org.codehaus.plexus.personality.plexus.lifecycle.phase.InitializationException;
import org.unidal.helper.Scanners;
import org.unidal.helper.Threads;
import org.unidal.lookup.ContainerHolder;
import org.unidal.lookup.annotation.Inject;

/* loaded from: input_file:WEB-INF/lib/cat-consumer-3.0.3.jar:com/dianping/cat/consumer/dump/LocalMessageBucketManager.class */
public class LocalMessageBucketManager extends ContainerHolder implements MessageBucketManager, Initializable, LogEnabled {
    public static final String ID = "local";
    protected Logger m_logger;

    @Inject
    private ServerConfigManager m_configManager;

    @Inject
    private ServerStatisticManager m_serverStateManager;

    @Inject
    private PathBuilder m_pathBuilder;
    private File m_baseDir;
    private long m_total;
    private BlockingQueue<MessageItem> m_last;
    private ConcurrentHashMap<String, LocalMessageBucket> m_buckets = new ConcurrentHashMap<>();
    private String m_localIp = NetworkInterfaceManager.INSTANCE.getLocalHostAddress();
    private int m_gzipThreads = 20;
    private int m_gzipMessageSize = 5000;
    private int m_messageBlockSize = 5000;
    private BlockingQueue<MessageBlock> m_messageBlocks = new LinkedBlockingQueue(this.m_messageBlockSize);
    private List<BlockingQueue<MessageItem>> m_messageQueues = new ArrayList();

    /* loaded from: input_file:WEB-INF/lib/cat-consumer-3.0.3.jar:com/dianping/cat/consumer/dump/LocalMessageBucketManager$CloseBucketChecker.class */
    public class CloseBucketChecker implements Threads.Task {
        public CloseBucketChecker() {
        }

        private void closeBuckets(List<String> list) {
            String localHostAddress = NetworkInterfaceManager.INSTANCE.getLocalHostAddress();
            for (String str : list) {
                LocalMessageBucket localMessageBucket = (LocalMessageBucket) LocalMessageBucketManager.this.m_buckets.remove(str);
                if (localMessageBucket != null) {
                    try {
                        try {
                            localMessageBucket.close();
                            Cat.logEvent("CloseBucket", localHostAddress);
                            LocalMessageBucketManager.this.m_buckets.remove(str);
                            LocalMessageBucketManager.this.release(localMessageBucket);
                        } catch (Exception e) {
                            Cat.logError(e);
                            LocalMessageBucketManager.this.m_buckets.remove(str);
                            LocalMessageBucketManager.this.release(localMessageBucket);
                        }
                    } catch (Throwable th) {
                        LocalMessageBucketManager.this.m_buckets.remove(str);
                        LocalMessageBucketManager.this.release(localMessageBucket);
                        throw th;
                    }
                }
            }
        }

        @Override // org.unidal.helper.Threads.Task
        public String getName() {
            return "LocalMessageBucketManager-CloseBucketChecker";
        }

        @Override // java.lang.Runnable
        public void run() {
            while (true) {
                try {
                    Thread.sleep(60000L);
                    try {
                        closeBuckets(LocalMessageBucketManager.this.findCloseBuckets());
                    } catch (Exception e) {
                        Cat.logError(e);
                    }
                } catch (InterruptedException e2) {
                    return;
                }
            }
        }

        @Override // org.unidal.helper.Threads.Task
        public void shutdown() {
        }
    }

    /* loaded from: input_file:WEB-INF/lib/cat-consumer-3.0.3.jar:com/dianping/cat/consumer/dump/LocalMessageBucketManager$MessageGzip.class */
    public class MessageGzip implements Threads.Task {
        public BlockingQueue<MessageItem> m_messageQueue;
        private int m_index;
        private int m_count = -1;

        public MessageGzip(BlockingQueue<MessageItem> blockingQueue, int i) {
            this.m_messageQueue = blockingQueue;
            this.m_index = i;
        }

        @Override // org.unidal.helper.Threads.Task
        public String getName() {
            return "Message-Gzip-" + this.m_index;
        }

        private void gzipMessage(MessageItem messageItem) {
            try {
                MessageId messageId = messageItem.getMessageId();
                String logviewPath = LocalMessageBucketManager.this.m_pathBuilder.getLogviewPath(new Date(messageId.getTimestamp()), messageId.getDomain() + '-' + messageId.getIpAddress() + '-' + LocalMessageBucketManager.this.m_localIp);
                LocalMessageBucket localMessageBucket = (LocalMessageBucket) LocalMessageBucketManager.this.m_buckets.get(logviewPath);
                if (localMessageBucket == null) {
                    synchronized (LocalMessageBucketManager.this.m_buckets) {
                        localMessageBucket = (LocalMessageBucket) LocalMessageBucketManager.this.m_buckets.get(logviewPath);
                        if (localMessageBucket == null) {
                            localMessageBucket = (LocalMessageBucket) LocalMessageBucketManager.this.lookup(MessageBucket.class, "local");
                            localMessageBucket.setBaseDir(LocalMessageBucketManager.this.m_baseDir);
                            localMessageBucket.initialize(logviewPath);
                            LocalMessageBucketManager.this.m_buckets.put(logviewPath, localMessageBucket);
                        }
                    }
                }
                DefaultMessageTree defaultMessageTree = (DefaultMessageTree) messageItem.getTree();
                MessageBlock storeMessage = localMessageBucket.storeMessage(defaultMessageTree.getBuffer(), messageId);
                if (storeMessage != null && !LocalMessageBucketManager.this.m_messageBlocks.offer(storeMessage)) {
                    LocalMessageBucketManager.this.m_serverStateManager.addBlockLoss(1L);
                    Cat.logEvent("DumpError", defaultMessageTree.getDomain());
                }
            } catch (Throwable th) {
                Cat.logError(th);
            }
        }

        private void gzipMessageWithMonitor(MessageItem messageItem) {
            Transaction newTransaction = Cat.newTransaction("Gzip", "Thread-" + this.m_index);
            newTransaction.setStatus("0");
            gzipMessage(messageItem);
            newTransaction.complete();
        }

        @Override // java.lang.Runnable
        public void run() {
            while (true) {
                try {
                    MessageItem poll = this.m_messageQueue.poll(5L, TimeUnit.MILLISECONDS);
                    if (poll != null) {
                        this.m_count++;
                        if (this.m_count % 10000 == 0) {
                            gzipMessageWithMonitor(poll);
                        } else {
                            gzipMessage(poll);
                        }
                    }
                } catch (InterruptedException e) {
                    return;
                }
            }
        }

        @Override // org.unidal.helper.Threads.Task
        public void shutdown() {
        }
    }

    @Override // com.dianping.cat.message.storage.MessageBucketManager
    public void archive(long j) {
        String logviewPath = this.m_pathBuilder.getLogviewPath(new Date(j), "");
        ArrayList arrayList = new ArrayList();
        Iterator it = this.m_buckets.keySet().iterator();
        while (it.hasNext()) {
            String str = (String) it.next();
            if (str.startsWith(logviewPath)) {
                arrayList.add(str);
            }
        }
        Iterator it2 = arrayList.iterator();
        while (it2.hasNext()) {
            try {
                MessageBlock flushBlock = this.m_buckets.get((String) it2.next()).flushBlock();
                if (flushBlock != null) {
                    this.m_messageBlocks.put(flushBlock);
                }
            } catch (Exception e) {
                Cat.logError(e);
            }
        }
    }

    @Override // org.codehaus.plexus.logging.LogEnabled
    public void enableLogging(Logger logger) {
        this.m_logger = logger;
    }

    public List<String> findCloseBuckets() {
        final HashSet hashSet = new HashSet();
        Scanners.forDir().scan(this.m_baseDir, new Scanners.FileMatcher() { // from class: com.dianping.cat.consumer.dump.LocalMessageBucketManager.1
            @Override // org.unidal.helper.Scanners.IMatcher
            public Scanners.IMatcher.Direction matches(File file, String str) {
                if (new File(file, str).isFile() && LocalMessageBucketManager.this.shouldUpload(str)) {
                    int indexOf = str.indexOf(".idx");
                    if (indexOf == -1) {
                        hashSet.add(str);
                    } else {
                        hashSet.add(str.substring(0, indexOf));
                    }
                }
                return Scanners.IMatcher.Direction.DOWN;
            }
        });
        return new ArrayList(hashSet);
    }

    @Override // org.codehaus.plexus.personality.plexus.lifecycle.phase.Initializable
    public void initialize() throws InitializationException {
        if (this.m_configManager.isUseNewStorage()) {
            return;
        }
        this.m_baseDir = new File(this.m_configManager.getHdfsLocalBaseDir("dump"));
        Threads.forGroup("cat").start(new BlockDumper(this.m_buckets, this.m_messageBlocks, this.m_serverStateManager, this.m_configManager));
        Threads.forGroup("cat").start(new CloseBucketChecker());
        if (this.m_configManager.isLocalMode()) {
            this.m_gzipThreads = 2;
        }
        for (int i = 0; i < this.m_gzipThreads; i++) {
            LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue(this.m_gzipMessageSize);
            this.m_messageQueues.add(linkedBlockingQueue);
            Threads.forGroup("cat").start(new MessageGzip(linkedBlockingQueue, i));
        }
        this.m_last = this.m_messageQueues.get(this.m_gzipThreads - 1);
    }

    /* JADX WARN: Finally extract failed */
    @Override // com.dianping.cat.message.storage.MessageBucketManager
    public MessageTree loadMessage(String str) {
        MessageProducer producer = Cat.getProducer();
        Transaction newTransaction = producer.newTransaction("BucketService", getClass().getSimpleName());
        newTransaction.setStatus("0");
        try {
            try {
                MessageId parse = MessageId.parse(str);
                final String logviewPath = this.m_pathBuilder.getLogviewPath(new Date(parse.getTimestamp()), "");
                File file = new File(this.m_baseDir, logviewPath);
                final String str2 = parse.getDomain() + '-' + parse.getIpAddress();
                final ArrayList<String> arrayList = new ArrayList();
                Scanners.forDir().scan(file, new Scanners.FileMatcher() { // from class: com.dianping.cat.consumer.dump.LocalMessageBucketManager.2
                    @Override // org.unidal.helper.Scanners.IMatcher
                    public Scanners.IMatcher.Direction matches(File file2, String str3) {
                        if (str3.contains(str2) && !str3.endsWith(".idx")) {
                            arrayList.add(logviewPath + str3);
                        }
                        return Scanners.IMatcher.Direction.NEXT;
                    }
                });
                for (String str3 : arrayList) {
                    LocalMessageBucket localMessageBucket = this.m_buckets.get(str3);
                    if (localMessageBucket != null) {
                        MessageBlock flushBlock = localMessageBucket.flushBlock();
                        if (flushBlock != null) {
                            boolean offer = this.m_messageBlocks.offer(flushBlock);
                            LockSupport.parkNanos(200000000L);
                            if (!offer) {
                                if (this.m_messageBlocks.offer(flushBlock)) {
                                    LockSupport.parkNanos(200000000L);
                                } else {
                                    Cat.logError(new RuntimeException("error flush block when read logview"));
                                }
                            }
                        }
                        MessageTree findById = localMessageBucket.findById(str);
                        if (findById != null && findById.getMessageId().equals(str)) {
                            newTransaction.addData("path", str3);
                            newTransaction.complete();
                            return findById;
                        }
                    } else if (new File(this.m_baseDir, str3).exists()) {
                        try {
                            try {
                                localMessageBucket = (LocalMessageBucket) lookup(MessageBucket.class, "local");
                                localMessageBucket.setBaseDir(this.m_baseDir);
                                localMessageBucket.initialize(str3);
                                MessageTree findById2 = localMessageBucket.findById(str);
                                if (findById2 != null && findById2.getMessageId().equals(str)) {
                                    newTransaction.addData("path", str3);
                                    localMessageBucket.close();
                                    release(localMessageBucket);
                                    newTransaction.complete();
                                    return findById2;
                                }
                                localMessageBucket.close();
                                release(localMessageBucket);
                            } catch (Throwable th) {
                                localMessageBucket.close();
                                release(localMessageBucket);
                                throw th;
                            }
                        } catch (Exception e) {
                            Cat.logError(e);
                            localMessageBucket.close();
                            release(localMessageBucket);
                        }
                    } else {
                        continue;
                    }
                }
                newTransaction.complete();
                return null;
            } catch (Throwable th2) {
                newTransaction.setStatus(th2);
                producer.logError(th2);
                newTransaction.complete();
                return null;
            }
        } catch (Throwable th3) {
            newTransaction.complete();
            throw th3;
        }
    }

    /*  JADX ERROR: Failed to decode insn: 0x0021: MOVE_MULTI, method: com.dianping.cat.consumer.dump.LocalMessageBucketManager.logStorageState(com.dianping.cat.message.spi.MessageTree):void
        java.lang.ArrayIndexOutOfBoundsException: arraycopy: source index -1 out of bounds for object array[6]
        	at java.base/java.lang.System.arraycopy(Native Method)
        	at jadx.plugins.input.java.data.code.StackState.insert(StackState.java:49)
        	at jadx.plugins.input.java.data.code.CodeDecodeState.insert(CodeDecodeState.java:118)
        	at jadx.plugins.input.java.data.code.JavaInsnsRegister.dup2x1(JavaInsnsRegister.java:313)
        	at jadx.plugins.input.java.data.code.JavaInsnData.decode(JavaInsnData.java:46)
        	at jadx.core.dex.instructions.InsnDecoder.lambda$process$0(InsnDecoder.java:54)
        	at jadx.plugins.input.java.data.code.JavaCodeReader.visitInstructions(JavaCodeReader.java:81)
        	at jadx.core.dex.instructions.InsnDecoder.process(InsnDecoder.java:50)
        	at jadx.core.dex.nodes.MethodNode.load(MethodNode.java:156)
        	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:443)
        	at jadx.core.ProcessClass.process(ProcessClass.java:70)
        	at jadx.core.ProcessClass.generateCode(ProcessClass.java:118)
        	at jadx.core.dex.nodes.ClassNode.generateClassCode(ClassNode.java:400)
        	at jadx.core.dex.nodes.ClassNode.decompile(ClassNode.java:388)
        	at jadx.core.dex.nodes.ClassNode.getCode(ClassNode.java:338)
        */
    private void logStorageState(com.dianping.cat.message.spi.MessageTree r7) {
        /*
            r6 = this;
            r0 = r7
            java.lang.String r0 = r0.getDomain()
            r8 = r0
            r0 = r7
            io.netty.buffer.ByteBuf r0 = r0.getBuffer()
            int r0 = r0.readableBytes()
            r9 = r0
            r0 = r6
            com.dianping.cat.statistic.ServerStatisticManager r0 = r0.m_serverStateManager
            r1 = r8
            r2 = r9
            r0.addMessageSize(r1, r2)
            r0 = r6
            r1 = r0
            long r1 = r1.m_total
            r2 = 1
            long r1 = r1 + r2
            // decode failed: arraycopy: source index -1 out of bounds for object array[6]
            r0.m_total = r1
            r0 = 1000(0x3e8, double:4.94E-321)
            long r-1 = r-1 % r0
            r0 = 0
            int r-1 = (r-1 > r0 ? 1 : (r-1 == r0 ? 0 : -1))
            if (r-1 != 0) goto L38
            r-1 = r6
            com.dianping.cat.statistic.ServerStatisticManager r-1 = r-1.m_serverStateManager
            r0 = 1000(0x3e8, double:4.94E-321)
            r-1.addMessageDump(r0)
            return
        */
        throw new UnsupportedOperationException("Method not decompiled: com.dianping.cat.consumer.dump.LocalMessageBucketManager.logStorageState(com.dianping.cat.message.spi.MessageTree):void");
    }

    public void setBaseDir(File file) {
        this.m_baseDir = file;
    }

    public void setLocalIp(String str) {
        this.m_localIp = str;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean shouldUpload(String str) {
        long currentTimeMillis = System.currentTimeMillis();
        long j = currentTimeMillis - (currentTimeMillis % 3600000);
        long j2 = j - 3600000;
        long j3 = j + 3600000;
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMMdd/HH");
        return str.indexOf(simpleDateFormat.format(new Date(j))) <= -1 && str.indexOf(simpleDateFormat.format(new Date(j2))) <= -1 && str.indexOf(simpleDateFormat.format(new Date(j3))) <= -1;
    }

    @Override // com.dianping.cat.message.storage.MessageBucketManager
    public void storeMessage(MessageTree messageTree, MessageId messageId) {
        boolean z = true;
        int abs = Math.abs((messageId.getDomain() + '-' + messageId.getIpAddress()).hashCode()) % this.m_gzipThreads;
        MessageItem messageItem = new MessageItem(messageTree, messageId);
        if (this.m_messageQueues.get(abs % (this.m_gzipThreads - 1)).offer(messageItem)) {
            z = false;
        } else if (this.m_last.offer(messageItem)) {
            z = false;
        }
        if (z) {
            this.m_serverStateManager.addMessageDumpLoss(1L);
        }
        logStorageState(messageTree);
    }
}
