RocketMQ

2022-05-20 16:51:42 浏览数 (1)

RocketMQ

基本概念

消息模型

  • RocketMQ由Producer、Broker、Consumer组成
    • Producer
      • 生产消息,同步/异步发送,顺序/单向发送。 同步/异步发送均需要Broker返回确认信息。
      • 主要实现
        • DefaultMQProducer
          • 负责生产普通、顺序、单向、批量、延迟消息
        • TransactionMQProducer
          • 负责生产 事务消息
      • 支持集群部署,通过负载均衡模块,选择相应的Broker进行投递。
      • 与NameServer集群中一个节点建立长连接,定期拉取Topic路由信息,并与提供Topic服务的master建立长连接,定时发送心跳。
    • Producer Group
      • 生产者组,生产同一类消息且发送逻辑相同。
    • Broker Server
      • 存储消息(持久化),实际对应一台机器。
      • Broker可存储多个Topic,每个Topic由多个队列(MessageQueue)组成。
      • 为消费者拉取做准备。也保存一些元数据:消费组、消费进度。
      • 支持集群部署,Master-Slave
        • master可对应多个Slave,一个Slave只能对应一个Master。
        • 每个Broker都与所有Name Server建立长连接,定时注册Topic信息
        • BrokerId=0为Maste,非0为Slave,只有BrokerId=1才会参与消息读负载。
    • Consumer
      • 消费消息:主动从Broker服务器拉取消息进行消费。
      • 两种消费形式:拉取式和推动式,实则是主动拉取下来的。
      • 支持集群部署,支持集群消费、广播消费。
      • 与NameServer集群中一个节点建立长连接,定期获取Topic路由信息,并向提供Topic服务的master、slave连接长连接,定时向两者发送心跳。
        • 可从Master订阅也可从Slave订阅。
        • 当向master拉取时,master会根据 拉取偏移量和最大偏移量等因素,建议下次是送master还是Slave拉取。
    • Consumer Group
      • 同一类Consumer的集合,消费同一类消息且消费逻辑一致。则要求订阅相同topic。
  • Topic
    • 某类消息的集合;每条消息只能属于一个主题。
    • 是消息订阅的基本单位
    • 由若干Queue组成
    • 内部由一个或多个分区,有这些分区保存数据
  • Name Server
    • 管理路由信息(Broker信息,Broker与Topic的关系,Topic与队列的关系)
      • 通过心跳机制判断Broker是否存活
    • 生产者/消费者 通过 NameServer 查找 topic路由信息(主题对应的 Broker IP列表)进行投递或消费。
    • 多个Name Server可以集群,但相互独立,没有交互。所以:每个Name Server都保存一个完整的路由信息,任何一台NameServer宕机都不影响使用。
    • 每个Broker与所有Name Server建立长连接,而Producer与Consumer仅与集群中一台NameServer建立长连接。
  • 消费模式
    • Clustering集群消费(负载均衡消费)
      • 同一Consumer Group下每个Consumer平均分配消息。
代码语言:javascript复制
- BroadCasting广播消费

    - 同一Consumer Group下每个Consumer都接收到全量消息。
  • 顺序消息
    • 分区有序
      • 同一队列的消息是有序的,不同队列消息可能是无序的
    • 全局有序
      • 每个topic只有一个队列

特性

消息有序

  • 分区有序
    • 将一个Topic消息分为多个分区保存和消费,每个分区遵循FIFO原则,即分区有序
  • 全局有序
    • 内部将topic的分区设置1,那么该topic只有一个分区,所有消息都遵循FIFO原则

消息过滤

  • 目前只能在Broker端实现过滤

消息可靠

  • Broker非正常关闭、Broker异常Crash、机器掉电、磁盘设备损坏,机器无法开机
    • 采用同步刷盘方式,不会丢失任何数据
    • 采用异步刷盘方式,会丢失少量数据
  • 单点故障,如master失败
    • 采用同步复制:数据不会丢失,完全避免单点故障,只是性能差

至少一次

  • 每个消息至少投递一次
  • 消费者拉取并消费完成才向服务器返回ack
    • 可代码控制是否返回ack。

回溯消费

  • Consumer已消费成功的消息,需要重新消费一次。按时间维度回溯,精确到毫秒级别。

事务消息

  • 本地事务与发送消息定义到全局事务中,要么同时成功,要么同时失败
  • 分布式事务最终一致性

定时消息

消息重试

  • 消息没有被Consumer成功消费,重试机制是的消息再消费一次。
  • 每个topic都有 重试队列 ,以保存消费失败的消息。

