package com.dianping.cat.message.storage;

import com.dianping.cat.message.CodecHandler;
import com.dianping.cat.message.internal.MessageId;
import com.dianping.cat.message.spi.MessageTree;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import java.io.ByteArrayOutputStream;
import java.io.EOFException;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Date;
import java.util.concurrent.atomic.AtomicBoolean;
import org.unidal.lookup.annotation.Named;
import org.xerial.snappy.SnappyOutputStream;

@Named(type = MessageBucket.class, value = "local", instantiationStrategy = Named.PER_LOOKUP)
/* loaded from: input_file:WEB-INF/lib/cat-core-3.0.3.jar:com/dianping/cat/message/storage/LocalMessageBucket.class */
public class LocalMessageBucket implements MessageBucket {
    public static final String ID = "local";
    private static final int MAX_BLOCK_SIZE = 65536;
    private MessageBlockWriter m_writer;
    private String m_dataFile;
    private long m_lastAccessTime;
    private OutputStream m_out;
    private ByteArrayOutputStream m_buf;
    private MessageBlock m_block;
    private int m_blockSize;
    private File m_baseDir = new File(".");
    private AtomicBoolean m_dirty = new AtomicBoolean();

    @Override // com.dianping.cat.message.storage.MessageBucket
    public void close() throws IOException {
        synchronized (this) {
            if (this.m_writer != null) {
                this.m_writer.close();
                this.m_out.close();
                this.m_buf.close();
                this.m_out = null;
                this.m_buf = null;
                this.m_writer = null;
            }
        }
    }

    @Override // com.dianping.cat.message.storage.MessageBucket
    public MessageTree findById(String str) throws IOException {
        return findByIndex(MessageId.parse(str).getIndex());
    }

    public MessageTree findByIndex(int i) throws IOException {
        MessageBlockReader messageBlockReader = new MessageBlockReader(new File(this.m_baseDir, this.m_dataFile));
        try {
            this.m_lastAccessTime = System.currentTimeMillis();
            byte[] readMessage = messageBlockReader.readMessage(i);
            if (readMessage == null) {
                messageBlockReader.close();
                CodecHandler.reset();
                return null;
            }
            ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(readMessage.length);
            buffer.writeBytes(readMessage);
            MessageTree decode = CodecHandler.decode(buffer);
            messageBlockReader.close();
            CodecHandler.reset();
            return decode;
        } catch (EOFException e) {
            messageBlockReader.close();
            CodecHandler.reset();
            return null;
        } catch (Throwable th) {
            messageBlockReader.close();
            CodecHandler.reset();
            throw th;
        }
    }

    public MessageBlock flushBlock() throws IOException {
        MessageBlock messageBlock;
        if (!this.m_dirty.get()) {
            return null;
        }
        synchronized (this) {
            this.m_out.close();
            try {
                this.m_block.setData(this.m_buf.toByteArray());
                this.m_blockSize = 0;
                this.m_buf.reset();
                this.m_out = new SnappyOutputStream(this.m_buf);
                this.m_dirty.set(false);
                messageBlock = this.m_block;
                this.m_block = new MessageBlock(this.m_dataFile);
            } catch (Throwable th) {
                this.m_block = new MessageBlock(this.m_dataFile);
                throw th;
            }
        }
        return messageBlock;
    }

    @Override // com.dianping.cat.message.storage.MessageBucket
    public long getLastAccessTime() {
        return this.m_lastAccessTime;
    }

    public MessageBlockWriter getWriter() {
        return this.m_writer;
    }

    @Override // com.dianping.cat.message.storage.MessageBucket
    public void initialize(String str) throws IOException {
        this.m_dataFile = str;
        this.m_writer = new MessageBlockWriter(new File(this.m_baseDir, str));
        this.m_block = new MessageBlock(this.m_dataFile);
        this.m_buf = new ByteArrayOutputStream(16384);
        this.m_out = new SnappyOutputStream(this.m_buf);
    }

    public void setBaseDir(File file) {
        this.m_baseDir = file;
    }

    public MessageBlock storeMessage(ByteBuf byteBuf, MessageId messageId) throws IOException {
        synchronized (this) {
            int readableBytes = byteBuf.readableBytes();
            this.m_dirty.set(true);
            this.m_lastAccessTime = System.currentTimeMillis();
            this.m_blockSize += readableBytes;
            this.m_block.addIndex(messageId.getIndex(), readableBytes);
            byteBuf.getBytes(0, this.m_out, readableBytes);
            if (this.m_blockSize < 65536) {
                return null;
            }
            return flushBlock();
        }
    }

    @Override // com.dianping.cat.message.storage.MessageBucket
    public void initialize(String str, Date date) throws IOException {
        initialize(str);
    }
}
