如何用关系数据库实现 watchable mvcc:Kine 学习笔记

2022-04-16 13:37:18 浏览数 (1)

背景

Kine 目前使用在 k3s: Lightweight Kubernetes 项目中, k3s 的目标是建立一个轻量级的 k8s(完整的 k8s 实在是太重了),以便于部署在 小型机器/边缘机器/物联网机器 上。当然自己学习也可以部署一个。k3s 和 k8s 是兼容的,k3s 主要做的事情还是简化一些插件,合并一些工具到一个 bin 文件等等。

既然要部署到边缘环境,那么 k8s 的核心存储 etcd 也必须要有轻量级的替代方案,比如 sqlite,Kine 就是为这个目的而生。对于 k8s 来说,完全不感知 kine 的不同,因为 kine 做了一层 adapter,实现了 k8s 使用的大部分 etcd 接口(不是完整的 etcd 接口,只是 k8s 用的重要的一部分接口),再将请求改变逻辑,落地到 sqlite 等类似的关系存储中,这样部署 k8s 集群就不用再部署一个沉重而且对于小型集群来说完全没必要的 etcd 集群了。

运行逻辑如下图。

除了 sqlite,kine 目前也支持如 mysql, dqlite(sqlite 高可用版) , postgreSql 等等,具体可参考下图(图片来自:k3s高可用部署实践及其原理 - 极术社区 - 连接开发者与智能计算生态)在这里插入图片描述在这里插入图片描述

目标

上面的背景中讲到,Kine 实际就是用对象存储实现了 etcd 的一些重要接口,如果我们从简单的 kv 存储角度来看,这个实现可能并不复杂。比如:我们只要实现 kv 就行了,使用 mysql 的一个索引作为 key,value 存在另一个字段,这个实现并不复杂。实际上 kine 的实现并不是这样,比这个要复杂很多,为什么会这样呢。要理解这点,我们必须先要理解 etcd 是个什么样的 kv(和 redis、memcache 等 kv 有什么不同),已经这些能力为什么对于 k8s 来说很重要。