消息重投

  • 生产者发送消息失败,同步发送情况会重投,异步会重试。
  • 可能会重复,且不可避免。
  • 可设置重投、重试次数。

死信队列

  • 用于处理无法被正常消费的消息。
  • 消息达到重投、重试次数,就进入该队列中。只能后台重发这些消息。

介绍

为什么用MQ

  • 消息队列是一种 先进先出 的数据结构
    • 消息队列应用场景
      • 应用解耦
        • 强耦合
          • 系统的耦合性越高,容错性就越低。以电商应用为例,用户创建订单后,如果耦合调用库存系统、物流系统、支付系统,任何一个子系统出了故障或因为升级等原因暂时不可用,都会造成下单操作异常,影响用户使用体验。
        • 使用MQ解耦
          • 下游服务故障,不会影响上游服务;如物流系统故障,物流系统所需要的数据缓存到消息队列中,用户下单能正常完成,物流系统恢复后,到消息队列获取数据消费即可。
      • 流量削峰
        • 什么是削峰
          • 系统请求流量瞬间猛增,可能会将系统压垮,可将大量请求缓存中MQ,分散到很长一段时间处理,可提高系统稳定性和用户体验。
        • 使用MQ解决削峰
      • 数据分发
        • 硬编码实现数据分发
        • 使用MQ数据分发
          • 数据的产生方不需要关心谁来使用数据,只需要将数据发送到消息队列,数据使用方直接在消息队列中直接获取数据即可(其实是订阅)

使用MQ优缺点

  • 优点
    • 解耦、削峰、数据分发
  • 缺点
    • 系统可用性降低
    • 系统稳定性降低,一旦MQ宕机,对业务造成影响
    • 如何保证MQ高可用
    • 系统变复杂了
      • 重复消费问题
      • 消息丢失问题
      • 顺序消息问题
      • 一致性问题
        • 如通过MQ给B、C、D发送消息,B、C处理成功,D处理失败
        • 如何保证消息处理的一致性?

MQ带来的问题

消息堆积

  • 堆积原因分析
    • producer生产速度过快
      • 可能存在的问题
        • dos攻击
        • 业务高峰(双十一下单,12306订票)
    • broker消息堆积
      • 可能存在的问题
        • broker性能瓶颈
        • broker同步策略导致消息堆积
    • 消息者拉取超过一定量消息后会暂定消息拉取
      • 原因有二
        • 消息者消息能力有限
        • 消费端过多消息容易GC频繁
  • 消息堆积处理手段
    • 首先明确堆积原因
    • 通常可限流和扩容来解决
  • 如何判断是否消息堆积
    • producer发送消息速率监控
    • producer发送消息的maxoffset与consumer消费消息的currOffset的差异值,与给定的消息堆积数值告警值对比,如差异值大于告警值则出现堆积
    • consumer消息消息的速率监控

与KafKa不同点

性能

  • KafKa单机写入 TPS 约在百万条/秒,消息大小 10个字节。 而RocketMQ单机单Broker写入TPS越 7万条/秒,单机部署3个Broker,可达12万条/秒。
  • 原因
    • KafKa得TPS单机达到百万级别,主要因为 Producer端将多个小消息合并,批量发给Broker。
      • 为什么RocketMQ没有这么做
        • 因为RocketMQ 是java 实现的,要是缓存过多消息,GC是很严重的问题。
    • Kafka无限消息堆积,高效的持久化速度,但KafKa主要定位于日志传输
    • KafKa 消息存储过程会根据topic和partition的数量创建文件,即创建一个topic 并 指定3个partition,那就有3个物理文件对partition对应。所以多文件并发写入,性能比RocketMQ好。
    • RocketMQ只有一个commitLog物理文件,单文件写入,性能比KafKa差。

数据可靠性

  • RocketMQ 支持异步/同步刷盘(持久化),异步/同步复制(主从)
  • KafKa使用异步刷盘,异步复制
  • 所以 RocketMQ同步刷盘同步复制在数据可靠性上 比KafKa更优

集群

  • KafKa支持Slave 自动切换为Master;开源版本RocketMQ不支持Slave自动切换为Master,而阿里云版本RocketMQ支持自动切换特性。

消费失败重试

  • KafKa消费失败不支持重试,而RocketMQ支持失败重试。

严格的消息顺序

  • kafka支持顺序消息,当宕机后会乱序
  • RocketMQ支持严格消息顺序,即使宕机后也不会乱序

