RocketMQ
基本概念
消息模型
- RocketMQ由Producer、Broker、Consumer组成
- Producer
- 生产消息,同步/异步发送,顺序/单向发送。 同步/异步发送均需要Broker返回确认信息。
- 主要实现
- DefaultMQProducer
- 负责生产普通、顺序、单向、批量、延迟消息
- TransactionMQProducer
- 负责生产 事务消息
- DefaultMQProducer
- 支持集群部署,通过负载均衡模块,选择相应的Broker进行投递。
- 与NameServer集群中一个节点建立长连接,定期拉取Topic路由信息,并与提供Topic服务的master建立长连接,定时发送心跳。
- Producer Group
- 生产者组,生产同一类消息且发送逻辑相同。
- Broker Server
- 存储消息(持久化),实际对应一台机器。
- Broker可存储多个Topic,每个Topic由多个队列(MessageQueue)组成。
- 为消费者拉取做准备。也保存一些元数据:消费组、消费进度。
- 支持集群部署,Master-Slave
- master可对应多个Slave,一个Slave只能对应一个Master。
- 每个Broker都与所有Name Server建立长连接,定时注册Topic信息
- BrokerId=0为Maste,非0为Slave,只有BrokerId=1才会参与消息读负载。
- Consumer
- 消费消息:主动从Broker服务器拉取消息进行消费。
- 两种消费形式:拉取式和推动式,实则是主动拉取下来的。
- 支持集群部署,支持集群消费、广播消费。
- 与NameServer集群中一个节点建立长连接,定期获取Topic路由信息,并向提供Topic服务的master、slave连接长连接,定时向两者发送心跳。
- 可从Master订阅也可从Slave订阅。
- 当向master拉取时,master会根据 拉取偏移量和最大偏移量等因素,建议下次是送master还是Slave拉取。
- Consumer Group
- 同一类Consumer的集合,消费同一类消息且消费逻辑一致。则要求订阅相同topic。
- Producer
- Topic
- 某类消息的集合;每条消息只能属于一个主题。
- 是消息订阅的基本单位
- 由若干Queue组成
- 内部由一个或多个分区,有这些分区保存数据
- Name Server
- 管理路由信息(Broker信息,Broker与Topic的关系,Topic与队列的关系)
- 通过心跳机制判断Broker是否存活
- 生产者/消费者 通过 NameServer 查找 topic路由信息(主题对应的 Broker IP列表)进行投递或消费。
- 多个Name Server可以集群,但相互独立,没有交互。所以:每个Name Server都保存一个完整的路由信息,任何一台NameServer宕机都不影响使用。
- 每个Broker与所有Name Server建立长连接,而Producer与Consumer仅与集群中一台NameServer建立长连接。
- 管理路由信息(Broker信息,Broker与Topic的关系,Topic与队列的关系)
- 消费模式
- Clustering集群消费(负载均衡消费)
- 同一Consumer Group下每个Consumer平均分配消息。
- Clustering集群消费(负载均衡消费)
- BroadCasting广播消费
- 同一Consumer Group下每个Consumer都接收到全量消息。
- 顺序消息
- 分区有序
- 同一队列的消息是有序的,不同队列消息可能是无序的
- 全局有序
- 每个topic只有一个队列
- 分区有序
特性
消息有序
- 分区有序
- 将一个Topic消息分为多个分区保存和消费,每个分区遵循FIFO原则,即分区有序
- 全局有序
- 内部将topic的分区设置1,那么该topic只有一个分区,所有消息都遵循FIFO原则
消息过滤
- 目前只能在Broker端实现过滤
消息可靠
- Broker非正常关闭、Broker异常Crash、机器掉电、磁盘设备损坏,机器无法开机
- 采用同步刷盘方式,不会丢失任何数据
- 采用异步刷盘方式,会丢失少量数据
- 单点故障,如master失败
- 采用同步复制:数据不会丢失,完全避免单点故障,只是性能差
至少一次
- 每个消息至少投递一次
- 消费者拉取并消费完成才向服务器返回ack
- 可代码控制是否返回ack。
回溯消费
- Consumer已消费成功的消息,需要重新消费一次。按时间维度回溯,精确到毫秒级别。
事务消息
- 本地事务与发送消息定义到全局事务中,要么同时成功,要么同时失败
- 分布式事务最终一致性
定时消息
消息重试
- 消息没有被Consumer成功消费,重试机制是的消息再消费一次。
- 每个topic都有 重试队列 ,以保存消费失败的消息。
消息重投
- 生产者发送消息失败,同步发送情况会重投,异步会重试。
- 可能会重复,且不可避免。
- 可设置重投、重试次数。
死信队列
- 用于处理无法被正常消费的消息。
- 消息达到重投、重试次数,就进入该队列中。只能后台重发这些消息。
介绍
为什么用MQ
- 消息队列是一种 先进先出 的数据结构
- 消息队列应用场景
- 应用解耦
- 强耦合
- 系统的耦合性越高,容错性就越低。以电商应用为例,用户创建订单后,如果耦合调用库存系统、物流系统、支付系统,任何一个子系统出了故障或因为升级等原因暂时不可用,都会造成下单操作异常,影响用户使用体验。
- 使用MQ解耦
- 下游服务故障,不会影响上游服务;如物流系统故障,物流系统所需要的数据缓存到消息队列中,用户下单能正常完成,物流系统恢复后,到消息队列获取数据消费即可。
- 强耦合
- 流量削峰
- 什么是削峰
- 系统请求流量瞬间猛增,可能会将系统压垮,可将大量请求缓存中MQ,分散到很长一段时间处理,可提高系统稳定性和用户体验。
- 使用MQ解决削峰
- 什么是削峰
- 数据分发
- 硬编码实现数据分发
- 使用MQ数据分发
- 数据的产生方不需要关心谁来使用数据,只需要将数据发送到消息队列,数据使用方直接在消息队列中直接获取数据即可(其实是订阅)
- 应用解耦
- 消息队列应用场景
使用MQ优缺点
- 优点
- 解耦、削峰、数据分发
- 缺点
- 系统可用性降低
- 系统稳定性降低,一旦MQ宕机,对业务造成影响
- 如何保证MQ高可用
- 系统变复杂了
- 重复消费问题
- 消息丢失问题
- 顺序消息问题
- 一致性问题
- 如通过MQ给B、C、D发送消息,B、C处理成功,D处理失败
- 如何保证消息处理的一致性?
MQ带来的问题
消息堆积
- 堆积原因分析
- producer生产速度过快
- 可能存在的问题
- dos攻击
- 业务高峰(双十一下单,12306订票)
- 可能存在的问题
- broker消息堆积
- 可能存在的问题
- broker性能瓶颈
- broker同步策略导致消息堆积
- 可能存在的问题
- 消息者拉取超过一定量消息后会暂定消息拉取
- 原因有二
- 消息者消息能力有限
- 消费端过多消息容易GC频繁
- 原因有二
- producer生产速度过快
- 消息堆积处理手段
- 首先明确堆积原因
- 通常可限流和扩容来解决
- 如何判断是否消息堆积
- producer发送消息速率监控
- producer发送消息的maxoffset与consumer消费消息的currOffset的差异值,与给定的消息堆积数值告警值对比,如差异值大于告警值则出现堆积
- consumer消息消息的速率监控
与KafKa不同点
性能
- KafKa单机写入 TPS 约在百万条/秒,消息大小 10个字节。 而RocketMQ单机单Broker写入TPS越 7万条/秒,单机部署3个Broker,可达12万条/秒。
- 原因
- KafKa得TPS单机达到百万级别,主要因为 Producer端将多个小消息合并,批量发给Broker。
- 为什么RocketMQ没有这么做
- 因为RocketMQ 是java 实现的,要是缓存过多消息,GC是很严重的问题。
- 为什么RocketMQ没有这么做
- Kafka无限消息堆积,高效的持久化速度,但KafKa主要定位于日志传输
- KafKa 消息存储过程会根据topic和partition的数量创建文件,即创建一个topic 并 指定3个partition,那就有3个物理文件对partition对应。所以多文件并发写入,性能比RocketMQ好。
- RocketMQ只有一个commitLog物理文件,单文件写入,性能比KafKa差。
- KafKa得TPS单机达到百万级别,主要因为 Producer端将多个小消息合并,批量发给Broker。
数据可靠性
- RocketMQ 支持异步/同步刷盘(持久化),异步/同步复制(主从)
- KafKa使用异步刷盘,异步复制
- 所以 RocketMQ同步刷盘同步复制在数据可靠性上 比KafKa更优
集群
- KafKa支持Slave 自动切换为Master;开源版本RocketMQ不支持Slave自动切换为Master,而阿里云版本RocketMQ支持自动切换特性。
消费失败重试
- KafKa消费失败不支持重试,而RocketMQ支持失败重试。
严格的消息顺序
- kafka支持顺序消息,当宕机后会乱序
- RocketMQ支持严格消息顺序,即使宕机后也不会乱序
定时消息
- kafka不支持定时消息
- RocketMQ支持定时消息
分布式事务消息
- kafka不支持分布式事务消息
- RocketMQ支持分布式事务
消息过滤
- kafka不支持代理端消息过滤
- RocketMQ支持代理端消息过滤
KafKa不支持延迟消息,而RocketMQ支持
重点
- ActiveMQ IO模块遇到了瓶颈
- Kafka在低延迟和高可靠性方面不能满足阿里
- kafka仅在提交消息后(即将消息复制到所有同步副本),才将消息公开给使用者。
- 实验表明,将1000个分区从一个代理复制到另一个代理会增加大约20毫秒的延迟。对某些实时应用程序来说,这可能太高
集群搭建
单机
- 启动
- 启动namesrv
- nohup sh bin/mqnamesrv &
- tail -f ~/logs/rocketmqlogs/namesrv.log
- 启动broker
- vi runbroder.sh vi runserver.sh 修改jvm参数
- nohup sh bin/mqbroker -n localhost:9876 &
- 指定了namesrv的地址
- tail -f ~/logs/rocketmqlogs/broker.log
- 启动namesrv
- 关闭
- sh bin/mqshutdown broker
- sh bin/mqshutdown namesrv
- 测试发送消息和接收消息
- 窗口一
- export NAMESRV_ADDR=localhost:9876
- sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
- 窗口二
- export NAMESRV_ADDR=localhost:9876
- sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
- 窗口一
集群
- 集群概念图
- 集群特点
- nameserver是无状态的,可集群部署,节点之间不通信
- broder分master和slave,一个master可对应多个slave,而一个slave只能对应一个master,master和slave的对应关系通过指定相同的brokername,主从由brokerid来控制,brokerid=0代表主 非0代表从。
master可以部署多个。
每个broker与nameserver集群的所有节点建立长连接,定时注册topic信息到所有nameserver。
- producer与nameserver集群中的其中一个节点(随机选择)建立长连接,定期从nameserver获取topic路由信息,并向提供topic服务的master建立长连接,且定时向master发送心跳。
producer完全无状态,可集群部署。
- consumer与nameserver集群中的其中一个节点(随机)建立长连接,定期从nameserver获取topic路由信息,并向提供topic服务的master、slave建立长连接,且定时向master、slave发送心跳。
consumer既可以从master订阅消息,也可以从slave订阅消息,订阅规则由broker配置决定。
双主双从同步双写搭建
- 设计图
- 集群工作流程说明
- namesrv启动,等待broker、producer、consumer连接,充当路由控制中心
- broker启动,与所有namesrv保持长连接,定时发送心跳,心跳包包含当前broker信息(ip和端口)以及存储所有的topic信息。
注册成功后,namesrv就有了topic与broker的映射关系了。
- 收发消息前,先创建topic,创建topic时需要指定topic要存储在哪些broker上,也可以在发送消息时自动创建topic
- Producer发送消息,启动时先跟namesrv集群中的其中一台建立长连接,并从namesrv中获取当前发送的topic存在哪些broker上,轮询从队列列表中选择一个队列,然后与队列所在的broker建立长连接从而向broker发消息。
- consumer与producer类似,跟其中一台namesrv建立长连接,获取当前订阅topic存在哪些broker上,然后直接跟broker建立长连接,开始消费消息。
- 搭建步骤
- 目标
- 配置
- 配置hosts文件
- 在两台机器上配置
- 配置rocketmq环境变量
- 在两台机器上配置
- 配置消息存储路径
- 在两台机器上配置 快速:mkdir /usr/local/rocketmq/store/{commitlog,consumequeue,index} -p
- 配置: broker-a.properties broker-a-s.properties broker-b.properties broker-b-s.properties
- **#所属集群名称 重点 brokerClusterName=rocket-cluster #当前broker的名称,主从关系的节点该名称必须一致,注:不同的broker配置不同的名称 重点 brokerName=broker-a #是否master 0为master 非0为slave 重点 brokerId=0 #namesrv地址 重点 namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876 #在发送消息时,自动创建不存在的topic,默认创建的队列数 defaultTopicQueueNums=4 #是否允许broker自动创建topic,建议线下开启,线上关闭 autoCreateTopicEnable=true #是否允许broker自动创建订阅组,建议线下开启,线上关闭 autoCreateSubscriptionGroup=true #接受客户端连接的监听端口 重点 listenPort=10911 #在每天的什么时间删除已经超过文件保留时间的 commit log deleteWhen=04 #以小时计算的文件保留时间 fileReservedTime=48 #commitLog每个文件的大小默认1G mappedFileSizeCommitLog=1073741824 #ConsumeQueue每个文件默认存30W条,根据业务情况调整 mapedFileSizeConsumeQueue=300000 #检测物理文件磁盘空间 diskMaxUsedSpaceRatio=88
- 配置hosts文件
#存储路径 重点 storePathRootDir=/user/local/rocketmq/store #commitLog 存储路径 重点 storePathCommitLog=/user/local/rocketmq/store/commitlog #消费队列存储路径存储路径 重点 storePathConsumeQueue=/user/local/rocketmq/store/consumequeue #消息索引存储路径 重点 storePathIndex=/user/local/rocketmq/store/index #checkpoint 文件存储路径 重点 storeCheckpoint=/user/local/rocketmq/store/checkpoint #abort 文件存储路径 重点 abortFile=/user/local/rocketmq/store/abort #限制的消息大小 重点 maxMessageSize=65536 #broker的角色 #SYNC_MASTER 同步双写master #ASYNC_MASTER 同步双写master #SLAVE 重点 brokerRole=SYNC_MASTER #ASYNC_FLUSH 异步刷盘 #SYNC_FLUSH 同步刷盘 重点 flushDiskType=ASYNC_FLUSH **
代码语言:javascript复制 - 修改jvm参数
- vi runbroder.sh
vi runserver.sh 修改jvm参数
代码语言:javascript复制 - 分别启动namesrv
-
- 启动broker
-
- 查看进程jps
- 查看日志
-
- [rocketmq-console管理后台搭建](https://github.com/apache/rocketmq-externals)
- 下载rocketmq-console
- 修改resource/application.properties
- rocketmq.config.namesrvAddr=ip:port;ip:port
- 打包mvn clean package -Dmaven.test.skip=true
- 运行java -jar target/rocketmq-console-ng-1.0.1.jar
- 访问locallhost:8080
集群方式介绍
namesrv集群
- 启动多个namesrv即可,namesrv之间不通信
- broker需配置所有要连接的namesrv
单Master
- 风险大、一旦Broker重启或挂掉,整个服务就不可用。仅使用本地测试
多Master
- 一个集群全是Master,无Slave。如2个Master或3个Master
- 优点
- 配置简单,单个Master宕机或重启维护 对 应用无影响。(异步刷新盘丢失少量消息,同步刷盘一条不丢),性能最优。
- 缺点
- 单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息的实时性会收到影响。
多Master多Slave模式 异步复制
- 每个master配一个slave,有多对master-slave,HA(高可用)采用异步复制,主备有短暂消息延迟(毫秒级);master同步刷盘
- 优点
- 即使磁盘损坏,消息丢失得非常少,且消息实时性不会受影响,同时master宕机后,消费者仍然可以从slave消费,此过程自动执行,性能同 多Master 一样。
- 缺点
- master宕机或磁盘损坏情况下丢失少量消息。(未来得及同步至slave那部分数据)
多Master多Slave模式 同步双写
- 每个master配一个slave,有多对master-slave,HA采用同步双写,即只有主备都写成功了,才向应用返回成功。
- 优点
- 数据与服务都无单点故障,master宕机,消息无延迟,服务可用性与数据可用性非常高
- 缺点
- 性能比异步复制差 10% 左右,且目前版本,在master节点宕机后,slave不能主动切换master
主从
- 主从工作原理
- 通过name确定是否同一组,通过brokerid确定是否主,0主 , 非0从
- Master负责接收消息,Slave不断发送请求到Master拉去消息
- 消费端可从Master或Slave拉去消息,如从Master拉去消息,Master 会根据当前情况和Slave同步情况,向消费者建议下一次拉去消息是从Master还Slave。(自动完成)
- Master 宕机
- 当Master宕机后,会通过 dledger算法 从Slave当中选举一个为Master。
- 为什么是dledger
- 也可借用zookeeper ,当zk依赖外部组件比较多,而且还得维护zk集群
- dledger是利用raft协议完成自动选主的,raft协议不需要外部组件,自动选主的逻辑集成到各个节点的进程中,节点之间通过通信就可以完成选主。
- dledger如何工作
- 工作原理是raft算法
- 为什么是dledger
- 当Master宕机后,会通过 dledger算法 从Slave当中选举一个为Master。