四万字硬刚Kudu | Kudu基础原理实践小总结

2021-09-22 11:48:17 浏览数 (1)

Kudu简介

Hadoop生态系统发展到现在,存储层主要由HDFS和HBase两个系统把持着,一直没有太大突破。在追求高吞吐的批处理场景下,我们选用HDFS,在追求低延迟,有随机读写需求的场景下,我们选用HBase,那么是否存在一种系统,能结合两个系统优点,同时支持高吞吐率和低延迟呢?

有人尝试修改HBase内核构造这样的系统,即保留HBase的数据模型,而将其底层存储部分改为纯列式存储(目前HBase只能算是列簇式存储引擎),但这种修改难度较大。Kudu的出现解决了这一难题。

Kudu是Cloudera开源的列式存储引擎,具有以下几个特点:

  • C 语言开发,Kudu 的 API 可以使用 Java 和 C
  • 高效处理类OLAP负载
  • 与MapReduce,Spark以及Hadoop生态系统中其他组件进行友好集成
  • 可与Cloudera Impala集成,替代目前Impala常用的HDFS Parquet组合
  • 灵活的一致性模型
  • 顺序写和随机写并存的场景下,仍能达到良好的性能
  • 高可用,使用Raft协议保证数据高可靠存储
  • 结构化数据模型

Kudu的出现,有望解决目前Hadoop生态系统难以解决的一大类问题,比如:流式实时计算结果的更新。

时间序列相关应用,具体要求有:

  • 查询海量历史数据
  • 查询个体数据,并要求快速返回
  • 预测模型中,周期性更新模型,并根据历史数据快速做出决策

使用场景

  1. 实时数据更新
  2. 时间序列相关的应用(例如APM),海量历史数据查询(数据顺序扫描),必须非常快地返回关于单个实体的细粒度查询(随机读)。
  3. 实时预测模型的应用(机器学习),支持根据所有历史数据周期地更新模型。

Kudu基本架构

Kudu是典型的主从架构。一个Kudu集群由主节点即Master和若干个从节点即Tablet Server组成。Master负责管理集群的元数据(类似于HBase Master),Tablet Server负责数据存储(类似HBase的RegionServer)。在生产环境,一般部署多个Master实现高可用(奇数个、典型的是3个),Tablet Server一般也是奇数个。

基础概念:

开发语言:C

Columnar Data Store(列式数据存储)

Read Efficiency(高效读取)

对于分析查询,允许读取单个列或该列的一部分同时忽略其他列

  • Data Compression(数据压缩)

由于给定的列只包含一种类型的数据,基于模式的压缩比压缩混合数据类型(在基于行的解决案中使用)时更有效几个数量级。结合从列读取数据的效率,压缩允许您在从磁盘读取更少的块时完成查询

  • Table(表)

一张table是数据存储在 Kudu 的位置。表具有schema和全局有序的primary key(主键)。table被分成很多段,也就是称为tablets。

  • Tablet(段)

一个tablet是一张table连续的segment,与其它数据存储引擎或关系型数据库的partition(分区)相似。给定的tablet冗余到多个tablet服务器上,并且在任何给定的时间点,其中一个副本被认为是leader tablet。任何副本都可以对读取进行服务,并且写入时需要在为tablet服务的一组tablet server之间达成一致性。

一张表分成多个tablet,分布在不同的tablet server中,最大并行化操作Tablet在Kudu中被切分为更小的单元,叫做RowSets,RowSets分为两种MemRowSets和DiskRowSet,MemRowSets每生成32M,就溢写到磁盘中,也就是DiskRowSet

  • Tablet Server

一个tablet server存储tablet和为tablet向client提供服务。对于给定的tablet,一个tablet server充当 leader,其他tablet server充当该 tablet 的follower副本。只有leader服务写请求,然而leader或followers为每个服务提供读请求。leader使用Raft Consensus Algorithm来进行选举 。一个tablet server可以服务多个tablets,并且一个 tablet 可以被多个tablet servers服务着。

  • Master

该master保持跟踪所有的tablets,tablet servers,Catalog Table 和其它与集群相关的metadata。在给定的时间点,只能有一个起作用的master(也就是 leader)。如果当前的 leader 消失,则选举出一个新的master,使用 Raft Consensus Algorithm来进行选举。

master还协调客户端的metadata operations(元数据操作)。例如,当创建新表时,客户端内部将请求发送给master。master将新表的元数据写入catalog table,并协调在tablet server上创建 tablet 的过程。

所有master的数据都存储在一个 tablet 中,可以复制到所有其他候选的 master。tablet server以设定的间隔向master发出心跳(默认值为每秒一次)。master是以文件的形式存储在磁盘中,所以说,第一次初始化集群。需要设定好

  • Raft Consensus Algorithm

Kudu 使用 Raft consensus algorithm 作为确保常规 tablet 和 master 数据的容错性和一致性的手段。通过 Raft,tablet 的多个副本选举出 leader,它负责接受以及复制到 follower 副本的写入。一旦写入的数据在大多数副本中持久化后,就会向客户确认。给定的一组 N 副本(通常为 3 或 5 个)能够接受最多(N - 1)/2 错误的副本的写入。

  • Catalog Table(目录表)

catalog table是Kudu 的 metadata(元数据中)的中心位置。它存储有关tables和tablets的信息。该catalog table(目录表)可能不会被直接读取或写入。相反,它只能通过客户端 API中公开的元数据操作访问。catalog table 存储两类元数据。

  • Tables

table schemas, locations, and states(表结构,位置和状态)

  • Tablets

现有tablet 的列表,每个 tablet 的副本所在哪些tablet server,tablet的当前状态以及开始和结束的keys(键)。

注意:

  1. 建表的时候要求所有的tserver节点都活着
  2. 根据raft机制,允许(replication的副本数-)/ 2宕掉,集群还会正常运行,否则会报错找不到ip:7050(7050是rpc的通信端口号),需要注意一个问题,第一次运行的时候要保证集群处于正常状态下,也就是所有的服务都启动,如果运行过程中,允许(replication的副本数-)/ 2宕掉
  3. 读操作,只要有一台活着的情况下,就可以运行

上图显示了一个具有三个 master 和多个tablet server的Kudu集群,每个服务器都支持多个tablet。它说明了如何使用 Raft 共识来允许master和tablet server的leader和follow。此外,tablet server 可以成为某些 tablet 的 leader,也可以是其他 tablet follower。leader以金色显示,而 follower 则显示为蓝色。

代码语言:javascript复制
测试:
7个tablet server
ssd硬盘,5分钟manul flush到kudu 1000万数据

总结:

  1. KUDU分区数必须预先预定。
  2. 在内存中对每个Tablet分区维护一个MemRowSet来管理最新更新的数据,默认是1G刷新一次或者是2分钟。后Flush到磁盘上形成DiskRowSet,多个DiskRowSet在适当的时候进行归并处理。
  3. 和HBase采用的LSM(LogStructured Merge,很难对数据进行特殊编码,所以处理效率不高)方案不同的是,Kudu对同一行的数据更新记录的合并工作,不是在查询的时候发生的(HBase会将多条更新记录先后Flush到不同的Storefile中,所以读取时需要扫描多个文件,比较rowkey,比较版本等,然后进行更新操作),而是在更新的时候进行,在Kudu中一行数据只会存在于一个DiskRowSet中,避免读操作时的比较合并工作。那Kudu是怎么做到的呢?对于列式存储的数据文件,要原地变更一行数据是很困难的,所以在Kudu中,对于Flush到磁盘上的DiskRowSet(DRS)数据,实际上是分两种形式存在的,一种是Base的数据,按列式存储格式存在,一旦生成,就不再修改,另一种是Delta文件,存储Base数据中有变更的数据,一个Base文件可以对应多个Delta文件,这种方式意味着,插入数据时相比HBase,需要额外走一次检索流程来判定对应主键的数据是否已经存在。因此,Kudu是牺牲了写性能来换取读取性能的提升。

更新、删除操作需要记录到特殊的数据结构里,保存在内存中的DeltaMemStore或磁盘上的DeltaFIle里面。DeltaMemStore是B-Tree实现的,因此速度快,而且可修改。磁盘上的DeltaFIle是二进制的列式的块,和base数据一样都是不可修改的。因此当数据频繁删改的时候,磁盘上会有大量的DeltaFiles文件,Kudu借鉴了Hbase的方式,会定期对这些文件进行合并。

  1. 既然存在Delta数据,也就意味着数据查询时需要同时检索Base文件和Delta文件,这看起来和HBase的方案似乎又走到一起去了,不同的地方在于,Kudu的Delta文件与Base文件不同,不是按Key排序的,而是按被更新的行在Base文件中的位移来检索的,号称这样做,在定位Delta内容的时候,不需要进行字符串比较工作,因此能大大加快定位速度,但是无论如何,Delta文件的存在对检索速度的影响巨大。因此Delta文件的数量会需要控制,需要及时的和Base数据进行合并。由于Base文件是列式存储的,所以Delta文件合并时,可以有选择性的进行,比如只把变化频繁的列进行合并,变化很少的列保留在Delta文件中暂不合并,这样做也能减少不必要的IO开销。
  2. 除了Delta文件合并,DRS自身也会需要合并,为了保障检索延迟的可预测性(这一点是HBase的痛点之一,比如分区发生Major Compaction时,读写性能会受到很大影响),Kudu的compaction策略和HBase相比,有很大不同,kudu的DRS数据文件的compaction,本质上不是为了减少文件数量,实际上Kudu DRS默认是以32MB为单位进行拆分的,DRS的compaction并不减少文件数量,而是对内容进行排序重组,减少不同DRS之间key的overlap(重复),进而在检索的时候减少需要参与检索的DRS的数量。

原理

table与schema

