RabbitMq 总结

2022-04-27 16:09:58 浏览数 (1)

  • 基本介绍
  • 交换机类型
  • 公共参数说明
  • 消息手动签收
  • 消费者和生产者时间依赖关系
  • 消费端获取消息模式
  • 解决重复消费问题
  • 死信队列
  • 消息延时推送

基本介绍

  • Broker(消息代理):接受客户端的链接,实现AMQP实体服务
  • Producer:消息生产者
  • Consumer:消息消费者
  • Connection(连接):producer 和 consumer 与 broker的tcp连接
  • Channel(网络信道):基于 connection 创建,消息读写都是在 channel 中进行。客户端可以建立多个channel,每个channel代表一个会话任务
  • VirtualHost(虚拟主机) :一个broker里可以开设多个虚拟主机,用于进行逻辑隔离,最上层的消息路由。类似mysql的database
  • Exchange(交换机) :接收消息,根据路由键转单消息到绑定队列
  • Queue(消息队列) :是 RabbitMQ 的内部对象,用于存储消息。每个消息都会被投入到一个或多个队列。且多个消费者可以订阅同一个 Queue(这时 Queue 中的消息会被平均分摊给多个消费者进行处理,而不是每个消费者都收到所有的消息并处理)
  • Binding(绑定) :Exchange和队列Queue之间的虚拟链接。
  • Routing Key(路由键) :消息发送给 Exchange时,消息将拥有一个路由键(默认为空), Exchange根据这个路由键将消息发送到匹配的队列中
  • Binding Key(绑定键):将消息路由到所有绑定到该Exchange的Queue,但fanout下bindingkey会失效 图1-1

交换机类型

fanout

消息会发送到所有与该交换机绑定的队列中

图2-1

direct

消息通过RoutingKey精准匹配对应的队列中

图2-2

topic

消息通过RoutingKey模糊匹配到对应的队列中

  • RoutingKey用"."分割字符串
  • *:匹配一个单词
  • #:匹配0个或多个单词 图2-3
headers

不依赖于路由键的匹配规则路由消息,根据发送的消息内容headers属性进行完全匹配(键值对形式)。性能差,基本不使用。

公共参数说明

队列参数

channel.QueueDeclare方法中arguments参数,队列一旦声明,参数将无法更改,添加,删除

参数名称

描述

Features

x-message-ttl

队列中的消息的生存周期,单位毫秒

TTL

x-expires

队列在指定的时间内没有被使用(访问)就会被删除

Exp

x-max-length

设置队列最大长度(先进先丢)

Lim

x-max-length-bytes

队列可以容纳的消息的最大字节数,超过这个字节数,队列头部的消息将会被丢弃

Lim B

x-overflow

队列中的消息溢出时,(默认drop-head)丢弃队列头部的消息或(reject-publish)拒绝接收后面生产者发送过来的所有消息

Ovfl

x-single-active-consumer

一次只能有一个消费者消费消息

SAC

x-dead-letter-exchange

设置当前队列的死信交换机

DLX

x-dead-letter-routing-key

设置死信交换机的路由key,死信交换机会根据该值去找到死信消息存放的队列

DLK

x-max-priority

队列中的消息的优先级最大值,不设置的队列就不支持优先级发送消息

Pri

x-queue-mode

懒人模式的队列会先将交换机推送过来的消息(尽可能多的)保存在磁盘上,以减少内存的占用。当消费者开始消费的时候才加载到内存中。

Args

x-queue-master-locator

master queue host 的分配策略:min-masters、client-local和random

消息参数

参数名称

描述

content_type

消息内容的类型

content_encoding

消息内容的编码格式

priority

消息的优先级

correlation_id

用于将RPC响应与请求相关联

reply_to

回调队列

expiration

消息过期时间,单位毫秒.该参数值优先级>队列参数设置中的消息生存期

message_id

消息id

timestamp

消息的时间戳

type

类型

user_id

用户id

app_id

应用程序id

cluster_id

集群id

