RabbitMQ2
1. 摘要
本文按照以下目前讲解RabbitMQ死信队列的内容,包括: (1)死信队列是什么? (2)如何配置死信队列? (3)死信队列代码实现演示(GO版本/JAV版本) (3)死信队列的应用场景? 网上Java版本的死信队列演示代码较多,特定找了GO版本的代码供大家演示使用。
2. 内容
2.1 死信队列是什么?
注意:业务队列与死信交换机的绑定是在构建业务队列时,通过参数(x-dead-letter-exchange和x-dead-letter-routing-key)的形式进行指定。
死信,在官网中对应的单词为“Dead Letter”,可以看出翻译确实非常的简单粗暴。那么死信是个什么东西呢?
“死信”是RabbitMQ中的一种消息机制,当你在消费消息时,如果队列里的消息出现以下情况之一: (1)消息被否定确认,使用 channel.basicNack 或 channel.basicReject ,并且此时requeue 属性被设置为false。 (2)消息在队列的存活时间超过设置的TTL时间。 (3)消息队列的消息数量已经超过最大队列长度。 那么该消息将成为“死信”。
“死信”消息会被RabbitMQ进行特殊处理,如果配置了死信队列信息,那么该消息将会被丢进死信队列中,如果没有配置,则该消息将会被丢弃。
2.2 如何配置死信队列?
这一部分将是本文的关键,如何配置死信队列呢?其实很简单,大概可以分为以下步骤: (1)配置业务队列,绑定到业务交换机上 (2)为业务队列配置死信交换机和路由key (3)为死信交换机配置死信队列
注意,并不是直接声明一个公共的死信队列,然后所以死信消息就自己跑到死信队列里去了。而是为每个需要使用死信的业务队列配置一个死信交换机,这里同一个项目的死信交换机可以共用一个,然后为每个业务队列分配一个单独的路由key。
有了死信交换机和路由key后,接下来,就像配置业务队列一样,配置死信队列,然后绑定在死信交换机上。也就是说,死信队列并不是什么特殊的队列,只不过是绑定在死信交换机上的队列。死信交换机也不是什么特殊的交换机,只不过是用来接受死信的交换机,所以可以为任何类型【Direct、Fanout、Topic】。一般来说,会为每个业务队列分配一个独有的路由key,并对应的配置一个死信队列进行监听,也就是说,一般会为每个重要的业务队列配置一个死信队列。
2.3 RabbitMQ死信队列GO代码实现
JAVA版本的代码实现参考该篇文章 《【RabbitMQ】一文带你搞定RabbitMQ死信队列》 和代码 https://github.com/MFrank2016/dead-letter-demo,本文只讲GO代码实现。
(1)生产者
代码语言:javascript复制package product
import (
"fmt"
"github.com/streadway/amqp"
)
func ProducerDlx() {
var(
conn *amqp.Connection
err error
ch *amqp.Channel
)
if conn, err = amqp.Dial("amqp://liulong:liulong@127.0.0.1:5672/"); err!=nil{
fmt.Println("amqp.Dial err :", err)
return
}
defer conn.Close()
if ch, err = conn.Channel(); err!=nil{
fmt.Println("conn.Channel err: ", err)
return
}
defer ch.Close()
//func (ch *Channel) ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args Table) error
//声明交换器
if err = ch.ExchangeDeclare(
"long_abc", // Exchange names
amqp.ExchangeDirect,//"direct", "fanout", "topic" and "headers"
true,
false,//Durable and Non-Auto-Deleted exchanges会一直保留
false,
false,
nil,
); err!=nil{
fmt.Println("ch.ExchangeDeclare err: ", err)
return
}
//func (ch *Channel) Publish(exchange, key string, mandatory, immediate bool, msg Publishing) error
//发送消息
if err = ch.Publish(
"long_abc",
"zhe_mess",
false,
false,
amqp.Publishing{
Headers: amqp.Table{},
ContentType:"text/plain",
Body:[]byte("hello world dlx"),
DeliveryMode:amqp.Persistent,//需要做持久化保留
Priority:0,
},
); err!=nil{
fmt.Println("ch.Publish err: ", err)
return
}
}
(2)消费正常队列
代码语言:javascript复制package consum
import (
"fmt"
"github.com/streadway/amqp"
"time"
)
func Consumer() {
var(
conn *amqp.Connection
err error
ch *amqp.Channel
queue amqp.Queue
dlxExchangeName string
delvers <- chan amqp.Delivery
message amqp.Delivery
ok bool
)
if conn, err = amqp.Dial("amqp://liulong:liulong@127.0.0.1:5672/"); err!=nil{
fmt.Println("amqp.Dial err :", err)
return
}
defer conn.Close()
if ch, err = conn.Channel(); err!=nil{
fmt.Println("conn.Channel err: ", err)
return
}
defer ch.Close()
//func (ch *Channel) Qos(prefetchCount, prefetchSize int, global bool) error
//设置未确认的最大消息数
if err = ch.Qos(3, 0, false); err!=nil{
fmt.Println("ch.Qos err: ", err)
return
}
dlxExchangeName = "dlx_exchange"
//声明交换器
if err = ch.ExchangeDeclare(
"long_abc",
amqp.ExchangeDirect,
true,
false,
false,
false,
nil,
); err!=nil{
fmt.Println("ch.ExchangeDeclare err: ", err)
return
}
argsQue := make(map[string]interface{})
//添加死信队列交换器属性
argsQue["x-dead-letter-exchange"] = dlxExchangeName
//指定死信队列的路由key,不指定使用队列路由键
//argsQue["x-dead-letter-routing-key"] = "zhe_mess"
//添加过期时间
argsQue["x-message-ttl"] = 6000 //单位毫秒
//声明队列
queue, err = ch.QueueDeclare("zhe_123", true, false, false, false, argsQue)
if err !=nil{
fmt.Println("ch.QueueDeclare err :", err)
return
}
//绑定交换器/队列和key
//func (ch *Channel) QueueBind(name, key, exchange string, noWait bool, args Table) error
if err = ch.QueueBind(queue.Name, "zhe_mess", "long_abc", false, nil);err!=nil{
fmt.Println("ch.QueueBind err: ", err)
return
}
//开启推模式消费
//func (ch *Channel) Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args Table) (<-chan Delivery, error)
delvers, err = ch.Consume(
queue.Name,
"",
false,
false,
false,
false,
nil,
)
if err!=nil{
fmt.Println("ch.Consume err: ", err)
}
//消费接收到的消息
for{
select {
case message, ok = <- delvers:
if !ok{
continue
}
go func() {
//处理消息
time.Sleep(time.Second*2)
//确认接收到的消息
if err = message.Ack(true); err!=nil{
//TODD: 获取到消息后,在过期时间内如果未进行确认,此消息就会流入到死信队列,此时进行消息确认就会报错
fmt.Println("d.Ack err: ", err)
return
}
fmt.Println("已确认", string(message.Body))
}()
case <-time.After(time.Second*1):
}
}
(3)消费死信队列
代码语言:javascript复制package consum
import (
"fmt"
"github.com/streadway/amqp"
"time"
)
func ConsumerDlx() {
var(
conn *amqp.Connection
ch *amqp.Channel
queue amqp.Queue
err error
delvers <- chan amqp.Delivery
message amqp.Delivery
ok bool
)
//链接rbmq
if conn, err = amqp.Dial("amqp://liulong:liulong@52.83.64.102:5672/");err!=nil{
fmt.Println("amqp.Dial err: ", err)
return
}
//声明信道
if ch, err = conn.Channel(); err!=nil{
fmt.Println("conn.Channel err: ", err)
return
}
//声明交换机
if err = ch.ExchangeDeclare(
"dlx_exchange",
amqp.ExchangeFanout, //交换机模式fanout
true, //持久化
false, //自动删除
false, //是否是内置交互器,(只能通过交换器将消息路由到此交互器,不能通过客户端发送消息
false,
nil,
); err!=nil{
fmt.Println("ch.ExchangeDeclare: ", err)
return
}
//声明队列
if queue, err = ch.QueueDeclare(
"dlx_queue", //队列名称
true, //是否是持久化
false, //是否不需要确认,自动删除消息
false, //是否是排他队列
false, //是否等待服务器返回ok
nil,
); err!=nil{
fmt.Println("ch.QueueDeclare err: ", err)
return
}
//将交换器和队列/路由key绑定
if err = ch.QueueBind(queue.Name, "", "dlx_exchange", false, nil); err!=nil{
fmt.Println("ch.QueueBind err: ", err)
return
}
//开启推模式消费
delvers, err = ch.Consume(
queue.Name,
"",
false,
false,
false,
false,
nil,
)
if err!=nil{
fmt.Println("ch.Consume err: ", err)
}
//消费接收到的消息
for{
select {
case message, ok = <- delvers:
if !ok{
continue
}
go func() {
//处理消息
time.Sleep(time.Second*3)
//确认接收到的消息
if err = message.Ack(true); err!=nil{
fmt.Println("dlx d.Ack err: ", err)
return
}
fmt.Println("已确认dlx", string(message.Body))
}()
case <-time.After(time.Second*1):
}
}
}
(4)死信消息变化
那么“死信”被丢到死信队列中后,会发生什么变化呢?
如果队列配置了参数 x-dead-letter-routing-key 的话,“死信”的路由key将会被替换成该参数对应的值。如果没有设置,则保留该消息原有的路由key。
举个例子:
如果原有消息的路由key是testA,被发送到业务Exchage中,然后被投递到业务队列QueueA中,如果该队列没有配置参数x-dead-letter-routing-key,则该消息成为死信后,将保留原有的路由keytestA,如果配置了该参数,并且值设置为testB,那么该消息成为死信后,路由key将会被替换为testB,然后被抛到死信交换机中。
另外,由于被抛到了死信交换机,所以消息的Exchange Name也会被替换为死信交换机的名称。
消息的Header中,也会添加很多奇奇怪怪的字段,修改一下上面的代码,在死信队列的消费者中添加一行日志输出:
代码语言:javascript复制log.info("死信消息properties:{}", message.getMessageProperties());
然后重新运行一次,即可得到死信消息Header中被添加的信息:
死信消息properties:
代码语言:javascript复制死信消息properties:MessageProperties [headers={x-first-death-exchange=dead.letter.demo.simple.business.exchange, x-death=[{reason=rejected, count=1, exchange=dead.letter.demo.simple.business.exchange, time=Sun Jul 14 16:48:16 CST 2019, routing-keys=[], queue=dead.letter.demo.simple.business.queuea}], x-first-death-reason=rejected, x-first-death-queue=dead.letter.demo.simple.business.queuea}, correlationId=1, replyTo=amq.rabbitmq.reply-to.g2dkABZyYWJiaXRAREVTS1RPUC1DUlZGUzBOAAAPQAAAAAAB.bLbsdR1DnuRSwiKKmtdOGw==, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=dead.letter.demo.simple.deadletter.exchange, receivedRoutingKey=dead.letter.demo.simple.deadletter.queuea.routingkey, deliveryTag=1, consumerTag=amq.ctag-NSp18SUPoCNvQcoYoS2lPg, consumerQueue=dead.letter.demo.simple.deadletter.queuea]
Header中看起来有很多信息,实际上并不多,只是值比较长而已。下面就简单说明一下Header中的值:
字段名 | 含义 |
---|---|
x-first-death-exchange | 第一次被抛入的死信交换机的名称 |
x-first-death-reason | 第一次成为死信的原因,rejected:消息在重新进入队列时被队列拒绝,由于default-requeue-rejected 参数被设置为false。expired :消息过期。maxlen : 队列内消息数量超过队列最大容量 |
x-first-death-queue | 第一次成为死信前所在队列名称 |
x-death | 历次被投入死信交换机的信息列表,同一个消息每次进入一个死信交换机,这个数组的信息就会被更新 |
3.参考
(1)RabbitMQ系列(十三)RabbitMQ的死信队列 https://juejin.cn/post/6844903823643934727
(2)【RabbitMQ】一文带你搞定RabbitMQ死信队列 https://www.cnblogs.com/mfrank/p/11184929.html
(3)go 调用rabbitmq 死信队列 https://juejin.cn/post/6844903942661373965
(4)RABBITMP的GO消息接口 https://godoc.org/github.com/streadway/amqp
(5)死信队列的官方介绍 https://www.rabbitmq.com/dlx.html