/*
 * Decompiled with CFR 0.152.
 */
package org.unidal.cat.message.storage.hdfs;

import com.dianping.cat.config.server.ServerConfigManager;
import com.dianping.cat.message.internal.MessageId;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.unidal.cat.message.storage.FileType;
import org.unidal.cat.message.storage.Index;
import org.unidal.cat.message.storage.PathBuilder;
import org.unidal.cat.message.storage.TokenMapping;
import org.unidal.cat.message.storage.TokenMappingManager;
import org.unidal.cat.message.storage.hdfs.HdfsSystemManager;
import org.unidal.lookup.annotation.Inject;
import org.unidal.lookup.annotation.Named;

@Named(type=Index.class, value="hdfs", instantiationStrategy="per-lookup")
public class HdfsIndex
implements Index {
    public static final String ID = "hdfs";
    private static final int SEGMENT_SIZE = 32768;
    @Inject
    protected HdfsSystemManager m_manager;
    @Inject
    private ServerConfigManager m_serverConfigManager;
    @Inject(value={"hdfs"})
    private PathBuilder m_bulider;
    @Inject(value={"hdfs"})
    private TokenMappingManager m_hdfsTokenManager;
    private TokenMapping m_mapping;
    private MessageIdCodec m_codec = new MessageIdCodec();
    private IndexHelper m_index = new IndexHelper();
    private long m_lastAccessTime;

    @Override
    public void close() {
        if (this.m_index.isOpen()) {
            this.m_index.close();
        }
    }

    @Override
    public MessageId find(MessageId id) throws IOException {
        long value = this.m_index.read(id);
        if (value != 0L) {
            byte[] data = this.getBytes(value);
            return this.m_codec.decode(data, id.getHour());
        }
        return null;
    }

    public void flush() {
        throw new RuntimeException("unsupport operation");
    }

    private byte[] getBytes(long data) {
        byte[] bytes = new byte[]{(byte)(data & 0xFFL), (byte)(data >> 8 & 0xFFL), (byte)(data >> 16 & 0xFFL), (byte)(data >> 24 & 0xFFL), (byte)(data >> 32 & 0xFFL), (byte)(data >> 40 & 0xFFL), (byte)(data >> 48 & 0xFFL), (byte)(data >> 56 & 0xFFL)};
        return bytes;
    }

    public long getLastAccessTime() {
        return this.m_lastAccessTime;
    }

    @Override
    public void initialize(String domain, String ip, int hour) throws IOException {
        long timestamp = (long)(hour * 3600) * 1000L;
        Date startTime = new Date(timestamp);
        FileSystem fs = this.m_manager.getFileSystem();
        String dataPath = this.m_bulider.getPath(domain, startTime, ip, FileType.MAPPING);
        FSDataInputStream indexStream = fs.open(new Path(dataPath));
        this.m_index.init(indexStream);
        this.m_mapping = this.m_hdfsTokenManager.getTokenMapping(hour, ip);
    }

    @Override
    public void map(MessageId from, MessageId to) throws IOException {
        throw new RuntimeException("unsupport operation");
    }

    @Override
    public void maps(Map<MessageId, MessageId> maps) throws IOException {
        throw new RuntimeException("unsupport operation");
    }

    protected class MessageIdCodec {
        protected MessageIdCodec() {
        }

        private int bytesToInt(byte[] src, int offset) {
            int value = (src[offset] & 0xFF) << 24 | (src[offset + 1] & 0xFF) << 16 | (src[offset + 2] & 0xFF) << 8 | src[offset + 3] & 0xFF;
            return value;
        }

        private MessageId decode(byte[] data, int currentHour) throws IOException {
            int value = this.bytesToInt(data, 0);
            int index = this.bytesToInt(data, 4);
            int s1 = value >> 17 & Short.MAX_VALUE;
            int s2 = value >> 2 & Short.MAX_VALUE;
            int s3 = value & 3;
            String domain = HdfsIndex.this.m_mapping.find(s1);
            String ipAddressInHex = HdfsIndex.this.m_mapping.find(s2);
            int flag = s3 >> 14 & 3;
            int hour = currentHour + (flag == 3 ? -1 : flag);
            if (domain != null && ipAddressInHex != null) {
                return new MessageId(domain, ipAddressInHex, hour, index);
            }
            return null;
        }
    }

    private class IndexHelper {
        private static final int BYTE_PER_MESSAGE = 8;
        private static final int BYTE_PER_ENTRY = 8;
        private static final int MESSAGE_PER_SEGMENT = 4096;
        private static final int ENTRY_PER_SEGMENT = 4096;
        private Header m_header = new Header();
        private FSDataInputStream m_indexSteam;

        private IndexHelper() {
        }

        public void close() {
        }

        public void init(FSDataInputStream indexStream) throws IOException {
            this.m_indexSteam = indexStream;
            int size = indexStream.available();
            int totalHeaders = (int)Math.ceil((double)size * 1.0 / 1.34217728E8);
            if (totalHeaders == 0) {
                totalHeaders = 1;
            }
            for (int i = 0; i < totalHeaders; ++i) {
                this.m_header.load(i);
            }
        }

        public boolean isOpen() {
            return this.m_indexSteam != null;
        }

        public long read(MessageId id) throws IOException {
            int index = id.getIndex();
            long position = this.m_header.getOffset(id.getIpAddressValue(), index);
            if (position > 0L) {
                this.m_indexSteam.seek(position);
                long address = this.m_indexSteam.readLong();
                return address;
            }
            return -1L;
        }

        private class Segment {
            private long m_address;
            private ByteBuffer m_buf;

            private Segment(FSDataInputStream channel, long address) throws IOException {
                this.m_address = address;
                byte[] b = new byte[32768];
                channel.readFully(b);
                this.m_buf = ByteBuffer.wrap(b);
            }

            public int readInt() throws IOException {
                return this.m_buf.getInt();
            }

            public long readLong() throws IOException {
                return this.m_buf.getLong();
            }

            public String toString() {
                return String.format("%s[address=%s]", this.getClass().getSimpleName(), this.m_address);
            }
        }

        private class Header {
            private Map<Integer, Map<Integer, Integer>> m_table = new LinkedHashMap<Integer, Map<Integer, Integer>>();
            private int m_nextSegment;

            private Header() {
            }

            private Integer findSegment(int ip, int index) throws IOException {
                Map<Integer, Integer> map = this.m_table.get(ip);
                if (map != null) {
                    return map.get(index);
                }
                return null;
            }

            public long getOffset(int ip, int seq) throws IOException {
                int segmentIndex = seq / 4096;
                int segmentOffset = seq % 4096 * 8;
                Integer segmentId = this.findSegment(ip, segmentIndex);
                if (segmentId != null) {
                    long offset = segmentId * 32768 + segmentOffset;
                    return offset;
                }
                return -1L;
            }

            public void load(int headBlockIndex) throws IOException {
                Segment segment = new Segment(IndexHelper.this.m_indexSteam, headBlockIndex * 4096 * 32768);
                long magicCode = segment.readLong();
                if (magicCode != -1L) {
                    throw new IOException("Invalid index file: " + IndexHelper.this.m_indexSteam);
                }
                this.m_nextSegment = 1 + 4096 * headBlockIndex;
                int readerIndex = 1;
                while (readerIndex < 4096) {
                    Integer segmentNo;
                    int ip = segment.readInt();
                    int index = segment.readInt();
                    ++readerIndex;
                    if (ip == 0) break;
                    Map<Integer, Integer> map = this.m_table.get(ip);
                    if (map == null) {
                        map = new HashMap<Integer, Integer>();
                        this.m_table.put(ip, map);
                    }
                    if ((segmentNo = map.get(index)) != null) continue;
                    segmentNo = this.m_nextSegment++;
                    map.put(index, segmentNo);
                }
            }
        }
    }
}

