时序数据库Influx-IOx源码学习九(查询主流程)

2021-04-29 15:23:01 浏览数 (1)

InfluxDB是一个由InfluxData开发的开源时序数据库,专注于海量时序数据的高性能读、写、高效存储与实时分析等,在DB-Engines Ranking时序型数据库排行榜上常年排名第一。 InfluxDB可以说是当之无愧的佼佼者,但 InfluxDB CTO Paul 在 2020/12/10 号在博客中发表一篇名为:Announcing InfluxDB IOx – The Future Core of InfluxDB Built with Rust and Arrow的文章,介绍了一个新项目 InfluxDB IOx,InfluxDB 的下一代时序引擎。 接下来,我将连载对于InfluxDB IOx的源码解析过程,欢迎各位批评指正,联系方式见文章末尾。


上一篇粗略的总结了写入的基本流程,详情见:

https://my.oschina.net/u/3374539/blog/5033469

这一篇记录一下查询的主要流程。


在第六章中,写了一个查询示例,如下:

代码语言:javascript复制
 let mut query = flight::Client::new(connection)
        .perform_query("databaseName", "select * from myMeasurement")
        .await
        .expect("query request should work");

其中connection,代表的建立了一个Grpc的连接。perform_query代表执行查询,其中第一个参数是数据库名字,第二个参数是要执行查询的sql语句。这个perform_query是封装了一下调用协议,然后调用了服务器端的do_get方法,do_get方法在服务器的src/influxdb_ioxd/rpc/flight.rs:139行可以找到,如下:

代码语言:javascript复制
async fn do_get(
        &self,
        //这个Ticket里就是保存的perform_query方法中封装的json数据
        request: Request<Ticket>,
    ) -> Result<Response<Self::DoGetStream>, tonic::Status> {
        //这里就是把json还原回来
        let ticket = request.into_inner();
        let json_str = String::from_utf8(ticket.ticket.to_vec()).context(InvalidTicket {
            ticket: ticket.ticket,
        })?;
        //反序列化成了ReadInfo结构
        let read_info: ReadInfo =
            serde_json::from_str(&json_str).context(InvalidQuery { query: &json_str })?;
        //拿到客户端设置的数据库名字
        let database = DatabaseName::new(&read_info.database_name).context(InvalidDatabaseName)?;
        //从内存中查找是否存在这个database名字,如果不存在就会报DatabaseNotFound错误回去
        //这里就是创建数据库的时候写入到内存里的
        //同时还应该记得iox的数据库必须一个节点创建一次。。hhhhha
        let db = self.server.db(&database).context(DatabaseNotFound {
            database_name: &read_info.database_name,
        })?;
        //这个是拿到之前创建数据库时候设置的线程池,可以回去参考第五章
        let executor = db.executor();
        //这里是创建出sql语句对应的physical_plan,后面再看
        let physical_plan = Planner::new(Arc::clone(&executor))
            .sql(db, &read_info.sql_query)
            .await
            .context(Planning)?;

        //使用线程异步的执行查询
        let results = executor
             //复制一下执行时候需要用到的信息
            .new_context()
             //真正的去执行
            .collect(Arc::clone(&physical_plan))
            .await
            .map_err(|e| Box::new(e) as _)
            .context(Query {
                database_name: &read_info.database_name,
            })?;

        //在写入的章节里应该知道了在RBChunk里面存储的是Arrow格式的。
        //在这个方法中就是调用arrow_flight工具包的方法,先把schema序列化到flight_buffer中
        let options = arrow::ipc::writer::IpcWriteOptions::default();
        let schema = physical_plan.schema();
        let schema_flight_data =
            arrow_flight::utils::flight_data_from_arrow_schema(schema.as_ref(), &options);

        let mut flights: Vec<Result<FlightData, tonic::Status>> = vec![Ok(schema_flight_data)];
        //上面得到的结果集,这里进行遍历,封装为要返回的数据结构
        let mut batches: Vec<Result<FlightData, tonic::Status>> = results
            .iter()
            //这个是为了给下面flight_data_from_arrow_batch这个方法打补丁用的
            //因为这个方法即便对于切片类型的batch也是盲目的序列化所有数据
            .map(optimize_record_batch)
            .collect::<Result<Vec<_>, Error>>()?
            .iter()
            //这里就是一条一条的把数据序列化到缓冲区里
            .flat_map(|batch| {
                let (flight_dictionaries, flight_batch) =
                    arrow_flight::utils::flight_data_from_arrow_batch(&batch, &options);
                //把数据包装在Result中
                flight_dictionaries
                    .into_iter()
                    .chain(std::iter::once(flight_batch))
                    .map(Ok)
            })
            .collect();

        //前面是schema,后面是数据
        flights.append(&mut batches);
        //返回一个数据的异步stream,有可能调用一次next就会释放一次cpu?
        let output = futures::stream::iter(flights);
        //数据以flight形式发送到了客户端,客户端先读取schema再读取数据。
        Ok(Response::new(Box::pin(output) as Self::DoGetStream))
    }

这里基本上是整个查询的主逻辑:

  • 异步的将sql转换为plan。
  • 异步的去执行plan并返回结果和结果所对应的schema信息。
  • 将返回的arrow数据封装到flights格式中。
  • 通过Grpc返回

这一篇就到这里吧,下几章准备记录一下:

  1. sql是怎么被执行的
  2. 查询中都经历了什么
  3. 等等。。。

祝玩儿的开心。

0 人点赞