CAP能力模型
在CAP能力模型表现方面,hbase主要是面向CP的应用系统,针对数据写入可以满足强一致性需求,从客户端视角来看写入成功之后的数据是即时可见的。然而hbase的CP模型目前还存在很大的短板,比如当有服务节点出现宕机事件时,需要经历很长时间的MTTR过程,耗时主要体现在以下两个方面:
- 首先需要对宕机节点做到及时发现。 目前hbase主要通过监听ZK来做服务感知,因此能够发现目标节点宕机的时间主要跟ZK的session超时时间有关。
- 其次需要基于LSM树来做日志回放处理。 回放的数据总量跟当时的memstore数据量大小相关,数据量越大恢复时延将越明显。
通过观察线上hbase集群发现当有RS节点出现宕机事件时,相关Region的服务恢复时间基本都维持在分钟级别,也即服务访问需要有分钟级的响应延迟,这是所有在线服务都无法接受的。
而从另一个角度来看,目前很多线上业务其实对数据的强一致性要求并不严苛,数据写入成功后不要求立刻可见,只要能够在一定的时间buffer之后访问到数据即可,但是对服务的可用性要求非常高,对服务的响应时延要求非常敏感,更多是对AP场景的能力需求。
原生AP能力支持
针对AP场景的能力需求,hbase原生提供了Replica特性来提供支持,通过该特性可为每个Region引入额外的副本Region,这样当主Region所在机器宕机时,可通过副本Region来接管相应的读请求,主Region与副本Region主要通过WAL的异步push机制来实现memstore的状态同步。
但是Replica特性目前同样也还存在着一些短板,比如:
- 会增加集群的IO使用消耗(需要读取WAL来做数据同步);
- 其次只能针对读操作做容灾处理,针对写操作依然需要经历长时间的MTTR等待过程,因为副本Region是不能接管写服务的;
- 最后Replica只能处理集群内部的容灾,针对跨机房的容灾需求起不到太好的帮助作用。
基于以上这些问题我们对原生架构做了如下调整。
IO分散解耦
HBase的IO占比可以按照如下比例来进行划分,假设原始数据占据一份IO,则记录WAL会将写IO放大一倍,Replica/Replication特性会将读IO放大一倍,而整理操作会将读写IO放大到一倍以上。因此hbase集群普遍是一个IO密集型的系统,系统的物理资源通常是磁盘IO先达到饱和。如何有效控制IO的使用将会对集群的吞吐能力起到至关重要的提升。为此我们首先想到能否把WAL所占据的IO从hbase集群中解耦出去,通过其他更适合做日志存储的系统来进行处理(比如kafka)。这样有关WAL的写IO以及Replica/Replication的同步IO便可以分散到kafka系统中去完成。
另一方面,通过对hbase业务接入场景的了解,发现很多业务在接入hbase的时候都是先将数据写入到kafka,在通过实时流计算消费把kafka中的数据转存到hbase,以起到流量消峰的作用,而如果我们能够把业务原始数据与WAL数据进行统一,那么便可以缩减出一份IO资源。
MTTR优化
如之前所描述,影响hbase的MTTR时间主要涉及两个方面,分别是服务宕机的发现时间和WAL日志的回放时间。由于主Region与副本Region的数据同步是异步进行的,在主Region宕机时有可能部分数据还没有向副本Region做同步,因此此时的副本Region是不能接管写服务的,否则将会产生数据的写入乱序,进而影响到数据的读写一致性。所以客户端需要等待主Region再次上线之后才能继续做之前的写入,而在此期间需要做不断的retry重试。
相比于hbase集群,kafka集群的MTTR时间则要更快一些,因为没有WAL回放的过程,只需要对宕机的服务节点进行感知,然后在ISR列表中随机选取一个健康状态的副本进行切换即可。基于此我们考虑将hbase的整体写链路做一下相应的调整,客户端不在直连hbase进行写入,而是先记录WAL到kafka,再通过实时流计算消费,把kafka中的WAL数据同步到hbase集群。
这样客户端通过调用我们的SDK,便可实现kafka原始数据与WAL数据的统一,从而缩减出部分IO资源。另外客户端视角的写容错时间也只跟kafka的故障恢复时间有关,而不受到hbase长时间MTTR过程的影响。
多机房容灾能力
受zkQurom需要满足多数可用的限制要求,hbase是没有办法通过单集群来满足跨机房容灾需求的。需要在每个机房部署独立的hbase集群,然后通过原生的Replication机制去做数据同步处理(与Replica一样通过异步的消费WAL)。
在原生架构实现里,集群的容灾模式主要是基于Active/Standby方式进行部署的,其中Active端保有最新的数据写入,然后通过异步消费WAL把数据push到Standby集群端。如果Active集群所在机房宕掉那么便有可能出现部分数据还没有向Standby同步的情况发生。为此社区在2.0之后的版本提供了同步备份功能,但是在IO使用上放大效果将更为明显。而将WAL转存到KAFKA之后,我们可以针对部署模式做一些相应的调整,将Active/Standby模式切换成Active/Active模式,如图所示:
这样不同的集群可开启不同的流计算作业去消费kafka中的WAL以便将数据同步到自己的hbase集群,而hbase的机房容灾功能也可转嫁到kafka的数据容灾处理上。与此同时我们还可借助KafkaWAL把HBase集群中数据同步到其他系统中去,比如可以把数据同步到SOLR来实现全文检索,也可以把数据同步到HIVE去做离线的分析,或者将数据同步到DaltaLake来构建实时数仓应用。
客户端双写
集群基于ActiveToActive模式部署之后,为了提升整体服务的查询效率,客户端可采用双集群并发访问的方式来优先获取执行成功的返回结果,这样即使单集群的访问出现了故障,也不会影响到最终的整体时延。针对数据访问只需满足最终一致性的业务场景,该方式可有效提升SLA诉求,为此我们对客户端SDK做了如下调整。
首先我们基于组合模式全新设计了HBase的Connection实现(即这里的CompositeConnection),其内部会封装两个单独的Connection用于访问不同的集群。针对每个RPC请求(put或者get),首先通过CompositeConnection构建出一个MTable实例,然后通过该MTable提交两个不同的ConnectionCallable线程到线程池中去运行处理,每个ConnectionCallable会对应不同的集群访问,如果是put请求需要执行线程池的invokeAll操作,待所有集群全部保存数据成功之后在做返回;而如果是get或者multiget请求只需要执行invokeAny,返回先执行结束的运行结果即可,而未运行结束的线程便可以cancel掉。
基于Kafka的日志回放
hbase的数据结构主要基于LSM树的方式进行组织,数据写入memstore之前需要先记录WAL,以便RS宕机时可通过回放WAL来恢复memstore中的数据。原生的WAL实现里,每行日志记录是通过sequenceId来进行唯一标识的,其和MVCC的事物ID采用的是同一套ID计数器。由于WAL的写入和memstore的写入处在同一个事务里,采用相同的计数ID可以让应用变得更加简洁。但是在将WAL解耦到kafka之后,sequenceId的概念转变成了kafka的offset,RS端需要把已持久化完成的offset保存下来,并且以心跳通信的方式将其汇报给HMaster,以便HMaster在处理LogSplit操作时能够通过offset将已被持久化的无用记录过滤掉。
为此我们需要把kafka的消息偏移量从Consumer端传递到RS端,使其能够汇总到RS端去进行保存,同时利用已有的心跳汇报流程,在与HMaster心跳通信过程中将kafka的偏移量也一并汇报上去,整个OFFSET的传递流程如图所示。
Consumer在执行kafka消费过程中,可以拿到每条WAL日志对应的offset以及partition信息,然后通过执行Mutation#setAttribute将其序列化到Mutation对象实例中进行保存,这样HBaseClient在执行RPC远程调用时便可将其发送到RegionServer端进行处理。RegionServer端收到请求之后,可通过Mutation#getAttribute方法将offset和partition信息反序列化出来,在通过自定义的KafkaClientWAL将其传入KafkaOffsetAccounting中进行汇总。KafkaClientWAL与原生WAL最大的区别便是这里只需记录每条日志的offset而无需对日志内容进行保存,因为保存操作已在客户端完成。
LogSplit改造
HBase的LogSplit逻辑是通过HMaster与RS的共同参与来完成的,HMaster端主要负责生成每一个 SplitLogTask任务,RS端则会对具体的任务进行抢占和处理,其中的协调过程主要是通过ZK来控制的,大体的处理流程如下。HBase的LogSplit逻辑是通过HMaster与RS的共同参与来完成的,HMaster端主要负责生成每一个 SplitLogTask任务,RS端则会对具体的任务进行抢占和处理,其中的协调过程主要是通过ZK来控制的,大体的处理流程如下。
- 首先SplitLogManager负责生成每一个SplitLogTask任务,并将任务内容序列化写入ZK,然后通过TimeoutMonitor来监听每个SplitLogTask任务的执行情况,如果执行出错会将其重新提交处理。
- SplitLogTask提交之后,RS端会开启SplitLogWorker线程来进行任务抢占,抢占到任务之后会通过TaskExecutor来对目标WAL数据进行读取并将无用记录进行过滤。最终将处理结果通过OutputSink输出到HDFS的每个Region目录下,以便Region在启动过程中对这些数据完成加载回放。
- 最终SplitLogTask任务运行结束的事件会通过ZK通知到HMaster端,以便其通过TaskFinisher来执行相关的任务清理操作。
而基于kafka的日志回放操作,我们只需对如下3个组件进行相应的定制和重构便可满足回放需求。
- SplitLogManager 在原生实现里主要是针对每个HLog开启一个SplitLogTask,而基于KAFKA的实现方式可考虑针对partition纬度进行开启。
- TaskFinisher 在原生实现里主要是对拆分完成之后的HLog进行归档,并对临时目录进行清理,而基于kafka则不需要做相关的归档操作。
- TaskExecutor 在原生实现里主要是开启ProtobufLogReader对HLog进行读取,并对无用日志记录进行过滤,而基于kafka则需要根据region来定位partition,并通过offset来对partition数据进行过滤和读取。