消息手动签收

  • 签收异常,没有调用basic.ack;当前会话处于连接状态时,消息转变为unacked状态,其他消费者消费不到,当前会话断开,unacked的消息会重新变为ready状态,其他消费者才能够重新消费
  • 签收正常,成功调用basic.ack,队列中立即删除消息
  • basic.reject方法拒绝deliveryTag对应的消息,第二个参数是否requeue,true则重新入队列,否则丢弃或者进入死信队列
  • basic.nack方法为不确认deliveryTag对应的消息,第二个参数是否应用于多消息,第三个参数是否requeue,与basic.reject区别就是同时支持多个消息,可以nack该消费者先前接收未ack的所有消息。nack后的消息也会被自己消费到
  • basic.recover是否恢复消息到队列,参数是是否requeue,true则重新入队列,并且尽可能的将之前recover的消息投递给其他消费者消费,而不是自己再次消费。false则消息会重新被投递给自己

消费者和生产者时间依赖关系

  • 消费者和生产者已知,消费者和生产者之间没有时间依赖关系
  • 生产者已知,消费者未知,需要消费者订阅后才能接收消息

消费端获取消息模式

  • **推模式:**消息中间件主动将消息推送给消费者,消费者需要设置一个缓冲区缓存消息,效率高,但缓冲区可能会溢出
  • **拉模式:**消费者主动从消息中间件拉取消息,网络开销会增加消息延迟,降低系统吞吐量 拉模式适用场景
    • 消费者在某个条件成立时才能消费消息
    • 需要批量拉取消息进行处理,连续调用basicGet方法拉取多条消息,处理完毕一次性返回ACK

解决重复消费问题

  • 利用数据库主键去重
  • 利用Redis的原子性去实现

redis是单线程的,但是性能好也有很多原子性的命令,比如setnx命令,在接收到消息后将消息ID作为key去执行setnx命令,如果执行成功则表示没有执行过这条消息,可以进行消费(setnx命令特点:当且仅当key不存在,将key值设为value值;若key已存在该命令不做任何操作)

  • 使用全局ID区分消息,解决幂等性

生产者在请求头设置messageId,可以用随机ID或业务逻辑唯一ID

死信队列

  • 消息被拒(basicreject or basicnack)并且没有重新入队(requeue=false);
  • 当前队列中的消息数量已经超过最大长度
  • 消息在队列中过期

配置死信队列

代码语言:javascript复制
    public static void SendMessage()
        {
            var exchangeA = "exchange";
            var routeA = "routekey";
            var queueA = "queue";

            var exchangeD = "dlx.exchange";
            var routeD = "dlx.route";
            var queueD = "dlx.queue";

            var connection = RabbitMQHelper.GetConnection();
            {
                var channel = connection.CreateModel();
                {
                    // 创建死信交换机
                    channel.ExchangeDeclare(exchangeD, type: "fanout", durable: true, autoDelete: false);
                    // 创建死信队列
                    channel.QueueDeclare(queueD, durable: true, exclusive: false, autoDelete: false);
                    // 绑定死信交换机和队列
                    channel.QueueBind(queueD, exchangeD, routeD);

                    channel.ExchangeDeclare(exchangeA, type: "fanout", durable: true, autoDelete: false);
                    channel.QueueDeclare(queueA, durable: true, exclusive: false, autoDelete: false, arguments: 
                                        new Dictionary<string, object> {
                                             { "x-dead-letter-exchange",exchangeD}, //设置当前队列的DLX
                                             { "x-dead-letter-routing-key",routeD}, //设置DLX的路由key,DLX会根据该值去找到死信消息存放的队列
                                           //  { "x-message-ttl",10000}, //设置消息的存活时间,即过期时间
                                            { "x-max-length",5}//设置队列最大长度
                                         });
                    channel.QueueBind(queueA, exchangeA, routeA);


                    var properties = channel.CreateBasicProperties();
                    properties.Persistent = true;
                    //发布消息
                    channel.BasicPublish(exchange: exchangeA,
                                         routingKey: routeA,
                                         basicProperties: properties,
                                         body: Encoding.UTF8.GetBytes("hello rabbitmq message"));
                }
            }
            
        } 

重试失败特定次数后放入死信队列

