【深度知识】RabbitMQ死信队列的原理及GO实现

2021-01-29 10:43:01 浏览数 (1)

RabbitMQ2

1. 摘要

本文按照以下目前讲解RabbitMQ死信队列的内容,包括: (1)死信队列是什么? (2)如何配置死信队列? (3)死信队列代码实现演示(GO版本/JAV版本) (3)死信队列的应用场景? 网上Java版本的死信队列演示代码较多,特定找了GO版本的代码供大家演示使用。

2. 内容

2.1 死信队列是什么?

注意:业务队列与死信交换机的绑定是在构建业务队列时,通过参数(x-dead-letter-exchange和x-dead-letter-routing-key)的形式进行指定。

死信,在官网中对应的单词为“Dead Letter”,可以看出翻译确实非常的简单粗暴。那么死信是个什么东西呢?

“死信”是RabbitMQ中的一种消息机制,当你在消费消息时,如果队列里的消息出现以下情况之一: (1)消息被否定确认,使用 channel.basicNack 或 channel.basicReject ,并且此时requeue 属性被设置为false。 (2)消息在队列的存活时间超过设置的TTL时间。 (3)消息队列的消息数量已经超过最大队列长度。 那么该消息将成为“死信”。

“死信”消息会被RabbitMQ进行特殊处理,如果配置了死信队列信息,那么该消息将会被丢进死信队列中,如果没有配置,则该消息将会被丢弃。

2.2 如何配置死信队列?

这一部分将是本文的关键,如何配置死信队列呢?其实很简单,大概可以分为以下步骤: (1)配置业务队列,绑定到业务交换机上 (2)为业务队列配置死信交换机和路由key (3)为死信交换机配置死信队列

注意,并不是直接声明一个公共的死信队列,然后所以死信消息就自己跑到死信队列里去了。而是为每个需要使用死信的业务队列配置一个死信交换机,这里同一个项目的死信交换机可以共用一个,然后为每个业务队列分配一个单独的路由key。

有了死信交换机和路由key后,接下来,就像配置业务队列一样,配置死信队列,然后绑定在死信交换机上。也就是说,死信队列并不是什么特殊的队列,只不过是绑定在死信交换机上的队列。死信交换机也不是什么特殊的交换机,只不过是用来接受死信的交换机,所以可以为任何类型【Direct、Fanout、Topic】。一般来说,会为每个业务队列分配一个独有的路由key,并对应的配置一个死信队列进行监听,也就是说,一般会为每个重要的业务队列配置一个死信队列。

2.3 RabbitMQ死信队列GO代码实现

JAVA版本的代码实现参考该篇文章 《【RabbitMQ】一文带你搞定RabbitMQ死信队列》 和代码 https://github.com/MFrank2016/dead-letter-demo​,本文只讲GO代码实现。

(1)生产者

代码语言:javascript复制
package product

import (
    "fmt"
    "github.com/streadway/amqp"
)

func ProducerDlx()  {
    var(
        conn *amqp.Connection
        err error
        ch *amqp.Channel
    )
    if conn, err = amqp.Dial("amqp://liulong:liulong@127.0.0.1:5672/"); err!=nil{
        fmt.Println("amqp.Dial err :", err)
        return
    }
    defer conn.Close()

    if ch, err = conn.Channel(); err!=nil{
        fmt.Println("conn.Channel err: ", err)
        return
    }

    defer ch.Close()
    
    //func (ch *Channel) ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args Table) error
    //声明交换器
    if err = ch.ExchangeDeclare(
        "long_abc", // Exchange names
        amqp.ExchangeDirect,//"direct", "fanout", "topic" and "headers"
        true,
        false,//Durable and Non-Auto-Deleted exchanges会一直保留
        false,
        false,
        nil,
    ); err!=nil{
        fmt.Println("ch.ExchangeDeclare err: ", err)
        return
    }
    
    //func (ch *Channel) Publish(exchange, key string, mandatory, immediate bool, msg Publishing) error
    //发送消息
    if err = ch.Publish(
        "long_abc",
        "zhe_mess",
        false,
        false,
        amqp.Publishing{
            Headers: amqp.Table{},
            ContentType:"text/plain",
            Body:[]byte("hello world dlx"),
            DeliveryMode:amqp.Persistent,//需要做持久化保留
            Priority:0,
        },
    ); err!=nil{
        fmt.Println("ch.Publish err: ", err)
        return
    }
}

(2)消费正常队列

