- 消息格式为 应用名-IP-小时正点数-消息递增号 MessageId
- 每个
应用 IP 整点小时
对应: 一个索引文件 和 一个数据文件 - 消息经过编码后,首4字节为该消息的大小,从文件中读消息的时候会用到这个特性
写消息过程
- 获取
MessageBlock
中的MessageTree
个数,进行遍历 - 获得每个
MessageTree
的index(索引递增号) 和 每个MessageTree
的size(数据大小) - 设置索引文件的起始位置
索引递增号*6
- 将该该消息所对应block在数据文件中的起始地址写到索引文件(4字节)
- 将该该消息在block中的偏移量写入索引文件(2字节)
- 将block的内容长度写入数据文件
- 将block的内容写入dataFile
// MessageBlockWriter.java
public synchronized void writeBlock(MessageBlock block) throws IOException {
// block中消息条数
int len = block.getBlockSize();
// block大小
byte[] data = block.getData();
// 用于在遍历过程中记录每条消息的偏移量,遍历完成之后,blockSize等于block的大小
int blockSize = 0;
ByteBuffer buffer = ByteBuffer.allocate(4 data.length);
buffer.order(ByteOrder.BIG_ENDIAN);
for (int i = 0; i < len; i ) {
// 消息的递增号
int seq = block.getIndex(i);
// 消息的大小
int size = block.getSize(i);
// m_indexFile.seek(seq * 6L);
// 该消息在索引文件的起始位置 递增号*6 ,表示每条消息在索引文件中占6个字节大小
m_indexChannel.position(seq * 6L);
// m_indexFile.writeInt(m_blockAddress);
// m_indexFile.writeShort(blockSize);
// 用于记录该消息所对应block在数据文件中的起始地址
buffer.putInt(m_blockAddress);
// 用于记录该消息在block中的偏移量
buffer.putShort((short) blockSize);
buffer.flip();
// 写入索引文件
m_indexChannel.write(buffer);
// 计算下一条消息在该block中的偏移量
blockSize = size;
buffer.clear();
}
// m_dataFile.writeInt(data.length);
// m_dataFile.write(data);
buffer = ByteBuffer.allocate(4 data.length);
buffer.order(ByteOrder.BIG_ENDIAN);
// 先在数据文件中用4个字节记录 block 的大小
buffer.putInt(data.length);
// 再将block的内容写入数据文件
buffer.put(data);
buffer.flip();
m_dataChannel.write(buffer);
// 更新 m_blockAddress 的值,即数据文件下一次写入时的起始位置
m_blockAddress = data.length 4;
}
即数据文件中的存储结构为: 【blockSize(4byte)->blockData】=>【blockSize(4byte)->blockData】
索引文件的存储结构为: 【blackAddr(4byte)->messageOffsetInBlock(2byte)】 => 【blackAddr(4byte)->messageOffsetInBlock(2byte)】
读消息过程
对于真正的文件存储,block在这里其实是一个抽象的概念; 如果是直接以Message为单位进行写文件,那这个 block 和 索引文件中的block偏移量 就没有什么意义了。但实际上消息是以block为单位进行写文件,一个block最大为64K,而一个block中又存在多条消息,所以每条消息在它所属的block中有一个偏移量
- 根据 索引递增号从索引文件读前4个字节 找到block的地址
- 该地址为起始地址,从数据文件中读取一个int类型数据(4个字节)作为该block的长度
- 根据该长度读取整个block的内容到byte数组
- 根据 索引递增号从索引文件读后2个字节 找到该消息在该block中的偏移地址
- 以偏移地址为起始地址,读取一个int类型数据(4个字节)作为该消息的大小(为什么读4字节?这是在对消息编码时决定的,首4字节表示该消息的大小)
- 根据偏移地址 和 上一步获取的int类型数据大小 读取Message
// MessageBlockReader.java
private DataInputStream createDataInputStream(byte[] buf) {
DataInputStream in = null;
try {
in = new DataInputStream(new SnappyInputStream(new ByteArrayInputStream(buf)));
} catch (IOException e) {
try {
in = new DataInputStream(new GZIPInputStream(new ByteArrayInputStream(buf)));
} catch (IOException ioe) {
Cat.logError(ioe);
}
}
return in;
}
public byte[] readMessage(int index) throws IOException {
int blockAddress = 0;
int blockOffset = 0;
// 索引 在索引文件的起始位置
m_indexFile.seek(index * 6L);
// 读出4字节,该值代表block在数据文件的起始位置
blockAddress = m_indexFile.readInt();
// 读出2字节 该值代表Message在block中的偏移量
blockOffset = m_indexFile.readShort() & 0xFFFF;
// 从数据文件的 blockAddress 地址开始访问数据
m_dataFile.seek(blockAddress);
// 4字节里面存的是block块的长度
byte[] buf = new byte[m_dataFile.readInt()];
// 从数据文件中读取整个block到buf数组
m_dataFile.readFully(buf);
DataInputStream in = createDataInputStream(buf);
if (in != null) {
try {
// 跳到block中的偏移量
in.skip(blockOffset);
// 该值代表消息长度
int len = in.readInt();
byte[] data = new byte[len];
// 从block中读取Message
in.readFully(data);
return data;
} finally {
try {
in.close();
} catch (Exception e) {
// ignore it
}
}
} else {
return null;
}
}
听说还有V2版本,分 以一级索引和二级索引,可我拉代码没看到呀