浅读TiKV源码:Coprocessor

2024-08-06 08:19:54 浏览数 (2)

本文的的源码分析全部基于TiDB6.5来做分析。

前言

前阵子在看TiKV统计信息收集实现的时候,看到了Coprocessor有两个版本的实现:

激起了我的好奇,所以有了这篇文章。

正文

在这里先科普一下:TiKV上的Coprocessor 主要是负责是执行物理算子,为 SQL 计算出中间结果,从而减少 TiDB 的计算和网络开销。这个是绝大多数场景下 Coprocessor 执行的任务。其次则是Analyze(分析表数据,统计、采样表数据信息,持久化后被 TiDB 的优化器采用)和CheckSum(对表数据进行校验,用于导入数据后一致性校验)。

接着我随手一搜社区,社区人员是这么解答的:

TiKV 的 Coprocessor V2 是对 Coprocessor 的升级和改进版本。下面是 Coprocessor V2 和 Coprocessor 的主要区别:

  1. 功能和性能:Coprocessor V2 在功能和性能上进行了优化和改进。它引入了新的执行引擎,提供了更高的执行效率和更低的延迟。Coprocessor V2 还支持更多的操作,如索引扫描、聚合计算等,可以更好地满足复杂查询的需求。
  2. 接口和编程模型:Coprocessor V2 引入了新的接口和编程模型,使得开发者可以更方便地编写和管理 Coprocessor。它提供了更灵活的接口,支持自定义的计算逻辑和数据处理流程。
  3. 兼容性:Coprocessor V2 是对 Coprocessor 的升级,因此在兼容性方面,Coprocessor V2 与 Coprocessor 是不兼容的。如果

作为一名开发者,读完这个答案,好奇心瞬间上来了:

  • 更高的执行效率和更低的延迟是怎么做到的?
  • Coprocessor V2 引入了新的接口和编程模型,使得开发者可以更方便地编写和管理 Coprocessor。它提供了更灵活的接口,支持自定义的计算逻辑和数据处理流程灵活在哪里?

于是我看了社区的issue:github.com/tikv/tikv/i…以及merge request:github.com/tikv/rfcs/p…

心里有了些许b数——这个是用来扩展TiKV生态的,K-V这种存储不止可以用在TiDB的场景,还可以用在别的场景,但在别的场景中如何使用TiKV来取数,可以通过Coprocessor V2来扩展。

接下来,看下两者之间的方法:

代码语言:javascript复制

rust
 代码解读
复制代码
#[async_trait(?Send)]
pub trait RawStorage {
    /// Retrieves the value for a given key from the storage on the current
    /// node. Returns [`Option::None`] if the key is not present in the
    /// database.
    async fn get(&self, key: Key) -> PluginResult<Option<Value>>;

    /// Same as [`RawStorage::get()`], but retrieves values for multiple keys at
    /// once.
    async fn batch_get(&self, keys: Vec<Key>) -> PluginResult<Vec<KvPair>>;

    /// Same as [`RawStorage::get()`], but accepts a `key_range` such that
    /// values for keys in `[key_range.start, key_range.end)` are retrieved.
    /// The upper bound of the `key_range` is exclusive.
    async fn scan(&self, key_range: Range<Key>) -> PluginResult<Vec<Value>>;

    /// Inserts a new key-value pair into the storage on the current node.
    async fn put(&self, key: Key, value: Value) -> PluginResult<()>;

    /// Same as [`RawStorage::put()`], but inserts multiple key-value pairs at
    /// once.
    async fn batch_put(&self, kv_pairs: Vec<KvPair>) -> PluginResult<()>;

    /// Deletes a key-value pair from the storage on the current node given a
    /// `key`. Returns [`Result::Ok]` if the key was successfully deleted.
    async fn delete(&self, key: Key) -> PluginResult<()>;

    /// Same as [`RawStorage::delete()`], but deletes multiple key-value pairs
    /// at once.
    async fn batch_delete(&self, keys: Vec<Key>) -> PluginResult<()>;

