RocketMQ 大脑 NameServer 赏析

2023-11-17 10:43:40 浏览数 (1)

1. 为什么是 RocketMQ

2. 使用场景

MQ 的基本用途

  • 异步处理(如:订单支付后,扣减库存、增加积分,引入 MQ 异步去做,降低响应时间)
  • 系统解耦(如:订单完成后,发送 MQ,下游依赖系统(库存、营销)自行调用)
  • 削峰填谷(如:秒杀活动,请求进入MQ,服务器慢慢处理,多余请求抛弃)
  • 日志收集(如:分布式系统调用日志记录)

MQ 本质:异步通信

3. 工作原理

RocketMQ 四大核心组件

  • NameServer:主要负责对于元数据的管理,包括了对于 Topic 和路由信息的管理。(类比:快递中心
  • Broker:消息中转角色,负责存储消息,转发消息。(类比:快递柜
  • Producer:负责生产消息。(类比:寄件人发货
  • Consumer:负责消费消息。(类比:收件人收货

【来源于RocketMQ官网】【来源于RocketMQ官网】

4. 演进过程

  • 在 RocketMQ 的早期版本其实不叫 RocketMQ,而是叫 MetaQ,在 MetaQ1.0 和 MetaQ2.0 实际上是依赖的是 Zookeeper,但是从 MetaQ3.0 的时候更名为 RocketMQ,同时也去掉了 Zookeeper。
  • RocketMQ 整体设计思想是来自于 kafka,而 kafka 是使用 Zookeeper 作为注册中心,所以早期版本也是使用的 Zookeeper,到了后期发展,摈弃了 Zookeeper,自己写了一套 NameServer,保证独立性。
  • 采用 Zookeeper 需要保持强一致性,还会导致整体架构就会变得复杂,在维护、搭建成本上都会上升,为了保证高性能,低维护成本,那么就开发了 NameServer。

5. 从 NameServer 起点

5.1 RocketMQ 大脑 —— NameServer

NameServer 是一个 Broker 与 Topic 路由的注册中心,支持 Broker 的动态注册与发现,主要功能如下:

Broker管理:接收 Broker 集群的注册信息并且保存下来作为路由信息的基本数据,提供心跳监测机制,检查 Broker 是否还存活。

路由信息管理:每个 NameServer中都保存 Broker 集群的整个路由信息可用于客户端查询队列信息,Producer 和 Consumer 通过 NameServer可以获取整个 Broker 集群路由信息从而进行消息投递和消费。

5.2 本地调试

日志文件存储路径配置和启动

日志文件存储路径配置和启动日志文件存储路径配置和启动

5.3 NamesrvStartup 启动类

启动类: org.apache.rocketmq.namesrv.NamesrvStartup

代码片段如下:

代码语言:java复制
public static NamesrvController main0(String[] args) {

        try {
            NamesrvController controller = createNamesrvController(args);
            start(controller);
            String tip = "The Name Server boot success. serializeType="   RemotingCommand.getSerializeTypeConfigInThisServer();
            log.info(tip);
            System.out.printf("%s%n", tip);
            return controller;
        } catch (Throwable e) {
            e.printStackTrace();
            System.exit(-1);
        }

        return null;
    }

通过 main 方法启动 NameServer,分为两大步,先创建 NamesrvController,而后再初始化并启动 NamesrvController。

5.4 NameServer 启动时序图

NameServer启动时序图NameServer启动时序图

6. 路由管理

6.1 路由注册

6.1.1 心跳请求

在 RocketMQ 中,默认情况下,Broker 服务器会每间隔 30 秒向集群中所有的 NameServer 发送心跳信息。

org.apache.rocketmq.broker.BrokerController#start

代码语言:java复制
// 每隔30s发送心跳
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
                } catch (Throwable e) {
                    log.error("registerBrokerAll Exception", e);
                }
            }
        }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);

NameServer 会接收到 Broker 发来的心跳包,DefaultRequestProcessor(默认请求处理器) 根据请求头中的code 码,执行处理心跳包的操作。

具体会调用 RouteInfoManager.registerBroker() 操作。其实 broker 注册路由过程,主要还是操作RouteInfoManager 中的多个映射表

org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#processRequest

路由注册代码片段及时序图路由注册代码片段及时序图

6.1.2 RouteInfoManager

RouteInfoManagerRouteInfoManager

RouteInfoManager:用于管理心跳信息以及路由管理

