精华!一张图进阶 RocketMQ

2022-05-05 20:54:05 浏览数 (2)

前 言

大家好,我是三此君,一个在自我救赎之路上的非典型程序员。

“一张图”系列旨在通过“一张图”系统性的解析一个板块的知识点:

  • 三此君向来不喜欢零零散散的知识点,通过一张图将零散的知识点连接起来,能够让我们对一个板块有更深入、更系统的理解。
  • 同时本系列尽可能的精炼,希望能够让大家花 20%的时间,快速理解这个板块下 80% 的内容。

本文是“一张图”系列的第一个板块:一张图解析 RocketMQ。

  • 为了叙述的方便,绘图的时候将整个系列分为许多小的模块,讲解的时候也是按照模块循序渐进的。一张图解析 RocketMQ 原图
  • 一张图解析 RocketMQ 是会深入到源码层面,但是文中不会粘贴源码。三此君在看源码的时候写了很多备注,可以降低大家看源码的难度,需要的同学自行到三此君的仓库中 Fork:rocketmq release-4.3.0
一张图进阶 RocketMQ 一张图进阶 RocketMQ

本文是《一张图解析 RocketMQ》系列的第 1 篇,今天的内容主要分为三个部分:

  • 整体架构:会从大家熟悉的“生产者-消费者模式”逐步推出 RocketMQ 完整架构,只需要记住一张完整的架构图即可。
  • 元数据管理:我把 RocketMQ 集群的元数据整理成一张图,方便大家直观的了解都有哪些元数据,各有什么用。
  • 消息收发示例:通过 Docker 部署 RocketMQ,并用简单的示例串起 RocketMQ 消息收发流程。

整体架构

什么是消息队列?顾名思义,首先得有一个队列,这个队列用来存储消息。那有了消息队列就得有人往里面放,有人往里面取。有没有似曾相识燕归来的感 jio,这莫非就是连小学生都知道的,经典的“生产者-消费者模式”?接下来我们就来看看它里面穿了什么?

别急,先来回顾一下 “生产者-消费者模式” 这个老朋友。简单来说,这个模型是由两类线程和一个队列构成:

  • 生产者线程:生产产品,并把产品放到队列里。
  • 消费者线程:从队列里面获取产品,并消费。
生产者-消费者 生产者-消费者

有了这个队列,生产者就只需要关注生产,而不用管消费者的消费行为,更不用等待消费者线程执行完;消费者也只管消费,不用管生产者是怎么生产的,更不用等着生产者生产。

这意味着什么呢,生产者和消费者之间实现「解藕」「异步」。这就厉害了,因为我们生活中很多都是异步的。比如最近新冠疫情卷土重来,我点的外卖只能送到小区门口的外卖队列里面,而我只能去外卖队列里面取外卖,然后一顿狼吞虎咽。

具体 “生产者-消费者模式” 怎么实现,想必各位小学都学过了,我们来看看这个模式还有什么问题吧。最大的问题就是我们小学学的 “生产者-消费者模式” 是个单机版的,只能自嗨。这就相当于,我就是外卖骑手,我点了个外卖放到外卖队列,然后我再从外卖队列里面去取,一顿操作猛如虎呀!于是就有了进化版,我们把消费者,队列,生产者放到不同的服务器上,这就是传说中的分布式消息队列了。

生产者生产的消息通过网络传递给队列存储,消费者通过网络从队列获取消息。但是还有问题,消息可能有很多种,全都放在一起岂不是乱套了?我点的外卖和快递全都放在一起,太难找了吧。于是我们就需要区分不同类型消息,「相同类型的消息称为一个 Topic」。同时,骑手不可能只有一个,点外卖的也不会只有我一个人,于是就有了「生产者组」「消费者组」

但还是有问题呀,小区那么大,一个队列放不下。我住在小区南门,点个外卖还要跑去北门拿,那真的是 eggs hurt。于是物业在东南西北门各设了一个外卖快递放置点。也就是我们有多个队列,组成 「队列集群」

