InfluxDB是一个由InfluxData开发的开源时序数据库,专注于海量时序数据的高性能读、写、高效存储与实时分析等,在DB-Engines Ranking时序型数据库排行榜上常年排名第一。 InfluxDB可以说是当之无愧的佼佼者,但 InfluxDB CTO Paul 在 2020/12/10 号在博客中发表一篇名为:Announcing InfluxDB IOx – The Future Core of InfluxDB Built with Rust and Arrow的文章,介绍了一个新项目 InfluxDB IOx,InfluxDB 的下一代时序引擎。 接下来,我将连载对于InfluxDB IOx的源码解析过程,欢迎各位批评指正,联系方式见文章末尾。
上一章说到数据写入时的分区机制及分区现有的功能。详情见:https://my.oschina.net/u/3374539/blog/5026139
这一章记录一下数据是怎样进行存储的。
上一章没有细节的介绍数据从Line protocol
被解析成了什么样子,在开篇先介绍一下数据被封装后的展示。
转换过程的代码可以参见
internal_types/src/entry.rs : 157行
中的build_table_write_batch
方法; 内部数据结构可以查看:generated_types/protos/influxdata/write/v1/entry.fbs
。
数据是被层层加码组装出来的:
LP --> TableWriteBatch --> PartitionWrite --> WriteOperations --> Entry --> ShardedEntry --> SequencedEntry
sharded_entries:[{
shard_id: None,
entry: {
fb: {
operation_type: write,
operation: {
partition_writes:[{
key:"2019-05-02 16:00:00",
table_batches:[
{
name:"myMeasurement",
columns:[
{
name:"fieldKey",
logical_column_type: Field,
values_type: StringValues,
values: { values:["123"] },
null_mask: None
},
{
name:"tag1",
logical_column_type: Tag,
values_type: StringValues,
values: { values:["value1"]) },
null_mask: None
},
{
name:"tag2",
logical_column_type: Tag,
values_type: StringValues,
values: { values:["value2"]) },
null_mask: None
},
{
name:"time",
logical_column_type: Time,
values_type: I64Values,
values: { values:[1556813561098000000]) },
null_mask: None
}]
}]
}]
}
}
}
}]
数据在内存中就会形成如上格式保存,但要注意,内存中使用的 flatbuffer 格式保存,上面只是为了展示内容。
继续上节里的内容,结构被拼凑完成之后,就会调用write_sharded_entry
方法去进行实际写入工作:
futures_util::future::try_join_all(
sharded_entries
.into_iter()
//对每个数据进行写入到shard
.map(|e| self.write_sharded_entry(&db_name, &db, Arc::clone(&shards), e)),
)
.await?;
然后看是怎样写入到shard的,因为shard的写入还没有完成,所以只能关注单机的写入了。具体看代码:
代码语言:javascript复制 async fn write_sharded_entry(
&self,
db_name: &str,
db: &Db,
shards: Arc<HashMap<u32, NodeGroup>>,
sharded_entry: ShardedEntry,
) -> Result<()> {
//判断shard的id是否为null,如果是null就写入本地
//否则就写入到具体的shard去
match sharded_entry.shard_id {
Some(shard_id) => {
let node_group = shards.get(&shard_id).context(ShardNotFound { shard_id })?;
//还没有真正的实现,可以看下面的方法
self.write_entry_downstream(db_name, node_group, &sharded_entry.entry)
.await?
}
None => self.write_entry_local(&db, sharded_entry.entry).await?,
}
Ok(())
}
//可以看到还没有实现远程的写入
async fn write_entry_downstream(
&self,
db_name: &str,
node_group: &[WriterId],
_entry: &Entry,
) -> Result<()> {
todo!(
"perform API call of sharded entry {} to one of the nodes {:?}",
db_name,
node_group
)
}
//数据对本地写入
pub async fn write_entry_local(&self, db: &Db, entry: Entry) -> Result<()> {
//继续往下跟踪
db.store_entry(entry).map_err(|e| match e {
db::Error::HardLimitReached {} => Error::HardLimitReached {},
_ => Error::UnknownDatabaseError {
source: Box::new(e),
},
})?;
Ok(())
}
//方法似乎什么都没做,只是增补了clock_value和write_id
//注释上解释到logical clock是一个用来在数据库内部把entry变为有序的字段
pub fn store_entry(&self, entry: Entry) -> Result<()> {
//生成一个新的结构SequencedEntry并增补字段
let sequenced_entry = SequencedEntry::new_from_entry_bytes(
ClockValue::new(self.next_sequence()),
self.server_id.get(),
entry.data(),
).context(SequencedEntryError)?;
//关于读缓存相关的配置和实现,先不用管
if self.rules.read().wal_buffer_config.is_some() {
todo!("route to the Write Buffer. TODO: carols10cents #1157")
}
//继续调用其他方法
self.store_sequenced_entry(sequenced_entry)
}
上面的所有方法完成之后,基本的插入数据格式就准备完成了,接下来就是写入内存存储:
代码语言:javascript复制pub fn store_sequenced_entry(&self, sequenced_entry: SequencedEntry) -> Result<()> {
//读取出数据库对于写入相关的配置信息
//包括是否可写、是否超过内存限制等等验证
let rules = self.rules.read();
let mutable_size_threshold = rules.lifecycle_rules.mutable_size_threshold;
if rules.lifecycle_rules.immutable {
return DatabaseNotWriteable {}.fail();
}
if let Some(hard_limit) = rules.lifecycle_rules.buffer_size_hard {
if self.memory_registries.bytes() > hard_limit.get() {
return HardLimitReached {}.fail();
}
}
//rust语言中的释放变量
std::mem::drop(rules);
//因为是批量写入,所以需要循环
//partition_writes的数据格式可以参见上面的json数据
if let Some(partitioned_writes) = sequenced_entry.partition_writes() {
for write in partitioned_writes {
let partition_key = write.key();
//根据之前生成的partition_key来得到或者创建一个partition描述
let partition = self.catalog.get_or_create_partition(partition_key);
//这里是拿到一个写锁
let mut partition = partition.write();
//更新这个partition最后的插入时间
//记录这个的目的,代码上并没写明白是做什么用的
partition.update_last_write_at();
//找到一个打开的chunk
//不知道为什么每次都要在所有chunk里搜索一次
//难道是同时可能有很多个chunk都可以写入?
let chunk = partition.open_chunk().unwrap_or_else(|| {
//否则就创建一个新的chunk出来
partition.create_open_chunk(self.memory_registries.mutable_buffer.as_ref())
});
//获取一个写锁
let mut chunk = chunk.write();
//更新当前chunk的第一条、最后一条写入记录
chunk.record_write();
//得到chunk的内存区域,称为mutable_buffer
let chunk_id = chunk.id();
let mb_chunk = chunk.mutable_buffer().expect("cannot mutate open chunk");
//真正的写入到内存中
mb_chunk
.write_table_batches(
sequenced_entry.clock_value(),
sequenced_entry.writer_id(),
&write.table_batches(),
)
.context(WriteEntry {
partition_key,
chunk_id,
})?;
//如果当前chunk写入数据的大小超过了设置的限制,就关闭
//关闭的意思就是把状态制为Closing,并更新关闭时间
let size = mb_chunk.size();
if let Some(threshold) = mutable_size_threshold {
if size > threshold.get() {
chunk.set_closing().expect("cannot close open chunk")
}
}
}
}
Ok(())
}
再深入的就不继续跟踪了,但是思路还是比较清晰了。
1.分区相关
client --> grpc --> 进行分区shard --> 分区partition
2.写入相关
- 结构封装
LP --> TableWriteBatch --> PartitionWrite --> WriteOperations --> Entry --> ShardedEntry --> SequencedEntry
- 内存写入空间
catalog -> partition -> table -> column
- 达到指定大小后标记为关闭
- 异步 - 后台线程进行内存整理
到这里基本就完成了所有的写入,并返回给客户端成功。
关于后台线程的内存整理在下一篇中继续介绍。
祝玩儿的开心