长文预警,今天介绍一个时间序列管理系统的论文:《ModelarDB: Modular Model-Based Time Series Management with Spark and Cassandra》,三个作者都来自丹麦奥尔堡大学,这三个人在 2017 年 TKDE 有一篇很全面的时序数据库 Survey《Time Series Management Systems: A Survey》。
正文 4616 字,预计阅读时间 12 分钟。
问题背景
工业系统(如风机)产生的数据量太大,无法存储所有原始数据,现在普遍只存储了聚合信息。但是这样会丢失原始数据中的波动和异常值,但是通常这些信息是很宝贵的,可以用来做故障诊断。
时序数据库需要具有的重要性质:分布式,流处理(写入即可见),高压缩,高效检索,模糊查询处理AQP(Approximate Query Processing),可扩展性(不需要修改代码就能增加领域知识)。
于是,这篇文章针对这些问题,做了一个时序数据库 ModelarDB。
源代码在 https://github.com/skejserjensen/ModelarDB
时间序列
时间序列(Time Series):一系列有时间和值的二元组,并且时间维度递增。
比如:(100,28.3)(200,30.7)(300,28.3)(400,28.3)(500,15.2)...
一个有有限个数据点的时间序列叫有界时间序列。
定频时间序列(Regular Time Series):相邻两个时间点的时间间隔相等。
上边那个就是定频的。
采样间隔(Sampling Interval):定频时间序列中两个相邻时间点的时间间隔。
上边那个间隔就是 100。
模型
上边的概念没啥新奇的,重点在模型,这篇文章主要要理解模型是什么:
模型(model):是一个时间序列的表示,包括两个函数(Mest,Merr),第一个函数输入一个时间点,给出一个估计的值。第二个函数输入一个时间序列和第一个函数,给出一个正实数,作为误差估计。
以上边那个包含 5 个点时间序列为例,可以给一个模型:
Mest = -0.0024 * ti 29.5, 1 ≤ i ≤ 5
Merr = max( |vi - Mest(ti)| ), 1 ≤ i ≤ 5
这里 vi 和 ti 就是从原始的时间序列得到的。其实就是用一个一次函数用来估计值,计算每个点的绝对误差,保留最大的那个。
这个模型没问题,但是起码在计算 Merr 时还需要原始时间序列。
间断(GAP):就是一个时间段(ts,te),用来表示一个数据源产生的两段相同采样间隔的定频时间序列中间的间断大小,其中 te = ts m*采样间隔,m大于等于2,也就是至少需要缺一个点,因为一个都不缺时 m 就为1。
像(100,x)(200,x)(400,x)中间就有间断,就是不定频的时间序列。
将不定频的时间序列的GAP用空值填上,就变成了带间断的定频时间序列。
段(Segment):一个段就是一个有界的带间断的定频时间序列,包括几个元素:起始时间,终止时间,采样间隔,空值时间点的集合,模型,误差。
这个 segment 就是最终 boss 了,前边推了那么多就是为了引出 segment,之后系统存储的也是 segment。ModelarDB 只适用于定频时间序列,这是硬伤。
一个有5个点的时间序列,假如第5个点不符合用户定义的错误率,就把前四个用 segment 表示,第五个点等接下来的数据来了之后再创建 segment,如下图示例:
系统架构
说是一个系统,其实是一个 jar 包,这个 jar 包依赖了 Spark 、Spark-Cassandra-Connector 和 Cassandra,实现了他们的接口。
ModelarDB 的架构图如下图,基本包括数据导入模块(生成segment),查询接口,存储接口,还有一个元数据 cache 模块。
这张图说每个 ModelarDB 节点上都有一个 Spark 节点和 Cassandra,保证数据本地性,其实任意一个使用 Spark-Cassandra-Connector 的客户端都能做到这个。
数据流动:通过 segment 生成器给时间序列数据做个转换,选择合适的模型,生成一堆 segment,然后 cache 在内存里,并把旧的 segment 持久化到 Cassandra 里。内存里的和 Cassandra 里的都可以查询。
为啥选 Spark 和 Cassandra?因为都是成熟的分布式系统,天生自带高可用的特性,而且好集成,有现成的扩展接口。这里还提到了一个 Simba 系统,也是基于 Spark 做的一个用来管理时空数据的,跟 ModelarDB 的原理差不多。
使用方式
查询:只需要把 ModelarDB 的 jar 包提交成一个 Spark 作业,Spark 会自动分发 jar 包并行执行,看起来就是分布式时序数据查询。
导入:可以直接 java -jar 启动主函数,里边会自动启动 SparkSession,用 spark local 模式往 Cassandra 里写数据。
容错
作者讨论了一下容错机制,因为集成的现有分布式系统,所以只在系统架构层面考虑,不会考虑细节的东西,比如 Cassandra 里一个节点挂了会怎样。
出错只有三种情况:(1)数据导入时(2)内存中的数据(3)磁盘上的数据。这三种情况分别有不同的解决策略。
(1)第一种是将数据缓存在 kafka 中,这样导入时候 ModelarDB 挂了,数据在 kafka 里还有。虽然解法很鸡贼,跟 ModelarDB 没啥关系,但是确实很实用,在实际场景我也会这么选。
另一种是在多个节点并行导入(作者没有细说,我觉得是将一份数据交给多个节点同时解析,由于 key 相同,最后只会留一份),但是这种会很费资源,没必要。
(2)(3)利用 Spark 和 Cassandra 自带的副本保证安全。Cassandra 的副本可以理解,毕竟是个数据库,Spark 有啥副本?个人觉得是 Spark 的 RDD 的容错机制,一个 RDD 坏了重新从源头算出来。
并且为了保证导入速度,最后作者采用了单节点导入数据,允许丢失一部分。也没用 kafka。容错机制直接用的 Spark 和 Cassandra 的,也没做修改。
其实只是在架构层面讨论了一下容错,实际没额外做工作。这也是利用现有系统的好处,虽然自己没做,但是也是系统的一部分特性。
模型压缩示例
数据导入时候会根据时间序列的特点自动分段,生成多个 segment。论文的重点就是这部分,剩下的都是比较工程化的东西。
ModelarDB 提出的压缩方法在高压缩率和低延迟之间做了平衡。这里的延迟就是流处理中的时间窗口,在本文指代最大不可查点数。
举个例子:
系统分三层,最上层是 segment 生成器,里边有数据点的 buffer,用来接收数据点,实线是缓存的,虚线是被删除的。这里最大延迟设置为 3 个点,也就是最多只能有最近的 2 个点不可见,当第三个点到达时,就需要创建一个临时段(ST)放在内存里,支持查询。
T表示内存中 segment 的最后一个点,上图t3时候产生了一个 segment,复制到 cache 里,这时 t3 之前的点都可见了,当接收到 t4 点的时候还可以继续加到上一个 segment 中,但是还不着急对用户可见,所以先放着,如果当前 segment 又攒够了 3 个点,就再更新到 cache 一次。
如果遇到了一个用户设置的阈值外的离群点,就关闭当前 segment,更新到 cache 中,并且把 buffer 中的删除。segment 的最后这个点为 F。
ye 是 buffer 中还没被 segment 包含的点数。当 cache 中的 segment 达到一定大小就刷到等存储 storage 中。
model-agnostic copression:模型无关的压缩算法
上面的示例只有一个模型。实际算法里支持用多个模型去压缩一个时间序列。多模型的时候,每次产生一个最终 segment 的过程如下:
每次从 TS 里拿一个点先放到 buffer里。尝试加到第一个模型里,当新的点不能被当前模型表示时,就去尝试用下一个模型表示 buffer 里的所有点。如果所有模型都试过了,就选择一个压缩率最高的模型作为最终的segment(SF)放到 cache 中。
看个示例吧,假如 buffer 里有这么几个点,并且三个模型都试了。这里压缩率最高的不一定是表示的点数最多的,可能 model2 压缩率最高,于是就被刷出去了。主要是看谁吃的好,而不是看谁吃的多。
比如第一次 model2 胜出,segment1 被刷到 cache 中了,然后三个模型继续从第四个点开始吃,这次 model3 压缩最好,于是 segment2 又被刷出去了。这里 segment 的编号只是从 1 开始而已,跟 model id 没关系。
这个压缩算法是模型 agnostic 的,其实就是动态选择最佳模型。
模型也是可扩展的,任何人都可以实现 ModelarDB 中模型的接口去扩展模型,比较灵活。
查询模式
ModelarDB 提供两种视图支持查询,第一种是段视图(段ID, 起始时间, 终止时间, 采样间隔, 模型ID, 模型参数),第二种是点视图(段ID, 时间戳, 值)。这两种视图就是两种表结构。sql 也得针对这两种表结构去写。
单点的接口最后也是实现在 segment 之上的。所以可以只考虑 segment 查询。
优化行重组
这是个很工程的东西,用来加速行的重组的。
SparkSQL 中的查询会选择视图中的一些列,交给 ModelarDB 去执行,执行完结果后还需要拼成一行一行的格式返回给 SparkSQL,这基本就是 SparkSQL 的接口。
在每次拼一行数据时,都需要根据 SparkSQL 给我的列名去一个一个找对应的值,这样比较费劲。作者在这里提供了一个函数,这个函数接收一个数据点,直接返回一行。
如何生成这个函数呢?用点视图举例:(段ID, 时间戳, 值),各列下标分别是1,2,3。
首先根据点视图和查询的列名拿到各个列的 index 的拼接,比如我查询的是(时间戳,值),拼接出来就是 23,(值,段ID)= 31。
针对每种组合,手动写这个函数。因为每种视图都不超过 10 列,而且表结构是固定的,所以这个优化方案可行,工作量也还能接受。如果表结构不固定或者行数太多这种方法就不适用了。
底层存储
Cassandra 中表结构是这样的,有三张表,Time Series 存储 segment id 和 采样间隔,Segment 表存储 segment 的信息,model 表存储模型信息。
一个 Time Series 可以对应很多个 segment,一个 model 也可以对应很多个 segment。可以做谓词下推,也是利用了 Spark-Cassandra-Connector 的功能。
对比
压缩率:用模型代替原始数据肯定能压的很好,跟其他流行的时间序列数据库和大数据文件格式做了对比。
写入速度:吊打了其他系统和文件格式,这也没的说的,毕竟 ModelarDB 没存原始点,I/O上的优势比较大。
局限
只支持定频数据,感觉这一点就可以宣布死刑了。
文章开头介绍场景时说工业场景复杂,数据可能缺失、乱序,但是后来没有提乱序的解决方案。
针对一个时间序列,每一段都会尝试所有的模型。也就是写入速度和模型数成正比,候选模型多了会拖慢写入速度,不过作者没提这个事。
个人感觉有损压缩是无法接受的,也没见过实用的数据库是有损的。
总结
这篇文章主要解决空间问题,因为数据量太大,无法存储所有原始数据,这个假设感觉比较鸡肋。确实没有人把有史以来所有数据都存下来,大家都是存近期数据,几天到几年都有。但是这些近期数据基本也够用了。