代码语言:javascript复制
package consum

import (
    "fmt"
    "github.com/streadway/amqp"
    "time"
)


func Consumer()  {
    var(
        conn *amqp.Connection
        err error
        ch *amqp.Channel
        queue amqp.Queue
        dlxExchangeName string
        delvers <- chan amqp.Delivery
        message amqp.Delivery
        ok bool
    )
    if conn, err = amqp.Dial("amqp://liulong:liulong@127.0.0.1:5672/"); err!=nil{
        fmt.Println("amqp.Dial err :", err)
        return
    }
    defer conn.Close()

    if ch, err = conn.Channel(); err!=nil{
        fmt.Println("conn.Channel err: ", err)
        return
    }

    defer ch.Close()

    //func (ch *Channel) Qos(prefetchCount, prefetchSize int, global bool) error
    //设置未确认的最大消息数
    if err = ch.Qos(3, 0, false); err!=nil{
        fmt.Println("ch.Qos err: ", err)
        return
    }

    dlxExchangeName = "dlx_exchange"

    //声明交换器
    if err = ch.ExchangeDeclare(
        "long_abc",
        amqp.ExchangeDirect,
        true,
        false,
        false,
        false,
        nil,
    ); err!=nil{
        fmt.Println("ch.ExchangeDeclare err: ", err)
        return
    }

    argsQue := make(map[string]interface{})
        //添加死信队列交换器属性
    argsQue["x-dead-letter-exchange"] = dlxExchangeName
    //指定死信队列的路由key,不指定使用队列路由键
    //argsQue["x-dead-letter-routing-key"] = "zhe_mess"
    //添加过期时间
    argsQue["x-message-ttl"] = 6000  //单位毫秒
    //声明队列
    queue, err = ch.QueueDeclare("zhe_123", true, false, false, false, argsQue)
    if err !=nil{
        fmt.Println("ch.QueueDeclare err :", err)
        return
    }

    //绑定交换器/队列和key
    //func (ch *Channel) QueueBind(name, key, exchange string, noWait bool, args Table) error
    if err = ch.QueueBind(queue.Name, "zhe_mess", "long_abc", false, nil);err!=nil{
        fmt.Println("ch.QueueBind err: ", err)
        return
    }
    //开启推模式消费
    //func (ch *Channel) Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args Table) (<-chan Delivery, error)
    delvers, err = ch.Consume(
        queue.Name,
        "",
        false,
        false,
        false,
        false,
        nil,
    )

    if err!=nil{
        fmt.Println("ch.Consume err: ", err)
    }

    //消费接收到的消息
    for{
        select {
        case message, ok = <- delvers:
            if !ok{
                continue
            }
            go func() {
                //处理消息
                time.Sleep(time.Second*2)
                //确认接收到的消息
                if err = message.Ack(true); err!=nil{
                //TODD: 获取到消息后,在过期时间内如果未进行确认,此消息就会流入到死信队列,此时进行消息确认就会报错
                    fmt.Println("d.Ack err: ", err)
                    return
                }
                fmt.Println("已确认", string(message.Body))
            }()
        case <-time.After(time.Second*1):

        }
    }

(3)消费死信队列

代码语言:javascript复制
package consum

import (
    "fmt"
    "github.com/streadway/amqp"
    "time"
)

func ConsumerDlx()  {

    var(
        conn *amqp.Connection
        ch *amqp.Channel
        queue amqp.Queue
        err error
        delvers <- chan amqp.Delivery
        message amqp.Delivery
        ok bool
    )

    //链接rbmq
    if conn, err = amqp.Dial("amqp://liulong:liulong@52.83.64.102:5672/");err!=nil{
        fmt.Println("amqp.Dial err: ", err)
        return
    }

    //声明信道
    if ch, err = conn.Channel(); err!=nil{
        fmt.Println("conn.Channel err: ", err)
        return
    }


    //声明交换机
    if err = ch.ExchangeDeclare(
        "dlx_exchange",
        amqp.ExchangeFanout,   //交换机模式fanout
        true,          //持久化
        false,      //自动删除
        false,        //是否是内置交互器,(只能通过交换器将消息路由到此交互器,不能通过客户端发送消息
        false,
        nil,
    ); err!=nil{
        fmt.Println("ch.ExchangeDeclare: ", err)
        return
    }

    //声明队列
    if queue, err = ch.QueueDeclare(
        "dlx_queue",       //队列名称
        true,      //是否是持久化
        false,   //是否不需要确认,自动删除消息
        false,   //是否是排他队列
        false,    //是否等待服务器返回ok
        nil,
    ); err!=nil{
        fmt.Println("ch.QueueDeclare err: ", err)
        return
    }


    //将交换器和队列/路由key绑定
    if err = ch.QueueBind(queue.Name, "", "dlx_exchange", false, nil); err!=nil{
        fmt.Println("ch.QueueBind err: ", err)
        return
    }



    //开启推模式消费
    delvers, err = ch.Consume(
        queue.Name,
        "",
        false,
        false,
        false,
        false,
        nil,
    )

    if err!=nil{
        fmt.Println("ch.Consume err: ", err)
    }

    //消费接收到的消息
    for{
        select {
        case message, ok = <- delvers:
            if !ok{
                continue
            }
            go func() {
                //处理消息
                time.Sleep(time.Second*3)
                //确认接收到的消息
                if err = message.Ack(true); err!=nil{
                    fmt.Println("dlx d.Ack err: ", err)
                    return
                }
                fmt.Println("已确认dlx", string(message.Body))
            }()
        case <-time.After(time.Second*1):

        }
    }
}

(4)死信消息变化

那么“死信”被丢到死信队列中后,会发生什么变化呢?

如果队列配置了参数 x-dead-letter-routing-key 的话,“死信”的路由key将会被替换成该参数对应的值。如果没有设置,则保留该消息原有的路由key。

举个例子:

如果原有消息的路由key是testA,被发送到业务Exchage中,然后被投递到业务队列QueueA中,如果该队列没有配置参数x-dead-letter-routing-key,则该消息成为死信后,将保留原有的路由keytestA,如果配置了该参数,并且值设置为testB,那么该消息成为死信后,路由key将会被替换为testB,然后被抛到死信交换机中。

另外,由于被抛到了死信交换机,所以消息的Exchange Name也会被替换为死信交换机的名称。

消息的Header中,也会添加很多奇奇怪怪的字段,修改一下上面的代码,在死信队列的消费者中添加一行日志输出:

代码语言:javascript复制
log.info("死信消息properties:{}", message.getMessageProperties());

然后重新运行一次,即可得到死信消息Header中被添加的信息:

死信消息properties:

代码语言:javascript复制
死信消息properties:MessageProperties [headers={x-first-death-exchange=dead.letter.demo.simple.business.exchange, x-death=[{reason=rejected, count=1, exchange=dead.letter.demo.simple.business.exchange, time=Sun Jul 14 16:48:16 CST 2019, routing-keys=[], queue=dead.letter.demo.simple.business.queuea}], x-first-death-reason=rejected, x-first-death-queue=dead.letter.demo.simple.business.queuea}, correlationId=1, replyTo=amq.rabbitmq.reply-to.g2dkABZyYWJiaXRAREVTS1RPUC1DUlZGUzBOAAAPQAAAAAAB.bLbsdR1DnuRSwiKKmtdOGw==, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=dead.letter.demo.simple.deadletter.exchange, receivedRoutingKey=dead.letter.demo.simple.deadletter.queuea.routingkey, deliveryTag=1, consumerTag=amq.ctag-NSp18SUPoCNvQcoYoS2lPg, consumerQueue=dead.letter.demo.simple.deadletter.queuea]

Header中看起来有很多信息,实际上并不多,只是值比较长而已。下面就简单说明一下Header中的值:

字段名

含义

x-first-death-exchange

第一次被抛入的死信交换机的名称

x-first-death-reason

第一次成为死信的原因,rejected:消息在重新进入队列时被队列拒绝,由于default-requeue-rejected 参数被设置为false。expired :消息过期。maxlen : 队列内消息数量超过队列最大容量

x-first-death-queue

第一次成为死信前所在队列名称

x-death

历次被投入死信交换机的信息列表,同一个消息每次进入一个死信交换机,这个数组的信息就会被更新

3.参考

(1)RabbitMQ系列(十三)RabbitMQ的死信队列 https://juejin.cn/post/6844903823643934727

(2)【RabbitMQ】一文带你搞定RabbitMQ死信队列 https://www.cnblogs.com/mfrank/p/11184929.html

(3)go 调用rabbitmq 死信队列 https://juejin.cn/post/6844903942661373965

(4)RABBITMP的GO消息接口 https://godoc.org/github.com/streadway/amqp

(5)死信队列的官方介绍 https://www.rabbitmq.com/dlx.html

0 人点赞