kudu设计是面向结构化存储,因此kudu需要用户在建表时定义它的schema信息,这些schema信息包含:列定义(含类型),Primary Key定义(用户指定的若干个列的有序组合)数据的唯一性,依赖于用户所提供的Primary Key中的Column组合的值的唯一性。Kudu提供了Alter命令来增删列,但位于Primary Key中的列是不允许删除的。

从用户角度来看,kudu是一种存储结构化数据表的存储系统,一个kudu集群中可以定义任意数量table,每个table都需要定义好schema,每个table的列数是确定的,每一列都需要名字和类型,表中可以把一列或者多列定义为主键,kudu更像关系型数据库,但是不支持二级索引。

Kudu存储模型

Kudu的底层数据文件的存储,未采用HDFS这样的较高抽象层次的分布式文件系统,而是自行开发了一套可基于Table/Tablet/Replica视图级别的底层存储系统主要是

1.快速的列式查询 2.快速的随机更新 3.更为稳定的查询性能保障

一张table会分成若干个tablet,每个tablet包括MetaData元信息及若干个RowSet。RowSet包含一个MemRowSet及若干个DiskRowSet,DiskRowSet中包含一个BloomFile、AdhocIndex、BaseData、DeltaMem及若干个RedoFile和UndoFile( UndoFile一般情况下只有一个 )

RowSet组成:

MemRowSet

代码语言:javascript复制
用于新数据insert及已在MemRowSet中的数据的更新,一个MemRowSet写满后会将数据刷到磁盘形成若干个DiskRowSet。默认是1G或者或者120S

DiskRowSet

代码语言:javascript复制
用于老数据的变更,后台定期对DiskRowSet做compaction,以删除没用的数据及合并历史数据,减少查询过程中的IO开销。

BloomFile

代码语言:javascript复制
根据一个DiskRowSet中的key生成一个bloom filter,用于快速模糊定位某个key是否在DiskRowSet中。

AdhocIndex

代码语言:javascript复制
是主键的索引,用于定位key在DiskRowSet中的具体哪个偏移位置

BaseData

代码语言:javascript复制
是MemRowSet flush下来的数据,按列存储,按主键有序。

UndoFile

代码语言:javascript复制
是基于BaseData之前时间的历史数据,通过在BaseData上apply UndoFile中的记录,可以获得历史数据。

RedoFile

代码语言:javascript复制
是基于BaseData之后时间的变更记录,通过在BaseData上apply RedoFile中的记录,可获得较新的数据。

DeltaMem

代码语言:javascript复制
用于DiskRowSet中数据的变更,先写到内存中,写满后flush到磁盘形成
RedoFile

MemRowSets与DiskRowSets的区别:

Kudu

HBase

对比可知,MemRowSets中数据Flush磁盘后,形成DiskRowSets,DiskRowSets中数据32M大小为单位,按序划分一个个DiskRowSet,DiskRowSet中的数据按照Column进行组织,类比Parquet,这是Kudu可支持一些分析性查询的基础,每一个Column存储在一个相邻的数据区域,而这个数据区域进一步细分为一个个小Page单元,与hbase的File中Block类似,对于每个Column Page可以采用一些Encoding算法,以及通用的Compression算法.

对于数据的更新和删除,Kudu与hbase蕾西,通过增加一条新记录来描述数据更新和删除,虽然对于DiskRowSet不可修改,Kudu将DiskRowSet划分两个部分,BaseData,DeltaStores,BaseData负责存储基础数据,DeltaStore负责存储BaseData中变更数据

数据从 MemRowSet 刷到磁盘后就形成了一份 DiskRowSet(只包含 base data),每份DiskRowSet 在内存中都会有一个对应的 DeltaMemStore,负责记录此 DiskRowSet 后续的数据变更映射到每个 row_offset 对应的数据变更。

DeltaMemStore 数据增长到一定程度后转化成二进制文件存储到磁盘,形成一个 DeltaFile,随着base data 对应数据的不断变更,DeltaFile 逐渐增长。下图是DeltaFile生成过程的示意图(更新、删除)。DeltaMemStore 内部维护一个 B-树索引.

Delta数据部分包含REDO与UNDO两部分:这里的REDO与UNDO与关系型数据库中的REDO与UNDO日志类似(在关系型数据库中,REDO日志记录了更新后的数据,可以用来恢复尚未写入DataFile的已成功事务更新的数据。而UNDO日志用来记录事务更新之前的数据,可以用来在事务失败时进行回滚),但也存在一些细节上的差异:

  • REDO Delta Files包含了Base Data自上一次被Flush/Compaction之后的变更值。REDO Delta Files按照Timestamp顺序排列。
  • UNDO Delta Files包含了Base Data自上一次Flush/Compaction之前的变更值。这样才可以保障基于一个旧Timestamp的查询能够看到一个一致性视图。UNDO按照Timestamp倒序排列。

tablet发现过程

Kudu客户端无论在执行写入还是读操作,先从master获取tablet位置信息,这个过程为tablet发现。当创建Kudu客户端时,其会从主服务器上获取tablet位置信息,然后直接与服务于该tablet的服务器进行交谈,为了优化读取和写入路径,客户顿将保留该信息的本地缓存,防止每一个请求都要查询tablet位置信息,随着时间推移,并且当写入被发送不再是tablet的leader服务器时,被拒绝,然后客户顿通过查询主服务器发现新领导者位置来更新缓存。

Kudu目标:

按照cloudera的想法,kudu的出现是为了解决,hbase,parquet不能兼顾分析和更新的需求,所以需要一个新的存储引擎可以同时支持高吞吐的分析应用以及少量更新的应用。cloudera 的设计目标是:

  • Strong performance for both scan and random access to help customers simplify complex hybrid architectures

在扫描和随机访问两种场景下都有很强的性能,帮助客户简化混合架构。

  • High CPU efficiency in order to maximize the return on investment that our customers are making in modern processors

高cpu利用率

  • High IO efficiency in order to leverage modern persistent storage

高io效率充分利用现代存储

  • The ability to update data in place, to avoid extraneous processing and data movement

支持数据原地更新

  • The ability to support active-active replicated clusters that span multiple data centers in geographically distant locations

支持双活复制集群

kudu核心机制:

代码语言:javascript复制
模仿数据库,以二维表的形式组织数据,创建表的时候需要指定schema。所以只支持结构化数据。

每个表指定一个或多个主键。

支持insert/update/delete,这些修改操作全部要指定主键。

read操作,只支持scan原语。

一致性模型,默认支持snapshot ,这个可以保证scan和单个客户端 read-you-writes一致性保证。更强的一致性保证,提供manually propagate timestamps between clients或者commit-wait。

cluster类似hbase简单的M-S结构,master支持备份。

单个表支持水平分割,partitions叫tablets,单行一定在一个tablets里面,支持范围,以及list等更灵活的分区键。

使用Raft 协议,可以根据SLA指定备份块数量。

列式存储

delta flushes,数据先更新到内存中,最后在合并到最终存储中,有专门到后台进程负责。

Lazy Materialization ,对一些选择性谓词,可以帮助跳过很多不必要的数据。

支持和MR/SPARK/IMPALA等集成,支持Locality ,Columnar Projection ,Predicate pushdown 等。

kudu数据读写、更新

先根据要扫描数据的主键范围,定位到目标的Tablets,然后读取Tablets 中的RowSets,在读取每个RowSet时,先根据主键过滤要scan范围,然后加载范围内的BaseData,再找到对应的DeltaMemStores,应用所有变更,最后union上MemRowSet中的内容,返回数据给Client。

当CLient请求写数据时,先根据主键从Master获取要访问的目标Tablets,然后依次到对应的Tablet获取数据。

因为kudu表存在主键约束,所以需要进行主键是否已经存在的判断,这里涉及到之前说的索引结构对读写的优化,一个Tablet中存在多个RowSets,为了提升性能,尽可能减少扫描RowSets数量,首先,我们先通过每个 RowSet 中记录的主键的(最大最小)范围,过滤掉一批不存在目标主键的RowSets,然后在根据RowSet中的布隆过滤器,过滤掉确定不存在目标主键的 RowSets,最后再通过RowSets中的 B-树索引,精确定位目标主键是否存在。

如果主键已经存在,则报错(主键重复),否则就进行写数据(写MemRowSet)。

更新

数据更新的核心是定位到待更新数据的位置,这块与写入的时候类似,就不展开了,等定位到具体位置后,然后将变更写到对应的DeltaMemStore 中。

kudu的模式设计

基于HTAP方式

kudu是基于hbase-hdfs之间,满足高并发的随机读写,兼顾大规模分析处理,具有OLTP以及OLAP特征,因此是典型的HTAP(在线事务处理/在线分析处理混合模式) 早期

由于将OLTP以及OLAP拆分,事务性应用和分析型应用分开,但是分析型应用无法获取最新数据,OLTP横向扩展性不足,维护一套系统复杂度很高

Lambda架构

Lambda架构将工作负载分为实时层和批处理层,我们是用实施层检索和分析最新的数据,使用批处理层分析历史数据。这样会带来两个特别的问题,两套系统、两份代码,开发、运维、测试都很复杂,整个处理链条中有一处出现问题就需要重跑数据

Kudu设计模式

非常易于跟其他组件整合以支持SQL或者进行分布式计算,非常利于从其他关系型数据库迁移数据,数据的读写均匀分散到每个Tablet Server,以充分挖掘集群的潜力(受分区设计影响),扫描时读取查询所需的最少数据量(主要受主键设计影响,但分区设计也会起到重要作用)

好的shema设计取决于要处理数据的特征、对数据的操作以及集群的拓扑结构。Schema设计对于kudu集群性能最大化来说是最重要的事情。shema设计包含三大块:

列设计

每个列选择合适的类型、编码和压缩方式 Kudu的每个列都必须指定明确的数据类型的,非主键可以为null,目前支持的数据类型如下:

Kudu利用强类型列和列式存储格式来提供高效的编码和序列化。为了充分利用这些功能,应将列指定为适当的类型,而不是使用字符串或二进制列来模拟“无模式”表。除了编码之外,Kudu还允许在每列的基础上指定压缩