    /// Same as [`RawStorage::delete()`], but deletes multiple key-values pairs
    /// at once given a `key_range`. All records with keys in
    /// `[key_range.start, key_range.end)` will be deleted. The upper bound
    /// of the `key_range` is exclusive.
    async fn delete_range(&self, key_range: Range<Key>) -> PluginResult<()>;
}
代码语言:javascript复制

rust
 代码解读
复制代码
pub trait Store: Send {
    /// The scanner type returned by `scanner()`.
    type Scanner: Scanner;

    /// Fetch the provided key.
    fn get(&self, key: &Key, statistics: &mut Statistics) -> Result<Option<Value>>;

    /// Re-use last cursor to incrementally (if possible) fetch the provided
    /// key.
    fn incremental_get(&mut self, key: &Key) -> Result<Option<Value>>;

    /// Take the statistics. Currently only available for `incremental_get`.
    fn incremental_get_take_statistics(&mut self) -> Statistics;

    /// Whether there was data > ts during previous incremental gets.
    fn incremental_get_met_newer_ts_data(&self) -> NewerTsCheckState;

    /// Fetch the provided set of keys.
    fn batch_get(
        &self,
        keys: &[Key],
        statistics: &mut Vec<Statistics>,
    ) -> Result<Vec<Result<Option<Value>>>>;

    /// Retrieve a scanner over the bounds.
    fn scanner(
        &self,
        desc: bool,
        key_only: bool,
        check_has_newer_ts_data: bool,
        lower_bound: Option<Key>,
        upper_bound: Option<Key>,
    ) -> Result<Self::Scanner>;
}

从方法上来看,Coprocessor V2定义了更为全面的方法,开发者可以根据不同的场景来实现操纵TiKV的方法。并且全部支持异步,可以通过指定name来调用对应的Coprocessor实现,对于新增的Coprocessor支持热加载。这么看Coprocessor V2 引入了新的接口和编程模型,使得开发者可以更方便地编写和管理 Coprocessor。它提供了更灵活的接口,支持自定义的计算逻辑和数据处理流程基本属实。

以Get为例,v2中的版本使用了store.incremental_get的实现,这个实现会缓存一个当前的游标,避免每次查数据时在多个版本之间查找最新版本的数据。(见:github.com/tikv/tikv/p…)。另外,store这一层封装比起v1中的storage调用,是更加接近的底层的。无论从实现细节、注释以及模块关系上都可以印证这一点。

从这两个点上来看,更高的执行效率和更低的延迟属实。

但总的来看,整个功能迭代并不完全。在这个功能的描述中:

代码语言:javascript复制

vbnet
 代码解读
复制代码
## Future work

### Official coprocessor

Provide an official coprocessor plugin which defines common primitives like `bool`, `f32`, `UTF8String`, `Array<T>` along with codec for them, moreover, implements simple scanning expressions or even aggregations.

Two reasons for that:

1. To provide an example showing how to build a coprocessor plugin for TiKV and guaranteed to be always up-to-date.

2. To comfort simple use cases to TiKV, so as to avoid forcing the users to develop a new plugin at the beginning.

### Transaction mode

Transaction mode is way more complicated than the RawKV mode. The current picture of txn plugin is an 'interactive' component with the client, which means, the client and the coprocessor will talk to each others during the request being processed. This is especially for resolving locks and coordinating the transaction among multiple coprocessors in multiple TiKV nodes.

### Static mode

Discussed in the section above.

### Migrate `tidb_query`

When static plugin and txn plugin is implemented, we can move the coprocessor specific to TiDB out of TiKV, and the repository of TiDB might be a good new home.

两个比较重要的规划并没有落地,一个是Transaction mode,一个是Migrate tidb_query。大概这类扩展生态更多算是锦上添花的事吧,只有在团队足够空闲的时候才会考虑做这些事。

0 人点赞