代码语言:javascript复制
 private static string _exchangeNormal = "Exchange.Normal";  //定义一个用于接收 正常 消息的交换机
        private static string _exchangeRetry = "Exchange.Retry";    //定义一个用于接收 重试 消息的交换机
        private static string _exchangeFail = "Exchange.Fail";      //定义一个用于接收 失败 消息的交换机
        private static string _queueNormal = "Queue.Noraml";        //定义一个用于接收 正常 消息的队列
        private static string _queueRetry = "Queue.Retry";          //定义一个用于接收 重试 消息的队列
        private static string _queueFail = "Queue.Fail";            //定义一个用于接收 失败 消息的队列

        public static void Test()
        {
            var connection = RabbitMQHelper.GetConnection();
            var channel = connection.CreateModel();

            //声明交换机
            channel.ExchangeDeclare(_exchangeNormal, "topic", true, false, null);
            channel.ExchangeDeclare(_exchangeRetry, "topic", true, false, null);
            channel.ExchangeDeclare(_exchangeFail, "topic", true, false, null);

            //定义队列参数
            var queueNormalArgs = new Dictionary<string, object>();
            {
                queueNormalArgs.Add("x-dead-letter-exchange", _exchangeFail);   //指定死信交换机,用于将 Normal 队列中失败的消息投递给 Fail 交换机
            }
            var queueRetryArgs = new Dictionary<string, object>();
            {
                queueRetryArgs.Add("x-dead-letter-exchange", _exchangeNormal);  //指定死信交换机,用于将 Retry 队列中超时的消息投递给 Normal 交换机
                queueRetryArgs.Add("x-message-ttl", 6000);                      //定义 queueRetry 的消息最大停留时间 (原理是:等消息超时后由 broker 自动投递给当前绑定的死信交换机)                                                                             //定义最大停留时间为防止一些 待重新投递 的消息、没有定义重试时间而导致内存溢出
            }
            var queueFailArgs = new Dictionary<string, object>();
            {
            }

            //声明队列
            channel.QueueDeclare(queue: _queueNormal, durable: true, exclusive: false, autoDelete: false, arguments: queueNormalArgs);
            channel.QueueDeclare(queue: _queueRetry, durable: true, exclusive: false, autoDelete: false, arguments: queueRetryArgs);
            channel.QueueDeclare(queue: _queueFail, durable: true, exclusive: false, autoDelete: false, arguments: queueFailArgs);

            //为队列绑定交换机
            channel.QueueBind(queue: _queueNormal, exchange: _exchangeNormal, routingKey: "#");
            channel.QueueBind(queue: _queueRetry, exchange: _exchangeRetry, routingKey: "#");
            channel.QueueBind(queue: _queueFail, exchange: _exchangeFail, routingKey: "#");

            #region 创建一个普通消息消费者
            {
                var consumer = new EventingBasicConsumer(channel);

                consumer.Received  = (sender, e) =>
                {
                    var _sender = (EventingBasicConsumer)sender;            //消息传送者
                    var _channel = _sender.Model;                           //消息传送通道
                    var _message = (BasicDeliverEventArgs)e;                //消息传送参数
                    var _headers = _message.BasicProperties.Headers;        //消息头
                    var _content = Encoding.UTF8.GetString(_message.Body.ToArray());  //消息内容
                    var _death = default(Dictionary<string, object>);       //死信参数

                    if (_headers != null && _headers.ContainsKey("x-death"))
                        _death = (Dictionary<string, object>)(_headers["x-death"] as List<object>)[0];

                    try
                    #region 消息处理
                    {
                        Console.WriteLine();
                        Console.WriteLine($"{DateTime.Now.ToString("HH:mm:ss.fff")}t(1.0)消息接收:rnt[deliveryTag={_message.DeliveryTag}]rnt[consumerID={_message.ConsumerTag}]rnt[exchange={_message.Exchange}]rnt[routingKey={_message.RoutingKey}]rnt[content={_content}]");

                        throw new Exception("模拟消息处理失败效果。");

                        //处理成功时
                        Console.WriteLine($"{DateTime.Now.ToString("HH:mm:ss.fff")}t(1.1)处理成功:rnt[deliveryTag={_message.DeliveryTag}]");

                        //消息确认 (销毁当前消息)
                        _channel.BasicAck(deliveryTag: _message.DeliveryTag, multiple: false);
                    }
                    #endregion
                    catch (Exception ex)
                    #region 消息处理失败时
                    {
                        var retryCount = (long)(_death?["count"] ?? default(long)); //查询当前消息被重新投递的次数 (首次则为0)

                        Console.WriteLine($"{DateTime.Now.ToString("HH:mm:ss.fff")}t(1.2)处理失败:rnt[deliveryTag={_message.DeliveryTag}]rnt[retryCount={retryCount}]");

                        if (retryCount >= 2)
                        #region 投递第3次还没消费成功时,就转发给 exchangeFail 交换机
                        {
                            //消息拒绝(投递给死信交换机,也就是上边定义的 ("x-dead-letter-exchange", _exchangeFail))
                            _channel.BasicNack(deliveryTag: _message.DeliveryTag, multiple: false, requeue: false);
                        }
                        #endregion
                        else
                        #region 否则转发给 exchangeRetry 交换机
                        {
                            var interval = (retryCount   1) * 10; //定义下一次投递的间隔时间 (单位:秒)

                            //定义下一次投递的间隔时间 (单位:毫秒)
                            _message.BasicProperties.Expiration = (interval * 1000).ToString();

                            //将消息投递给 _exchangeRetry (会自动增加 death 次数)
                            _channel.BasicPublish(exchange: _exchangeRetry, routingKey: _message.RoutingKey, basicProperties: _message.BasicProperties, body: _message.Body);

                            //消息确认 (销毁当前消息)
                            _channel.BasicAck(deliveryTag: _message.DeliveryTag, multiple: false);
                        }
                        #endregion
                    }
                    #endregion
                };
                channel.BasicConsume(queue: _queueNormal, autoAck: false, consumer: consumer);
            }
            #endregion
        }