同HBase不同,kudu没有提供version和timestamp来跟踪行的变化,如果需要的话,需要自行设计一列
Decimal类型

decimal是具有固定刻度和精度的十进制数字类型,适合于财务等算术运算,(float与double不精确有舍入行为)。decimal类型对于大于int64的整数和主键中具有小数值的情况也很有用

精度:表示该列可以表示的总位数,与小数点的位置无关。此值必须介于1和38之间,并且没有默认值。例如,精度为4表示最大值为9999的整数值,或者表示最多99.99带有两个小数位值。您还可以表示相应的负值,而不用对精度进行任何更改。例如,-9999到9999的范围仍然只需要4的精度。

刻度:表示小数位数。该值必须介于0和精度之间。刻度为0会产生整数值,没有小数部分。如果 精度和刻度相等,则所有数字都在小数点后面。例如,精度和刻度等于3的小数可以表示介于-0.999和0.999之间的值

decimal列类型编码默认 性能考虑:Kudu将每个值存储在尽可能少的字节中,具体取决于decimal指定的精度,。因此,不建议为了方便使用最高精度。这样做可能会对性能,内存和存储产生负面影响

在编码和压缩之前: 精度为9或更小的十进制值以4个字节存储。 精度为10到18的十进制值以8个字节存储。 精度大于18的十进制值以16个字节存储。 alter命令不能修改的decimal列的精度和刻度。

列编码

数据类型-编码对照表

编码

Plain

代码语言:javascript复制
数据以其自然格式存储

Bitshuffle

代码语言:javascript复制
重新排列一个值块以存储每个值的最高有效位,然后是第二个最高有效位,依此类推。最后,结果进行LZ4压缩。如果值重复的比较多,或者按主键排序时值的变化很小,Bitshuffle编码是一个不错的选择。

run length

代码语言:javascript复制
对连续的重复值采用压缩存储,主要是通过只存储值和个数。该编码对按主键排序时具有许多连续重复值的列有效。

dictionary

代码语言:javascript复制
创建一个字典存放所有的值,每个列值使用索引进行编码存储。如果值的个数较少,这种方式比较有效。如果RowSet的列值由于唯一值的数量过多而无法
压缩,则Kudu将透明地退回到Plain编码。这在flush期间进行评估计算

prefix

代码语言:javascript复制
在连续的列值中对公共前缀进行压缩。对于有公共前缀的值或主键的第一列有效,因为tablet中的行是通过对主键排序并存储的。
列压缩

Kudu允许列使用LZ4、Snappy或zlib压缩编解码器进行压缩。如果减少存储空间比扫描性能更重要,请考虑使用压缩,每个数据集的压缩方式都不同,但一般来说LZ4是性能最佳的编解码器,而zlib空间压缩比最大。

默认情况下,使用BitLuffle编码的列固有地使用LZ4压缩进行压缩(不建议修改),其他编码默认不进行压缩。

主键设计

每个Kudu表必须声明由一列或多列组成的主键。与RDBMS主键一样,Kudu主键强制执行唯一性,约束。尝试插入具有与现有行相同的主键值的行将导致重复键错误。主键列必须是非可空的,并且不可以是boolean,float或double类型。表创建指定主键后,主键中的列集就不能更改。

与RDBMS不同,Kudu不提供列的自增,因此应用程序必须提供完整的主键,删除和更新时必须指定完整主键。Kudu本身不支持范围删除或更新。即都是通过主键完成操作。

主键值无法修改,但是可以删除后重新插入来变相实现。

主键索引

Kudu中只有主键才会被索引,没有二级索引。

扫描Kudu行时,在主键列上使用等于或范围谓词来找行性能最佳,非主键列在数据量大的情况下性能不好,建议把查询用到的列尽量设置为主键列

主键索引优化可以使扫描跳过个别Tablet,要想使扫描操作跳过很多Tablet需要借助分区设计。

主键索引是有序的,如果主键有多列则按照组合排序,即先按第一列排序,第一列一样则按第二列,排序,以此类推

时间戳主键回填问题

回填场景

kudu每次插入数据的时候会根据主键索引查找主键,判断主键是否存在来决定插入还是报错

1.实时插入

代码语言:javascript复制
数据产生立马就从数据源采集然后入库到Kudu,及时考虑有一段时间的延迟时间戳的范围也很小。这就意味着只有很小范围的主键是“热”的,它们会被频繁使用因此会被缓存在内存里,检查主键唯一性的操作会非常快,入库速度可以轻松达到百万条/秒。

2.导入历史数据(回填)

代码语言:javascript复制
有些场景下我们需要将历史数据一次性导入Kudu,这个时间跨度可能很大,每插入一行都可能命中主键索引的冷数据,该部分主键索引存储在磁盘上,磁盘寻道和IO读写将会瞬间暴增,入库速度极有可能降低到数千条/秒

3.如何解决回填性能问题

代码语言:javascript复制
使主键更具可压缩性主键压缩更小,则相同内存能够被缓存的主键索引就更多,从而减少磁盘IO
使用SSD,随机寻道要比机械旋转磁盘快几个数量级,更改主键结构,以使回填写入命中连续的主键范围

分区设计

kudu中的表被分成很多tablet分布在多个tserver上,每一行属于一个tablet,行划分到哪个tablet由分区决定,分区是在表创建期间设置的。

写入频繁时,考虑将写入动作平衡到所有tablet之间能够有效降低单个tablet的压力,对于小范围扫描操作比较多的情况,如果所扫描的数据都为一个tablet上则可以提高性能。

kudu没有默认分区,建议读写都较重的table可以设置和tserver服务器数量相同的分区数。

kudu提供两种类型的分区:范围分区和哈希分区。表可以有多级分区,组合使用范围和哈希或者多个哈希组合使用。

分区设计好坏由场景 三个维度去考量:

代码语言:javascript复制
1.是否是读热点
2,是否写热点
3.Tablet可扩展性

范围分区

Kudu允许在运行时动态添加和删除范围分区,而不会影响其他分区的可用性。

删除分区将删除属于该分区的平板电脑以及其中包含的数据,后续插入到已删除的分区中将失败。可以添加新分区,但它们不得与任何现有范围分区重叠。

Kudu允许在单个事务更改表操作中删除和添加任意数量的范围分区。

动态添加和删除范围分区对于时间序列特别有用。随着时间的推移,可以添加范围分区以覆盖即将到来的时间范围。例如,存储事件日志的表可以在每个月开始之前添加月份分区,以便保存即将发生的事件,可以删除旧范围分区,根据需要有效的删除历史数据。

哈希分区

哈希分区按哈希值将行分配到存储桶中的一个。在单级散列分区表中,每个桶只对应一个tablet,在表创建期间设置桶的数量。通常,主键列用作要散列的列,但与范围分区一样,可以使用主键列的任何子集。

当不需要对表进行有序访问时,散列分区是一种有效的策略。散列分区对于在tablet之间随机写入非常有效,这有助于缓解热点和不均匀的tablet大小。

多级分区

Kudu允许表在单个表上组合多个级别的分区。零个或多个哈希分区可以与范围分区组合。除了各个分区类型的约束之外,多级分区的唯一附加约束是多级哈希分区不能散列相同的列。

如果使用正确,多级分区可以保留各个分区类型的好处,同时减少每个分区类型的缺点。多级分区表中的tablet总数是每个级别中分区数的乘积。

修剪分区

当通过扫描条件能够完全确定分区的时候,kudu就会自动跳过整个分区的扫描要确定哈希分区,扫描条件必须包含每个哈希列的等值判定条件。多级分区表的扫描可以单独利用每一级的分区界定。

分区案例

代码语言:javascript复制
CREATE TABLE metrics (
 host STRING NOT NULL, -- 主机
 metric STRING NOT NULL, -- 度量指标
  time INT64 NOT NULL, -- 时间戳
 value DOUBLE NOT NULL, -- 值
 PRIMARY KEY (host, metric, time), -- 主键
);

1.采用范围分区

对time列进行范围分区,假如每年对应一个分区,数据包括2014,2015和2016,至少可以使用两种分区方式:有界范围分区和无界范围分区。但是如果后续时间不断增大,导致一个数据写入最后一个tablet中,导致tablet太大,无法容纳单个tablet

2.采用哈希分区

host和 metric列上的哈希分区为四个桶。与之前的范围分区示例不同,此分区策略将均匀地在表中的所有tablet上进行写入,这有助于整体写入吞吐量。扫描特定host和metric可以通过指定等式来利用分区修剪,将扫描的tablet数量减少到一个。使用纯哈希分区策略时要注意的一个问题是,随着越来越多的数据插入表中,tablet可能会无限增长。最终tablet将变得太大,无法容纳单个tablet服务器。

3.哈希 范围组合分区

哈希分区可以最大限度地提高写入吞吐量,而范围分区可以避免无限制的tablets增长问题。这两种策略都可以利用分区修剪来优化不同场景下的扫描。使用多级分区,可以将这两种策略结合起来,以获得两者的好处,同时最大限度地减少每种策略的缺点

4.双哈希组合分区

要没有共同的哈希列,Kudu就可以在同一个表中支持任意数量的散列分区级别。在上面的示例中,表被host散列为4个桶,并将散列分区metric为3个桶,产生12个tablet。尽管在使用此策略时,写入将倾向于在所有Tablet中传播,但与多个独立列上的散列分区相比,它更容易出现热点,因为单个主机或度量标准的所有值将始终属于单个tablet。扫描可以分别利用host和metric列上的等式谓词来修剪分区。

多级散列分区也可以与范围分区相结合,从逻辑上增加了分区的另一个维度。

模式变更

Kudu1.10.0能够支持的模式更改:

代码语言:javascript复制
1.重命名表
2.重命名主键列
3.重命名,添加或删除非主键列
4,添加和删除范围分区

局限性

