RabbitMQ生产者消费者模型(二)

2022-03-29 16:00:56 浏览数 (1)

作为主流的MQ消息队列中间件,RabbitMQ也是具备了生产者消费者的模型,那么也就是说生产者把消息发送后,消费者来作为接收具体的消息。本文章主要详细的概述RabbitMQ的生产者投递和消费者监听。

一、消息传递流程

下面主要详细的总结下RabbitMQ消息队列服务器的整体流程,具体汇总如下:

  • 生产者只负责把消息投递到Exchange,这个过程不需要刻意的关注Queue
  • 而由Exchange把消息传递给Queue
  • 作为消费者的程序来负责监听Queue的消息
  • 为了保障消息传递的准确性以及及时性,Exchange与Queue会存在一定的绑定关系就是路由Key

二、MQ投递

依据RabbitMQ的架构模型,在生产者模型和消费者模型中,其实生产者和消费者并不知道对方的存在,这是异步通信的特性。作为生产者,它只需要把消息投递到Exchange,在这个过程中生产者并不需要关注Queue,事实上生产者也是无法关注到Queue的,那么消息是如何让消费者来监听并且接收消息了?这就是说会在Exchange和Queue之间建立一种映射关系,而这层关系就不是生产者所需要关注的了。作为消费者也不需要刻意的关注Exchange,而只需要监听Queue。

2.1、引入RabbitMQ的jar

要使用RabbitMQ的前提是需要引入RabbitMQ的jar,那么就需要在pom.xml文件里面新增RabbitMQ

的服务端和客户端,具体如下:

代码语言:javascript复制
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
 
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
        </dependency>

2.2、生产者投递步骤

生产者把消息需要投递给Exchange,那么它的步骤具体总结如下:

代码语言:javascript复制
ConnectionFactory类负责获取连接工厂
Connection类的对象获取一个连接
Channel创建数据通道信道,可以发送和接收消息

下面具体是完整的生产者投递的代码,具体如下:

代码语言:javascript复制
package com.example.rabbitmq.quickstart;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Producer
{
    public static void main(String[] args) throws  Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("101.**.***.84");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("wuya");
        connectionFactory.setPassword("java");
        connectionFactory.setVirtualHost("/");

        Connection connection = connectionFactory.newConnection();

        Channel channel = connection.createChannel();
        String msg = "Hello RabbitMQ";
        channel.basicPublish("saas", "", null, msg.getBytes());
        channel.close();
        connection.close();
    }
}

在如上中,我们可以看到我们首先需要连接到RabbitMQ的服务器,然后在发送消息message的时候我们需要指定具体的Exchange,因为对于生产者来说,它只关注的是把消息投递给Exchange。

2.3、消费者监听

生产者把消息投递到Exchange,那么作为消费者就需要来监听具体的消息了。监听的整个过程首先也是

需要建立RabbitMQ的服务器,这部分涉及到的代码具体如下:

代码语言:javascript复制
package com.example.rabbitmq.quickstart;

import com.rabbitmq.client.*;

public class Consumer
{
    private static final String EXCHANGE = "saas";
    private  static  final String queueName="saas";

    public static void main(String[] args) throws  Exception
    {
        try{
            ConnectionFactory connectionFactory=new ConnectionFactory();
            connectionFactory.setHost("101.**.***.84");
            connectionFactory.setPort(5672);
            connectionFactory.setUsername("wuya");
            connectionFactory.setPassword("java");
            connectionFactory.setVirtualHost("/");
            Connection connection=connectionFactory.newConnection();
            Channel channel=connection.createChannel();

            channel.exchangeDeclare(EXCHANGE,BuiltinExchangeType.FANOUT);

            channel.queueDeclare(queueName,true,false,false,null);

            channel.queueBind(queueName,EXCHANGE,"");

            //创建一个消费者来消费数据
            DefaultConsumer consumer=new DefaultConsumer(channel)
            {
                @Override
                public void handleDelivery(
                        String consumerTag,
                        com.rabbitmq.client.Envelope envelope,
                        AMQP.BasicProperties properties,
                        byte [] body) throws  java.io.IOException
                {
                    String message=new String(body);
                    System.out.println("接收到的消息为:" message);
                };
            };
            System.out.println("消费者程序启动成功,准备接收生产者的数据:n");
            channel.basicConsume(queueName,consumer);
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

在如上中,我们看到Exchange与生产端的Exchange名字是一样的,那么只有这样才能够建立绑定关系,再说的更加简单点来说,生产者把消息给到Exchange,然后Exchange与Queue之间有一个层映射关系,那么只有这样消费者监听队列才能够收取message的消息。

2.4、绑定关系

刚才说到Exchange与Queue之间的绑定关系,下面就针对这部分具体的演示下。我们先启动消费者

的程序,启动成功后,就会自动的创建Exchange和Queue,就可以从Exchange的绑定以及Queue的绑定

中能够获取到对应的绑定关系。

2.4.1、Exchange绑定关系

下面的图是消费者的程序启动后创建的Exchange,以及它的绑定关系,具体如下:

2.4.2、消费者绑定关系

在Exchange的绑定关系中,点击To里面saas,就会自动的跳转到Queue,具体如下所示:

2.5、406错误避免

很多初学者在学习RabbitMQ的时候,总是提前创建好Exchange和Queue,这样结果导致消费者的程序报很多的错误,具体错误如下:

代码语言:javascript复制
java.io.IOException
	at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:129)
	at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:125)
	at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:147)
	at com.rabbitmq.client.impl.ChannelN.exchangeDeclare(ChannelN.java:783)
	at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.exchangeDeclare(AutorecoveringChannel.java:252)
	at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.exchangeDeclare(AutorecoveringChannel.java:242)
	at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.exchangeDeclare(AutorecoveringChannel.java:222)
	at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.exchangeDeclare(AutorecoveringChannel.java:227)
	at com.example.rabbitmq.quickstart.Consumer.main(Consumer.java:31)
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'type' for exchange 'saas' in vhost '/': received 'fanout' but current is 'direct', class-id=40, method-id=10)
	at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66)
	at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36)
	at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:502)
	at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:293)
	at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:141)
	... 6 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'type' for exchange 'saas' in vhost '/': received 'fanout' but current is 'direct', class-id=40, method-id=10)
	at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:517)
	at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:341)
	at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:182)
	at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:114)
	at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:739)
	at com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:47)
	at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:666)
	at java.lang.Thread.run(Thread.java:748)

其实遇到该问题,最简单解决问题的方式就是删除自己创建的Exchange和Queue。删除后,再次执行消费者的程序,它会自动创建Exchange和Queue,而且也就不会再报一系列的具体问题了。解决了如上的问题后,再次执行生产者的程序,就可以看到生产者发送的消息就能够被消费者这边监听到。感谢您的阅读。

0 人点赞