MQ
A message queue is a form of asynchronous service-to-service communication used in serverless and microservices architectures. Messages are stored on the queue until they are processed and deleted. Each message is processed only once, by a single consumer.
什么是 MQ
MQ 全称 Message Queue,中文译为消息队列,其实质是一个队列,队列是一种先进先出的数据结构,所以我们可以简单理解 MQ 是一种存储消息的容器,MQ 一般包括三类参与者:
- 生产者:是消息的产生者,生产者生产出消息后随机写入消息队列中
- 消费者:是消息的消费方,负责从队列中取出消息进行其他处理
- 队列:MQ本身,是消息的容器
为什么使用 MQ
一般来说, MQ 的使用场景有以下几个方面:
- 异步
- 削峰/限流
- 解耦
异步
比如一个评论系统,往往在评论之后要发邮件通知原作者,在不使用 MQ 时,我们首先会想到串行处理,即:
代码语言:javascript复制func Talk(ctx *gin.Context, req BaseReqInter) BaseRespInter {
// 存储评论数据
dao.InsertNewTalk(req.Talk)
// 发邮件
utils.SendEmail(...)
return SuccessResp
}
上面的实现在功能上当然是可以的,但性能却不是很好,由于评论这个事件本事不依赖于发邮件这个事件,也就是从正常角度来说,一旦新评论落库,就应该响应成功,而不是在邮件发成功后才响应成功。
另一个解决方案就是异步地去处理发邮件这个事件:
代码语言:javascript复制func Talk(ctx *gin.Context, req BaseReqInter) BaseRespInter {
// 存储评论数据
dao.InsertNewTalk(req.Talk)
// 异步发邮件
go utils.SendEmail(...)
return SuccessResp
}
我们当然可以简单地使用携程或线程去异步处理发邮件,但总不能每一次评论都搞一个新线程去处理啊,携程还好,想想Java的线程,如果 QPS 很大,那一次性得开多少线程,并且这种异步的代码往往不是像上面加个 go
就完事,需要涉及很多后续异常情况的处理,现在只有发邮件,如果还要发短信,还要发站内信,这种代码往往面临维护困难的问题,所以上面的异步并不是一种好的处理方法。
最后的解决办法就是 MQ, 我们开启一个邮件消费者,持续读取并消费MQ中邮件队列里的消息,然后在评论数据落库之后就把要发的邮件扔到 MQ 中,然后直接对客户端响应成功:
代码语言:javascript复制func Talk(ctx *gin.Context, req BaseReqInter) BaseRespInter {
// 存储评论数据
dao.InsertNewTalk(req.Talk)
// 将邮件推送到 MQ 中
ch.Publish(q.Name, []byte(email))
return SuccessResp
}
这样一来 Talk
作为生产者只是负责把消息放到队列中而并不需要关心邮件消费者的消费情况,以此来实现异步处理。
解耦
还是上面异步的例子,串行操作的实质问题在于 Talk
方法耦合了本不应该耦合的 SendEmail
方法,一方面导致调用链过长从而使响应时间增多,另一方面还会导致接口稳定性降低,试想如果邮件服务出了问题,那就会导致评论接口挂掉,使用 MQ 的另一个好处就是可以降低程序耦合度, 因为 MQ 屏蔽了生产和消费的双方,双方都只需要和 Queue 交互而不用管消息产生和消费的细节
削峰/限流
比如某个接口的 QPS 突然达到 3000,但服务器只能处理 2000 的,如果任由请求打进来,那服务器可能就会由于扛不住 QPS 而挂掉,这时的解决办法就是先把请求放在 MQ 中,让服务器以自己能接受的量去从 MQ 中消费请求,以此避免突然的高 QPS 打挂服务器。
什么是 RabbitMQ
AMQP
AMQP, 全称 Advanced Message Queuing Protocol, 中文译为高级消息队列协议, 是一个用于在进程间传递异步消息的应用层协议。
AMQP 模型
AMQP 一般被分为下面三个层次:
- 模型层:模型层定义了一套命令(按功能分类),客户端应用可以利用这些命令来实现它的业务功能。
- 会话层:会话层负责将命令从客户端应用传递给服务器,再将服务器的应答传递给客户端应用,会话层为这个传递过程提供可靠性、同步机制和错误处理。
- 传输层:传输层提供帧处理、信道复用、错误检测和数据表示。实现者可以将传输层替换成任意传输协议,只要不改变AMQP协议中与客户端应用程序相关的功能。实现者还可以使用其他高层协议中的会话层。
AMQP 服务器:
在 AMQP 的服务器(broker)中,三个主要功能模块连接成一个处理链以完成预期的功能,他们分别是:
- Exchange: 交换机, 用来接收生产者产生的消息,并按照一定的规则将这些消息路由到对应的 Queue 中,AMQP 0.9.1 中定义了下面四种交换机类型:
- Queue: 可以看作是存储消息的仓库,Queue 会保存消息直到被消费者消费
- Binding: 提供路由规则,建立了 Exchange 和 Queue 之间的对应关系
Channel:
- 不管是生产者还是消费者,要与 Broker 交互就必须建立与之的网络连接,在对 AMQP 的具体实现中,传输层选择的协议可能不同(RabbitMQ 使用 TCP),这个连接就是图中的 Connection,但如果每一次请求都建立一个 Connection,在消息量大的时候这种开销将是巨大的,效率也较低。所以有了Channel, Channel是在connection内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的channel进行通讯,AMQP method包含了channel id帮助客户端和message broker识别channel,所以channel之间是完全隔离的。Channel作为轻量级的Connection极大减少了操作系统建立TCP connection的开销。
总结:
AMQP 是一种应用层协议,和普通的 HTTP, SMTP 没有什么区别,用在进程间传递异步消息上,如果一个客户端程序 AMQP 协议就可以和服务器(消息中间件代理)进行通信。
RabbitMQ
RabbitMQ 是 使用 Erlang 语言对 AMQP 协议的一种实现,其客户端支持几乎所有的主流编程语言。
RabbitMQ 与其他消息中间件的对比
掘金 - 消息中间件部署及比较:rabbitMQ、activeMQ、zeroMQ、rocketMQ、Kafka、redis
HelloWorld
下载 RabbitMQ:
RabbitMQ 使用 erlang 语言编写,因此在安装 RabbitMQ 之前需要下载 erlang 相关依赖,具体下载方法参考 官网:
- For Homebrew on OS X: brew install erlang
- For MacPorts on OS X: port install erlang
- For Ubuntu and Debian: apt-get install erlang
- For Fedora: yum install erlang
- For FreeBSD: pkg install erlang
然后就可以安装使用 RabbitMQ 了,安装方法见官网
ps:
- RabbitMQ 可以在 Windows 上使用,没坑
- 使用 docker 事半功倍
配置 RabbitMQ
下载好 RabbitMQ 之后, 使用 rabbitmq-plugins enable rabbitmq_management
命令添加 rabbitmq_management
插件,以方便使用图像化界面配置,添加之后使用 rabbitmq-sever start
命令启动 RabbitMQ 服务端,之后访问本地 http://localhost:15672
,使用默认用户名密码(都是 guest)登录即可看见如下界面:
编写客户端程序
由于我是个垃圾, 不会 SpringBoot, 只能用 Go 来学
按照官方教程,Golang 使用 RabbitMQ 需要 amqp 依赖:
代码语言:javascript复制go get github.com/streadway/amqp
生产者 sender
下面这是官网给出的例子
代码语言:javascript复制package main
import (
`github.com/streadway/amqp`
`log`
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
// 连接 RabbitMQ
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
// 获取管道
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
// 声明要发送到的队列
q, err := ch.QueueDeclare(
"hello", // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
// 向队列中发送数据
body := "Hello World!"
err = ch.Publish(
"", // exchange
q.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing {
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")
}
运行生产者程序,刷新图形化界面,不出意外可以发现队列中的消息数应该从 0 变化成了 1
可以抓包看到 AMQP 报文的具体内容:
消费者 Receiver
同样是官网的例子, 前面的部分没有变化,任然需要连接 RabbitMQ, 获取管道,声明要消费的队列,但在编写 Receiver 时,需要声明一个消费者:
代码语言:javascript复制 // 声明一个消费者
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
ch.Consume
会返回一个只读的管道(chan),我们只需要遍历这个管道就可以从 MQ 中持续读取数据:
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
}
}()
运行 Receiver 后,原先 MQ 中的一个消息应该就会被消费,队列中的消息数应该会重新为 0
参考
- 知乎 - 消息队列的使用场景是怎样的?
- CSDN - 深入理解 AMQP 协议
- 博客园 - RabbitMQ与AMQP协议详解
- RabbitMQ 中文文档 - AMQP 0-9-1
- DeepBlue - RabbitMQ安装及使用(Hello World)
- RabbitMQ官方文档 - HelloWorld