Kudu目前有一些已知的局限性可能会影响到架构设计

列数

代码语言:javascript复制
默认情况下,Kudu不允许创建超过300列的表。我们建议使用较少列的架构设计以获得最佳性能。

cell大小

代码语言:javascript复制
在编码或压缩之前,单个单元不得大于64KB。在Kudu完成内部复合密钥编码之后,构成复合密钥的单元限制为总共16KB。插入不符合这些限制的行将导致错误返回给客户端

行大小

代码语言:javascript复制
虽然单个单元可能高达64KB,而Kudu最多支持300列,但建议单行不要大于几百KB。

有效标识符

代码语言:javascript复制
表名和列名等标识符必须是有效的UTF-8序列且不超过256个字节。

主键值不可变

代码语言:javascript复制
Kudu不允许更新主键列得值。

不可更改主键列

代码语言:javascript复制
Kudu不允许您在创建表后修改主键列

不可更改的分区

代码语言:javascript复制
除了添加或删除范围分区之外,Kudu不允许您在创建后更改表的分区方式。

不可改变的列类型

代码语言:javascript复制
Kudu不允许更改列的数据类型。

分区拆分

代码语言:javascript复制
创建表后,无法拆分或合并分区.

总结

分区

代码语言:javascript复制
一般哈希 范围分区组合在一起,只有范围分区的情况极少,因为不能避免写热点,除非有哈希分区,典型的例子就是时间序列。

大对象

代码语言:javascript复制
string, binary在未压缩之前不能大于64K,虽然有配置可以调大这个值,但千万不要这么做,避免出现未知错误。
如果确实要存储超过64K的JSON、XML大对象,有两个办法:
1.先对json、XML压缩再存储,编码方式设置为Plain且关闭压缩;
2.如果远远超过64K,则可以把对象保存到HBase或者HDSF中,然后再去Kudu这边保存该对象的"外键",即HBase的Rowkey、HDFS的路径。

decimal(十进制数)

代码语言:javascript复制
Kudu1.7开始的版本推荐用decimal代替float和double,且可以出现在主键中(float和double就不可以),查询性能更佳,且更适合算数运算

不重复的字符串

代码语言:javascript复制
如果一个表的主键只有一个string列推荐采用Prefix压缩;如果是多个string列构成主键,则推荐Plain编码 LZ4压缩

压缩

代码语言:javascript复制
bitshuffle编码的列会自动使用LZ4压缩进行压缩,其他编码的列可以根据情况选择是否采用LZ4压缩,LZ4通常比Snappy快。

对象命名

代码语言:javascript复制
表名和列名都小写可以避免混乱(Impala不区分大小写,API操作区分大小写)。表名必须唯一,如果在Impala中创建内部Kudu表,则表名会默认加上前缀,如impala:default.person

列的数量

代码语言:javascript复制
列数不能超过300个,如果你在迁移数据时确实有300个以上的列,则可以拆分为多个表,每个表都要保留主键,以便可以通过视图将它们合并在一起。

kudu优化

机架感知

Kudu可以知道每个Tablet Server处于哪个数据中心的哪个机架上,副本的负载均衡策略就可以考虑更全面,避免一个tablet的多个副本负载在同一机架,防止机架故障时tablet不可用。

上图中,L0-L2是三个机架,TS0 -TS5是5台Tablet Server,有两张表: A表(副本因子=3),包含A0-A3四个tablets B表(副本因子=5),包含B0-B2三个tablets 如果Kudu配置了机架感知,它就会发现上面的tablet分布违背了相关规则: 副本A0.0和A0.1构成了大多数副本(三分之二),并且位于相同的位置/ L0中,一旦L0机架电源或者交换机故障,将只有L1上的A0.2一个tablet副本可用且无法选择出leader(根据Raft协议必须 n/2 1 个副本正常才可以选举,n=总副本数) B表的大多数副本集中在TS0-TS4,而TS5非常空闲,在即考虑机架分布式由考虑负载均衡的前提下,需要把B表的相关副本往TS5挪一挪 经过手工负载均衡,负载可能会变成如下样子

透明分层存储管理方案
  1. 存储选择方法

Kudu是为快速数据上的快速分析场景而生的,但是Kudu的成本并不低,且扩展性并没有那么好(tserver的个数不能太多) HDFS旨在以低成本实现无限的可扩展性。它针对数据不可更改的面向批处理的场景进行了优化,当使用Parquet文件格式,可以以极高的吞吐量和效率访问结构化数。

对于数据比较小且不断变化的数据(例如维表)通常全部存放到Kudu当数据不会超过Kudu的扩展范围限制,且能够从Kudu的独特功能中受益时(快速变化、快速分析),通常作为大表保存在Kudu。

当数据量非常大,面向批处理且基本不太可能变更的情况下首选以Parquet格式将数据存储在HDFS中(冷数据)

  1. 基于滑动窗口的透明存储管理方案

当需要两个存储层的优势时,滑动窗口模式是一个有用的解决方案。该方案的主要思路是: 使用Impala创建2张表:Kudu表和Parquet 格式的HDSF表这两张表都是按照时间分区的表,分区粒度取决于数据在Kudu表和HDSF表之间迁移的频率,一般是按照年或者月或者日分区,特殊情况下可以更细粒度在Impala另外创建一个统一视图,并使用where字句定义一个边界,由该边界决定哪些数据该从哪个表读取Kudu中变冷的数据分区会定期的被刷写到HDFS(Parquet )数据刷写之后,在HDFS表新增分区、使用原子的ALTER VIEW 操作把视图的边界往前推移

优点:

代码语言:javascript复制
流式数据可立即查询
可以更新迟到的数据或进行手动更正
HDFS中存储的数据具有最佳大小,可提高性能并防止小文件降低成本

2.数据从Kudu迁到HDFS的过程 数据从Kudu迁移到HDFS需要经过下面两个步骤,该过程需要定时自动调度 数据迁移在第一阶段,将现在不变的数据从Kudu复制到HDFS。即使将数据从Kudu复制到HDFS,在视图中定义的边界也将阻止向用户显示重复数据。此步骤应该包含检查机制,以确保成功完成数据的迁移和卸载。

元数据修改

在第二阶段,既然已将数据安全地复制到HDFS,则更改元数据以调整如何显示卸载的分区。这包括向前移动边界,添加下一个时间窗口的新的Kudu分区以及删除旧的Kudu分区。

索引跳跃式扫描优化
代码语言:javascript复制
CREATE TABLE metrics (
 host STRING,
 tstamp INT,
 clusterid INT,
 role STRING,
 PRIMARY KEY (host, tstamp, clusterid)
);

Kudu在内部会创建主键索引(B-tree),跟上表类似,索引数据按所有主键列的组合排序。当用户查询包含第一主键列(host)时,Kudu将使用索引(因为索引数据主要在第一个主键列上排序)

如果用户查询不包含第一个主键列而仅包含tstamp列怎么办?tstamp虽然在固定host下是有序的,但全局是无须的,所以无法使用主键索引。在关系型数据库中一般采用二级索引,但是Kudu并不支持二级索引

tstamp之前的列为“prefix column”,列的具体值为“prefix key”,在咱们的例子中host就是prefix column。在索引中首先按照prefix key排序,相同的prefix key在按照剩余列的值排序,因此可以使用索引跳转到具有不同prefix key且tstamp满足条件的行上

SELECT clusterid FROM metrics WHERE tstamp = 100,其余的是跳过的。Tablet Server使用索引( prefix key (host = helium) tstamp = 100)跳过不匹配的行直接到达第三行并逐步扫描直到不匹配tstamp = 100,就通过下一个prefix key (host = ubuntu) tstamp = 100继续跳过不匹配的行。其余prefix key采用相同的做法,这就叫做Index Skip Scan优化

性能

代码语言:javascript复制
索引跳跃式扫描优化的性能与前缀列(prefix column)的基数(prefix key去重后的数量)负相关
host的基数越低,跳跃扫描性能越高,反之则性能越差。
前缀列基数很高时,索引跳跃式扫描优化就不可取了

在每个tablet一千万行的数据规模下,当【前缀列host基数>sqrt(number_of_rows_in_tablet)】时,索引跳跃式扫描性能开始下降。为了解决这个问题: Kudu目前在【跳跃次数>sqrt(number_of_rows_in_tablet)】时自动禁用跳跃扫描优化

资源规划

在做资源规划是重点考虑的是tserver,master负载要小很多,回顾已知tserver相关的建议和限制如下

选项 最佳性能(建议值) 限制 tablet server数 不超过100 300 tablet数/tablet server(含副本) 1000 4000 tablet数/表/tablet server(含副本) 60 60 单台tablet server存储数据(含副本,压缩后) 8TB 10TB 单tablet存储数据(超过会性能下降、合并失败、启动慢) 10G 50G 单tablet对应CPU核心数(不考虑副本,不考虑小表) 1 多对1 tablet server内存 16G以上最佳 不低于4G

集群规模

代码语言:javascript复制
Master 必须是奇数,3或者5台为佳,7台就多
Tablet Server 取决于数据规模,但最多不超过1000台的规模,以300以内性能最佳

这里有一个预估tserver服务器数量的公式供参考:t=d/(k∗(1-p))∗r

t:tserver数量 d: 以Parquet格式存储的数据总量(可以将一段时间的数据以Parquet格式存储到HDFS上做预估) k: 每个Tablet Server的最大磁盘容量(建议8T) p: 余量,一般0.25 r:tablet副本因子,一般为3

示例:

代码语言:javascript复制
d=120T
K=8T
p=25%
r=3
t=(120 / (8 * (1 - 0.25)))*3 = 60

内存和CPU

磁盘

跟HDFS不一样,Kudu针对SSD做了特别优化,推荐使用SSD

WAL、metadata、data 配置目录 –fs_wal_dir –fs_metadata_dir –fs_data_dirs

网卡

Master和Tablet Server和 2块千兆网卡绑定

性能调优