可是,问题又双叒叕来了(还有完没完),一个小区那么多个外卖快递队列,骑手怎么知道送到哪里去,我又怎么知道去哪里取?很简单,导航呀。我们把导航的信息称为「路由信息」,这些信息需要有一个管理的地方,它告诉生产者,某这个 Topic 的消息可以发给哪些队列,同时告诉消费者你需要的消息可以从哪些队列里面取。「RocketMQ 为这些路由信息的设置了管理员 NameServer」,当然 NameServer 也可以有很多个,组成 NameServer 集群。

到这里,你就应该知道 RocketMQ 里面都穿了什么啦。包括了「生产者(Producer),消费者(Consumer),NameServer 以及队列本身(Broker)」。Broker 是代理的意思,负责队列的存取等操作,我们可以把 Broker 理解为队列本身。

RocketMQ 整体架构 RocketMQ 整体架构
  • NameServer:我们可以同时部署很多台 NameServer 服务器,并且这些服务器是无状态的,节点之间无任何信息同步。 NameServer 起来后监听 端口,等待 Broker、Producer、Consumer 连上来,NameServer 是集群元数据管理中心。
  • Broker:Broker 启动,跟所有的 NameServer 保持长连接,每 30s 发送一次发送心跳包(像心跳一样持续稳定的发送请求)。心跳包中包含当前 Broker 信息 ( IP 端口等)以及存储所有 Topic 信息。注册成功后,NameServer 集群中就有 Topic 跟 Broker 的映射关系。 我们可以同时部署多个 Master 和多个 Slave,一个 Master 可以对应多个 Slave,但是一个 Slave 只能对应一个 Master。Master 与 Slave 的需要有相同的 BrokerName,不同的 BrokerId 。BrokerId 为 0 表示 Master,非 0 表示 Slave,但只有 BrokerId=1 的从服务器才会参与消息的读负载。(可以暂时忽略 Broker 的主从角色)
  • Topic:收发消息前,先创建 Topic,创建 Topic 时需要指定该 Topic 要存储在哪些 Broker 上,也可以在发送消息时自动创建 Topic。
  • Producer:Producer 发送消息,启动时先跟 NameServer 集群中的其中一台建立长连接,并从 NameServer 中获取当前发送的 Topic 存在哪些 Broker 上,采用轮询策略从选择一个队列,然后与队列所在的 Broker 建立长连接,并向 Broker 发消息。
  • Consumer:Consumer 跟 Producer 类似,跟其中一台 NameServer 建立长连接,获取当前订阅 Topic 存在哪些 Broker 上,然后直接跟 Broker 建立连接通道,开始消费消息。 我们刚刚提到骑手不止一个,取外卖快递的也不止我一个,所以会有生产者组合消费者组的概念。这里需要补充说明一下,消息分为集群消息和广播消息:
    • 集群消息:一个 Topic 的一条消息,一个消费者组「只能有一个消费者实例消费」。例如,同样是外卖 Topic,一份外卖,我们整个小区也只有一个人消费,就是集群消费。
    • 广播消息:一个 Topic 的一条消息,一个消费者组「所有消费者实例都会消费」。例如,如果是因为疫情,政府发放食品,那我们小区每个人都会消费,就是广播消费。

元数据管理

因为 Producer、Consumer 和 Broker 都需要和 NameServer 交互,负责的三此君不得不先和大家唠唠 NameServer 是何方神圣。上面有说道 NameServer 是集群的元数据管理中心,那它到底管理了哪些元数据?我们来看看 NameServer 里面又穿了什么,看完了记得关注、转发、点赞、收藏哦。

简单来说,NameServer 是我们的整个 RocketMQ 集群的元数据管理中心,负责集群元数据的增删改查。先不管这个增删改查是怎么实现的,我们甚至可以理解就是数据库的增删改查,但是我们一定要知道这些元数据都长什么样子。才能知道 Producer、Consumer 及 Broker 是如何根据这些数据进行消息收发的。

集群元数据 集群元数据

