rabbitmq使用basicQos控制速率

2021-12-06 14:01:07 浏览数 (1)

默认情况下,RabbitMq收到消息后,就向消费者全部推送。但是如果rabbitmq队列里消息过多,且消息的数量超过了消费者处理能力, 就会导致客户端超负荷崩溃。此时我们可以通过 prefetchCount 限制每个消费者在收到下一个确认回执前一次可以最大接受多少条消息。即如果设置prefetchCount =1,RabbitMQ向这个消费者发送一个消息后,再这个消息的消费者对这个消息进行ack之前,RabbitMQ不会向这个消费者发送新的消息

代码语言:javascript复制
 // 每个客户端每次最后获取N个消息
channel.basicQos(1);

订阅队列消息不控制接收速率

不使用channel.BasicQos 控制 prefetchCount 数量:

代码语言:javascript复制
         var consumer = new EventingBasicConsumer(channel);
            consumer.Received  = (ch, ea) =>
            {
                var body = ea.Body.ToArray();

                var msg = Encoding.UTF8.GetString(body);
                Console.WriteLine(msg);

                // copy or deserialise the payload
                // and process the message
                // ...
                channel.BasicAck(ea.DeliveryTag, false);

                Thread.Sleep(1000);
            };

            String consumerTag = channel.BasicConsume("mytest", false, consumer);

订阅队列消息控制接收速率

代码语言:javascript复制
 channel.BasicQos(0, 1, true); //每次从队列里取出一条消息;

            var consumer = new EventingBasicConsumer(channel);
            consumer.Received  = (ch, ea) =>
            {
                var body = ea.Body.ToArray();

                var msg = Encoding.UTF8.GetString(body);
                Console.WriteLine(msg);

                // copy or deserialise the payload
                // and process the message
                // ...
                channel.BasicAck(ea.DeliveryTag, false);

                Thread.Sleep(1000);
            };

            String consumerTag = channel.BasicConsume("mytest", false, consumer);

(Qos 相当于把 connection 只限制了一个channel;)

0 人点赞