InfluxDB是一个由InfluxData开发的开源时序数据库,专注于海量时序数据的高性能读、写、高效存储与实时分析等,在DB-Engines Ranking时序型数据库排行榜上常年排名第一。 InfluxDB可以说是当之无愧的佼佼者,但 InfluxDB CTO Paul 在 2020/12/10 号在博客中发表一篇名为:Announcing InfluxDB IOx – The Future Core of InfluxDB Built with Rust and Arrow的文章,介绍了一个新项目 InfluxDB IOx,InfluxDB 的下一代时序引擎。 接下来,我将连载对于InfluxDB IOx的源码解析过程,欢迎各位批评指正,联系方式见文章末尾。
上一篇粗略的总结了写入的基本流程,详情见:
https://my.oschina.net/u/3374539/blog/5033469
这一篇记录一下查询的主要流程。
在第六章中,写了一个查询示例,如下:
代码语言:javascript复制 let mut query = flight::Client::new(connection)
.perform_query("databaseName", "select * from myMeasurement")
.await
.expect("query request should work");
其中connection,代表的建立了一个Grpc的连接。perform_query代表执行查询,其中第一个参数是数据库名字,第二个参数是要执行查询的sql语句。这个perform_query是封装了一下调用协议,然后调用了服务器端的do_get方法,do_get方法在服务器的src/influxdb_ioxd/rpc/flight.rs:139行
可以找到,如下:
async fn do_get(
&self,
//这个Ticket里就是保存的perform_query方法中封装的json数据
request: Request<Ticket>,
) -> Result<Response<Self::DoGetStream>, tonic::Status> {
//这里就是把json还原回来
let ticket = request.into_inner();
let json_str = String::from_utf8(ticket.ticket.to_vec()).context(InvalidTicket {
ticket: ticket.ticket,
})?;
//反序列化成了ReadInfo结构
let read_info: ReadInfo =
serde_json::from_str(&json_str).context(InvalidQuery { query: &json_str })?;
//拿到客户端设置的数据库名字
let database = DatabaseName::new(&read_info.database_name).context(InvalidDatabaseName)?;
//从内存中查找是否存在这个database名字,如果不存在就会报DatabaseNotFound错误回去
//这里就是创建数据库的时候写入到内存里的
//同时还应该记得iox的数据库必须一个节点创建一次。。hhhhha
let db = self.server.db(&database).context(DatabaseNotFound {
database_name: &read_info.database_name,
})?;
//这个是拿到之前创建数据库时候设置的线程池,可以回去参考第五章
let executor = db.executor();
//这里是创建出sql语句对应的physical_plan,后面再看
let physical_plan = Planner::new(Arc::clone(&executor))
.sql(db, &read_info.sql_query)
.await
.context(Planning)?;
//使用线程异步的执行查询
let results = executor
//复制一下执行时候需要用到的信息
.new_context()
//真正的去执行
.collect(Arc::clone(&physical_plan))
.await
.map_err(|e| Box::new(e) as _)
.context(Query {
database_name: &read_info.database_name,
})?;
//在写入的章节里应该知道了在RBChunk里面存储的是Arrow格式的。
//在这个方法中就是调用arrow_flight工具包的方法,先把schema序列化到flight_buffer中
let options = arrow::ipc::writer::IpcWriteOptions::default();
let schema = physical_plan.schema();
let schema_flight_data =
arrow_flight::utils::flight_data_from_arrow_schema(schema.as_ref(), &options);
let mut flights: Vec<Result<FlightData, tonic::Status>> = vec![Ok(schema_flight_data)];
//上面得到的结果集,这里进行遍历,封装为要返回的数据结构
let mut batches: Vec<Result<FlightData, tonic::Status>> = results
.iter()
//这个是为了给下面flight_data_from_arrow_batch这个方法打补丁用的
//因为这个方法即便对于切片类型的batch也是盲目的序列化所有数据
.map(optimize_record_batch)
.collect::<Result<Vec<_>, Error>>()?
.iter()
//这里就是一条一条的把数据序列化到缓冲区里
.flat_map(|batch| {
let (flight_dictionaries, flight_batch) =
arrow_flight::utils::flight_data_from_arrow_batch(&batch, &options);
//把数据包装在Result中
flight_dictionaries
.into_iter()
.chain(std::iter::once(flight_batch))
.map(Ok)
})
.collect();
//前面是schema,后面是数据
flights.append(&mut batches);
//返回一个数据的异步stream,有可能调用一次next就会释放一次cpu?
let output = futures::stream::iter(flights);
//数据以flight形式发送到了客户端,客户端先读取schema再读取数据。
Ok(Response::new(Box::pin(output) as Self::DoGetStream))
}
这里基本上是整个查询的主逻辑:
- 异步的将sql转换为plan。
- 异步的去执行plan并返回结果和结果所对应的schema信息。
- 将返回的arrow数据封装到flights格式中。
- 通过Grpc返回
这一篇就到这里吧,下几章准备记录一下:
- sql是怎么被执行的
- 查询中都经历了什么
- 等等。。。
祝玩儿的开心。