RabbitMQ限流机制(五)

2022-03-29 16:03:28 浏览数 (1)

在服务端的稳定系的体系质量保障中,一个是考虑在客户端高并发的请求后,服务端如何能够接收所有的请求并且服务端能够顶得住洪流的负载。这中间就需要涉及考虑调度机制和队列机制。比如在2022年中,西安一码通是崩溃了又崩溃,这就是很典型的在高可用设计和稳定性体系建设方面缺少系统化的思考。作为主流的核心中间件RabbitMQ,也是考虑到了限流的机制。

一、为什么要限流?

如果生产者批量发送消息,但是消费者接收的能力是很慢,那么就会导致堆积的MQ的消息越来越多,最后导致无法承受,从而出现崩溃,这种情况是最不愿意看到的情况。那么我们可以把限流解释为RabbitMQ把收到的消息没来得及处理(可能是资源也可能是其他情况),但是这个时候生产者推送了批量的消息过来,单个客户端无法承受这么多的数据流量,最后结果就是崩溃,从而影响业务端使用的客户,给客户造成了损失。

二、RabbitMQ限流机制

在RabbitMQ消息队列服务器中,限流机制主要是由消费者来承载,这其实从逻辑上是很好理解的,毕竟消费者端是作为接收端,而生产者作为发送端是不需要限流的策略机制的。在RabbitMQ中提供了QOS的功能,也可以说是服务质量保障,它可以解决消费端存在大批量的消息的时候进行限流。使用到的方法具体是BasicQos。使用限流的前提是手工签收,不是自动签收,也就是basicConsumer方法中的autoAck的参数,需要把该参数设置为false,也就是手工签收。

三、限流案例实现

3.1、生产端的代码

代码语言:javascript复制
package com.example.rabbitmq.currentLimit;

import com.rabbitmq.client.*;

public class ProducerLimit
{
    private  static  final  String exchangeName="test_qos";
    private  static  final  String routyKey="qos.test";

    public static void main(String[] args) throws  Exception
    {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("101.**.***.84");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("wuya");
        connectionFactory.setPassword("java");
        connectionFactory.setVirtualHost("/");

        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();


        String msg = "Hello RabbitMQ QOS Message";

        for(int i=0;i<5;i  )
        {
            channel.basicPublish(exchangeName,routyKey,true,null,msg.getBytes());
        }

    }
}

3.2、消费端的代码

代码语言:javascript复制
package com.example.rabbitmq.currentLimit;

import com.example.rabbitmq.MyConsumer;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class ConsumerLimit
{
    private static final String EXCHANGE = "test_qos";
    private  static  final String queueName="test_qos_queue";
    private  static  final  String routingKey="qos.#";

    public static void main(String[] args) throws  Exception
    {
        try{
            ConnectionFactory connectionFactory=new ConnectionFactory();
            connectionFactory.setHost("101.**.***.84");
            connectionFactory.setPort(5672);
            connectionFactory.setUsername("wuya");
            connectionFactory.setPassword("java");
            connectionFactory.setVirtualHost("/");

            Connection connection=connectionFactory.newConnection();
            Channel channel=connection.createChannel();
            channel.exchangeDeclare(EXCHANGE,"topic",true,false,null);
            channel.queueDeclare(queueName,true,false,false,null);
            channel.queueBind(queueName,EXCHANGE,routingKey);

            channel.basicQos(0,1,false);
            //设置手工签收的方式,限流的前提是必须把autoAck设置为false
            channel.basicConsume(queueName,false,new MyConsumer(channel=channel));
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

备注:在如上的代码中,在方法basicQos中,第二个参数prefetchCount设置为1,表示的是生产者给消费者一个消息后,等待消费者ack处理完成后,然后再接收新的消息,第三个参数global设置为false表示的是consumer级别,而不是channel级别的消息。

3.3、自定义接收消息

代码语言:javascript复制
package com.example.rabbitmq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import java.io.IOException;

public class MyConsumer extends DefaultConsumer
{
    private  Channel channel;

    /**
     * Constructs a new instance and records its association to the passed-in channel.
     *
     * @param channel the channel to which this consumer is attached
     */
    public MyConsumer(Channel channel)
    {
        super(channel);
        this.channel=channel;
    }

    @Override
    public void handleDelivery(
            String consumerTag,
            Envelope envelope,
            AMQP.BasicProperties properties,
            byte[] body) throws IOException
    {
        System.err.println("---------------consumer---------------n");
        System.err.println("consumerTag:" consumerTag);
        System.err.println("envelope:" envelope);
        System.err.println("properties:" properties);
        System.err.println("the message received:" new String(body));

//        channel.basicAck(envelope.getDeliveryTag(),false);
    }
}

3.4、案例结果

执行如上的代码,可以在RabbitMQ控制台就能够看到限流的机制,消费者返回的消息具体如下:

代码语言:javascript复制
---------------consumer---------------

consumerTag:amq.ctag-wx7ky-PxTenMX7Q8MbeoXA
envelope:Envelope(deliveryTag=1, redeliver=false, exchange=test_qos, routingKey=qos.test)
properties:#contentHeader<basic>(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
the message received:Hello RabbitMQ QOS Message

在RabbitMQ的WEB控制台中就会显示出存在的总的消息是5,等待中的消息是4,等待ack签收的消息是1个,具体如下展示:

我们针对消费者接收消息的代码进行完善,具体如下:

代码语言:javascript复制
package com.example.rabbitmq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import java.io.IOException;

public class MyConsumer extends DefaultConsumer
{
    private  Channel channel;

    /**
     * Constructs a new instance and records its association to the passed-in channel.
     *
     * @param channel the channel to which this consumer is attached
     */
    public MyConsumer(Channel channel)
    {
        super(channel);
        this.channel=channel;
    }

    @Override
    public void handleDelivery(
            String consumerTag,
            Envelope envelope,
            AMQP.BasicProperties properties,
            byte[] body) throws IOException
    {
        System.err.println("---------------consumer---------------n");
        System.err.println("consumerTag:" consumerTag);
        System.err.println("envelope:" envelope);
        System.err.println("properties:" properties);
        System.err.println("the message received:" new String(body));
        channel.basicAck(envelope.getDeliveryTag(),false);

    }
}

这地方设置的是非批量的方式来接收消息,我们就可以看到再次启动消费者的程序后,就会看到每个消息都会有一个确认ack的过程,具体如下:

四、稳定性的保障体系思考

我一直认为作为一个质量保障工程师,在稳定系的体系建设方面的前提是我们需要懂得后端涉及到的技术,那么知道具体的技术,我们几可以设计对应的测试用例以及测试场景,来验证后端这部分的稳定性,比如针对限流部分,那么我们可以模拟生产者端,批量的往消费者端写数据,来验证消费者端在无法消费的情况下,它的处理逻辑以及限流机制的合理性,这中间,不管是限流还是不限流,其实都是为客户服务的思考路线,那么限流了,是否影响客户的正常使用?如果影响了,这个影响点具体是什么,有哪些,质量保障团队都需要通过测试验证,来拿出一个评估的数据。

0 人点赞