时序数据库Influx-IOx源码学习六-2(数据写入)

2021-04-26 10:51:35 浏览数 (1)

InfluxDB是一个由InfluxData开发的开源时序数据库,专注于海量时序数据的高性能读、写、高效存储与实时分析等,在DB-Engines Ranking时序型数据库排行榜上常年排名第一。 InfluxDB可以说是当之无愧的佼佼者,但 InfluxDB CTO Paul 在 2020/12/10 号在博客中发表一篇名为:Announcing InfluxDB IOx – The Future Core of InfluxDB Built with Rust and Arrow的文章,介绍了一个新项目 InfluxDB IOx,InfluxDB 的下一代时序引擎。 接下来,我将连载对于InfluxDB IOx的源码解析过程,欢迎各位批评指正,联系方式见文章末尾。


上一章说到数据写入时的分区机制及分区现有的功能。详情见:https://my.oschina.net/u/3374539/blog/5026139

这一章记录一下数据是怎样进行存储的。


上一章没有细节的介绍数据从Line protocol被解析成了什么样子,在开篇先介绍一下数据被封装后的展示。

转换过程的代码可以参见internal_types/src/entry.rs : 157行中的build_table_write_batch方法; 内部数据结构可以查看: generated_types/protos/influxdata/write/v1/entry.fbs

数据是被层层加码组装出来的:

LP --> TableWriteBatch --> PartitionWrite --> WriteOperations --> Entry --> ShardedEntry --> SequencedEntry

代码语言:javascript复制
sharded_entries:[{
  shard_id: None,
  entry: {
      fb: {
        operation_type: write,
        operation: {
          partition_writes:[{
            key:"2019-05-02 16:00:00",
            table_batches:[
            { 
              name:"myMeasurement",
              columns:[
              {
                name:"fieldKey",
                logical_column_type: Field,
                values_type: StringValues,
                values: { values:["123"] },
                null_mask: None 
              },
              {
                name:"tag1",
                logical_column_type: Tag,
                values_type: StringValues,
                values: { values:["value1"]) },
                null_mask: None 
              },
              {
                name:"tag2",
                logical_column_type: Tag,
                values_type: StringValues,
                values: { values:["value2"]) },
                null_mask: None
              }, 
              { 
                name:"time",
                logical_column_type: Time,
                values_type: I64Values,
                values: { values:[1556813561098000000]) },
                null_mask: None
              }]
            }]
          }]
      } 
    }
  }
}]

数据在内存中就会形成如上格式保存,但要注意,内存中使用的 flatbuffer 格式保存,上面只是为了展示内容。

继续上节里的内容,结构被拼凑完成之后,就会调用write_sharded_entry方法去进行实际写入工作:

代码语言:javascript复制
futures_util::future::try_join_all(
            sharded_entries
                .into_iter()
                 //对每个数据进行写入到shard
                .map(|e| self.write_sharded_entry(&db_name, &db, Arc::clone(&shards), e)),
        )
        .await?;

然后看是怎样写入到shard的,因为shard的写入还没有完成,所以只能关注单机的写入了。具体看代码:

代码语言:javascript复制
 async fn write_sharded_entry(
        &self,
        db_name: &str,
        db: &Db,
        shards: Arc<HashMap<u32, NodeGroup>>,
        sharded_entry: ShardedEntry,
    ) -> Result<()> {
        //判断shard的id是否为null,如果是null就写入本地
        //否则就写入到具体的shard去
        match sharded_entry.shard_id {
            Some(shard_id) => {
                let node_group = shards.get(&shard_id).context(ShardNotFound { shard_id })?;
                //还没有真正的实现,可以看下面的方法
                self.write_entry_downstream(db_name, node_group, &sharded_entry.entry)
                    .await?
            }
            None => self.write_entry_local(&db, sharded_entry.entry).await?,
        }
        Ok(())
    }


    //可以看到还没有实现远程的写入
    async fn write_entry_downstream(
        &self,
        db_name: &str,
        node_group: &[WriterId],
        _entry: &Entry,
    ) -> Result<()> {
        todo!(
            "perform API call of sharded entry {} to one of the nodes {:?}",
            db_name,
            node_group
        )
    }

  //数据对本地写入
   pub async fn write_entry_local(&self, db: &Db, entry: Entry) -> Result<()> {
        //继续往下跟踪
        db.store_entry(entry).map_err(|e| match e {
            db::Error::HardLimitReached {} => Error::HardLimitReached {},
            _ => Error::UnknownDatabaseError {
                source: Box::new(e),
            },
        })?;

        Ok(())
    }

