【进阶之路】消息队列——RocketMQ原理(三)

2021-04-02 11:43:24 浏览数 (1)

.markdown-body{word-break:break-word;line-height:1.75;font-weight:400;font-size:15px;overflow-x:hidden;color:#333}.markdown-body h1,.markdown-body h2,.markdown-body h3,.markdown-body h4,.markdown-body h5,.markdown-body h6{line-height:1.5;margin-top:35px;margin-bottom:10px;padding-bottom:5px}.markdown-body h1{font-size:30px;margin-bottom:5px}.markdown-body h2{padding-bottom:12px;font-size:24px;border-bottom:1px solid #ececec}.markdown-body h3{font-size:18px;padding-bottom:0}.markdown-body h4{font-size:16px}.markdown-body h5{font-size:15px}.markdown-body h6{margin-top:5px}.markdown-body p{line-height:inherit;margin-top:22px;margin-bottom:22px}.markdown-body img{max-width:100%}.markdown-body hr{border:none;border-top:1px solid #ddd;margin-top:32px;margin-bottom:32px}.markdown-body code{word-break:break-word;border-radius:2px;overflow-x:auto;background-color:#fff5f5;color:#ff502c;font-size:.87em;padding:.065em .4em}.markdown-body code,.markdown-body pre{font-family:Menlo,Monaco,Consolas,Courier New,monospace}.markdown-body pre{overflow:auto;position:relative;line-height:1.75}.markdown-body pre>code{font-size:12px;padding:15px 12px;margin:0;word-break:normal;display:block;overflow-x:auto;color:#333;background:#f8f8f8}.markdown-body a{text-decoration:none;color:#0269c8;border-bottom:1px solid #d1e9ff}.markdown-body a:active,.markdown-body a:hover{color:#275b8c}.markdown-body table{display:inline-block!important;font-size:12px;width:auto;max-width:100%;overflow:auto;border:1px solid #f6f6f6}.markdown-body thead{background:#f6f6f6;color:#000;text-align:left}.markdown-body tr:nth-child(2n){background-color:#fcfcfc}.markdown-body td,.markdown-body th{padding:12px 7px;line-height:24px}.markdown-body td{min-width:120px}.markdown-body blockquote{color:#666;padding:1px 23px;margin:22px 0;border-left:4px solid #cbcbcb;background-color:#f8f8f8}.markdown-body blockquote:after{display:block;content:""}.markdown-body blockquote>p{margin:10px 0}.markdown-body ol,.markdown-body ul{padding-left:28px}.markdown-body ol li,.markdown-body ul li{margin-bottom:0;list-style:inherit}.markdown-body ol li .task-list-item,.markdown-body ul li .task-list-item{list-style:none}.markdown-body ol li .task-list-item ol,.markdown-body ol li .task-list-item ul,.markdown-body ul li .task-list-item ol,.markdown-body ul li .task-list-item ul{margin-top:0}.markdown-body ol ol,.markdown-body ol ul,.markdown-body ul ol,.markdown-body ul ul{margin-top:3px}.markdown-body ol li{padding-left:6px}.markdown-body .contains-task-list{padding-left:0}.markdown-body .task-list-item{list-style:none}@media (max-width:720px){.markdown-body h1{font-size:24px}.markdown-body h2{font-size:20px}.markdown-body h3{font-size:18px}}

导言

大家好,我是南橘,从接触java到现在也有差不多两年时间了,两年时间,从一名连java有几种数据结构都不懂超级小白,到现在懂了一点点的进阶小白,学到了不少的东西。知识越分享越值钱,我这段时间总结(包括从别的大佬那边学习,引用)了一些平常学习和面试中的重点(自我认为),希望给大家带来一些帮助

这是消息中间件的文章,大家没有看过的可以跟着看一下

  • 【进阶之路】消息队列——原理及选型(一)
  • 【进阶之路】消息队列——RabbitMQ原理(二)

第一件事还是把思维导图贴给大家,因为用的是免费版,所以有水印,如果需要原始版本的话,可以加我的微信:

上次讲完RabbitMQ之后,这次就来讲讲RocketMQ。

Apache RocketMQ是阿里开源的一款高性能、高吞吐量的分布式消息中间件。曾经阿里团队考虑过Kafka,但是因为性能和高可用方面最后才选择自主研发了RocketMQ。RocketMQ是一款出生在高并发分布式时代的消息中间件,所以他本身就是支持高并发和事务的。

一、特点

  • 支持事务型消息(发送消息和DB操作保证两方的最终一致性,RabbitMQ和Kafka不支持)
  • 支持多个系统之间的最终一致性
  • 支持延迟消息(Kafka不支持)
  • 支持指定次数和时间间隔的消息失败重发(Kafka不支持)
  • 支持consumer端的tag过滤,减少不必要的网络传输(RabbitMQ和Kafka不支持)
  • 支持重复消费(Rabbitmq不支持)
  • 严格保证消息的顺序
  • 亿级消息堆积能力
  • 提供丰富的消息拉取模式
  • Producer、Consumer、队列都可以分布式。

二、RocketMQ部署结构

别的不说,先把这种图片拿上来

大家能够看出,RocketMQ集群的架构部署很像是SpringCloud的架构,NameServer就类似于Nacos、Eureka或者Zookeeper之类的。生产者还是那个生产者,消费者也是消费者,Broker就是中间的邮递员。只不过,他们都通过Name Server连接在一起了。

1、NameServer

NameServer的作用是注册中心,类似于Zookeeper,但又有区别于它的地方。每个NameServer节点互相之间是独立的,没有任何信息交互,也就不存在任何的选主或者主从切换之类的问题。单台NameServer宕机不影响其他NameServer与集群。即使整个NameServer集群宕机,已经正常工作的Producer,Consumer,Broker仍然能正常工作,但新起的Producer,Consumer,Broker就无法工作。

NameServer与Zookeeper相比更轻量级。单个NameServer节点中存储了Topic-Broker的关系信息(包括master和slave),这里活跃的定义是与NameServer保持有心跳。

NameServer压力不会太大,平时主要开销是在维持心跳和提供Topic-Broker的关系数据。但有一点需要注意,Broker向NameServer发心跳时,会带上当前自己所负责的所有Topic信息,如果Topic个数太多(万级别),会导致一次心跳中,就Topic的数据就几十M,网络情况差的话,网络传输失败,心跳失败,导致NameServer误认为NameServer心跳失败。

2、Broker

Broker是具体提供业务的服务器,单个Broker节点与所有的NameServer节点保持长连接及心跳,并会定时将Topic信息注册到NameServer,顺带一提底层的通信和连接都是基于Netty实现的。

Broker中分master和slave两种角色,每个master可以对应多个slave,但一个slave只能对应一个master,master和slave通过指定相同的Brokername,不同的BrokerId (master为0)成为一个组。master和slave之间的同步方式分为同步双写和异步复制,异步复制方式master和slave之间虽然会存在少量的延迟,但性能较同步双写方式要高出10%左右。

  • Topic和Queue

RocketMQ中Topic只代表普通的消息队列,而Queue是组成Topic的更小单元,集群消费模式下一个消费者只消费该Topic中部分Queue中的消息,当一个消费者开启广播模式时则会消费该Topic下所有Queue中的消息。

  • 高并发读写服务

消息顺序写:所有Topic数据同时只会写一个文件,一个文件满1G,再写新文件,真正的顺序写盘,使得发消息TPS大幅提高。

消息随机读:RocketMQ尽可能让读命中系统pagecache(高速缓冲存储器),因为操作系统访问pagecache时,即使只访问1K的消息,系统也会提前预读出更多的数据,在下次读时就可能命中pagecache,减少IO操作。

  • Broker的负载均衡与动态伸缩

负载均衡:Broker上存Topic信息,Topic由多个队列组成,队列会平均分散在多个Broker上,而Producer的发送机制保证消息尽量平均分布到所有队列中,最终效果就是所有消息都平均落在每个Broker上。

动态伸缩能力(非顺序消息):Broker的伸缩性体现在两个维度:Topic, Broker。

Topic维度:假如一个Topic的消息量特别大,但集群水位压力还是很低,就可以扩大该Topic的队列数,Topic的队列数跟发送、消费速度成正比。

Broker维度:如果集群水位很高了,需要扩容,直接加机器部署Broker就可以。Broker起来后NameServer注册,Producer、Consumer通过NameServer发现新Broker,立即跟该Broker直连,收发消息。

  • 高可用和高可靠

高可用:集群部署时一般都为主备,备机实时从主机同步消息,如果其中一个主机宕机,备机提供消费服务,但不提供写服务。

高可靠:所有发往broker的消息,有同步刷盘和异步刷盘机制;同步刷盘时,消息写入物理文件才会返回成功,异步刷盘时,只有机器宕机,才会产生消息丢失,broker挂掉可能会发生,但是机器宕机崩溃是很少发生的,除非突然断电。

单个Broker跟所有NameServer保持心跳请求,心跳间隔为30秒,心跳请求中包括当前Broker所有的Topic信息。NameServer会反查Broer的心跳信息,如果某个Broker在2分钟之内都没有心跳,则认为该Broker下线,调整Topic跟Broker的对应关系。但此时NameServer不会主动通知Producer、Consumer有Broker宕机。

3、Producer

生产者每30秒从NameServer获取Topic跟Broker的映射关系,更新到本地内存中。再跟Topic涉及的所有Broker建立长连接,每隔30秒发一次心跳。在Broker端也会每10秒扫描一次当前注册的Producer,如果发现某个Producer超过2分钟都没有发心跳,则断开连接。

生产者发送时,会自动轮询当前所有可发送的broker,一条消息发送成功,下次换另外一个broker发送,以达到消息平均落到所有的broker上。

这里需要注意一点:假如某个Broker宕机,意味生产者最长需要30秒才能感知到。在这期间会向宕机的Broker发送消息。当一条消息发送到某个Broker失败后,会往该broker自动再重发2次,假如还是发送失败,则抛出发送失败异常。业务捕获异常,重新发送即可。客户端里会自动轮询另外一个Broker重新发送。

4、Consumer

消费者启动时需要指定NameServer地址,与其中一个NameServer建立长连接。消费者每隔30秒从nameserver获取所有topic的最新队列情况,这意味着某个broker如果宕机,客户端最多要30秒才能感知。连接建立后,从NameServer中获取当前消费Topic所涉及的Broker,直连Broker。

  • 消费者端的负载均衡

一个topic可以由同一个ID下所有消费者分担消费。具体例子:假如TopicA有6个队列,某个消费者ID起了2个消费者实例,那么每个消费者负责消费3个队列。如果再增加一个消费者ID相同消费者实例,即当前共有3个消费者同时消费6个队列,那每个消费者负责2个队列的消费。消费者端的负载均衡,就是集群消费模式下,同一个ID的所有消费者实例平均消费该Topic的所有队列

三、消费模型

我们在用到消息中间件的时候,用到最多的功能就是消息的推送和拉取,但是,什么时候去推送消息,什么时候又主动去拉取消息,还是要根据不同的情况进行考虑。

1、Push模型

Push模型的优点:实时(因为服务端Broker一旦收到消息,就会发送给消费者,不管消费这准备好没有,消费者是死是活,缓存到消费端的BlockingQueue中)。

Push模型的缺点:

  • 1、消息保存在服务端broker,容易造成消息堆积。(因为服务端Broker在和消费端第一次建立通信时就明确了该消费者的消费喜好,他选择的就是Push模型,那就不管三七二十一都发给你的缓存队列中去)。
  • 2、服务端broker需要维护每次传输状态,遇到问题需要重试。
  • 3、服务端broker需要依据订阅者消费能力做流控(RabbitMQ的做法是可以在消费者新建时,设置Qos,对服务端Borker提前表明消费端的消费能力,这样服务端最多推送指定数量的消息给消费者。)

2、pull模型

Pull模型的优点:

  • 1、保存在消费端,获取消息方便。
  • 2、传输失败,不需要重试。
  • 3、消费端可以根据自身消费能力决定是否Pull。

Pull模型的缺点:

  • 默认的短轮询方式的实时性依赖于pull间隔时间,间隔越大,实时性越低,长轮询方式和push一致。( 指的当长时间没有消息时,消费端实现的间隔时间去服务端轮训消息的过程)

3、场景案例

一、当Producer 的速率大于 Consumer 的速率

出现这种场景有这几种可能: 第一种是Producer 本身的效率就要比 Consumer 高(比如说,Consumer端处理消息的业务逻辑可能很复杂,或者涉及到磁盘、网络等 I/O操作)。另一种则是 Consumer 出现故障,导致短时间内无法消费或消费不畅

这种问题采取 Pull 的方式解决问题就很简单,由于Consumer是主动到服务端拉取数据,此时只需要降低自己访问频率就好了。

二、强调消息的实时性的情况

采用 Push 的方式时,一旦消息到达,服务端即可马上将其推送给服务端,这种方式的实时性显然是非常好的;

而采用 Pull 方式时,为了不给服务端造成压力(尤其是当数据量不足时,不停的轮询显得毫无意义),需要控制好自己轮询的间隔时间,但这必然会给实时性带来一定的影响。(Pull不会频繁拉取,设置一定间隔)

三、消费方长期主动获取消息

Pull 的长轮询方式,由于主动权在消费方,消费方无法准确地决定何时去拉取最新的消息。如果一次 Pull 取到消息了还可以继续去 Pull,如果没有 Pull 取到消息则需要等待一段时间再重新 Pull。

业界较成熟的做法是从短时间开始,然后指数级增长等待

总之就是消费端长时间没有消息消费的话,消费端轮训时间间隔如果太长,可能在轮训间隔中让部分消息延时消费,如果轮训时间太短,则频繁的请求在消耗服务端Broker,broker要应答消费端的请求(线程开销等)而造成服务端Broker的负担。

可以设置消费者如果尝试拉取失败,不是直接return,而是把连接挂在那里wait,服务端如果有新的消息到来,把连接拉起,返回最新消息。

四、部分或全部 Consumer 不在线

在采用 Pull 模型时,服务端不再关心Consumer的状态,而是采取“你来了我才服务”的方式,Consumer是否能够及时消费数据,服务端不会做任何保证(也有超时清理时间)。

4、定时消息

和RabbitMQ一样,RokcetMQ也自带了定时消息的功能。

定时消息是指消息发到Broker后,不能立刻被Consumer消费,要到特定的时间点或者等待特定的时间后才能被消费。

但是如果要支持任意的时间精度,在Broker层面,必须要做消息排序,如果再涉及到持久化,那么消息排序要不可避免的产生巨大性能开销。

RocketMQ支持定时消息,但是不支持任意时间精度,支持特定的level,例如定时5s,10s,1m等。在RocketMQ中一共有18个级别的消息精度。

5、顺序消息

要保分布式事务消息,保证消息的顺序是很有必要的。RocketMQ可以保证消息消费者按照消息发送的顺序对消息进行消费。顺序消息分为全局有序和局部有序,一般推荐使用局部有序,即生产者通过将某一类消息按顺序发送至同一个队列来实现。

produce在发送消息的时候,把消息发到同一个队列(queue)中,消费者注册消息监听器为MessageListenerOrderly,这样就可以保证消费端只有一个线程去消费消息。 注意:把消息发到同一个队列(queue),不是同一个topic,默认情况下一个topic包括4个queue

也可以通过实现发送消息的对列选择器方法,实现部分顺序消息。 举例:比如一个数据库通过MQ来同步,只需要保证每个表的数据是同步的就可以。解析binlog,将表名作为对列选择器的参数,这样就可以保证每个表的数据到同一个对列里面,从而保证表数据的顺序消费。

6、回溯消费

回溯消费是指Consumer已经消费成功的消息,由于业务上需求需要重新消费,要支持此功能,Broker在向Consumer投递成功消息后,消息仍然需要保留。并且重新消费一般是按照时间维度,例如由于Consumer系统故障,恢复后需要重新消费1小时前的数据。

RocketMQ支持按照时间回溯消费,时间维度精确到毫秒,可以向前回溯,也可以向后回溯。

结语

在总结RocketMQ和RabbitMQ的时候,顺便总结了一下分布式事务解决方案的内容。消息中间件可以说是分布式框架中必不可缺的工具了,所以我之后也会重点介绍一下不同的分布式事务解决方案。这一章没有去讲解RocketMQ的集群,因为其实集群的搭建都差不太多~~~更多的东西还需要大家自己去发现。 同时需要思维导图的话,可以联系我,毕竟知识越分享越香!

0 人点赞