时序数据库Influx-IOx源码学习五(创建数据库)

2021-04-26 10:50:53 浏览数 (1)

上篇介绍到:InfluxDB-IOx的Run命令启动过程,详情见:https://my.oschina.net/u/3374539/blog/5021654

这章记录一下Database create命令的执行过程。


在第三章命令行中介绍了,所有的子命令都有一个独立的参数或配置称为subcommand

代码语言:javascript复制
enum Command {
    Convert { // 省略 ...},
    Meta {// 省略 ...},
    Database(commands::database::Config),
    Run(Box<commands::run::Config>),
    Stats(commands::stats::Config),
    Server(commands::server::Config),
    Writer(commands::writer::Config),
    Operation(commands::operations::Config),
}

这章我们打开看一眼commands::database下的config包含了什么。

代码语言:javascript复制
pub struct Config {
    #[structopt(subcommand)]
    command: Command,
}
//见名知意,基本猜测一下就行了,慢慢使用到再回来看
enum Command {
    Create(Create),
    List(List),
    Get(Get),
    Write(Write),
    Query(Query),
    Chunk(chunk::Config),
    Partition(partition::Config),
}

先来看一下create命令的执行。

代码语言:javascript复制
Command::Create(command) => {
            //创建一个grpc的client
            let mut client = management::Client::new(connection);
            //设置基本的配置项
            let rules = DatabaseRules {
                //数据库名字
                name: command.name,
                //内存的各种配置,包含缓存大小,时间等等
                lifecycle_rules: Some(LifecycleRules {
                    //省略。。
                }),
                //设置分区的策略
                partition_template: Some(PartitionTemplate {
                    //省略。。
                }),

                 //其它都填充default
                ..Default::default()
            };
            //使用配置信息创建数据库,这里是生成了一个CreateDatabaseRequest去调用了远程服务器的方法
            client.create_database(rules).await?;

            println!("Ok");
        }

在上一章中提到了grpc的启动,这里就涉及到了之前提到的grpc的框架tonic,在tonic中使用#[tonic::async_trait]了标记一个服务器端的实现开始。我在ide中搜索,可以在src/influxdb_ioxd/rpc/management.rs:50行中找到ManagementService相关的实现。

有关tonic更多的资料请阅读:https://github.com/hyperium/tonic

代码语言:javascript复制
#[tonic::async_trait]
impl<M> management_service_server::ManagementService for ManagementService<M>
where
    M: ConnectionManager   Send   Sync   Debug   'static,
{
    //省略其它方法。。。

 async fn create_database(
        &self,
        //这里就是接收CreateDatabaseRequest的请求
        request: Request<CreateDatabaseRequest>,
    ) -> Result<Response<CreateDatabaseResponse>, Status> {

         //对数据进行一下校验,然后获得在上面配置的rules规则
        let rules: DatabaseRules = request
            .into_inner()
            .rules
            .ok_or_else(|| FieldViolation::required(""))
            .and_then(TryInto::try_into)
            .map_err(|e| e.scope("rules"))?;

        //这里就是在第三章中提到的server_id,如果没配置就会报错了
        let server_id = match self.server.require_id().ok() {
            Some(id) => id,
            None => return Err(NotFound::default().into()),
        };
        //这里就是真正的去创建,在下面继续跟踪
        match self.server.create_database(rules, server_id).await {
            Ok(_) => Ok(Response::new(CreateDatabaseResponse {})),
            Err(Error::DatabaseAlreadyExists { db_name }) => {
                return Err(AlreadyExists {
                    resource_type: "database".to_string(),
                    resource_name: db_name,
                    ..Default::default()
                }
                .into())
            }
            Err(e) => Err(default_server_error_handler(e)),
        }
    }
}

接下来要继续查看数据库真正的被创建出来,我读到这里存在一个问题,文件格式是什么样子的?

代码语言:javascript复制
pub async fn create_database(&self, rules: DatabaseRules, server_id: NonZeroU32) -> Result<()> {
        //检查server_id
        self.require_id()?;
        //把数据库名字存储到内存中,最终保存到一个btreemap中
        let db_reservation = self.config.create_db(rules)?;
        //对数据进行持久化保存
        self.persist_database_rules(db_reservation.rules().clone())
            .await?;
        //启动数据库后台线程,在内存中写入数据库状态
        db_reservation.commit(server_id, Arc::clone(&self.store), Arc::clone(&self.exec));

        Ok(())
    }

来解答上面的疑问,文件是怎样持久化、格式是什么样子的。

代码语言:javascript复制
pub async fn persist_database_rules<'a>(&self, rules: DatabaseRules) -> Result<()> {
        //生成一个新的数据库路径
        let location = object_store_path_for_database_config(&self.root_path()?, &rules.name);
        //序列化DatabaseRules这个pb到byte流
        let mut data = BytesMut::new();
        rules.encode(&mut data).context(ErrorSerializing)?;
        let len = data.len();
        let stream_data = std::io::Result::Ok(data.freeze());
        //将pb的内容进行存储
        self.store
            .put(
                &location,
                futures::stream::once(async move { stream_data }),
                Some(len),
            )
            .await
            .context(StoreError)?;
        Ok(())
    }

这里调用了rules.encode()转换到pb的格式,这里是rust语言的一个方法,实现了From特性的,就得到了一个into的方法,如:impl From<DatabaseRules> for management::DatabaseRules.

到这里数据库的一个描述文件rules.pb就被写入到磁盘中了,路径是启动命令中指定的--data-dir参数路径 --writer-id 数据库名字。

例如,我的启动和创建命令为:

代码语言:javascript复制
./influxdb_iox run --writer-id 1 --object-store file --data-dir ~/influxtest/
./influxdb_iox database create test

那么得到的路径就为:~/influxtest/1/test/rules.pb. 之后可以运行一个pb的脚本来反查rules.pb中的数据内容,如下:

代码语言:javascript复制
$ ./scripts/prototxt decode influxdata.iox.management.v1.DatabaseRules 
    < ~/influxtest/1/test/rules.pb

influxdata/iox/management/v1/service.proto:6:1: warning: Import google/protobuf/field_mask.proto is unused.
name: "test"
partition_template {
  parts {
    time: "%Y-%m-%d %H:00:00"
  }
}
lifecycle_rules {
  mutable_linger_seconds: 300
  mutable_size_threshold: 10485760
  buffer_size_soft: 52428800
  buffer_size_hard: 104857600
  sort_order {
    order: ORDER_ASC
    created_at_time {
    }
  }
}

看到这里已经知道整个生成过程及文件内容。

祝玩儿的开心。

0 人点赞