//方法似乎什么都没做,只是增补了clock_value和write_id
//注释上解释到logical clock是一个用来在数据库内部把entry变为有序的字段
pub fn store_entry(&self, entry: Entry) -> Result<()> {
        //生成一个新的结构SequencedEntry并增补字段
        let sequenced_entry = SequencedEntry::new_from_entry_bytes(
            ClockValue::new(self.next_sequence()),
            self.server_id.get(),
            entry.data(),
        ).context(SequencedEntryError)?;
        //关于读缓存相关的配置和实现,先不用管
        if self.rules.read().wal_buffer_config.is_some() {
            todo!("route to the Write Buffer. TODO: carols10cents #1157")
        }
        //继续调用其他方法
        self.store_sequenced_entry(sequenced_entry)
    }

上面的所有方法完成之后,基本的插入数据格式就准备完成了,接下来就是写入内存存储:

代码语言:javascript复制
pub fn store_sequenced_entry(&self, sequenced_entry: SequencedEntry) -> Result<()> {
        //读取出数据库对于写入相关的配置信息
        //包括是否可写、是否超过内存限制等等验证
        let rules = self.rules.read();
        let mutable_size_threshold = rules.lifecycle_rules.mutable_size_threshold;
        if rules.lifecycle_rules.immutable {
            return DatabaseNotWriteable {}.fail();
        }
        if let Some(hard_limit) = rules.lifecycle_rules.buffer_size_hard {
            if self.memory_registries.bytes() > hard_limit.get() {
                return HardLimitReached {}.fail();
            }
        }
        //rust语言中的释放变量
        std::mem::drop(rules);

        //因为是批量写入,所以需要循环
        //partition_writes的数据格式可以参见上面的json数据
        if let Some(partitioned_writes) = sequenced_entry.partition_writes() {
            for write in partitioned_writes {
                let partition_key = write.key();
                //根据之前生成的partition_key来得到或者创建一个partition描述
                let partition = self.catalog.get_or_create_partition(partition_key);
                //这里是拿到一个写锁
                let mut partition = partition.write();
                //更新这个partition最后的插入时间
                //记录这个的目的,代码上并没写明白是做什么用的
                partition.update_last_write_at();
                //找到一个打开的chunk
                //不知道为什么每次都要在所有chunk里搜索一次
                //难道是同时可能有很多个chunk都可以写入?
                let chunk = partition.open_chunk().unwrap_or_else(|| {
                   //否则就创建一个新的chunk出来
                   partition.create_open_chunk(self.memory_registries.mutable_buffer.as_ref())
                });
                //获取一个写锁
                let mut chunk = chunk.write();
                //更新当前chunk的第一条、最后一条写入记录
                chunk.record_write();
                //得到chunk的内存区域,称为mutable_buffer
                let chunk_id = chunk.id();
                let mb_chunk = chunk.mutable_buffer().expect("cannot mutate open chunk");
                //真正的写入到内存中
                mb_chunk
                    .write_table_batches(
                        sequenced_entry.clock_value(),
                        sequenced_entry.writer_id(),
                        &write.table_batches(),
                    )
                    .context(WriteEntry {
                        partition_key,
                        chunk_id,
                    })?;

                //如果当前chunk写入数据的大小超过了设置的限制,就关闭
                //关闭的意思就是把状态制为Closing,并更新关闭时间
                let size = mb_chunk.size();
                if let Some(threshold) = mutable_size_threshold {
                    if size > threshold.get() {
                        chunk.set_closing().expect("cannot close open chunk")
                    }
                }
            }
        }

        Ok(())
    }

再深入的就不继续跟踪了,但是思路还是比较清晰了。

1.分区相关

client --> grpc --> 进行分区shard --> 分区partition

2.写入相关

  • 结构封装

LP --> TableWriteBatch --> PartitionWrite --> WriteOperations --> Entry --> ShardedEntry --> SequencedEntry

  • 内存写入空间

catalog -> partition -> table -> column

  • 达到指定大小后标记为关闭
  • 异步 - 后台线程进行内存整理

到这里基本就完成了所有的写入,并返回给客户端成功。

关于后台线程的内存整理在下一篇中继续介绍。

祝玩儿的开心

0 人点赞