如图所示,二主二从的 Broker 集群相关的元数据信息,包括 topicQueueTable、BrokerAddrTable、ClusterAddrTable、brokerLiveInfo、FilterServer (暂时不用关注,图中未画出)。

  • HashMap<String topic, List<QueueData>> topicQueueTable:Key 是 Topic 的名称,它存储了所有 Topic 的属性信息。Value 是个 QueueData 列表,长度等于这个 Topic 数据存储的 Master Broker 的个数,QueueData 里存储着 Broker 的名称、读写 queue 的数量、同步标识等。
  • HashMap<String BrokerName, BrokerData> brokerAddrTable:这个结构存储着一个 BrokerName 对应的属性信息,包括所属的 Cluster 名称,一个 Master Broker 和多个 Slave Broker 的地址信息
  • HashMap<String ClusterName, Set<String BrokerName>> clusterAddrTable:存储的是集群中 Cluster 的信息,就是一个 Cluster 名称对应一个由 BrokerName 组成的集合。
  • HashMap<String BrokerAddr, BrokerLiveInfo> brokerLiveTable:Key 是 BrokerAddr 对应着一台机器,BrokerLiveTable 存储的内容是这台 Broker 机器的实时状态,包括上次更新状态的时间戳,NameServer 会定期检查这个时间戳,超时没有更新就认为这个 Broker 无效了,将其从 Broker 列表里清除。
  • HashMap<String BrokerAddr, List<String> FilterServer> filterServerTable:Key 是 Broker 的地址,Value 是和这个 Broker 关联的多个 FilterServer 的地址。Filter Server 是过滤服务器,是 RocketMQ 的一种服务端过滤方式,一个 Broker 可以有一个或多个 Filter Server。

其他角色会主动向 NameServer 上报状态,根据上报消息里的请求码做相应的处理,更新存储的对应信息。

  • Broker 接到创建 Topic 的请求后向 NameServer 发送注册信息,NameServer 收到注册信息后首先更新 Broker 信息,然后对每个 Master 角色的 Broker,创建一个 QueueData 对象。如果是新建 Topic,就是添加 QueueData 对象;如果是修改 Topic,就是把旧的 QueueData 删除,加入新的 QueueData。
  • Broker 向 NameServer 发送的心跳会更新时间戳,NameServer 每 10 秒检查一次检查时间戳,检查到时间戳超过 2 分钟则认为 Broker 已失效,便会触发清理逻辑。
  • 连接断开的事件也会触发状态更新,当 NameServer 和 Broker 的长连接断掉以后,onChannelDestroy 函数会被调用,把这个 Broker 的信息清理出去。
  • Producer/Consumer 启动之后会和 NameServer 建立长连接,定时从 NameServer 获取路由信息保存到本地。消息的发送与拉取都会用到上面的数据。

那么多数据,相信大家看的有点晕,三此君简单总结下:NameServer 通过 brokerLiveInfo 来维护存活的 Broker。Producer 会获取上面的路由信息,发送消息的时候指定发送到哪个 Topic,根据 Topic 可以从 topicQueueTable 选择一个 Broker,根据 BrokerName 可以从 BrokerAddrTable 获取到Broker IP 地址。有了地址 Producer 就可以将消息通过网络传递给 Broker。

消息收发示例

RocketMQ 部署

刚刚我们了解 RocketMQ 整体架构,那怎么样通过 RocketMQ 收发消息呢?需要先通过 Docker 部署一套 RocketMQ:

如果你没有安装 Docker,可以根据菜鸟教程 MacOS Docker 安装/Windows Docker 安装 进行安装。然后,通过 docker-compose 部署 RocketMQ:

  • 克隆 docker-middleware 仓库,打开 RocketMQ 目录;
  • 修改broker.conf中的brokerIP1 参数为本机 IP;
  • 进入docker-compose.yml文件所在路径,执行docker-compose up命令即可;

注意:如果你现在不了解 Docker 不重要,只需要按照步骤部署好 RocketMQ 即可,并不会阻碍我们理解 RocketMQ 相关内容。