硬件层面优化

tserver的WAL采用M.2接口(NVMe协议) SSD,Kudu的每一次写入都会先写WAL,WAL是确保数据不丢失的关键,所以一般都会同步写磁盘(顺序写),为了提高性能建议tserver采用M.2接口(NVMe协议)SSD来存储WAL,至少也得是普通SD(master读写压力小,跟操作系统共享SSD即可) –fs_wal_dir=/data/kudu/tserver/wal

数据存储多SSD

tserver负责数据的读写和复制,压力比较大,建议采用多SSD分散读写IO。fs_data_dirs=/disk1/kudu/tserver/data,/disk2/kudu/tserver/data,/disk3/kudu/tserver/data

操作系统层面优化

操作系统会控制每个用户使用的文件描述符和线程数,Kudu作为数据库肯定比一般应用需要更多文件描述符和线程数

如果Kudu使用的线程数超过OS的限制,你会在日志中看到如下报错: pthread_create failed: Resource temporarily unavailable 降低或者禁用swap使用交换区会导致性能下降,建议降低swap的使用 sudo su - echo ‘vm.swappiness=10’>> /etc/sysctl.conf exit 上面参数重启才能生效,可以同时搭配如下命令避免重启: sudo sysctl vm.swappiness=10 cat /proc/sys/vm/swappiness 检查当前是否生效 cat /proc/sys/vm/swappiness

配置调优

tserver内存限制

Tablet Server能使用的最大内存量,tablet Server在批量写入数据时并非实时写入磁盘,而是先Cache在内存中,在flush到磁盘。这个值设置过小时,会造成Kudu数据写入性能显著下降。对于写入性能要求比较高的集群,建议设置更大的值 : –memory_limit_hard_bytes

还有两个软限制: Cgroup 内存软限制,这个限制并不会阻止进程使用超过限额的内存,只是在系统内存不足时,会优先回收超过限额的进程占用的内存,使之向限定值靠拢,当进程试图占用的内存超过了cgroups的限制,会触发out of memory,导致进程被kill掉

代码语言:javascript复制
–memory_limit_soft_percentage=80

tserver维护管理器线程数 Kudu后台对数据进行维护操作,如写入数据时的并发线程数,一般设置为4,建议的是数据目录的3倍

代码语言:javascript复制
–maintenance_manager_num_threads=6

调大tserver block cache容量,分配给Kudu Tablet Server块缓存的最大内存量,建议是2-4G

代码语言:javascript复制
–block_cache_capacity_mb=2048

避免磁盘耗尽,为避免磁盘空间耗尽,应该保留一部分空间:#默认-1,表示保留1%的磁盘空间,自己配置是必须大于0

代码语言:javascript复制
–fs_data_dirs_reserved_bytes

容忍磁盘故障 如果某个tablet的数据分散到更多的磁盘,则数据会更加分散,这个值越小每个tablet的数据会更加集中,不过受磁盘故障影响就越小。

#每个tablet的数据分散到几个目录

代码语言:javascript复制
fs_target_data_dirs_per_tablet=3

Kudu与HBase对比

整体结构
  • HBase的整体结构

HBase的主要组件包括Master,zookeeper服务,RegionServer,HDFS。

Master:用来管理与监控所有的HRegionServer,也是管理HBase元数据的模块。

zookeeper:作为分布式协调服务,用于保存meta表的位置,master的位置,存储RS当前的工作状态。

RegionServer:负责维护Master分配的region,region对应着表中一段区间内的内容,直接接受客户端传来的读写请求。

HDFS:负责最终将写入的数据持久化,并通过多副本复制实现数据的高可靠性。

  • Kudu的整体结构

Kudu集群中存在两种主要组件:

(1)TServer,负责管理Tablet,tablet是负责一张表中某块内容的读写,接收其他TServer中leader tablet传来的同步信息。

(2)Master,集群中的管理节点,用于管理tablet的基本信息,表的信息,并监听TServer的状态。多个Master之间通过Raft协议实现数据同步和高可用。

主要区别

Kudu结构看上去跟HBase差别并不大,主要的区别包括:

1、Kudu将HBase中zookeeper的功能放进了Master内,Kudu中Master的功能比HBase中的Master任务要多一些。

2、Hbase将数据持久化这部分的功能交给了Hadoop中的HDFS,最终组织的数据存储在HDFS上。Kudu自己将存储模块集成在自己的结构中,内部的数据存储模块通过Raft协议来保证leader Tablet和replica Tablet内数据的强一致性,和数据的高可靠性。为什么不像HBase一样,利用HDFS来实现数据存储,猜测可能是因为HDFS读小文件时的时延太大,所以Kudu自己重新完成了底层的数据存储模块,并将其集成在TServer中。

数据存储方式
  • HBase

HBase是一款Nosql数据库,典型的KV系统,没有固定的schema模式,建表时只需指定一个或多个列族名即可,一个列族下面可以增加任意个列限定名。一个列限定名代表了实际中的一列,HBase将同一个列族下面的所有列存储在一起,所以HBase是一种面向列族式的数据库。

HBase将每个列族中的数据分别存储,一个列族中的每行数据中,将rowkey、列族名、列名、timestamp组成最终存取的key值,另外为了支持修改,删除,增加了一个表征该行数据是否删除的标记。在同一个列族中的所有数据,按照rowkey:columnfamily:columnQulifier:timestamp组成的key值大小进行升序排列,其中rowkey、columnfamily、columnQulifier采用的是字典顺序,其值越大,key越大,而timestamp是值越大,key越小。HBase通过按照列族分开存储,相对于行式存储能够实现更高的压缩比,这也是其比较重要的一个特性。

HBase对一行数据进行更新时,HBase也是相当于插入一行新数据,在读数据时HBase按照timestamp的大小得到经过更新过的最新数据。

  • Kudu

Kudu是一种完全的列式存储引擎,表中的每一列数据都是存放在一起,列与列之间都是分开的。

为了能够保存一部分历史数据,并实现MVCC,Kudu将数据分为三个部分。一个部分叫做base data,是当前的数据;第二个部分叫做UNDO records,存储的是从插入数据时到形成base data所进行的所有修改操作,修改操作以一定形式进行组织,实现快速查看历史数据;第三个部分是REDO records,存储的是还未merge到当前数据中的更新操作。下图中表示的是在Kudu中插入一条数据、更新数据两个操作的做法,当然做法不唯一,不唯一的原因是Kudu可以选择先不将更新操作合并到base data中。

差异分析

(1)HBase是面向列族式的存储,每个列族都是分别存放的,HBase表设计时,很少使用设计多个列族,大多情况下是一个列族。这个时候的HBase的存储结构已经与行式存储无太大差别了。而Kudu,实现的是一个真正的面向列的存储方式,表中的每一列都是单独存放的;所以HBase与Kudu的差异主要在于类似于行式存储的列族式存储方式与典型的面向列式的存储方式的差异。

(2)HBase是一款NoSQL类型的数据库,对表的设计主要在于rowkey与列族的设计,列的类型可以不指定,因为HBase在实际存储中都会将所有的value字段转换成二进制的字节流。因为不需要指定类型,所以在插入数据的时候可以任意指定列名(列限定名),这样相当于可以在建表之后动态改变表的结构。Kudu因为选择了列式存储,为了更好的提高列式存储的效果,Kudu要求在建表时指定每一列的类型,这样的做法是为了根据每一列的类型设置合适的编码方式,实现更高的数据压缩比,进而降低数据读入时的IO压力。

(3)HBase对每一个cell数据中加入了timestamp字段,这样能够实现记录同一rowkey和列名的多版本数据,另外HBase将数据更新操作、删除操作也是作为一条数据写入,通过timestamp来标记更新时间,type来区分数据是插入、更新还是删除。HBase写入或者更新数据时可以指定timestamp,这样的设置可以完成某些特定的操作。

Kudu也在数据存储中加入了timestamp这个字段,不像HBase可以直接在插入或者更新数据时设置特殊的timestamp值,Kudu的做法是由Kudu内部来控制timestamp的写入。不过Kudu允许在scan的时候设置timestamp参数,使得客户端可以scan到历史数据。

(4)相对于HBase允许多版本的数据存在,Kudu为了提高批量读取数据时的效率,要求设计表时提供一列或者多列组成一个主键,主键唯一,不允许多个相同主键的数据存在。这样的设置下,Kudu不能像HBase一样将更新操作直接转换成插入一条新版本的数据,Kudu的选择是将写入的数据,更新操作分开存储。

(5)当然还有一些其他的行式存储与列式存储之间在不同应用场景下的性能差异。

写入和读取过程

一、HBase

HBase作为一种非常典型的LSM结构的分布式存储系统,是Google bigtable的apache开源版本。经过近10年的发展,HBase已经成为了一个成熟的项目,在处理OLTP型的应用如消息日志,历史订单等应用较适用。在HBase中真正接受客户端读写请求的RegionServer的结构如下图所示:

关于HBase的几个关键点:

(1)在HBase中,充当写入缓存的这个结构叫做Memstore,另外会将写入操作顺序写入HLOG(WAL)中以保证数据不丢失。

(2)为了提高读的性能,HBase在内存中设置了blockcache,blockcache采用LRU策略将最近使用的数据块放在内存中。

(3)作为分布式存储系统,为保证数据不因为集群中机器出现故障而导致数据丢失,HBase将实际数据存放在HDFS上,包括storefile与HLOG。HBase与HDFS低耦合,HBase作为HDFS的客户端,向HDFS读写数据。

1、HBase写过程

(1)客户端通过客户端上保存的RS信息缓存或者通过访问zk得到需要读写的region所在的RS信息;

(2)RS接受客户端写入请求,先将写入的操作写入WAL,然后写入Memstore,这时HBase向客户端确认写入成功;

(3)HBase在一定情况下将Memstore中的数据flush成storefile(可能是Memstore大小达到一定阈值或者region占用的内存超过一定阈值或者手动flush之类的),storefile以HFile的形式存放在HDFS上;

