RabbitMQ 学习(二)---- HelloWorld 简单模型

2022-09-29 17:13:58 浏览数 (2)

文章目录

  • RabbitMQ 学习(二)---- HelloWorld 简单模型
  • 开放 rabbitMQ 端口号 5672
  • 发送者
    • (1)创建连接工厂 connectionFactory
    • (2)创建 与 rabbitmq 的连接 connection
    • (3)通过连接创建信道 channel
    • (4)在信道中传递数据
    • (6)关闭资源
      • 完整的过程代码
  • 接收者
    • (1)发送消息
    • 接受方完整代码

RabbitMQ 学习(二)---- HelloWorld 简单模型

开放 rabbitMQ 端口号 5672

之前我们使用rabbitMq 网页客户端 开放了 15672 的端口,要想是的 java客户端访问服务器成功,需要开放 5672 的端口号。在服务器安全组设置

还要再pom.xml中加载 ampq客户端的依赖

代码语言:javascript复制
  <!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.14.2</version>
        </dependency>

如果想要在控制台不日志的报错信息,还要加载 slf4j 的依赖

代码语言:javascript复制
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>1.7.25</version>
        </dependency>

发送者

(1)创建连接工厂 connectionFactory

在工厂中需要设置连接的主机名、端口号、客户端的用户名、密码、虚拟主机等,为之后的连接做好预先准备。

虚拟主机就是相当于 我们在数据库软件中 一个系统对应的数据库一样,对应这个一个单独的节点

代码语言:javascript复制
 //1、创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("XXXX.XXX.XXX.XXX"); // 部署rabbitMQ的
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("123456");
        // 虚拟主机相当于一个节点,作用相当于数据库软件中的 存储各种表数据的数据库
        // 发送消息接收消息 (channel、exchange、routinekey、queue)都在虚拟主机中完成
        connectionFactory.setVirtualHost("/test");

(2)创建 与 rabbitmq 的连接 connection

通过工厂实例拿到与rabbitMq的连接对象,只有拿到了连接,才能进行后续的所有操作

代码语言:javascript复制
   //2、根据连接工厂创建连接对象
            connection = connectionFactory.newConnection();

(3)通过连接创建信道 channel

这里有一个经典面试题,为什么我们在channel中完成消息的发送接收,而不是直接在connection中呢?

看看这一个回答,能够很清楚的说明白。

(4)在信道中传递数据

如何在信道中传递数据呢?

我们需要明确一点,不管是什么模型,其实都需要 交换机(exchange)、路由器(routingkey)、队列(queue).

虽然这里的简单模型是点对点的,只需要 队列来传递数据的

  • Exchanger交换机,用来接收生产者发布的消息并将这些消息路由给服务器中的队列。
  • 默认的交换机有一个特点,只要你的routerKey与这个交换机中有同名的队列,他就会自动路由上。

这里的发送接收的规则捋了一下,是这样的,因为该模型不需要指定交换机与路由规则,只需要队列就行了所以使用默认交换机,交换机是用来接收生产者消息,并根据路由规则将消息分发给服务器中的队列中的,在此之前生产着声明了队列,但是在传递的时候仍然需要交换机与路由规则 给队列分发 消息, 巧了默认交换机有一个规则,如果路由规则与队列同名的话,那么路由与队列会自动绑定上,所以需要将 routingKey 写成 与 队列同名,让他们绑定上,然后生产者才能在队列中取到信息

一个很重要的信息传递的规则

  • 生产者声明队列
  • 生产者在发送消息的时候 使用 交换机 接收消息,通过 路由规则 分发消息到 指定的 队列 中等待接收
  • 消费者声明队列
  • 消费者在接收消息的时候通过队列(与生产者队列一致)进行接收消息,接收成功并选择是否回应

声明队列相关信息

代码语言:javascript复制
 //4、在信道中声明队列
    /**
     * @Params1 queue:队列的名字
     * @Params2 durable: 是否支持队列持久化? 这里的持久化以及就是队列信息写入磁盘,如果rabbitmq服务器重启也会恢复队列信息,但不是信息持久化,信息会失去
     * @Params3 exclusive: 该队列是否支持独占? 就是这个队列在被一个信道占用的时候不能被其他进行访问 
     * @Params4 autoDelete: 该队列是否自动删除? 就是说这个队列中的信息被接收方拿完之后要自动删除
     * @Params5 arguments: map类型,一些额外的参数,比如说过期时间设置等
     */
            channel.queueDeclare("queue",false,false,false,null);

通过交换机、路由器、队列进行发送信息

代码语言:javascript复制
        /**
         * 参数1:exchange 写交换机的名字,如果不写说明使用默认default amqp
         * 参数2:routinekey 路由器,如果没有路由器的话默认路由器和队列同名,需要写上队列的名字
         * 参数3:props 这里写发送的消息是否支持持久化
         * 参数4: bytes 类型,这是我们要传递的具体消息
         */ 

//5、使用该信道进行发送消息 
    for(int i=0;i<10;i  ){
                channel.basicPublish("", "queue", MessageProperties.PERSISTENT_TEXT_PLAIN, ("简单模型信息传递:" i).getBytes());
            }

