RabbitMQ优先级队列机制(八)

2022-03-29 16:06:12 浏览数 (1)

一、什么是优先级队列

在服务级级别的测试中需要考虑被执行任务的优先级机制,也就是通过线程优先级来进行,设置优先级的目的是在资源非常紧张的情况下,让优先级高的任务优先执行,而优先级低的任务排后执行,当然这样的一种设置机制只能是异步的模式下执行,如果是设计在同步的模式下执行,那这个设计从系统上来说就缺少宏观维度的思考。在RabbitMQ的机制中也是提供了队列的优先级机制,这样设计的目的也是在在生产者生产过快,而消费者消费不过来的情况下,也就是资源在紧张或者说是在有限的情况下,设置的队列优先级高的任务它的消息优先进行消费,而优先级低的消息排后消费。当然,如果是在资源不紧张的情况下,设置优先级其实没多大的意义,因为这个时候优先过来的消息先进行消费,也谈不上排队的机制和优先级的机制。

二、优先级的实现机制

针对优先级的设置,在消费者端进行设置,参数具体是x-max-priority,涉及的代码具体如下:

代码语言:javascript复制
            //设置优先级
            Map<String,Object>  arguments =new HashMap<String,Object>();
            arguments.put("x-max-prioroty",10);
            channel.queueDeclare(queueName,true,false,false,arguments);

这样消费者的代码执行后,在RabbitMQ的WEB控制台,就可以看到该消息队列显示设置的优先级,具体如下所示:

如上,我们演示了配置一个队列的最大优先级,其实核心的是需要在生产者发送消息的时候设置当前发送任务的优先级涉及代码如下:

代码语言:javascript复制
               AMQP.BasicProperties properties=new AMQP.BasicProperties.Builder()
                        .deliveryMode(2)
                        .contentEncoding("UTF_8")
                        .headers(headers)
                        .priority(8)
                        .build();

                String msg = "Hello RabbitMQ priority Message" i;
                channel.basicPublish(exchangeName,routyKey,true,properties,msg.getBytes());

在如上中,我们设置的发送任务的优先级是8。

三、优先级队列实战代码

3.1、生产者代码
代码语言:javascript复制
package com.example.rabbitmq.priority;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.util.HashMap;
import java.util.Map;

public class ProducerPriority
{
    private  static  final  String exchangeName="test_priority_exchange";
    private  static  final  String routyKey="priority.test";

    public static void main(String[] args) throws  Exception
    {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("101.**.***.84");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("wuya");
        connectionFactory.setPassword("java");
        connectionFactory.setVirtualHost("/");

        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();

        for(int i=0;i<3;i  )
        {
            Map<String,Object>  headers=new HashMap<String,Object>();
            headers.put("num",i);

            if (i==0)
            {
                AMQP.BasicProperties properties=new AMQP.BasicProperties.Builder()
                        .deliveryMode(2)
                        .contentEncoding("UTF_8")
                        .headers(headers)
                        .priority(8)
                        .build();

                String msg = "Hello RabbitMQ priority Message" i;
                channel.basicPublish(exchangeName,routyKey,true,properties,msg.getBytes());
            }
            else if (i==1)
            {
                AMQP.BasicProperties properties=new AMQP.BasicProperties.Builder()
                        .deliveryMode(2)
                        .contentEncoding("UTF_8")
                        .headers(headers)
                        .priority(3)
                        .build();

                String msg = "Hello RabbitMQ priority Message" i;
                channel.basicPublish(exchangeName,routyKey,true,properties,msg.getBytes());
            }
            else
            {
                AMQP.BasicProperties properties=new AMQP.BasicProperties.Builder()
                        .deliveryMode(2)
                        .contentEncoding("UTF_8")
                        .headers(headers)
                        .priority(9)
                        .build();

                String msg = "Hello RabbitMQ priority Message" i;
                channel.basicPublish(exchangeName,routyKey,true,properties,msg.getBytes());
            }


        }

    }
}
代码语言:javascript复制
在如上中,我们针对发送的任务依据编号进行了优先级的设置。
3.2、消费者代码
代码语言:javascript复制
package com.example.rabbitmq.priority;

import com.example.rabbitmq.MyConsumer;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.util.HashMap;
import java.util.Map;

public class ConsumerPriority
{
    private static final String exchangeName = "test_priority_exchange";
    private  static  final String queueName="test_priority_queue";
    private  static  final  String routingKey="priority.#";

    public static void main(String[] args) throws  Exception
    {
        try{
            ConnectionFactory connectionFactory=new ConnectionFactory();
            connectionFactory.setHost("101.**.***.84");
            connectionFactory.setPort(5672);
            connectionFactory.setUsername("wuya");
            connectionFactory.setPassword("java");
            connectionFactory.setVirtualHost("/");

            Connection connection=connectionFactory.newConnection();
            Channel channel=connection.createChannel();

            //设置优先级
            Map<String,Object>  arguments =new HashMap<String,Object>();
            arguments.put("x-max-prioroty",10);

            channel.exchangeDeclare(exchangeName,"topic",true,false,null);
            channel.queueDeclare(queueName,true,false,false,arguments);
            channel.queueBind(queueName,exchangeName,routingKey);

            channel.basicConsume(queueName,true,new MyConsumer(channel=channel));
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}
3.3、MyConsumer类代码
代码语言:javascript复制
package com.example.rabbitmq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import java.io.IOException;

public class MyConsumer extends DefaultConsumer
{
    private  Channel channel;
    public MyConsumer(Channel channel)
    {
        super(channel);
        this.channel=channel;
    }

    @Override
    public void handleDelivery(
            String consumerTag,
            Envelope envelope,
            AMQP.BasicProperties properties,
            byte[] body) throws IOException
    {
        System.err.println("---------------consumer---------------n");
        System.err.println("the message received:" new String(body));
        System.err.println("message priority:" properties.getPriority());

    }
}
3.4、执行结果信息

如上的代码执行后,当然当前的资源不存在紧张的情况,那么就会按正常的顺序消费,具体输出结果如下:

如上,主要总结了消息队列优先级这部分的总结和它的案例应用。

0 人点赞