定时消息

  • kafka不支持定时消息
  • RocketMQ支持定时消息

分布式事务消息

  • kafka不支持分布式事务消息
  • RocketMQ支持分布式事务

消息过滤

  • kafka不支持代理端消息过滤
  • RocketMQ支持代理端消息过滤

KafKa不支持延迟消息,而RocketMQ支持

重点

  • ActiveMQ IO模块遇到了瓶颈
  • Kafka在低延迟和高可靠性方面不能满足阿里
    • kafka仅在提交消息后(即将消息复制到所有同步副本),才将消息公开给使用者。
    • 实验表明,将1000个分区从一个代理复制到另一个代理会增加大约20毫秒的延迟。对某些实时应用程序来说,这可能太高

集群搭建

单机

  • 启动
    • 启动namesrv
      • nohup sh bin/mqnamesrv &
      • tail -f ~/logs/rocketmqlogs/namesrv.log
    • 启动broker
      • vi runbroder.sh vi runserver.sh 修改jvm参数
      • nohup sh bin/mqbroker -n localhost:9876 &
        • 指定了namesrv的地址
      • tail -f ~/logs/rocketmqlogs/broker.log
  • 关闭
    • sh bin/mqshutdown broker
    • sh bin/mqshutdown namesrv
  • 测试发送消息和接收消息
    • 窗口一
      • export NAMESRV_ADDR=localhost:9876
      • sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
    • 窗口二
      • export NAMESRV_ADDR=localhost:9876
      • sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

集群

  • 集群概念图
  • 集群特点
    • nameserver是无状态的,可集群部署,节点之间不通信
    • broder分master和slave,一个master可对应多个slave,而一个slave只能对应一个master,master和slave的对应关系通过指定相同的brokername,主从由brokerid来控制,brokerid=0代表主 非0代表从。

master可以部署多个。

每个broker与nameserver集群的所有节点建立长连接,定时注册topic信息到所有nameserver。

  • producer与nameserver集群中的其中一个节点(随机选择)建立长连接,定期从nameserver获取topic路由信息,并向提供topic服务的master建立长连接,且定时向master发送心跳。

producer完全无状态,可集群部署。

  • consumer与nameserver集群中的其中一个节点(随机)建立长连接,定期从nameserver获取topic路由信息,并向提供topic服务的master、slave建立长连接,且定时向master、slave发送心跳。

consumer既可以从master订阅消息,也可以从slave订阅消息,订阅规则由broker配置决定。

双主双从同步双写搭建

  • 设计图
  • 集群工作流程说明
    • namesrv启动,等待broker、producer、consumer连接,充当路由控制中心
    • broker启动,与所有namesrv保持长连接,定时发送心跳,心跳包包含当前broker信息(ip和端口)以及存储所有的topic信息。

注册成功后,namesrv就有了topic与broker的映射关系了。

  • 收发消息前,先创建topic,创建topic时需要指定topic要存储在哪些broker上,也可以在发送消息时自动创建topic
  • Producer发送消息,启动时先跟namesrv集群中的其中一台建立长连接,并从namesrv中获取当前发送的topic存在哪些broker上,轮询从队列列表中选择一个队列,然后与队列所在的broker建立长连接从而向broker发消息。
  • consumer与producer类似,跟其中一台namesrv建立长连接,获取当前订阅topic存在哪些broker上,然后直接跟broker建立长连接,开始消费消息。
  • 搭建步骤
    • 目标
    • 配置
      • 配置hosts文件
        • 在两台机器上配置
      • 配置rocketmq环境变量
        • 在两台机器上配置
      • 配置消息存储路径
        • 在两台机器上配置 快速:mkdir /usr/local/rocketmq/store/{commitlog,consumequeue,index} -p
      • 配置: broker-a.properties broker-a-s.properties broker-b.properties broker-b-s.properties
        • **#所属集群名称 重点 brokerClusterName=rocket-cluster #当前broker的名称,主从关系的节点该名称必须一致,注:不同的broker配置不同的名称 重点 brokerName=broker-a #是否master 0为master 非0为slave 重点 brokerId=0 #namesrv地址 重点 namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876 #在发送消息时,自动创建不存在的topic,默认创建的队列数 defaultTopicQueueNums=4 #是否允许broker自动创建topic,建议线下开启,线上关闭 autoCreateTopicEnable=true #是否允许broker自动创建订阅组,建议线下开启,线上关闭 autoCreateSubscriptionGroup=true #接受客户端连接的监听端口 重点 listenPort=10911 #在每天的什么时间删除已经超过文件保留时间的 commit log deleteWhen=04 #以小时计算的文件保留时间 fileReservedTime=48 #commitLog每个文件的大小默认1G mappedFileSizeCommitLog=1073741824 #ConsumeQueue每个文件默认存30W条,根据业务情况调整 mapedFileSizeConsumeQueue=300000 #检测物理文件磁盘空间 diskMaxUsedSpaceRatio=88

