Cat消息存储

2019-12-12 21:39:19 浏览数 (1)

  1. 消息格式为 应用名-IP-小时正点数-消息递增号 MessageId
  2. 每个 应用 IP 整点小时 对应: 一个索引文件 和 一个数据文件
  3. 消息经过编码后,首4字节为该消息的大小,从文件中读消息的时候会用到这个特性

写消息过程

  1. 获取MessageBlock中的MessageTree个数,进行遍历
  2. 获得每个MessageTree的index(索引递增号) 和 每个MessageTree的size(数据大小)
  3. 设置索引文件的起始位置 索引递增号*6
  4. 将该该消息所对应block在数据文件中的起始地址写到索引文件(4字节)
  5. 将该该消息在block中的偏移量写入索引文件(2字节)
  6. 将block的内容长度写入数据文件
  7. 将block的内容写入dataFile
代码语言:javascript复制
// MessageBlockWriter.java
public synchronized void writeBlock(MessageBlock block) throws IOException {
    // block中消息条数
    int len = block.getBlockSize();
    // block大小
    byte[] data = block.getData();

    // 用于在遍历过程中记录每条消息的偏移量,遍历完成之后,blockSize等于block的大小
    int blockSize = 0;

    ByteBuffer buffer = ByteBuffer.allocate(4   data.length);
    buffer.order(ByteOrder.BIG_ENDIAN);

    for (int i = 0; i < len; i  ) {
        // 消息的递增号
        int seq = block.getIndex(i);
        // 消息的大小
        int size = block.getSize(i);

        // m_indexFile.seek(seq * 6L);
        // 该消息在索引文件的起始位置 递增号*6 ,表示每条消息在索引文件中占6个字节大小
        m_indexChannel.position(seq * 6L);

        // m_indexFile.writeInt(m_blockAddress);
        // m_indexFile.writeShort(blockSize);
        // 用于记录该消息所对应block在数据文件中的起始地址
        buffer.putInt(m_blockAddress);
        // 用于记录该消息在block中的偏移量
        buffer.putShort((short) blockSize);
        buffer.flip();
        // 写入索引文件
        m_indexChannel.write(buffer);

        // 计算下一条消息在该block中的偏移量
        blockSize  = size;

        buffer.clear();
    }

    // m_dataFile.writeInt(data.length);
    // m_dataFile.write(data);
    buffer = ByteBuffer.allocate(4   data.length);
    buffer.order(ByteOrder.BIG_ENDIAN);
    // 先在数据文件中用4个字节记录 block 的大小
    buffer.putInt(data.length);
    // 再将block的内容写入数据文件
    buffer.put(data);
    buffer.flip();
    m_dataChannel.write(buffer);

    // 更新 m_blockAddress 的值,即数据文件下一次写入时的起始位置
    m_blockAddress  = data.length   4;
}

即数据文件中的存储结构为: 【blockSize(4byte)->blockData】=>【blockSize(4byte)->blockData】

索引文件的存储结构为: 【blackAddr(4byte)->messageOffsetInBlock(2byte)】 => 【blackAddr(4byte)->messageOffsetInBlock(2byte)】

读消息过程

对于真正的文件存储,block在这里其实是一个抽象的概念; 如果是直接以Message为单位进行写文件,那这个 block 和 索引文件中的block偏移量 就没有什么意义了。但实际上消息是以block为单位进行写文件,一个block最大为64K,而一个block中又存在多条消息,所以每条消息在它所属的block中有一个偏移量

  1. 根据 索引递增号从索引文件读前4个字节 找到block的地址
  2. 该地址为起始地址,从数据文件中读取一个int类型数据(4个字节)作为该block的长度
  3. 根据该长度读取整个block的内容到byte数组
  4. 根据 索引递增号从索引文件读后2个字节 找到该消息在该block中的偏移地址
  5. 以偏移地址为起始地址,读取一个int类型数据(4个字节)作为该消息的大小(为什么读4字节?这是在对消息编码时决定的,首4字节表示该消息的大小)
  6. 根据偏移地址 和 上一步获取的int类型数据大小 读取Message
代码语言:javascript复制
// MessageBlockReader.java
private DataInputStream createDataInputStream(byte[] buf) {
    DataInputStream in = null;

    try {
        in = new DataInputStream(new SnappyInputStream(new ByteArrayInputStream(buf)));
    } catch (IOException e) {
        try {
            in = new DataInputStream(new GZIPInputStream(new ByteArrayInputStream(buf)));
        } catch (IOException ioe) {
            Cat.logError(ioe);
        }
    }
    return in;
}

public byte[] readMessage(int index) throws IOException {
    int blockAddress = 0;
    int blockOffset = 0;

    // 索引 在索引文件的起始位置
    m_indexFile.seek(index * 6L);

    // 读出4字节,该值代表block在数据文件的起始位置
    blockAddress = m_indexFile.readInt();
    // 读出2字节 该值代表Message在block中的偏移量
    blockOffset = m_indexFile.readShort() & 0xFFFF;

    // 从数据文件的 blockAddress 地址开始访问数据
    m_dataFile.seek(blockAddress);
    // 4字节里面存的是block块的长度
    byte[] buf = new byte[m_dataFile.readInt()];
    // 从数据文件中读取整个block到buf数组
    m_dataFile.readFully(buf);

    DataInputStream in = createDataInputStream(buf);

    if (in != null) {
        try {
            // 跳到block中的偏移量
            in.skip(blockOffset);
            
            // 该值代表消息长度
            int len = in.readInt();

            byte[] data = new byte[len];
            
            // 从block中读取Message
            in.readFully(data);
            return data;
        } finally {
            try {
                in.close();
            } catch (Exception e) {
                // ignore it
            }
        }
    } else {
        return null;
    }
}

听说还有V2版本,分 以一级索引和二级索引,可我拉代码没看到呀

0 人点赞