(6)关闭资源

在发送完毕之后,关闭信道,关闭连接

代码语言:javascript复制
finally {
            if(channel!=null){
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            }

            if(connection!=null){
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }

完整的过程代码

代码语言:javascript复制
package hello;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Provider {


    public static void main(String[] args) {
        //1、创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("120.46.143.156");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("123456");

        // 虚拟主机相当于一个节点,作用相当于数据库软件中的 存储各种表数据的数据库
        // 发送消息接收消息 (channel、exchange、routinekey、queue)都在虚拟主机中完成
        connectionFactory.setVirtualHost("/test");

        Connection connection = null;
        Channel channel =null;

        try {
            //2、根据连接工厂创建连接对象
            connection = connectionFactory.newConnection();

            //3、根据连接对象创建信道
            channel = connection.createChannel();

            // 这里使用的是 helloworld 简单模型,不需要交换机,不需要路由,只需要队列

            //4、在信道中声明队列
            /**
             * @Params1 queue:队列的名字
             * @Params1 durable: 是否支持队列持久化? 这里的持久化以及就是队列信息写入磁盘,如果rabbitmq服务器重启也会恢复队列信息,但不是信息持久化,信息会失去
             * @Params1 exclusive: 该队列是否支持独占? 就是这个队列在被一个信道占用的时候不能被其他进行访问
             * @Params1 autoDelete: 该队列是否自动删除? 就是说这个队列中的信息被接收方拿完之后要自动删除
             * @Params1 arguments: map类型,一些额外的参数,比如说过期时间设置等
             */
            channel.queueDeclare("queue",false,false,false,null);

            //5、使用该信道使用exchange、routineKey 进行发送消息 Bytes,

            // 默认的交换机有一个特点,routineKey 如果和 队列名一致的话 ,那么匹配成功

            // 生产者不是将消息放在queue队列中,而是放在默认交换机中等待符合routingkey的队列匹配,routineKey 名字和队列名一致则匹配成功
            // queue在消费者这里生成,匹配成功之后消费者在queue中取消息

            /**
             * 参数1:exchange 写交换机的名字,如果不写说明使用默认default amqp
             * 参数2:routinekey 路由器,如果没有路由器的话默认路由器和队列同名,需要写上队列的名字
             * 参数3:props 这里写发送的消息是否支持持久化
             * 参数4: bytes 类型,这是我们要传递的具体消息
             */
            for(int i=0;i<10;i  ){
                channel.basicPublish("", "queue", MessageProperties.PERSISTENT_TEXT_PLAIN, ("简单模型信息传递:" i).getBytes());
            }

        } catch (Exception e){
            e.printStackTrace();
        }finally {
            if(channel!=null){
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            }

            if(connection!=null){
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }

    }


}

接收者

前面的几个步骤 都一样,

需要注意的一点是,接收方在接收queue中的数据的时候,声明queue必须和 发送方的保持一致,所有条件都得保持一致,否则接收不到。

(1)发送消息

从使用信道发送消息开始,使用 basicConsume()

代码语言:javascript复制
 /**
             * 参数1 : 队列的名字
             * 参数2 : 是否自动确认,如果接收方接受了消息之后是否确认收到
             * 参数3 : 接收到消息之后的业务操作
             */

//5、使用该信道进行发送消息
            channel.basicConsume("queue", true, new DefaultConsumer(channel){
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("接受了发送方的消息:"  new String(body));
                }
            });

这里需要说一下,在最后的业务操作的参数是一个 Consumer,设置一个接口,我们需要写一个实现类,重写其中的 方法,对接收的message进行后续的业务操作。

接受方完整代码

代码语言:javascript复制
package hello;

import com.rabbitmq.client.*;

import java.io.IOException;

public class Customer {
    public static void main(String[] args) {
        //1、创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("120.46.143.156");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("123456");

        // 虚拟主机相当于一个节点,作用相当于数据库软件中的 存储各种表数据的数据库
        // 发送消息接收消息 (channel、exchange、routinekey、queue)都在虚拟主机中完成
        connectionFactory.setVirtualHost("/test");

        Connection connection = null;
        Channel channel =null;

        try {
            //2、根据连接工厂创建连接对象
            connection = connectionFactory.newConnection();
            //3、根据连接对象创建信道
            channel = connection.createChannel();
            // 这里使用的是 helloworld 简单模型,不需要交换机,不需要路由,只需要队列
            //4、在信道中声明队列
            channel.queueDeclare("queue",false,false,false,null);
            //5、使用该信道进行发送消息
            /**
             * 参数1 : 队列的名字
             * 参数2 : 是否自动确认,如果接收方接受了消息之后是否确认收到
             * 参数3 : 接收到消息之后的业务操作
             */
            channel.basicConsume("queue", true, new DefaultConsumer(channel){
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("接受了发送方的消息:"  new String(body));
                }
            });
        } catch (Exception e){
            e.printStackTrace();
        }
    }
}

0 人点赞