#存储路径 重点 storePathRootDir=/user/local/rocketmq/store #commitLog 存储路径 重点 storePathCommitLog=/user/local/rocketmq/store/commitlog #消费队列存储路径存储路径 重点 storePathConsumeQueue=/user/local/rocketmq/store/consumequeue #消息索引存储路径 重点 storePathIndex=/user/local/rocketmq/store/index #checkpoint 文件存储路径 重点 storeCheckpoint=/user/local/rocketmq/store/checkpoint #abort 文件存储路径 重点 abortFile=/user/local/rocketmq/store/abort #限制的消息大小 重点 maxMessageSize=65536 #broker的角色 #SYNC_MASTER 同步双写master #ASYNC_MASTER 同步双写master #SLAVE 重点 brokerRole=SYNC_MASTER #ASYNC_FLUSH 异步刷盘 #SYNC_FLUSH 同步刷盘 重点 flushDiskType=ASYNC_FLUSH **

代码语言:javascript复制
    - 修改jvm参数

        - vi runbroder.sh

vi runserver.sh 修改jvm参数

代码语言:javascript复制
    - 分别启动namesrv

        - 

    - 启动broker

        - 

    - 查看进程jps
    - 查看日志

        - 

    - [rocketmq-console管理后台搭建](https://github.com/apache/rocketmq-externals)

        - 下载rocketmq-console
        - 修改resource/application.properties

            - rocketmq.config.namesrvAddr=ip:port;ip:port

        - 打包mvn clean package -Dmaven.test.skip=true
        - 运行java -jar target/rocketmq-console-ng-1.0.1.jar
        - 访问locallhost:8080

集群方式介绍

namesrv集群

  • 启动多个namesrv即可,namesrv之间不通信
  • broker需配置所有要连接的namesrv

单Master

  • 风险大、一旦Broker重启或挂掉,整个服务就不可用。仅使用本地测试

多Master

  • 一个集群全是Master,无Slave。如2个Master或3个Master
  • 优点
    • 配置简单,单个Master宕机或重启维护 对 应用无影响。(异步刷新盘丢失少量消息,同步刷盘一条不丢),性能最优。
  • 缺点
    • 单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息的实时性会收到影响。

多Master多Slave模式 异步复制

  • 每个master配一个slave,有多对master-slave,HA(高可用)采用异步复制,主备有短暂消息延迟(毫秒级);master同步刷盘
  • 优点
    • 即使磁盘损坏,消息丢失得非常少,且消息实时性不会受影响,同时master宕机后,消费者仍然可以从slave消费,此过程自动执行,性能同 多Master 一样。
  • 缺点
    • master宕机或磁盘损坏情况下丢失少量消息。(未来得及同步至slave那部分数据)

多Master多Slave模式 同步双写

  • 每个master配一个slave,有多对master-slave,HA采用同步双写,即只有主备都写成功了,才向应用返回成功。
  • 优点
    • 数据与服务都无单点故障,master宕机,消息无延迟,服务可用性与数据可用性非常高
  • 缺点
    • 性能比异步复制差 10% 左右,且目前版本,在master节点宕机后,slave不能主动切换master

主从

  • 主从工作原理
    • 通过name确定是否同一组,通过brokerid确定是否主,0主 , 非0从
    • Master负责接收消息,Slave不断发送请求到Master拉去消息
    • 消费端可从Master或Slave拉去消息,如从Master拉去消息,Master 会根据当前情况和Slave同步情况,向消费者建议下一次拉去消息是从Master还Slave。(自动完成)
  • Master 宕机
    • 当Master宕机后,会通过 dledger算法 从Slave当中选举一个为Master。
      • 为什么是dledger
        • 也可借用zookeeper ,当zk依赖外部组件比较多,而且还得维护zk集群
        • dledger是利用raft协议完成自动选主的,raft协议不需要外部组件,自动选主的逻辑集成到各个节点的进程中,节点之间通过通信就可以完成选主。
      • dledger如何工作
        • 工作原理是raft算法

0 人点赞