(4)HBase会按照一定的合并策略对HDFS上的storefile进行合并操作,减少storefile的数量。

2、HBase读过程

HBase读数据的过程比较麻烦,原因包括:

(1)HBase采用了LSM-tree的多组件算法作为数据组织方式,这种算法会导致一个region中有多个storefile;

(2)HBase中采用了非原地更新的方式,将更新操作和删除操作转换成插入一条新数据的形式,虽然这样能够较快的实现更新与删除,但是将导致满足指定rowkey,列族、列名要求的数据有多个,并且可能分布在不同的storefile中;

(3)HBase中允许设置插入和删除数据行的timestamp属性,这样导致按顺序落盘的storefile内数据的timestamp可能不是递增的。

下面介绍从HBase中读取一条指定(rowkey,column family,column)

(1)读过程与HBase客户端写过程第一步一样,先尝试获取需要读的region所在的RS相关信息;

(2)RS接收读请求,因为HBase中支持多版本数据(允许存在rowkey、列族名、列名相同的数据,不同版本的数据通过timestamp进行区分),另外更新与删除数据都是通过插入一条新数据实现的。所以要准确的读到数据,需要找到所有可能存储有该条数据的位置,包括在内存中未flush的memstore,已经flush到HDFS上的storefile,所以需要在1 memstore N storefile中查找;

(3)在找到的所有数据中通过判断timestamp值得到最终的数据。

  • Kudu

(1)Kudu中的Tablet是负责表中一块内容的读写工作,Tablet由一个或多个Rowset组成。其中有一个Rowset处于内存中,叫做Memrowset,Memrowset主要负责处理新的数据写入请求。DiskRowSet是MemRowset达到一定程序刷入磁盘后生成的,实质上是由一个CFile(Base Data)、多个DeltaFile(UNDO records &REDO records)和位于内存的DeltaMemStore组成。Base data、UNDO records、和REDO records都是不可修改的,DeltaMemStore达到一定大小后会将数据刷入磁盘生成新的REDO records。Kudu后台会有一个类似HBase的compaction线程按照一定的compaction 策略对tablet进行合并处理:

a、将多个DeltaFile(REDO records)合并成一个大的DeltaFile;

b、将多个REDO reccords文件与Base data进行合并,并生成新的UNDO records;

c、将多个DiskRowset之间进行合并,减少DiskRowset的数量。

(2)Kudu将最终的数据存储在本地磁盘上,为了保证数据可靠性,Kudu为一个tablet设置了多个副本(一般为3或5个)。所以一个tablet会由多个TServer负责维护,其中有个副本称为leader tablet,写入的请求只能通过leader tablet来处理,副本之间通过Raft协议保证其他副本与leader tablet的强一致性。

  • 1、Kudu写过程

Kudu与HBase不同,Kudu将写入操作分为两种,一种是插入一条新数据,一种是对一条已插入数据的更新。

1、Kudu插入一条新数据

(1)客户端连接Master获取表的相关信息,包括分区信息,表中所有tablet的信息;

(2)客户端找到负责处理读写请求的tablet所负责维护的TServer。Kudu接受客户端的请求,检查请求是否符合要求(表结构);

(3)Kudu在Tablet中的所有rowset(memrowset,diskrowset)中进行查找,看是否存在与待插入数据相同主键的数据,如果存在就返回错误,否则继续;

(4)Kudu在MemRowset中写入一行新数据,在MemRowset数据达到一定大小时,MemRowset将数据落盘,并生成一个diskrowset用于持久化数据,还生成一个memrowset继续接收新数据的请求。

2、Kudu对原有数据的更新

(1)客户端连接Master获取表的相关信息,包括分区信息,表中所有tablet的信息;

(2)Kudu接受请求,检查请求是否符合要求;

(3)因为待更新数据可能位于memrowset中,也可能已经flush到磁盘上,形成diskrowset。因此根据待更新数据所处位置不同,kudu有不同的做法:

当待更新数据位于memrowset时

a、找到待更新数据所在行,然后将更新操作记录在所在行中一个mutation链表中;在memrowset将数据落盘时,Kudu会将更新合并到base data,并生成UNDO records用于查看历史版本的数据和MVCC,UNDO records实际上也是以DeltaFile的形式存放;

当待更新数据位于DiskRowset中

b、找到待更新数据所在的DiskRowset,每个DiskRowset都会在内存中设置一个DeltaMemStore,将更新操作记录在DeltaMemStore中,在DeltaMemStore达到一定大小时,flush在磁盘,形成Delta并存在方DeltaFile中;

实际上Kudu提交更新时会使用Raft协议将更新同步到其他replica上去,当然如果在memrowset和diskrowset中都没有找到这条数据,那么返回错误给客户端;另外当DiskRowset中的deltafile太多时,Kudu会采用一定的策略对一组deltafile进行合并。

  • 2、Kudu读过程

1、客户端连接Master获取表的相关信息,包括分区信息,表中所有tablet的信息;

2、客户端找到需要读取的数据的tablet所在的TServer,Kudu接受读请求,并记录timestamp信息,如果没有显式指定,那么表示使用当前时间;

3、Kudu找到待读数据的所有相关信息,当目标数据处于memrowset时,根据读取操作中包含的timestamp信息将该timestamp前提交的更新操作合并到base data中,这个更新操作记录在该行数据对应的mutation链表中;

4、当读取的目标数据位于diskrowset中,在所有DeltaFile中找到所有目标数据相关的UNDO record和REDO records,REDO records可能位于多个DeltaFile中,根据读操作中包含的timestamp信息判断是否需要将base data进行回滚或者利用REDO records将base data进行合并更新。

Kudu与HBase在读写上过程中的差异

  • 1、写过程

(1)HBase写的时候,不管是新插入一条数据还是更新数据,都当作插入一条新数据来进行;而Kudu将插入新数据与更新操作分别看待。

(2)Kudu表结构中必须设置一个唯一键,插入数据的时候必须判断一些该数据的主键是否唯一,所以插入的时候其实有一个读的过程;而HBase没有太多限制,待插入数据将直接写进memstore。

(3)HBase实现数据可靠性是通过将落盘的数据写入HDFS来实现,而Kudu是通过将数据写入和更新操作同步在其他副本上实现数据可靠性。

结合以上几点,可以看出Kudu在写的性能上相对HBase有一定的劣势。

  • 2、读过程

(1)在HBase中,读取的数据可能有多个版本,所以需要结合多个storefile进行查询;Kudu数据只可能存在于一个DiskRowset或者MemRowset中,但是因为可能存在还未合并进原数据的更新,所以Kudu也需要结合多个DeltaFile进行查询。

(2)HBase写入或者更新时可以指定timestamp,导致storefile之间timestamp范围的规律性降低,增加了实际查询storefile的数量;Kudu不允许人为指定写入或者更新时的timestamp值,DeltaFile之间timestamp连续,可以更快的找到需要的DeltaFile。

(3)HBase通过timestamp值可以直接取出数据;而Kudu实现多版本是通过保留UNDO records(已经合并过的操作)和REDO records(未合并过的操作)完成的,在一些情况下Kudu需要将base data结合UNDO records进行回滚或者结合REDO records进行合并然后才能得到真正所需要的数据。

结合以上三点可以得出,不管是HBase还是Kudu,在读取一条数据时都需要从多个文件中搜寻相关信息。相对于HBase,Kudu选择将插入数据和更新操作分开,一条数据只可能存在于一个DiskRowset或者memRowset中,只需要搜寻到一个rowset中存在指定数据就不用继续往下找了,用户不能设置更新和插入时的timestamp值,减少了在rowset中DeltaFile的读取数量。这样在scan的情况下可以结合列式存储的优点实现较高的读性能,特别是在更新数量较少的情况下能够有效提高scan性能。

另外,本文在描述HBase读写过程中没有考虑读写中使用的优化技术如Bloomfilter、timestamp range等。其实Kudu中也有使用类似的优化技术来提高读写性能,本文只是简单的分析,因此就不再详细讨论读写过程。

  • 其他差异

HBase:使用的java,内存的释放通过GC来完成,在内存比较紧张时可能引发full GC进而导致服务不稳定;

Kudu:核心模块用的C 来实现,没有full gc的风险。

Kudu在网易的实践

生产实践

实时数据采集场景

实时数据分析中,一些用户行为数据有更新的需求。没有引入Kudu前,用户行为数据会首先通过流式计算引擎写入HBase,但HBase不能支撑聚合分析。为了支撑分析和查询需求,还需要把HBase上的数据通过Spark读取后写入其他OLAP引擎。使用Kudu后,用户行为数据会通过流式计算引擎写入Kudu,由Kudu完成数据更新操作。Kudu可以支持单点查询,也可以配合计算引擎做数据分析。

维表数据关联应用

有些场景中,日志的事件表还需要和MySQL内维度表做关联后进行查询。使用Kudu,可以利用NDC同步工具,将MySQL中数据实时同步导入Kudu,使Kudu内数据表和MySQL中的表保持数据一致。这时Kudu配合计算引擎就可以直接对外提供结果数据,如产生报表和做在线分析等。省去了MySQL中维度表和数据合并的一步,大大提升了效率。

实时数仓ETL

Kudu作为分布式数据存储引擎,可以和Hadoop生态更好结合,因此在生产中我们采用了使用Kudu替换Oracle的做法,提升了扩展性。

ABTEST

在我们的ABTest业务中有两种日志,行为日志和用户分流日志。

架构升级前,我们采用了比较传统的模式,将用户行为日志和用户分流日志分别写入HDFS作为存储的ODS层,通过Spark做清洗、转换后导入HDFS作为存储的DWD层,再通过Spark进行一步清洗、按照时间或其他纬度做简单聚合后写入DWS层。

这个架构的问题是数据产出时间比较长,数据延迟在天级别。业务方需要更及时地拿到ABTest结果。