Etcd 官方在 这篇文档:etcd versus other key-value stores | etcd 里面对比了 etcd 和 zookeeper, consul, newsql 有什么不同,其中我认为比较重要的几个点是:

  1. 原生支持 MVCC:当然基于 kv 或者 sql 实现 MVCC 并不困难,但是直接的支持总是更方便的【补充一点:etcd 支持获取指定版本的 kv,意味着所有的历史版本 kv 都要被存储,直到到 compact(compact 是另一个问题,即历史版本不断增加会消耗大量的存储,所以需要一种机制把一些很老的历史版本删掉,最保留最近的比如几百个版本)
  2. 支持事务:这对于 sql 存储很常见,但是对于 kv 存储来说就不一定了,etcd 通过一个 txn 函数来支持事务,实际大部分时候是配合 MVCC 使用的。举个例子, 通过 etcd 常见的一种事务操作是 if (k's version == 1) do update value = new_value
  3. ChangeNotification:即 Watch 机制,这点对于 k8s 来说很重要,k8s 是一个描述性的应用系统(区别于命令式,即我们需求一个 pod,k8s 得到了这个需求,会尽快的满足这个需求,达到和需求的状态一致),通知系统让 k8s 的状态应用变得快速方便。这点在 zk 或者 consul 等类似的(用于 coordination)存储中很常见,但是在 sql 存储中则不太方便,需要借助 trigger 或者 binlog 来实现。

了解了这些特点之后,我们的目标就比较明确:基于关系数据库实现支持 MVCC 的一个 kv 系统,这个 kv 需要保留所有的 版本,并且 还能支持 watch change notification。

实现

那么 kine 是怎么实现的呢,首先 kine 把所有的 kv 都存在一个表里面,并且在上层的调用函数里面称之为 "log", 这个不难理解,kine 实际上就是把所有的 kv 修改记录存在了这个表里面。表结构如下 (基于 sqlite):

代码语言:txt复制
CREATE TABLE IF NOT EXISTS kine
(
       id INTEGER PRIMARY KEY AUTOINCREMENT,
       name INTEGER,
       created INTEGER,
       deleted INTEGER,
       create_revision INTEGER,
       prev_revision INTEGER,
       lease INTEGER,
       value BLOB,
       old_value BLOB
)`,
`CREATE INDEX IF NOT EXISTS kine_name_index ON kine (name)`,
`CREATE INDEX IF NOT EXISTS kine_name_id_index ON kine (name,id)`,
`CREATE INDEX IF NOT EXISTS kine_id_deleted_index ON kine (id,deleted)`,
`CREATE INDEX IF NOT EXISTS kine_prev_revision_index ON kine (prev_revision)`,
`CREATE UNIQUE INDEX IF NOT EXISTS kine_name_prev_revision_uindex ON kine (name, prev_revision)`,
`PRAGMA wal_checkpoint(TRUNCATE)

这个表值得关注的有几个点:

  1. id 就是 Revision,对应 etcd 里面的 Revision,始终自增 【各种 version 的概念请参考etcd 中让人头大的 version, revision, createRevision, modRevision - 墨天轮】
  2. name,value 分别死 key、value
  3. created、deleted 标记是一个创建或者删除事件
  4. create_revision 表示这个 key 的创建版本,即这个 key 第一次创建的时候的 id 号(对于创建事件本身,这个字段可不填)
  5. prev_revision 表示这个 key 的上一个版本,即这个 key 上一次被操作时候的 id 号
  6. lease 是租约,类似 ttl
  7. old_value 是更新之前的值

List 实现

Get 是 List 的特殊情况,这里我们不特别讨论

List 主要支持几种情况:

  1. 设置是 prefix,表示把 有 prefix 的 key 都查出来,否则精确匹配
  2. 设置了 startKey,表示查询同时还要过滤 key,需要 key >= startKey, 其他的就不要了
  3. 设置了 revision,表示查出 这个 revision 的key,如果没设置表示查最新的

我们先考虑 revision 没有设置,查最新的,语句如下

代码语言:txt复制
SELECT *
FROM   (SELECT (SELECT Max(rkv.id) AS id
                FROM   kine AS rkv),
               (SELECT Max(crkv.prev_revision) AS prev_revision
                FROM   kine AS crkv
                WHERE  crkv.name = 'compact_rev_key'),
               kv.id AS theid,
               kv.name,
               kv.created,
               kv.deleted,
               kv.create_revision,
               kv.prev_revision,
               kv.lease,
               kv.value,
               kv.old_value
        FROM   kine AS kv
               JOIN (SELECT Max(mkv.id) AS id
                     FROM   kine AS mkv
                     WHERE  mkv.name LIKE ?
                     GROUP  BY mkv.name) AS maxkv
                 ON maxkv.id = kv.id
        WHERE  kv.deleted = 0
                OR ?) AS lkv
ORDER  BY lkv.theid ASC 

这个语句里面有几个点值得关注:

  1. 不管查什么,当前 db 的最大 version 以及最小 version(等价于压缩 key 的 最大prev_revision)都会被返回
  2. 具体的查询部分 里面有个 join ,看上去比较复杂,可以理解为:先按照 key 做 group,然后取 每个 group 里面 id 最大的,这也就是最新的。

再看 revision 设置了情况 (同时设置了 startKey),这种情况稍微复杂一点,语句如下

代码语言:txt复制
SELECT *
FROM   (SELECT (SELECT Max(rkv.id) AS id
                FROM   kine AS rkv),
               (SELECT Max(crkv.prev_revision) AS prev_revision
                FROM   kine AS crkv
                WHERE  crkv.name = 'compact_rev_key'),
               kv.id AS theid,
               kv.name,
               kv.created,
               kv.deleted,
               kv.create_revision,
               kv.prev_revision,
               kv.lease,
               kv.value,
               kv.old_value
        FROM   kine AS kv
               JOIN (SELECT Max(mkv.id) AS id
                     FROM   kine AS mkv
                     WHERE  mkv.name LIKE ?
                            AND mkv.id <= ?
                            AND mkv.id > (SELECT Max(ikv.id) AS id
                                          FROM   kine AS ikv
                                          WHERE  ikv.name = ?
                                                 AND ikv.id <= ?)
                     GROUP  BY mkv.name) AS maxkv
                 ON maxkv.id = kv.id
        WHERE  kv.deleted = 0
                OR ?) AS lkv
ORDER  BY lkv.theid ASC 

这个语句的重点:

  1. 同时设置了 startKey 和 revision, 相当于先把 kv 做一轮筛选,筛选出 版本在 startKey 的 maxId 和 revision Id 之间的 key,然后在类似上面未设置 revision 的查询,group 找最大的。
  2. 为什么 revision 判断是 <= 呢,这里根据 etcd 语义,要找的是在这一个 历史时刻下,db 的 key 情况,所以要用 <=,即返回这个时刻 key 里面 version 最大的,并不一定是说返回的 value 版本一定是 revision
  3. 还有一种情况,查询的版本可能已经被 compact 删掉了,这时候在上层里面会比较 compact version,直接烦 ErrCompacted 错误

Create 实现

即 put,创建一个 kv

create 的实现非常简单,插入一条 log 即可,执行语句如下

代码语言:txt复制
INSERT INTO kine
            (
                NAME,
                created,
                deleted,
                create_revision,
                prev_revision,
                lease,
                value,
                old_value
            )
            VALUES
            (
                ?,
                ?,
                ?,
                ?,
                ?,
                ?,
                ?,
                ?
            )
            returning id

Update 实现

这个 Update 支持 create if not exist,支持 mvcc:对比当前 version,不一致则返回更新失败

Update 操作和 Create 操作非常类似,因为对于 log 实现来讲,这两者没有太大区别,都是插入一个 log 事件。几个小的区别:

  1. Update log 会有 create_revision
  2. Update log 会有 old_value
  3. Create log 会有 create 标识(create log 甚至也可能有 prev_revision,因为 create 这个 key 之前可能有一个 删除事件)

Compact 实现

compact 的作用是删除一些历史无用 kv log(比如有些 key 频繁更新,那么很老的那部分 revision 数据就可以删掉来释放空间了),为 db 节约一些存储空间。实现的方式也比较简单,就是在 kine 里面记录一种特殊的 key,名字叫 "compact_rev_key", 其中保存了 compact 上次操作的 revision,下次 compact 操作的时候 从这个 revision 开始继续查询所有的 kv,把不需要的版本( 已经 delete 的,版本小于待 compact 的 的 prev_revision)都删掉,然后再 更新 compact_rev_key 中的版本号即可。具体的操作如下

代码语言:txt复制
DELETE FROM kine AS kv
        WHERE
            kv.id IN (
                SELECT kp.prev_revision AS id
                FROM kine AS kp
                WHERE
                    kp.name != 'compact_rev_key' AND
                    kp.prev_revision != 0 AND
                    kp.id <= ?
                UNION
                SELECT kd.id AS id
                FROM kine AS kd
                WHERE
                    kd.deleted != 0 AND
                    kd.id <= ?
            )


UPDATE kine
SET    prev_revision = ?
WHERE  NAME = 'compact_rev_key'      

Watch 实现

watch 的实现有点像 binlog,支持从某一个 version 开始 watch(这点在 tcp 连接因为一些异常中断恢复的时候很重要),在 一个 tcp 连接里面不断返回 log 事件。

kine 的实现使用 poll,即定期 list 一组数据(>lastListVersion),然后返回到一个 chan 里面做 scatter filter 之后返回给 watcher。为什么使用 poll 呢,实际上 kine 里面有所有的事件动作,直接把 事件发送进 chan 不行吗?这种操作的问题是:如果 kine 重启,或者多实例部署,那么事件就会有问题。

尽管思路很简单,这里面仍然有一些细节:

  1. 如何尽快返回最近的事件,因为 watch 有间隔,在 kine 中为 每秒一次,kine 的做法是:如果一次查出来事件很多,立刻再查一次;如果有事件 notify 通知,并且这个 notify > last, 立刻再查一次,这里的 notify 来自实际的 Append 操作通知
  2. 当出现事件版本不一致的时候怎么处理,kine 的做法是新建一种特殊的 gap key,填补空缺

总结

Kine 提供了一种思路:使用关系型数据库如何实现一个 watchable 的 mvcc kv 存储,而这种思路并不仅仅适用于 kine 所适配的 k3 场景。

0 人点赞