RabbitMQ重回队列机制(六)

2022-03-29 16:04:04 浏览数 (1)

在RabbitMQ的生产端把消息发送到Exchange后,然后Exchange与Queue来建立映射关系从而保障消费端能够接收到消息,保障在业务端的消息可靠性,这是正常情况的一种逻辑思维。在异常的情况下,消息到队列中消费端并不能够收到消息,那么就需要重试的机制,也就是重回队列的机制。其实重试的机制在服务端的业务保障性体系中是必须需要考虑的,因为总有特殊的情况导致发送的请求在请求方并没有收到请求,比如服务这层出现TimeOut,以及连接数出现瓶颈,那么这个时候整体程序的瓶颈是在服务这层,那么既然涉及到重试的机制,一般重试是几次了?另外需要思考的是重试的间隔是需要多少秒之间?其实重试的间隔以及重试的次数就需要和具体技术的负责人根据业务的形态来进行考虑,这中间也是需要考虑到幂等性的问题。但是作为服务端质量体系保障的一个部分,质量负责人以及对应测试这部分的同学必须得有这个技术底蕴和测试场景的意识,需要更加系统宏观的站在全局的角度来考虑服务这层重试以及不重试给产品带来的风险管控。当然,在本文章体系中重点核心探讨的是RabbitMQ的重回队列的机制应用。

一、重回队列场景

在实际的产品设计和应用中,有如下几个场景是需要考虑使用重回队列的机制,具体如下:

  • 消费端在进行消费的时候,由于异常导致应该消费的消息没有消费到,那么就需要补偿的机制
  • 服务层这这边由于TimeOut,连接数等瓶颈导致服务端这层崩溃,那么就也就重回队列的机制

其实不管什么场景,具体总结来说就是在接收端,也就是服务这层,由于服务这边技术上的问题导致无法正常的消费应该消费的数据,那么就需要重回队列的机制来保障消费端这层把应该消费的消息消费掉。

二、怎么理解重回队列

消费者的重回队列机制是对没有处理成功的消息,消费者端这边为了消息的可靠性,那么就会把没有消费的消息重新会发送给Broker,通过这样的技术来保障消息的可靠性。那么在消费消息的时候我们需要把autoKey的参数设置为false。

三、重回队列案例

3.1、生产者代码

代码语言:javascript复制
package com.example.rabbitmq.ack;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.util.HashMap;
import java.util.Map;

public class ProducerAck
{
    private  static  final  String exchangeName="test_retry_exchange";
    private  static  final  String routyKey="retry.test";

    public static void main(String[] args) throws  Exception
    {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("101.**.***.84");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("wuya");
        connectionFactory.setPassword("java");
        connectionFactory.setVirtualHost("/");

        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();

        for(int i=0;i<3;i  )
        {

            Map<String,Object>  headers=new HashMap<String,Object>();
            headers.put("num",i);

           AMQP.BasicProperties properties=new AMQP.BasicProperties.Builder()
                   .deliveryMode(2)
                   .contentEncoding("UTF_8")
                   .headers(headers)
                   .build();
            String msg = "Hello RabbitMQ Ack Message" i;
            channel.basicPublish(exchangeName,routyKey,true,properties,msg.getBytes());
        }

    }
}

3.2、消费者代码

代码语言:javascript复制
package com.example.rabbitmq.ack;

import com.example.rabbitmq.MyConsumer;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class ConsumerAck
{
    private static final String exchangeName = "test_retry_exchange";
    private  static  final String queueName="test_retry_queue";
    private  static  final  String routingKey="retry.#";

    public static void main(String[] args) throws  Exception
    {
        try{
            ConnectionFactory connectionFactory=new ConnectionFactory();
            connectionFactory.setHost("101.**.***.84");
            connectionFactory.setPort(5672);
            connectionFactory.setUsername("wuya");
            connectionFactory.setPassword("java");
            connectionFactory.setVirtualHost("/");

            Connection connection=connectionFactory.newConnection();
            Channel channel=connection.createChannel();
            channel.exchangeDeclare(exchangeName,"topic",true,false,null);
            channel.queueDeclare(queueName,true,false,false,null);
            channel.queueBind(queueName,exchangeName,routingKey);

            
            channel.basicConsume(queueName,false,new MyConsumer(channel=channel));
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

在如上的代码中,特别是在方法basicConsum中,参数autoKey我们需要设置为false,也就是手工签收的方式。

3.3、MyConsumer类代码

代码语言:javascript复制
package com.example.rabbitmq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import java.io.IOException;

public class MyConsumer extends DefaultConsumer
{
    private  Channel channel;

    /**
     * Constructs a new instance and records its association to the passed-in channel.
     *
     * @param channel the channel to which this consumer is attached
     */
    public MyConsumer(Channel channel)
    {
        super(channel);
        this.channel=channel;
    }

    @Override
    public void handleDelivery(
            String consumerTag,
            Envelope envelope,
            AMQP.BasicProperties properties,
            byte[] body) throws IOException
    {

        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        //获取properties的值,然后判断
        if((Integer) properties.getHeaders().get("num")==0)
        {
 channel.basicNack(envelope.getDeliveryTag(),false,true);
        }
        else
        {
            channel.basicAck(envelope.getDeliveryTag(),false);
        }

        System.err.println("the message received:" new String(body));

    }
}

在如上的代码中,设置了等待2秒接收消息,方法basicNack就会负责把失败的消息会再次发送到Broker里面,也就是重新发送到队列中等待消费者来进行消费。

3.4、执行结果信息

如上的代码执行成功后,在消费端就会看到num为0的消费失败被再次发送到了队列中等待消费,具体如下所示:

在如上中可以看到消息ID为0的,被再次重回到队列了。重回队列有它的优势但是也是存在它的缺陷,而这个缺陷可以通过死信队列来进行补充和解决。

0 人点赞