架构升级后,使用Kafka作为ODS、DWD层存储。Flink在ODS层数据的基础上继续做一层整理和过滤,写入DWD形成明细表数据;DWD层在Flink中做简单聚合后写入DWS层,Kudu在DWS层作为数据存储。

Flink开窗口实时修正实验数据,这一操作在Kudu完成;超出了Flink时间窗口的数据更新则由离线补数据的操作在Kudu中完成修正。

架构升级后,数据延迟大大降低,能够让ABTest业务方更实时地拿到结果。

遇到的问题

问题1: 节点负载不均衡

一些大表场景下会有负载不均衡问题。Kudu不会把range下的哈希分片当作一张表,而是把整个表的分片当成了平等的表进行处理。而在真实使用场景中,range基本是时间字段;需要让range的hash分片更均匀地分布在各节点上,防止数据倾斜。下图是数据倾斜的情况展示:

我们的解决方案是实现了一套优化版本的负载均衡算法,这个算法能够把range表当作单独的表做负载均衡,解决了数据倾斜。下图是优化后效果:

Uploading file...

问题2: 表结构设计复杂

问题3: 没有二级索引,只能通过控制主键顺序和分区键来优化某几种查询模式

问题4: 创建表时需要根据业务场景专门设计表结构

问题2-4,对业务方要求比较高,经常需要专人介入引导业务方导入数据。为了解决问题,我们内部设计了二级索引来解决上述问题。二级索引可以满足查询性能的要求,同时减少用户设计表时候的复杂度:

  • 通过支持二级索引来优化包含非主键列过滤的查询
  • 支持二级索引能够降低业务设计表结构的复杂度 社区对二级索引的支持进度KUDU-2038:Add b-tree or inverted index on value field
Kudu功能展望

BloomFilter

BloomFilter成本较低、效率较高。Join场景下,小表动态生成BloomFilter下推到存储层,防止大表在Join层做数据过滤。最近的Kudu中已经支持了BloomFilter作为过滤条件。

灵活分区哈希

Kudu每个range的hash bucket数量是固定的。考虑到时间和业务增长,在项目实施前期阶段要给Kudu哈希桶数量设置略大,但是数据量较小的场景下过大的分片个数对资源是一种浪费,社区也不推荐hash bucket设置得比较大。期望后续Kudu可以更灵活地适配hash bucket数。

KUDU-2671:Change hash number for range partitioning

多行事务

Kudu暂时不能支持多行事务。目前更新主键需要业务自己实现逻辑检测。

KUDU-2612:Implement multi-row transactions

Flexible Schema

一些业务场景下业务没有唯一主键,但只希望利用Kudu的大批量写入、聚合分析查询的特性。接入业务时Kudu对Schema的要求比较高,一些业务场景无法支持。

KUDU-1879:Support table without a primary key

Spark Streaming Kudu Impala构建预测引擎

1.动态资源分配预测架构图

这个例子的数据通过流式API进入Kafka,然后使用Spark Streaming从Kafka加载数据到Kudu。Kafka允许数据同时进入两个独立的Spark Streaming作业:一个用来进行特征工程;一个用来使用MLlib进行流式预测。预测的结果存储在Kudu中,我们也可以使用Impala或者Spark SQL进行交互式查询,见下图。

下面是一些技术概要:

  • Kafka:Kafka可抽象数据输入,支持扩展,并耦合Spark Streaming框架。Kafka拥有每秒处理百万事件的扩展能力,并能和其他各项技术集成,比如,Spark Streaming。
  • Spark Streaming:Spark Streaming能够处理复杂的流式事件,并且采用Scala编程仅需简单的几行代码即可,也支持Java、Python或者R语言。Spark Streaming提供和Kafka、MLlib(Spark的机器学习库)的集成。Apache Kudu:Kudu支持事件的增量插入,它旨在提供一种基于HDFS(HDFS优势在于大数据存储下的快速扫描能力)和HBase(HBase优势是基于主键的快速插入/查询)之间超存储层。本项目可以采用HBase或者Cassandra,但Kudu为数据分析提供了快速的扫描能力、列式存储架构。
  • Impala:使用Impala可很容易的即席查询。它提供一个查询引擎直接查询加载到Kudu上的数据,并能理解生成模型。作为可选的方案可使用Spark SQL,但这里为了比较使用MADlib库训练的回归模型和使用Saprk MLlib训练的模型,故用Impala。

2.构建实例

现在解释下架构的选择,详细细节如下: 首先,粗略浏览一下流式数据源。通过Kafka来监测文件,tail文件变化发送到Kafka,部分代码见Github。下面给出RSVP内容样例:

代码语言:javascript复制
{"response":"yes","member":{"member_name":"Richard 
Williamson","photo":"http://photos3.meetupstatic.com/photos/member/d/a/4/0/thu
mb_231595872.jpeg","member_id":29193652},"visibility":"public","event":
{"time":1424223000000,"event_url":"http://www.meetup.com/Big-Data-
Science/events/217322312/","event_id":"fbtgdlytdbbc","event_name":"Big Data Science 
@Strata Conference, 
2015"},"guests":0,"mtime":1424020205391,"rsvp_id":1536654666,"group":{"group_name":"Big 
Data 
Science","group_state":"CA","group_city":"Fremont","group_lat":37.52,"group_urlname":"Big-
Data-Science","group_id":3168962,"group_country":"us","group_topics":
[{"urlkey":"data-visualization","topic_name":"Data Visualization"},{"urlkey":"data-
mining","topic_name":"Data Mining"},{"urlkey":"businessintell","topic_name":"Business 
Intelligence"},{"urlkey":"mapreduce","topic_name":"MapReduce"},
{"urlkey":"hadoop","topic_name":"Hadoop"},{"urlkey":"opensource","topic_name":"Open 
Source"},{"urlkey":"r-project-for-statistical-computing","topic_name":"R Project for Statistical 
Computing"},{"urlkey":"predictive-analytics","topic_name":"Predictive Analytics"},
{"urlkey":"cloud-computing","topic_name":"Cloud Computing"},{"urlkey":"big-
data","topic_name":"Big Data"},{"urlkey":"data-science","topic_name":"Data Science"},
{"urlkey":"data-analytics","topic_name":"Data Analytics"},
{"urlkey":"hbase","topic_name":"HBase"},
{"urlkey":"hive","topic_name":"Hive"}],"group_lon":-121.93},"venue":
{"lon":-121.889122,"venue_name":"San Jose Convention Center, Room 
210AE","venue_id":21805972,"lat":37.330341}}

一旦Kafka运行起来,数据从Kafka经过Spark Streaming进入Kudu,代码见这里。

流式作业在Kudu上初始化一个表,接着运行Spark Streaming加载数据到数据表。可以创建一个Impala外部表,并指向Kudu上存储的数据。

代码语言:javascript复制
CREATE EXTERNAL TABLE `kudu_meetup_rsvps` (
`event_id` STRING,
`member_id` INT,
`rsvp_id` INT,
`event_name` STRING,
`event_url` STRING,
`TIME` BIGINT,
`guests` INT,
`member_name` STRING,
`facebook_identifier` STRING,
`linkedin_identifier` STRING,
`twitter_identifier` STRING,
`photo` STRING,
`mtime` BIGINT,
`response` STRING,
`lat` DOUBLE,
`lon` DOUBLE,
`venue_id` INT,
`venue_name` STRING,
`visibility` STRING
)
TBLPROPERTIES(
  'storage_handler' = 'com.cloudera.kudu.hive.KuduStorageHandler',
  'kudu.table_name' = 'kudu_meetup_rsvps',
  'kudu.master_addresses' = 'quickstart.cloudera:7051',
  'kudu.key_columns' = 'event_id, member_id, rsvp_id'
);

紧接着用Impala表查询获得小时RSVP数据:

代码语言:javascript复制
create 
table   rsvps_by_hour as
select  from_unixtime(cast(mtime/1000 as bigint), "yyyy-MM-dd") as mdate 
        ,cast(from_unixtime(cast(mtime/1000 as bigint), "HH") as int) as mhour 
        ,count(*) as rsvp_cnt
from    kudu_meetup_rsvps
group 
by      1,2

有了RSVP数据后可以画随时间的变化图,见下图:

接着可以进行特征工程,为了后续可以直接用Impala建立预测模型:

代码语言:javascript复制
create 
table rsvps_by_hr_training as
select
      case when mhour=0 then 1 else 0 end as hr0
      ,case when mhour=1 then 1 else 0 end as hr1
      ,case when mhour=2 then 1 else 0 end as hr2
      ,case when mhour=3 then 1 else 0 end as hr3
      ,case when mhour=4 then 1 else 0 end as hr4
      ,case when mhour=5 then 1 else 0 end as hr5
      ,case when mhour=6 then 1 else 0 end as hr6
      ,case when mhour=7 then 1 else 0 end as hr7
      ,case when mhour=8 then 1 else 0 end as hr8
      ,case when mhour=9 then 1 else 0 end as hr9
      ,case when mhour=10 then 1 else 0 end as hr10
      ,case when mhour=11 then 1 else 0 end as hr11
      ,case when mhour=12 then 1 else 0 end as hr12
      ,case when mhour=13 then 1 else 0 end as hr13
      ,case when mhour=14 then 1 else 0 end as hr14
      ,case when mhour=15 then 1 else 0 end as hr15
      ,case when mhour=16 then 1 else 0 end as hr16
      ,case when mhour=17 then 1 else 0 end as hr17
      ,case when mhour=18 then 1 else 0 end as hr18
      ,case when mhour=19 then 1 else 0 end as hr19
      ,case when mhour=20 then 1 else 0 end as hr20
      ,case when mhour=21 then 1 else 0 end as hr21
      ,case when mhour=22 then 1 else 0 end as hr22
      ,case when mhour=23 then 1 else 0 end as hr23
      ,case when mdate in ("2015-02-14","2015-02-15") then 1 else 0 end as weekend_day
      ,mdate
      ,mhour
      ,rsvp_cnt
from  rsvps_by_hour;

在Impala上安装MADlib,这样就可以直接在Impala上构建回归模型。

采用MADlib训练回归模型的第一步:

代码语言:javascript复制
select  printarray(linr(toarray(hr0,hr1,hr2,hr3,hr4,hr5,hr6,hr7,hr8,hr9,hr10,hr11,hr12,hr13,hr14, hr15,hr16,hr17,hr18,hr19,hr20,hr21,hr22,hr23,weekend_day), rsvp_cnt))
from    rsvps_by_hr_training;

下面展示回归系数。可看到前面的24个系数显示了一天的按小时趋势,在晚上很少的人在线;最后一个系数是周末,如果是周末的话,系数是负值。

代码语言:javascript复制
Feature Coefficient 
hr0 8037.43 
hr1 7883.93 
hr2 7007.68 
hr3 6851.91 
hr4 6307.91 
hr5 5468.24 
hr6 4792.58 
hr7 4336.91 
hr8 4330.24 
hr9 4360.91 
hr10 4373.24 
hr11 4711.58 
hr12 5649.91 
hr13 6752.24 
hr14 8056.24 
hr15 9042.58 
hr16 9761.37 
hr17 10205.9 
hr18 10365.6 
hr19 10048.6 
hr20 9946.12 
hr21 9538.87 
hr22 9984.37 
hr23 9115.12 
weekend_day -2323.73

通过上述系数进行预测:

代码语言:javascript复制
select       mdate,
             mhour,
             cast(linrpredict(toarray(8037.43, 7883.93, 7007.68, 6851.91, 6307.91, 5468.24, 4792.58, 4336.91, 4330.24, 4360.91, 4373.24, 4711.58, 5649.91, 6752.24, 8056.24, 9042.58, 9761.37, 10205.9, 10365.6, 10048.6, 9946.12, 9538.87, 9984.37, 9115.12, -2323.73), toarray(hr0, hr1, hr2, hr3, hr4, hr5, hr6, hr7, hr8, hr9, hr10, hr11, hr12, hr13, hr14, hr15, hr16, hr17, hr18, hr19, hr20, hr21, hr22, hr23, weekend_day)) as int) as rsvp_cnt_pred,
             rsvp_cnt
from         rsvps_by_hr_testing

按小时对比预测数据和RSVP真实值,由于数据有限,只列出两天的预测。

3.使用Spark MLlib训练模型

下面使用Spark MLlib建立类似的模型,在海量数据下这种方式更优吸引力。 首先,Spark加载JSON文件并使用Spark SQL注册为一张表。你也可以直接从Kudu加载数据,但此列子直接用Spark读取JSON文件。

代码语言:javascript复制
val path = "/home/demo/meetupstream1M.json"
val meetup = sqlContext.read.json(path)
meetup.registerTempTable("meetup")

可以使用Spark SQL运行一个类似在前面Impala中使用的查询语句来获取小时的RSVP数据:

代码语言:javascript复制
val meetup2 = sqlContext.sql("
   select from_unixtime(cast(mtime/1000 as bigint), 'yyyy-MM-dd') as dy,
          case when from_unixtime(cast(mtime/1000 as bigint),'yyyy-MM-dd') in ('2015-02-14','2015-02-15') then 1 else 0 end as weekend_day,
          from_unixtime(cast(mtime/1000 as bigint), 'HH') as hr,
          count(*) as rsvp_cnt
    from  meetup
    where from_unixtime(cast(mtime/1000 as bigint), 'yyyy-MM-dd') >= '2015-10-30'
    group
    by    from_unixtime(cast(mtime/1000 as bigint), 'yyyy-MM-dd'),
          from_unixtime(cast(mtime/1000 as bigint), 'HH')")

接下来,创建特征向量。可以参照前面类的方法做特征工程,但这里介绍一个Andrew Ray的简便方法,使用一句话即可实现特征向量:

代码语言:javascript复制
val meetup3 = meetup2.groupBy("dy","weekend_day","hr","rsvp_cnt").pivot("hr").count().orderBy("dy")

现在有了这些数据,可以训练回归模型了:

代码语言:javascript复制
import org.apache.spark.mllib.regression.RidgeRegressionWithSGD
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
val trainingData = meetup3.map { row =>
      val features = Array[Double](1.0,row(1).toString().toDouble,row(4).toString().toDouble, 
                                   row(5).toString().toDouble,row(6).toString().toDouble,
                                   row(7).toString().toDouble,row(8).toString().toDouble,
                                   row(9).toString().toDouble,row(10).toString().toDouble, 
                                   row(11).toString().toDouble,row(12).toString().toDouble, 
                                   row(13).toString().toDouble,row(14).toString().toDouble, 
                                   row(15).toString().toDouble,row(16).toString().toDouble,
                                   row(17).toString().toDouble,row(18).toString().toDouble,
                                   row(19).toString().toDouble,row(20).toString().toDouble, 
                                   row(21).toString().toDouble,row(22).toString().toDouble, 
                                   row(23).toString().toDouble,row(24).toString().toDouble, 
                                   row(25).toString().toDouble,row(26).toString().toDouble, 
                                   row(27).toString().toDouble)
      LabeledPoint(row(3).toString().toDouble, Vectors.dense(features))
}
trainingData.cache()
val model = new RidgeRegressionWithSGD().run(trainingData)

得到一个新的数据集评分

代码语言:javascript复制
val scores = meetup3.map { row =>
      val features = Vectors.dense(Array[Double](1.0,row(1).toString().toDouble, 
                                                 row(4).toString().toDouble,row(5).toString().toDouble, 
                                                 row(6).toString().toDouble,row(7).toString().toDouble,
                                                 row(8).toString().toDouble,row(9).toString().toDouble,
                                                 row(10).toString().toDouble,row(11).toString().toDouble, 
                                                 row(12).toString().toDouble,row(13).toString().toDouble,
                                                 row(14).toString().toDouble,row(15).toString().toDouble,
                                                 row(16).toString().toDouble,row(17).toString().toDouble,
                                                 row(18).toString().toDouble,row(19).toString().toDouble,
                                                 row(20).toString().toDouble,row(21).toString().toDouble, 
                                                 row(22).toString().toDouble,row(23).toString().toDouble,
                                                 row(24).toString().toDouble,row(25).toString().toDouble, 
                                                 row(26).toString().toDouble,row(27).toString().toDouble))
      (row(0),row(2),row(3), model.predict(features)) 
}

scores.foreach(println)

描述Spark模型结果和真实RSVP数据的对比。

4.使用Spark Streaming建立回归模型

前面的两个例子展示了如何基于批处理数据构建模型和即席查询,现在开始建立一个Spark Streaming回归模型。使用流式的方法建立模型使得我们可以更频繁的更新模型,获取最新的数据,预测也更准确。

这里可能和批处理的方法稍有不同。为了展示使用流式回归模型,这里简单的使用每分钟的RSVP数据(替代前面批量预测中按小时处理)来生成连续的流数据来预测接下来的十分钟内的数据。

首先,使用Kafka来输入数据,代码见这里。这部分代码简单的设置Kafka为输入源,设置topic、broker list和Spark Streaming作为输入参数,它可以连接Kafka并获取数据。

代码语言:javascript复制
def loadDataFromKafka(topics: String,
                           brokerList: String,
                           ssc: StreamingContext): DStream[String] = {
            val topicsSet = topics.split(",").toSet
            val kafkaParams = Map[String, String]("metadata.broker.list" -> brokerList)
            val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
            messages.map(_._2)
     }
     val dstream = loadDataFromKafka(topics, brokerList, ssc)
对DStream进行transform操作获得RSVP值:
     val stream = dstream.transform { rdd =>
     val parsed1 = sqlContext.read.json(rdd)
     parsed1.registerTempTable("parsed1")
     val parsed2 = sqlContext.sql("
            select  m,
                    cnt,
                    mtime
            from    (select   (round(mtime/60000)-("   current_time   "/60000 ))/1000.0 as m,
                              count(*) as cnt,
                              round(mtime/60000) as mtime
                    from      (select distinct * from parsed1) a
                    group
                    by        (round(mtime/60000)-("   current_time   "/60000 ))/1000.0,
                              round(mtime/60000) ) aa
            where   cnt > 20
            ")
     parsed2.rdd
     }
     stream.print()

转换数据结构来训练模型: 一个数据流为训练数据,actl_stream;另一个数据流用来预测,pred_stream。预测数据流为当前训练数据流时刻的下一个10分钟时间间隔。

代码语言:javascript复制
val actl_stream = stream.map(x => 
           LabeledPoint(x(1).toString.toDouble,Vectors.dense(Array(1.0,x(0).toString.toDouble))) ).cache()
     actl_stream.print()
     val pred_stream = stream.map(x => 
           LabeledPoint((x(2).toString.toDouble 10)*60000,Vectors.dense(Array(1.0,x(0).toString.toDouble))) )
     pred_stream.print()

用时间间隔的数据作为特征训练流式模型,这里的场景非常简单,只是为了说明问题。实际的产品模型需要结合前面讨论的按天和周末的模型来提高预测的准确性。

代码语言:javascript复制
val numFeatures = 2
     val model = new StreamingLinearRegressionWithSGD().setInitialWeights(Vectors.zeros(numFeatures)
     model.trainOn(actl_stream)
最后,应用预测模型对下一个时间间隔的数据进行预测:
  val rslt_stream = model.predictOnValues(pred_stream.map(lp => (lp.label, lp.features)))
     rslt_stream.print()

下图为流式模型预测的结果。

假如利用最近十分钟的RSVP数据,可以更好的预测接下来的十分钟左右的数据。将来为了更好的预测需要考虑增加更多的特征来提高模型的健壮性。预测的结果流式的写入Kudu,使用API可以很容易的使用这些预测数据来自动的分配资源。

0 人点赞