0. 背景
需求:
- 支持事务
- 吞吐量大,实时统计查询,统计粒度大约在10分钟或者以内
- 有多个数据源,需要先将所有数据源进行聚合成宽表再进行统计查询
- 查询结果可能会集中在同一时间
- 尽量黑盒,业务无感知
1. 整体数仓解决方案
腾讯云数据仓库PostgreSql TDSQL,PingCAP的TiDB,阿里的OceanBase,华为云DWS,都是HTAP的业内常用数仓,可以一站式解决需求。
1.1 云数仓PostgreSQL
https://cloud.tencent.com/document/product/878/18778
云数据仓库 PostgreSQL(Cloud Data Warehouse PostgreSQL)(原 Snova 数据仓库)为您提供简单、快速、经济高效的 PB 级云端数据仓库解决方案。云数据仓库兼容 Greenplum 开源数据仓库,是一种基于 MPP(大规模并行处理)架构的数仓服务。借助于该产品,可以使用丰富的 PostgreSQL 开源生态工具,实现对云数据仓库中海量数据的即席查询分析、ETL 处理及可视化探索,对标华为云DWS;
1.1.1 数据接入
数据接入可使用DataX工具将其他数据源如MySQL、SQL Server、Oracle、PostgreSQL、HDFS、Hive、HBase数据增量更新或者离线更新至云数仓PGSQL。并支持通过SQL方式将需要的数据导入至云数仓PGSQL。若有多个数据源可配置多个DataX任务进行数据接入。
类型 | 数据源 | Reader(读) | Writer(写) | 文档 |
---|---|---|---|---|
RDBMS 关系型数据库 | MySQL | √ | √ | 读 、写 |
Oracle | √ | √ | 读 、写 | |
SQLServer | √ | √ | 读 、写 | |
PostgreSQL | √ | √ | 读 、写 | |
通用RDBMS(支持所有关系型数据库) | √ | √ | 读 、写 | |
NoSQL数据存储 | Hbase0.94 | √ | √ | 读 、写 |
Hbase1.1 | √ | √ | 读 、写 | |
MongoDB | √ | √ | 读 、写 | |
Hive | √ | √ | 读 、写 | |
无结构化数据存储 | TxtFile | √ | √ | 读 、写 |
FTP | √ | √ | 读 、写 | |
HDFS | √ | √ | 读 、写 | |
Elasticsearch | √ | 写 |
https://cloud.tencent.com/document/product/878/47391
https://github.com/HashDataInc/DataX
或者使用COS数据作为外表进行加载与查询,存储时间较长,查询需求较少的冷数据也可迁移至COS以减少成本。
加载COS数据 https://cloud.tencent.com/document/product/878/34875
冷数据迁移 https://cloud.tencent.com/document/product/878/45908
1.1.2 ETL
数仓内进行数据聚合ETL管理可以使用开源组件AirFlow或者azkaban,安装配置完成后可以对数仓内多个表进行实时/离线聚合,这里以AirFlow为例。
https://cloud.tencent.com/document/product/878/47355
1.2 TiDB
TiDB是PingCAP公司自主设计、研发的开源分布式关系型数据库,是一款同时支持在线事务处理与在线分析处理 (Hybrid Transactional and Analytical Processing, HTAP) 的融合型分布式数据库产品,具备水平扩容或者缩容、金融级高可用、实时 HTAP、云原生的分布式数据库、兼容 MySQL 5.7 协议和 MySQL 生态等重要特性。目标是为用户提供一站式 OLTP (Online Transactional Processing)、OLAP (Online Analytical Processing)、HTAP 解决方案。TiDB 适合高可用、强一致要求较高、数据规模较大等各种应用场景。
数据迁移:https://docs.pingcap.com/zh/tidb/stable/migration-overview
2. 开源大数据组件组合方案
2.1 常用框架简介
常用OLAP MPP框架优劣势
业界常用组合方案 Hbase Phoenix 、Kudu impala、 clickhouse 对比如下
2.1 HBASE Phoenix
HBASE在实时大批量查询与写入表现都很优秀,在引入Phoenix后查询方便许多,也能解决一些rowkey设计问题。不过后期运维成本可能会较高。
业务聚合处理: 简单的可以使用Phoenix写SQL直接进行,支持跨多表聚合,复杂的聚合操作可使用spark进行处理;
事务性:HBASE支持对数据进行修改;
扩展与运维:EMR支持一键扩容,可提供运维;
2.2 Kudu impala
Kudu和Impala均是Cloudera贡献给Apache基金会的顶级项目。Kudu作为底层存储,在支持高并发低延迟kv查询的同时,还保持良好的Scan性能,该特性使得其理论上能够同时兼顾OLTP类和OLAP类查询。Impala作为老牌的SQL解析引擎,其面对即席查询(Ad-Hoc Query)类请求的稳定性和速度在工业界得到过广泛的验证,Impala并没有自己的存储引擎,其负责解析SQL,并连接其底层的存储引擎。在发布之初Impala主要支持HDFS,Kudu发布之后,Impala和Kudu更是做了深度集成。
在众多大数据框架中,Impala定位类似Hive,不过Impala更关注即席查询SQL的快速解析,对于执行时间过长的SQL,仍旧是Hive更合适。
对于GroupBy等SQL查询,Impala进行的是内存计算,因而Impala对机器配置要求较高,官方建议内存128G以上,此类问题Hive底层对应的是传统的MapReduce计算框架,虽然执行效率低,但是稳定性好,对机器配置要求也低。
执行效率是Impala的最大优势,对于存储在HDFS中的数据,Impala的解析速度本来就远快于Hive,有了Kudu加成之后,更是如虎添翼,部分查询执行速度差别可达百倍。
2.2.1 对比
区别于Hbase等存储引擎,Kudu有如下优势:
- 快速的OLAP类查询处理速度
- 与MapReduce、Spark等Hadoop生态圈常见系统高度兼容,其连接驱动由官方支持维护
- 与Impala深度集成,相比HDFS Parquet Impala的传统架构,Kudu Impala在绝大多数场景下拥有更好的性能。
- 强大而灵活的一致性模型,允许用户对每个请求单独定义一致性模型,甚至包括强序列一致性。
- 能够同时支持OLTP和OLAP请求,并且拥有良好的性能。
- Kudu集成在ClouderaManager之中,对运维友好。
- 高可用。采用Raft Consensus算法来作为master失败后选举模型,即使选举失败,数据仍然是可读的。
- 支持结构化的数据,纯粹的列式存储,省空间的同时,提供更高效的查询速度。
- Kudu是一个纯粹的列式存储引擎,相比Hbase只是按列存放数据,Kudu的列式存储更接近于Parquet,在支持更高效Scan操作的同时,还占用更小的存储空间。
列式存储有如此优势,主要因为两点:
- 通常意义下的OLAP查询只访问部分列数据,列存储引擎在这种情况下支持按需访问,而索一行中的所有数据。
- 数据按列放一起一般意义来讲会拥有更高的压缩比,这是因为列相同的数据往往拥有更高的相似
Kudu和Hbase有如下两点本质不同
- Kudu的数据模型更像是传统的关系型数据库,Hbase是完全的no-sql设计,一切皆是字节。
- Kudu的磁盘存储模型是真正的列式存储,Kudu的存储结构设计和Hbase区别很大。 综合而言,纯粹的OLTP请求比较适合Hbase,OLTP与OLAP结合的请求适合Kudu。
2.2.2 事务性
Kudu可以保证单行操作的原子性
Kudu不支持多行的事务操作,不支持回滚事务
2.2.3 ETL
在多表聚合ETL可使用impala view创建不同数据源的临时表,再使用实时与离线任务加载不同数据源聚合的宽表,供业务方进行不同的聚合查询统计。
2.3 Apache Cassandra
单看性能,Cassandra还是很强大的,不过和其他数据库不太一样的地方,Cassandra 是一种无主的,反言之即 Cassandra 是一种多主的。多主的意思就是多个节点都可以操作,并不是都转发到一个节点上。在一个节点上很容易加锁,只要对某一行加锁,对所有的请求保持串行就可以了。所以对于独立行写其实是有冲突的,在 Cassandra 里面解决冲突的办法是很暴力的,就是 last write win ( 最后写入者获胜 ),因此导致 Cassandra 不适合做先读后写的操作。对于这种场景,Cassandra 建议使用 cas 的语法,但 cas 的性能比较差,因此使用 cassandra 时要避免冲突很多的场景。什么是冲突很多呢?比如多个手机用户同时更新一条数据,就是强冲突的。
3. Flink方案
在数仓写入性能有瓶颈,写入压力较大的或者对实时性要求较高情况下,可以引入实时计算框架进行实时聚合,减少下游的计算压力。
3.1 上下游兼容
可参见Oceanus,基于Flink,可支持传统关系型数据库racle、缓存Redis、或者Hadoop生态HBASE、hdfs、消息队列Kafka以及clickhouse、ES、hippo、HTTP等等进行建source与sink表。
需要说明的是,source表、sink表并不代表在oceanus中真的创建了类似数据库的真实物理表,实际上source表、sink表均是逻辑表,它只是通过业务填写的配置项映射到真实的数据源、目的地。
3.2 吞吐能力
根据美团Flink与Storm对比数据(spark streaming为秒级 storm与flink为毫秒级)
- Storm 单线程吞吐约为 8.7 万条/秒,Flink 单线程吞吐可达 35 万条/秒。Flink 吞吐约为 Storm 的 3-5 倍。
- Storm QPS 接近吞吐时延迟(含 Kafka 读写时间)中位数约 100 毫秒,99 线约 700 毫秒,Flink 中位数约 50 毫秒,99 线约 300 毫秒。Flink 在满吞吐时的延迟约为 Storm 的一半,且随着 QPS 逐渐增大,Flink 在延迟上的优势开始体现出来。
- 综上可得,Flink 框架本身性能优于 Storm。
3.3 聚合处理
Flink可以通过创建view即临时表,实现对多个业务表进行聚合,且结果不会存储,并可以按需聚合。业务可以按需写SQL进行查询view,且不需要写spark程序,不需要每次使用spark在hive建立宽表再进行查询,流程会简单许多。
若有复杂运算支持UDF。
3.4 事务性
部分事务可以使用Flink的时间窗口解决,如统计订单数时有取消订单可以使用时间窗口或者。传统数据库的ACID目前不支持。
flink提供了两种构建模块来实现事务性sink连接器:write-ahead-log(WAL,预写式日志)sink和两阶段提交sink。WAL式sink将会把所有计算结果写入到应用程序的状态中,等接到检查点完成的通知,才会将计算结果发送到sink系统。因为sink操作会把数据都缓存在状态后段,所以WAL可以使用在任何外部sink系统上。尽管如此,WAL还是无法提供刀枪不入的恰好处理一次语义的保证,再加上由于要缓存数据带来的状态后段的状态大小的问题,WAL模型并不十分完美。与之形成对比的,2PC sink需要sink系统提供事务的支持或者可以模拟出事务特性的模块。对于每一个检查点,sink开始一个事务,然后将所有的接收到的数据都添加到事务中,并将这些数据写入到sink系统,但并没有提交(commit)它们。当事务接收到检查点完成的通知时,事务将被commit,数据将被真正的写入sink系统。这项机制主要依赖于一次sink可以在检查点完成之前开始事务,并在应用程序从一次故障中恢复以后再commit的能力。2PC协议依赖于Flink的检查点机制。检查点屏障是开始一个新的事务的通知,所有操作符自己的检查点成功的通知是它们可以commit的投票,而作业管理器通知一个检查点成功的消息是commit事务的指令。于WAL sink形成对比的是,2PC sinks依赖于sink系统和sink本身的实现可以实现恰好处理一次语义。更多的,2PC sink不断的将数据写入到sink系统中,而WAL写模型就会有之前所述的问题。
3.5 附:其他流计算框架对比参考
各个实时计算引擎对比表如下
项目/引擎 | Storm | Flink | spark-treaming |
---|---|---|---|
API | 灵活的底层 API 和具有事务保证的 Trident API | 流 API 和更加适合数据开发的 Table API 和 Flink SQL 支持 | 流 API 和 Structured-Streaming API 同时也可以使用更适合数据开发的 Spark SQL |
容错机制 | ACK 机制 | State 分布式快照保存点 | RDD 保存点 |
状态管理 | Trident State状态管理 | Key State 和 Operator State两种 State 可以使用,支持多种持久化方案 | 有 UpdateStateByKey 等 API 进行带状态的变更,支持多种持久化方案 |
处理模式 | 单条流式处理 | 单条流式处理 | Mic batch处理 |
延迟 | 毫秒级 | 毫秒级 | 秒级 |
语义保障 | At Least Once,Exactly Once | Exactly Once,At Least Once | At Least Once |
相比于Storm和其他一些流计算框架,Flink具有以下几点优势:
- 更友好的编程接口。Storm提供的API偏底层且过于简单,用户需要大量的开发工作来完成业务需求。另外,用户在开发Storm程序时的学习成本也较高,需要熟悉框架原理和在分布式环境下的执行细节。Flink除了提供Table API和SQL这些高级的声明式编程语言之外,还对window这些流计算中常见的算子进行了封装,帮助用户处理流计算中数据乱序到达等问题,极大的降低了流计算应用的开发成本并减少了不必要的重复开发。
- 有效的状态管理支持。大部分的计算程序都是有状态的,即计算结果不仅仅决定于输入,还依赖于计算程序当前的状态。但Storm对程序状态的支持十分有限。一般情况下,用户常常需要将状态数据保存在MySQL和HBase这样的外部存储中,自己负责这些状态数据的访问。这些对外部存储的访问常常成为Storm程序的性能瓶颈。大多数情况下,用户只能设计复杂的本地cache来提升性能。Spark Streaming直到最近才提供了有限的状态管理支持,但受限于其实现机制需要一定的远程访问和数据迁移工作,因此状态数据的访问效率并不高。Flink则对计算程序的状态存储提供了有效支持。用户可以通过提供的接口方便地存储和访问程序状态。由于这些状态数据存放在本地,因此用户可以得到较高的访问性能。在发生故障时,Flink的状态管理会配合容错机制进行状态数据的重建,保证用户程序的正确性。而当用户需要修改程序并发度时,Flink也可以自动地将状态数据分发到新的计算节点上。
- 丰富的容错语义。由于Storm缺少对程序状态的有效支持,其对容错的支持也较弱,很难保证在发生故障的情况下,每条输入数据恰好被处理一次。而Flink则依靠分布式系统中经典的Chandy-Lamport算法,能够对用户程序的输入和状态生成满足一致性的程序快照。在发生异常的情况下通过快照回滚,Flink可以保证EXACTLY-ONCE的容错语义。而利用异步checkpoint和增量checkpoint技术,Flink能够在以较低的成本对用户程序进行快照。在开启快照时,用户程序的性能几乎不受影响。
- 出色的执行性能。Flink基于事件触发的执行模式对数据流进行处理,相比于Spark Streaming采取mini batch的执行模式,能够大量减少程序执行时的调度开销。此外,Flink对网络层进行了大量优化,通过细粒度封锁和高效内存访问提高数据传输性能,并通过反压机制和流量控制有效降低流量拥塞导致的性能下降。加上Flink能够避免状态数据的远程访问,Flink在实践中表现出比其他流计算系统更出色的执行性能,具有更低的处理延迟和更高的吞吐能力。
总结:Flink 和 Spark Streaming 的 API 、容错机制与状态持久化机制都可以解决一部分使用 Storm 中遇到的问题。但 Flink 在数据延迟上和 Storm 更接近,所以业界偏向使用Flink作实时计算与聚合。
4. Ref
- https://cloud.tencent.com/developer/news/707234 滴滴
- https://tech.meituan.com/2018/10/18/meishi-data-flink.html 美团flink
- http://www.imcdo.com/blogs/10907.html flink clickhouse
- https://cloud.tencent.com/document/product/849 腾讯云oceanus
- https://tech.meituan.com/2017/11/17/flink-benchmark.html flink与storm对比
- https://www.jianshu.com/p/aa0f0fdd234c hive事务性
- https://cloud.tencent.com/developer/article/1583233 https://confucianzuoyuan.github.io/flink-tutorial/book/chapter08-01-02-事务性写入.html flink事务
- https://cloud.tencent.com/product/cdwch/details 腾讯云clickhouse
- https://cloud.tencent.com/developer/article/1758562 hbase kudu ch 对比
- https://zhuanlan.zhihu.com/p/118592173 HTAP
- https://www.infoq.cn/article/etcdovq3lck7eiyjdlhp Apache Cassandra
- https://juejin.cn/post/6844903558429540366 https://zhuanlan.zhihu.com/p/65593795 kudu impala
- https://xie.infoq.cn/article/c98432267e7b7038718d199e1 OceanBase实现