RouteInfoManager 管理的路由元数据:(元数据是什么?

① topicQueueTable:topic路由信息,broker名称,读队列数,写队列数等

② brokerAddrTable:broker信息,broker集群,broker名称,broker地址等

③ clusterAddrTable:broker集群信息,broker名称列表

④ brokerLiveTable:broker状态信息,broker存活最新上报时间

⑤ filterServerTable:filterServer列表,消息过滤信息

路由元信息Map结构示例如下:

6.2 路由删除

org.apache.rocketmq.namesrv.NamesrvController#initialize

代码语言:javascript复制
// 每隔 10s 进行扫描判断是否存在失效的Broker,如果存在则移除失效broker
this.scheduledExecutorService.scheduleAtFixedRate(NamesrvController.this.routeInfoManager::scanNotActiveBroker, 5, 10, TimeUnit.SECONDS);

org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#scanNotActiveBroker

代码语言:javascript复制
public int scanNotActiveBroker() {
        int removeCount = 0;
        Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
        while (it.hasNext()) {
            Entry<String, BrokerLiveInfo> next = it.next();
            long last = next.getValue().getLastUpdateTimestamp();
            if ((last   BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
                RemotingUtil.closeChannel(next.getValue().getChannel());
                it.remove();
                log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);
                this.onChannelDestroy(next.getKey(), next.getValue().getChannel());
                removeCount  ;
            }
        }
        return removeCount;
    }

备注:Broker 视角触发路由删除,即 Broker 在正常关闭的情况下,会执行 unregisterBroker 指令这两种方式路由删除的方法都是一样的,都是从相关路由表中删除与该 broker 相关的信息。

6.3 路由发现

RocketMQ 的路由发现采用的是 Pull 模型,当 Topic 路由信息发生变化时,NameServer 不会主动推送给客户端,而是客户端定时拉取主题最新的路由,默认让客户端每 30 秒会拉取一次最新的路由。

Push、Pull、Long Polling 三种模型的优缺点?

路由发现源码片段路由发现源码片段

7. 小结

7.1 回顾

这次源码剖析学到了什么?

本次分享很多不足的地方还请指教。

如下图所示:

框架示意图框架示意图

7.2 源码学习的方法

本地调试法(启动类,Debug,QuickStart等)

7.3 NameServer 设计的亮点

  • 读写锁使用,Map 结构存储路由信息;
  • NameServer 之间不通信(AP,最终一致性);
  • NameServer 只管理 Broker 集群等信息。

8. 本地源码调试 —— Step by Step【附录】

(1)从 GitHub 上 clone 代码到本地

https://github.com/apache/rocketmq.git

(2)导入 IDEA

(3)执行 mvn clean install -DskipTests

(4)启动 NameServer

(5)启动 Broker

(6)启动 Producer

(7)启动 Consumer

Step By StepStep By Step

8.1 启动 NameServer

org.apache.rocketmq.namesrv.NamesrvStartup#main

启动报错:请设置 ROCKET_HOME 环境变量启动报错:请设置 ROCKET_HOME 环境变量

打开 【Edit Configuration】配置启动参数:

【Edit Configuration】配置【Edit Configuration】配置
ROCKET_HOME 环境变量设置ROCKET_HOME 环境变量设置

ROCKET_HOME 环境变量设置完成如下:

ROCKET_HOME 环境变量设置完成ROCKET_HOME 环境变量设置完成

再次启动 NameServer

启动报错:在如图路径下未找到 logback_namesrv.xml 文件启动报错:在如图路径下未找到 logback_namesrv.xml 文件

添加 logback_namesrv.xml 文件到指定文件路径下:

打开 logback_namesrv.xml 文件并替换 RocketMQ 运行路径

再次启动,出现“The Name Server boot success …”字样,表示启动成功。

8.2 启动 Broker

org.apache.rocketmq.broker.BrokerStartup#main

启动报错:请设置 ROCKET_HOME 环境变量启动报错:请设置 ROCKET_HOME 环境变量

ROCKET_HOME 环境变量设置如下:

ROCKET_HOME 环境变量设置ROCKET_HOME 环境变量设置

再次启动 NameServer

启动报错:在如图路径下未找到 logback_broker.xml 文件启动报错:在如图路径下未找到 logback_broker.xml 文件

添加 logback_namesrv.xml 文件到指定文件路径下,报错:未找到 broker.conf 文件

打开 broker.conf 文件并添加配置信息

代码语言:javascript复制
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH

#追加配置如下:
#是否允许 Broker 自动创建 Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true

#nameServer 地址,分号分割
namesrvAddr=127.0.0.1:9876

#broker 地址
brokerIP1=10.2.67.197

#存储路径
storePathRootDir=D:/xxx/rocketmqserver/store
#commitLog 存储路径
storePathCommitLog=D:/xxx/rocketmqserver/store/commitlog
#消费队列存储路径
storePathConsumeQueue=D:/xxx/rocketmqserver/store/consumequeue
#消息索引存储路径
storePathIndex=D:/xxx/rocketmqserver/store/index
#checkpoint 文件存储路径
storeCheckpoint=D:/xxx/rocketmqserver/store/checkpoint
#abort 文件存储路径
abortFile=D:/xxx/rocketmqserver/store/abort

设置【Programma arguments】项目启动参数如下:

【Programma arguments】项目启动参数【Programma arguments】项目启动参数

再次启动,出现“The broker[…] boot success …”字样,表示启动成功。

8.3 启动 Producer

org.apache.rocketmq.example.quickstart.Producer#main

8.4 启动 Consumer

org.apache.rocketmq.example.quickstart.Consumer#main

原文链接:https://mp.weixin.qq.com/s/94DaQVGvZwFQlSUvQmrL1A

0 人点赞