package com.dianping.cat.hadoop.hdfs;

import com.dianping.cat.Cat;
import com.dianping.cat.message.internal.MessageId;
import com.dianping.cat.message.spi.MessageCodec;
import com.dianping.cat.message.spi.MessageTree;
import com.dianping.cat.message.spi.internal.DefaultMessageTree;
import com.dianping.cat.message.storage.MessageBucket;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.EOFException;
import java.io.IOException;
import java.util.zip.GZIPInputStream;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.unidal.lookup.annotation.Inject;

/* loaded from: input_file:WEB-INF/lib/cat-hadoop-2.0.0.db.jar:com/dianping/cat/hadoop/hdfs/HdfsMessageBucket.class */
public class HdfsMessageBucket implements MessageBucket {
    public static final String ID = "hdfs";

    @Inject
    private FileSystemManager m_manager;

    @Inject
    private MessageCodec m_codec;
    private MessageBlockReader m_reader;
    private long m_lastAccessTime;

    /* loaded from: input_file:WEB-INF/lib/cat-hadoop-2.0.0.db.jar:com/dianping/cat/hadoop/hdfs/HdfsMessageBucket$MessageBlockReader.class */
    static class MessageBlockReader {
        private FSDataInputStream m_indexFile;
        private FSDataInputStream m_dataFile;

        public MessageBlockReader(FileSystemManager fileSystemManager, String str) throws IOException {
            StringBuilder sb = new StringBuilder();
            FileSystem fileSystem = fileSystemManager.getFileSystem("dump", sb);
            Path path = new Path(sb.toString());
            this.m_indexFile = fileSystem.open(new Path(path, str + ".idx"));
            this.m_dataFile = fileSystem.open(new Path(path, str));
        }

        public void close() throws IOException {
            synchronized (this.m_indexFile) {
                this.m_indexFile.close();
                this.m_dataFile.close();
            }
        }

        public byte[] readMessage(int i) throws IOException {
            int readInt;
            int readShort;
            byte[] bArr;
            synchronized (this.m_indexFile) {
                this.m_indexFile.seek(i * 6);
                readInt = this.m_indexFile.readInt();
                readShort = this.m_indexFile.readShort() & 65535;
            }
            synchronized (this.m_dataFile) {
                this.m_dataFile.seek(readInt);
                bArr = new byte[this.m_dataFile.readInt()];
                this.m_dataFile.readFully(bArr);
            }
            DataInputStream dataInputStream = new DataInputStream(new GZIPInputStream(new ByteArrayInputStream(bArr)));
            try {
                dataInputStream.skip(readShort);
                byte[] bArr2 = new byte[dataInputStream.readInt()];
                dataInputStream.readFully(bArr2);
                return bArr2;
            } finally {
                try {
                    dataInputStream.close();
                } catch (Exception e) {
                }
            }
        }
    }

    @Override // com.dianping.cat.message.storage.MessageBucket
    public void close() throws IOException {
        this.m_reader.close();
    }

    @Override // com.dianping.cat.message.storage.MessageBucket
    public MessageTree findById(String str) throws IOException {
        try {
            byte[] readMessage = this.m_reader.readMessage(MessageId.parse(str).getIndex());
            ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(readMessage.length);
            DefaultMessageTree defaultMessageTree = new DefaultMessageTree();
            buffer.writeBytes(readMessage);
            this.m_codec.decode(buffer, defaultMessageTree);
            this.m_lastAccessTime = System.currentTimeMillis();
            return defaultMessageTree;
        } catch (EOFException e) {
            Cat.logError(e);
            return null;
        }
    }

    @Override // com.dianping.cat.message.storage.MessageBucket
    public long getLastAccessTime() {
        return this.m_lastAccessTime;
    }

    @Override // com.dianping.cat.message.storage.MessageBucket
    public void initialize(String str) throws IOException {
        this.m_reader = new MessageBlockReader(this.m_manager, str);
    }

    public void setMessageCodec(MessageCodec messageCodec) {
        this.m_codec = messageCodec;
    }
}