Docker 部署 RocketMQDocker 部署 RocketMQ

部署完成后我们就可以在 Docker Dashboard 中看到 RocketMQ 相关容器,包括 Broker、NameServer 及 Console(RocketMQ 控制台),到这里我们就可以使用部署的 RocketMQ 收发消息了。

RocketMQ 已经部署好了,接下来先来看一个简单的消息收发示例,可以说是 RocketMQ 的 "Hello World"。

消息发送

代码语言:javascript复制
public class SyncProducer {
    public static void main(String[] args) throws Exception {
        // 实例化消息生产者Producer
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        // 设置NameServer的地址
        producer.setNamesrvAddr("localhost:9876");
        // 启动Producer实例
        producer.start();
        // 创建消息,并指定Topic,Tag和消息体
        Message msg = new Message("Topic1","Tag", "Key",
                                    "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); 
        // 发送消息到一个Broker
        SendResult sendResult = producer.send(msg);
       // 通过sendResult返回消息是否成功送达
        System.out.printf("%s%n", sendResult);
        // 如果不再发送消息,关闭Producer实例。
        producer.shutdown();
    }
}
  • 首先,实例化一个生产者 producer,并告诉它 NameServer 的地址,这样生产者才能从 NameServer 获取路由信息。
  • 然后 producer 得做一些初始化(这是很关键的步骤),它要和 NameServer 通信,要先建立通信连接等。
  • producer 已经准备好了,那得准备好要发的内容,把 "Hello World" 发送到 Topic1。
  • 内容准备好,那 producer 就可以把消息发送出去了。producer 怎么知道 Broker 地址呢?他就会去 NameServer 获取路由信息,得到 Broker 的地址是 localhost:10909,然后通过网络通信将消息发送给 Broker。
  • 生产者发送的消息通过网络传输给 Broker,Broker 需要对消息按照一定的结构进行存储。存储完成之后,把存储结果告知生产者。

消息接收

代码语言:javascript复制
public class Consumer {

 public static void main(String[] args) throws InterruptedException, MQClientException {
     // 实例化消费者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
     // 设置NameServer的地址
        consumer.setNamesrvAddr("localhost:9876");
     // 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息
        erbconsumerijun.subscribe("sancijun", "*");
     // 注册回调实现类来处理从broker拉取回来的消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(
              List<MessageExt> msgs,ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                // 标记该消息已经被成功消费
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 启动消费者实例
        consumer.start();
 }
}
  • 首先,实例化一个消费者 consumer,告诉它 NameServer 的地址,这样消费者才能从 NameServer 获取路由信息。
  • 然后这个消费者需要知道自己可以消费哪些 Topic 的消息,也就是每个消费者需要订阅一个或多个 Topic。
  • 消费者也需要做一些初始化,业务本身并没有理会怎么从 Broker 拉取消息,这些都是消费者默默无闻的奉献。所以,我们需要启动消费者,消费者会从 NameServer 拉取路由信息,并不断从 Broker 拉取消息。拉取回来的消息提供给业务定义的 MessageListener。
  • 消息拉取回来后,消费这需要怎么处理呢?每个消费者都不一样(业务本身决定),由我们业务定义的 MessageListener 处理。处理完之后,消费者也需要确认收货,就是告诉 Broker 消费成功了。

以上就是本文的全部内容,本文没有堆砌太多无意义的概念,没有讲什么削峰解耦,异步通信。这些内容网上也很多,看了和没看没什么两样。

参考文献

  • RocketMQ 官方文档
  • RocketMQ 源码
  • 丁威, 周继锋. RocketMQ技术内幕:RocketMQ架构设计与实现原理. 机械工业出版社, 2019-01.
  • 李伟. RocketMQ分布式消息中间件:核心原理与最佳实践. 电子工业出版社, 2020-08.
  • 杨开元. RocketMQ实战与原理解析. 机械工业出版社, 2018-06.

转载请注明出处

0 人点赞