上篇介绍到:InfluxDB-IOx的Run命令启动过程,详情见:https://my.oschina.net/u/3374539/blog/5021654
这章记录一下Database create
命令的执行过程。
在第三章命令行
中介绍了,所有的子命令都有一个独立的参数或配置称为subcommand
。
enum Command {
Convert { // 省略 ...},
Meta {// 省略 ...},
Database(commands::database::Config),
Run(Box<commands::run::Config>),
Stats(commands::stats::Config),
Server(commands::server::Config),
Writer(commands::writer::Config),
Operation(commands::operations::Config),
}
这章我们打开看一眼commands::database
下的config
包含了什么。
pub struct Config {
#[structopt(subcommand)]
command: Command,
}
//见名知意,基本猜测一下就行了,慢慢使用到再回来看
enum Command {
Create(Create),
List(List),
Get(Get),
Write(Write),
Query(Query),
Chunk(chunk::Config),
Partition(partition::Config),
}
先来看一下create
命令的执行。
Command::Create(command) => {
//创建一个grpc的client
let mut client = management::Client::new(connection);
//设置基本的配置项
let rules = DatabaseRules {
//数据库名字
name: command.name,
//内存的各种配置,包含缓存大小,时间等等
lifecycle_rules: Some(LifecycleRules {
//省略。。
}),
//设置分区的策略
partition_template: Some(PartitionTemplate {
//省略。。
}),
//其它都填充default
..Default::default()
};
//使用配置信息创建数据库,这里是生成了一个CreateDatabaseRequest去调用了远程服务器的方法
client.create_database(rules).await?;
println!("Ok");
}
在上一章中提到了grpc
的启动,这里就涉及到了之前提到的grpc
的框架tonic
,在tonic
中使用#[tonic::async_trait]
了标记一个服务器端的实现开始。我在ide中搜索,可以在src/influxdb_ioxd/rpc/management.rs:50行
中找到ManagementService
相关的实现。
代码语言:javascript复制有关tonic更多的资料请阅读:https://github.com/hyperium/tonic
#[tonic::async_trait]
impl<M> management_service_server::ManagementService for ManagementService<M>
where
M: ConnectionManager Send Sync Debug 'static,
{
//省略其它方法。。。
async fn create_database(
&self,
//这里就是接收CreateDatabaseRequest的请求
request: Request<CreateDatabaseRequest>,
) -> Result<Response<CreateDatabaseResponse>, Status> {
//对数据进行一下校验,然后获得在上面配置的rules规则
let rules: DatabaseRules = request
.into_inner()
.rules
.ok_or_else(|| FieldViolation::required(""))
.and_then(TryInto::try_into)
.map_err(|e| e.scope("rules"))?;
//这里就是在第三章中提到的server_id,如果没配置就会报错了
let server_id = match self.server.require_id().ok() {
Some(id) => id,
None => return Err(NotFound::default().into()),
};
//这里就是真正的去创建,在下面继续跟踪
match self.server.create_database(rules, server_id).await {
Ok(_) => Ok(Response::new(CreateDatabaseResponse {})),
Err(Error::DatabaseAlreadyExists { db_name }) => {
return Err(AlreadyExists {
resource_type: "database".to_string(),
resource_name: db_name,
..Default::default()
}
.into())
}
Err(e) => Err(default_server_error_handler(e)),
}
}
}
接下来要继续查看数据库真正的被创建出来,我读到这里存在一个问题,文件格式是什么样子的?
代码语言:javascript复制pub async fn create_database(&self, rules: DatabaseRules, server_id: NonZeroU32) -> Result<()> {
//检查server_id
self.require_id()?;
//把数据库名字存储到内存中,最终保存到一个btreemap中
let db_reservation = self.config.create_db(rules)?;
//对数据进行持久化保存
self.persist_database_rules(db_reservation.rules().clone())
.await?;
//启动数据库后台线程,在内存中写入数据库状态
db_reservation.commit(server_id, Arc::clone(&self.store), Arc::clone(&self.exec));
Ok(())
}
来解答上面的疑问,文件是怎样持久化、格式是什么样子的。
代码语言:javascript复制pub async fn persist_database_rules<'a>(&self, rules: DatabaseRules) -> Result<()> {
//生成一个新的数据库路径
let location = object_store_path_for_database_config(&self.root_path()?, &rules.name);
//序列化DatabaseRules这个pb到byte流
let mut data = BytesMut::new();
rules.encode(&mut data).context(ErrorSerializing)?;
let len = data.len();
let stream_data = std::io::Result::Ok(data.freeze());
//将pb的内容进行存储
self.store
.put(
&location,
futures::stream::once(async move { stream_data }),
Some(len),
)
.await
.context(StoreError)?;
Ok(())
}
这里调用了rules.encode()
转换到pb
的格式,这里是rust
语言的一个方法,实现了From
特性的,就得到了一个into
的方法,如:impl From<DatabaseRules> for management::DatabaseRules
.
到这里数据库的一个描述文件rules.pb
就被写入到磁盘中了,路径是启动命令中指定的--data-dir
参数路径 --writer-id
数据库名字。
例如,我的启动和创建命令为:
代码语言:javascript复制./influxdb_iox run --writer-id 1 --object-store file --data-dir ~/influxtest/
./influxdb_iox database create test
那么得到的路径就为:~/influxtest/1/test/rules.pb
. 之后可以运行一个pb
的脚本来反查rules.pb
中的数据内容,如下:
$ ./scripts/prototxt decode influxdata.iox.management.v1.DatabaseRules
< ~/influxtest/1/test/rules.pb
influxdata/iox/management/v1/service.proto:6:1: warning: Import google/protobuf/field_mask.proto is unused.
name: "test"
partition_template {
parts {
time: "%Y-%m-%d %H:00:00"
}
}
lifecycle_rules {
mutable_linger_seconds: 300
mutable_size_threshold: 10485760
buffer_size_soft: 52428800
buffer_size_hard: 104857600
sort_order {
order: ORDER_ASC
created_at_time {
}
}
}
看到这里已经知道整个生成过程及文件内容。
祝玩儿的开心。