消息延时推送

  • 过期队列 死信交换机
  • RabbitMQ 3.6.x 开始可以使用延迟插件,交换机类型选择x-delayed-message(延迟将数据放入队列)
代码语言:javascript复制
   public static void ConsumerMessage()
   {
            var connection = RabbitMQHelper.GetConnection();
            var channel = connection.CreateModel();

            var exchangeArgumets = new Dictionary<string, object>
            {
                { "x-delayed-type", "topic" }  //延迟交换机的类型
            };
            channel.ExchangeDeclare("delay_exchange", "x-delayed-message", true, false, exchangeArgumets);

            // 创建队列
            string queueName1 = "delay_queue1";
            channel.QueueDeclare(queueName1, false, false, false, null);
            string queueName2 = "delay_queue2";
            channel.QueueDeclare(queueName2, false, false, false, null);
            string queueName3 = "delay_queue3";
            channel.QueueDeclare(queueName3, false, false, false, null);
            // 绑定到交互机
            channel.QueueBind(queue: queueName1, exchange: "delay_exchange", routingKey: "delayed-direct1");
            channel.QueueBind(queue: queueName2, exchange: "delay_exchange", routingKey: "delayed-direct2");
            channel.QueueBind(queue: queueName3, exchange: "delay_exchange", routingKey: "delayed-direct3");

            var properties = channel.CreateBasicProperties();
            properties.Persistent = true; // 标记消息持久化

            Console.WriteLine(" [*] Waiting for messages.");

            var consumer = new EventingBasicConsumer(channel);
            consumer.Received  = (model, ea) =>
            {
                var body = ea.Body;
                var message = Encoding.UTF8.GetString(body.ToArray());
                var routingKey = ea.RoutingKey;
                var _headers = ea.BasicProperties.Headers;        //消息头
                int delay = 0;
                if (_headers == null)
                {
                    ea.BasicProperties.Headers = new Dictionary<string, object>();
                }
                else  if ( _headers.ContainsKey("x-delay"))
                {
                    delay = Convert.ToInt32(ea.BasicProperties.Headers["x-delay"]);
                    delay = delay   20000;
                }
                ea.BasicProperties.Headers["x-delay"] = delay; //消息头设置消息延迟的时间
                Console.WriteLine($" {DateTime.Now}=={delay}");
                Console.WriteLine(" [x] Received '{0}':'{1}'", routingKey, message);
               
                channel.BasicPublish(ea.Exchange, ea.RoutingKey, basicProperties: ea.BasicProperties, body);
                channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
            };

            channel.BasicConsume(queue: queueName3,
                                 autoAck: false,
                                 consumer: consumer);

            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }

0 人点赞