/*
 * Decompiled with CFR 0.152.
 */
package com.dianping.cat.hadoop.hdfs;

import com.dianping.cat.Cat;
import com.dianping.cat.config.server.ServerConfigManager;
import com.dianping.cat.hadoop.hdfs.FileSystemManager;
import com.dianping.cat.hadoop.hdfs.HdfsMessageBucket;
import com.dianping.cat.message.MessageProducer;
import com.dianping.cat.message.PathBuilder;
import com.dianping.cat.message.Transaction;
import com.dianping.cat.message.internal.MessageId;
import com.dianping.cat.message.spi.MessageTree;
import com.dianping.cat.message.storage.MessageBucket;
import com.dianping.cat.message.storage.MessageBucketManager;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.unidal.helper.Threads;
import org.unidal.lookup.ContainerHolder;
import org.unidal.lookup.annotation.Inject;
import org.unidal.lookup.extension.Initializable;
import org.unidal.lookup.extension.InitializationException;

public class HdfsMessageBucketManager
extends ContainerHolder
implements MessageBucketManager,
Initializable {
    public static final String ID = "hdfs";
    @Inject
    private FileSystemManager m_manager;
    @Inject
    private PathBuilder m_pathBuilder;
    @Inject
    private ServerConfigManager m_serverConfigManager;
    private Map<String, HdfsMessageBucket> m_buckets = new ConcurrentHashMap<String, HdfsMessageBucket>();

    private void closeIdleBuckets() throws IOException {
        HdfsMessageBucket bucket;
        long now = System.currentTimeMillis();
        long hour = 3600000L;
        HashSet<String> closed = new HashSet<String>();
        for (Map.Entry<String, HdfsMessageBucket> entry : this.m_buckets.entrySet()) {
            bucket = entry.getValue();
            if (now - bucket.getLastAccessTime() < hour) continue;
            try {
                bucket.close();
                closed.add(entry.getKey());
            }
            catch (Exception e) {
                Cat.logError((Throwable)e);
            }
        }
        for (String close : closed) {
            bucket = this.m_buckets.remove(close);
            this.release(bucket);
        }
    }

    public void initialize() throws InitializationException {
        if (this.m_serverConfigManager.isHdfsOn()) {
            Threads.forGroup((String)"cat").start((Runnable)((Object)new IdleChecker()));
        }
    }

    public MessageTree loadMessage(String messageId) {
        if (!this.m_serverConfigManager.isHdfsOn()) {
            return null;
        }
        MessageProducer cat = Cat.getProducer();
        Transaction t = cat.newTransaction("BucketService", ((Object)((Object)this)).getClass().getSimpleName());
        t.setStatus("0");
        try {
            MessageId id = MessageId.parse((String)messageId);
            final String path = this.m_pathBuilder.getLogviewPath(new Date(id.getTimestamp()), "");
            StringBuilder sb = new StringBuilder();
            FileSystem fs = this.m_manager.getFileSystem("dump", sb);
            sb.append('/').append(path);
            final String key = id.getDomain() + '-' + id.getIpAddress();
            String str = sb.toString();
            Path basePath = new Path(str);
            final ArrayList paths = new ArrayList();
            fs.listStatus(basePath, new PathFilter(){

                public boolean accept(Path p) {
                    String name = p.getName();
                    if (name.contains(key) && !name.endsWith(".idx")) {
                        paths.add(path + name);
                    }
                    return false;
                }
            });
            t.addData(((Object)paths).toString());
            for (String dataFile : paths) {
                try {
                    MessageTree tree;
                    Cat.getProducer().logEvent("HDFSBucket", dataFile);
                    HdfsMessageBucket bucket = this.m_buckets.get(dataFile);
                    if (bucket == null) {
                        bucket = (HdfsMessageBucket)this.lookup(MessageBucket.class, ID);
                        bucket.initialize(dataFile);
                        this.m_buckets.put(dataFile, bucket);
                    }
                    if (bucket == null || (tree = bucket.findById(messageId)) == null || !tree.getMessageId().equals(messageId)) continue;
                    t.addData("path", (Object)dataFile);
                    MessageTree messageTree = tree;
                    return messageTree;
                }
                catch (Exception e) {
                    t.setStatus((Throwable)e);
                    Cat.logError((Throwable)e);
                }
            }
        }
        catch (IOException e) {
            t.setStatus((Throwable)e);
            cat.logError((Throwable)e);
        }
        catch (RuntimeException e) {
            t.setStatus((Throwable)e);
            cat.logError((Throwable)e);
            throw e;
        }
        finally {
            t.complete();
        }
        return null;
    }

    public void storeMessage(MessageTree tree, MessageId id) {
        throw new UnsupportedOperationException("Not supported by HDFS!");
    }

    class IdleChecker
    implements Threads.Task {
        IdleChecker() {
        }

        public String getName() {
            return "HdfsMessageBucketManager-IdleChecker";
        }

        public void run() {
            try {
                while (true) {
                    Thread.sleep(60000L);
                    try {
                        HdfsMessageBucketManager.this.closeIdleBuckets();
                    }
                    catch (IOException e) {
                        Cat.logError((Throwable)e);
                    }
                }
            }
            catch (InterruptedException interruptedException) {
                return;
            }
        }

        public void shutdown() {
        }
    }
}

