既然有了Kafka为什么还会出现RocketMQ?这就不得不提到RocketMQ的诞生动机了,在RocketMQ的官网上面可以找到这个问题答案,原文可以点击此处阅读。实际原因当然是kafka存在一些问题,不能满足当下的业务场景。所以需要新的技术方案来解决,也就产生了RocketMQ。那到底kafka有哪些问题呢?
从Kafka的数据存储上来说,一个topic(主题)内部逻辑上由多个partition(分区)组成
这也使得它有几个特点:
- 生产者消息并行写入受限于分区数
- 消费者并行消费同样受限于分区数
- 每个topic由多个partition构成,partition数量决定了单个broker能够承受的topic个数
为什么Kafka单个broker不能支持更多分区?
https://rocketmq.apache.org/rocketmq/how-to-support-more-queues-in-rocketmq/
- 每个分区存储着全部的数据。尽管每个分区局部来看都是顺序写磁盘,但随着并行写分区的增加,从操作系统全局来看。写操作将会变成随机写磁盘了。
- 每个分区的数据文件是分散的,导致无法很好的利用组提交提升性能
上面根据官网的文档对Kafka的缺陷做了分析,那接下来RocketMQ就需要解决这些问题。
本文也就以该主题来对Kafka和RocketMQ内部实现原理做一些对比。主要下面四个方面对比:
1. 系统架构对比 2. 单机数据存储模型对比 3. 集群数据同步方案对比 4. 提升性能的优化技巧对比
本章内容目录如下所示:
- Kafka和RocketMQ实现原理对比
- 1.系统架构对比
- 1.1 整体架构对比
- 1.2 数据组织结构对比
- 2.单机存储模型对比
- 2.1 Kafka存储模型设计
- 2.2 RocketMQ存储模型设计
- 3.集群数据同步对比
- 3.1 Kafka集群数据同步
- 3.2 RocketMQ集群数据同步
- 4. 提升性能的优化技巧对比
- 4.1 Kafka高性能优化技巧
- 4.2 RocketMQ高性能优化技巧
- 5.参考资料
- 1.系统架构对比
1.系统架构对比
1.1 整体架构对比
上图中左侧是Kafka系统架构,右侧是RocketMQ系统架构。从图中我们可以看到它们整体的结构是大同小异的。主要由四部分组成: 1. 生产者:数据的生产方,生产数据发送给Broker集群存储 2. Broker集群:消息队列的核心集群,存储生产者发送的数据,并分发给消费者消费 3. 消费者:消费生产者生产的消息 4. 元数据中心集群:Kafka中的zookeeper和RocketMQ中的NameServer都扮演同样的角色,内部维护集群以及topic相关的元数据
其实不仅仅是Kafka和RocketMQ的架构相似,几乎绝大部分的消息队列(pulsar、nsq等)它们的架构都大同小异。
虽然从外壳看起来二者差不多,这也就意味着对外使用的接口也是大差不差的。那它们内部在数据组织设计方面是否也相同呢?
答案当然是否定的,否则那不就是和kafka没啥太大差别了嘛,下面来针对这个问题进行分析下。
1.2 数据组织结构对比
从结构上来说,Kafka和RocketMQ就有很大的区别 1. Kafka数据层级结构: topic->partition->segment->(.log、.index、.timeindex) 2. RocketMQ数据层级结构: topic->分片topic->msgqueue->(offset)
下图是Kafka和RocketMQ数据存储结构的对比。
Kafka数据组织结构
在Kafka中一般由几台broker构成一个集群,一个topic(例如topicA)在创建时需要指定partition数和replicate副本数。这些partition会分散在每个broker上。一个topic会对应多个partition分区。每个partition都保存完整的数据,是一个逻辑上的队列结构。其中在每个分区内部数据又是按照segment分段存储的。每个segment中会包含数据文件(.log)、索引文件(.index、.timeindex)。
同时值得注意的是Kafka中数据的同步是以partition为维度进行的,这部分会在第三节进行详细介绍。
RocketMQ数据组织结构
RocketMQ集群通常是由多个内部小集群构成。同样在RocketMQ中对外访问时以topic为单位,在其内部一个topic会首先按照小集群进行分片,也就是分片topic,在每个小集群内部再将分片的topic拆分为多个msgqueue。每个msgqueue就是一个逻辑队列。在RocketMQ中,每个msgqueue也是分小段存储的,不过每个段中存储的都是每个消息的索引数据。
在RocketMQ中,数据同步是按照每个小集群进行同步的,每个小集群内部按照同步或者异步方式同步数据。
2.单机存储模型对比
本节主要介绍Kafka和RocketMQ在单机层面存储模型设计上的差异点。还记得最开始我们介绍的kafka的partition设计存在的问题吗?这正是RocketMQ需要去解决的问题。下图中以一个topic三个逻辑队列为例进行介绍,图中左侧为Kafka存储结构,右侧为RocketMQ存储结构。
下面将对它们的存储模型进行详细介绍。
2.1 Kafka存储模型设计
Kafka中每个partition为逻辑队列,底层存储时按照segment为单元分段,每条消息都存储在具体的segment中,kafka会为每条消息生成一个唯一的编号offset,编号是单调递增的。下图是一个简单的示意图。
每个segment段由以下三部分组成: 1. xxxx.log:日志数据文件,按照固定的格式存储生产者分送过来的消息数据 2. xxxx.index:索引文件,该文件主要记录了哪条消息(offset)写在对应的xxxx.log文件中的哪个位置(position),查询消息时首先查询该文件,然后再去日志数据文件中读取对应的消息数据。此外该文件也可用来重置消费的offset,实现消息的回溯消费 3. xxxx.timeindex:索引文件,该文件记录了什么时间(timestamp)写入了哪条消息(offset)。类似于关系型数据库的二级索引。该文件主要用来实现按照时间回溯消费消息
在Kafka每个segment段都是顺序写入的,也就是顺序写磁盘。同时每个segment端在命名时也很有讲究,每个segment段的命名以当前保存的第一条消息的编号(offset)按照16进制来命名,这样一来,所有的segment段是遵循有序规律的。这样的好处就是当查找某条消息时,可以通过二分查找很快定位到该条消息对应的segment段。然后读取该段的index文件,找到该消息写入数据文件的位置。再从数据文件读取消息内容。在这个过程中主要利用有序二分查找的特性。
不过此处需要注意的是,Kafka中索引文件为了节约空间提升性能,索引数据在存储时是按照稀疏索引存储的,也就是每隔几条消息数据建一条索引。所以在查找该条消息时只需要找到对应的消息编号最近的索引位置,然后再在数据文件中顺序查找。大体的查找过程如上图右侧所示。最后在Kafka中数据的清理也是以segment为单位进行清理的。
2.2 RocketMQ存储模型设计
前面提到了RocketMQ要避免kafka的partition设计中带来的问题。接下来我们就详细分析RocketMQ是如何设计它的单机存储模型的。
我们再来回顾下前面提到的kafka的问题:Kafka中单个broker分区过多,会导致有大量的segment段,同时顺序写变成随机写。
RocketMQ为了解决这个问题,在数据文件设计上做了如下处理:
- 每个broker实例上所有的topic共用一个数据文件。该文件称为commitlog,该文件的特点是固定文件大小,然后采用mmap进行内存映射读取数据。因为所有topic的数据都存储在一个数据文件中,即便随着topic数量的增加,同一时间在RocketMQ单个broker内也只有一个日志文件在执行写操作,完全是顺序写磁盘,从而彻底避免了kafka前面存在的随机写的问题,同时也能更好的利用组提交特性。
- 既然数据是存储在一起,那就需要在索引上进行分开,每个topic对应的msgqueue内部存储的就是每条消息的索引信息,该索引信息也是固定长度(20字节:offset(8字节) size(4字节) tagHashCode(8字节))。同样索引文件也是固定大小的,单个文件大小为5.72M,可以存储30w条索引,索引定长设计后,可以充分利用操作系统的磁盘预读特性,这也是为什么RocketMQ在topic数据上升时吞吐量不会有明显影响的原因。
下图是RocketMQ中数据存储的一个示意图。
在消息过滤方面,RocketMQ支持按照Tag以及SQL语法进行过滤,篇幅限制,这部分内容不再展开介绍,大家感兴趣可以点击此处进入官方文档进行详细阅读。
在消息查询方面,RocketMQ支持按照消息id、消息Key、时间区间进行查找。这些查找在RocketMQ中通过索引文件实现的。它的索引格式本质上是存储在磁盘上的HashMap结构(hash冲突时按照链表存储相同hash值的数据)。索引文件按照创建时间来命名,方便支持时间区间的查询。另外该文件也是固定大小的,主要也是通过mmap来加速读取。
当按照消息id查询时,首先从消息id中解析出来broker的ip、port、offset。然后发送请求到对应的节点获取消息。 当按照消息key查询时,会按照该key计算其hash值然后再从index文件中挨个遍历链表获取到offset后,接着去读取commitlog获取到最终的消息数据。
索引文件结构如下图所示。
这部分内容感兴趣大家可以点击此处进入官方文档进行详细阅读。
3.集群数据同步对比
本节我们来重点谈谈Kafka和RocketMQ他们数据同步的实现差异。
此处先说结论:
Kafka的数据同步时以partition为维度进行的,在创建topic指定好分区数和副本数后,Kafka内部会为每个partition从副本集中选择出一个管理该partition的broker为leader,其他的该partition副本所在的broker为follower。客户端的请求(生产者发送消息、消费者消费消息)都由leader节点来负责,数据的同步也是leader同步给follower节点。
RocketMQ中整个集群由多个小集群构成,数据的同步是以小集群维度进行的。以主从架构为例,每个集群中有一个master节点,若干个slave节点。数据由master节点同步到slave节点上。slave节点是master节点数据的备份。数据同步的方式有同步复制、异步复制两种方式。
下面分别对二者的数据同步进行详细介绍。
3.1 Kafka集群数据同步
在Kafka中partition数据同步采用的并不是主从复制的模式。它采用的是ISR(in-sync replica set)方式。同时在数据同步过程中有几个概念是必须要了解的,下面通过下图介绍 下LEO、HW这两概念。
下面以一次数据生产者发送消息的流程进行介绍数据复制过程
大概的数据复制流程如下: 1. producer向broker(leader)发送消息请求 2. leader收到消息后,自己先存储。然后再发送给follower节点 3. follower节点收到数据后存储,然后响应leader节点 4. leader节点根据业务配置的ack响应策略进行响应生产者 5. 消费者获取待消费的数据
第四步中ack的响应策略kafka支持三种: ack=0:发送给leader后,立即返回,此时有可能数据都还没来得及落盘,吞吐量最高,但数据可靠性最低 ack=1:发送给leader后,等待leader写成功后响应生产者,吞吐量很高,但如果在该数据还未同步给follower后,leader宕机会发生数据的丢失,数据可靠性偏低 ack=-1:发送给leader后,等待leader和ISR中的节点都同步成功后,再响应生产者,这种方式,吞吐量比前两者低,但数据可靠性最高,极端情况也有数据丢失的风险
上图中为方便介绍数据复制,图中流程按照leader主动发送数据进行描述。但实际上在Kafka中,follower是主动从leader拉取数据的。
简单理解了数据同步的过程后,下面来看下当leader和follower分别发生故障后,Kafka是如何处理的。
leader故障后,Kafka需要进行故障转移,也就是重新选出一个新的leader节点。选择的核心原则是选择数据最新或者最完整的节点,以使得丢失的数据最少,选择完leader节点后还需要保证集群之间数据对外访问的一致性。
相比leader故障的处理过程,follower故障后处理就简单多了。如果follower节点发生了故障,若该节点在ISR中,则会将其移除。然后等节点恢复且数据同步跟leader差距很小时。再会将其加入到ISR集合中。
最后可以看出,Kafka为了避免传统主动复制中,数据复制时延受最慢节点带来的影响,选择了以ISR方式同步数据,来保证低时延。针对大数据处理、消息不要求强一致的场景,Kafka可以说是非常优质的选择。
3.2 RocketMQ集群数据同步
RocketMQ的数据同步和它的集群部署架构有关系。在RocketMQ中有以下几种集群部署方式: 1. 单Master 2. 多Master 3. Master-Slave异步复制 4. Master-Slave同步复制
其中生产环境用的比较多的还是第3种和第4种方式。不管是同步复制还是异步复制,数据的流向都是从master节点流向slave节点。只不过同步复制方式下数据比异步复制情况下更可靠一些。
这几种模式之间的特点如下表所示。
同时上述这几种方式是RocketMQ4.5之前就支持了,而在4.5版本以后也新增加了Dledger版本。
Dledger是一个基于raft算法实现的分布式日志库,RocketMQ底层采用Dledger日志库以后,就实现了强一致性的特性。同时也解决了之前同步复制下不能自动做故障转移的问题。
4. 提升性能的优化技巧对比
4.1 Kafka高性能优化技巧
1. 数据分片/分区 Kafka中数据按照partition分区后,提升了整体系统的并行度;此外通过将topic划分成多个partition,数据可以分散在多个节点上,保证了数据的可靠性。
2. 顺序写磁盘 在单个partition中,数据物理上按照segment分段存储,每个segment段都是顺序写磁盘。从而保证了高吞吐量。
3. 网络零拷贝 消费者从集群内部获取消费数据时,如果消费的数据已经落盘,则从磁盘读取数据并发给消费者的过程中采用了sendfile()、transferTo()之类的零拷贝方法,提升系统性能。
4. 批处理 实现层面内部支持多条网络数据合并在一起,批量发送,降低网络开销
4.2 RocketMQ高性能优化技巧
1. 数据分散多集群(数据分片) 在RocketMQ中,整个集群环境由多个分散的小集群组成,所以topic首先会分片到多个小集群中,然后每个小集群内部又会分成多个msgqueue。这两层的分片很大程度上提升了系统的性能。
2. 顺序写磁盘、磁盘预读 RocketMQ存储模型设计时,设计成每个broker实例上所有的topic共用一个commitlog文件。从而保证全局顺序写。同时索引格式设计是固定长度,充分利用操作系统磁盘预读特性提升性能。
3. mmap内存映射 RocketMQ commitlog在保证顺序写的情况下,通过采用mmap方式来加速读数据过程。使其随机读也不会大幅度影响系统性能。
5.参考资料
- https://rocketmq.apache.org/rocketmq/how-to-support-more-queues-in-rocketmq/
- https://rocketmq.apache.org/docs/motivation/
- RocketMQ设计文档(design)
- 消息中间件—Kafka数据存储(一)
- 【kafka原理】Kafka生产者 (分区策略和ACK应答机制)
- RocketMQ运维管理
- Kafka 和 DistributedLog 技术对比
- 基于 Raft 协议的 commitlog 存储库 DLedger
- Kafka设计解析(六)- Kafka高性能架构之道