RabbitMQ:基本消息模型

2022-12-27 17:18:36 浏览数 (2)

单生产单消费模型,即基本消息模型或简单消费模型,即完成基本的一对一消息转发。 RabbitMQ 单生产单消费模型主要有以下三个角色构成:

  • 生产者(producer/ publisher):一个发送消息的用户应用程序。
  • 消费者(consumer):消费和接收有类似的意思,消费者是一个主要用来等待接收消息的用户应用程序
  • 队列:RabbitMQ 内部类似于邮箱的一个概念。虽然消息流经 RabbitMQ 和你的应用程序,但是它们只能存储在队列中。队列只受主机的内存和磁盘限制,实质上是一个大的消息缓冲区。许多生产者可以发送消息到一个队列,许多消费者可以尝试从一个队列接收数据。

总之:生产者将消息发送到队列,消费者从队列中获取消息,队列是存储消息的缓冲区! ~ 本篇内容包括:RabbitMQ 基本(单生产单消费)消息模型、RabbitMQ 单生产消费模型实现、RabbitMQ 单生产消费模型实现(连接工具类封装)


文章目录
  • 一、RabbitMQ 基本(单生产单消费)消息模型
    • 1、单生产单消费模型(Hello World)
    • 2、单生产单消费模型组成
    • 3、参数细节

  • 二、RabbitMQ 单生产消费模型实现
    • 1、添加 Maven 依赖
    • 2、生产者 实现
    • 3、消费者 实现
  • 三、RabbitMQ 单生产消费模型实现(连接工具类封装)
    • 1、定义封装工具类 ConnectionUtil
    • 2、生产者
    • 3、消费者

一、RabbitMQ 基本(单生产单消费)消息模型

1、单生产单消费模型(Hello World)

单生产单消费模型,即基本消息模型或简单消费模型,即完成基本的一对一消息转发。

2、单生产单消费模型组成

RabbitMQ 单生产单消费模型主要有以下三个角色构成:

  • 生产者(producer/ publisher):一个发送消息的用户应用程序。
  • 消费者(consumer):消费和接收有类似的意思,消费者是一个主要用来等待接收消息的用户应用程序
  • 队列:RabbitMQ 内部类似于邮箱的一个概念。虽然消息流经 RabbitMQ 和你的应用程序,但是它们只能存储在队列中。队列只受主机的内存和磁盘限制,实质上是一个大的消息缓冲区。许多生产者可以发送消息到一个队列,许多消费者可以尝试从一个队列接收数据。

总之:生产者将消息发送到队列,消费者从队列中获取消息,队列是存储消息的缓冲区!

3、参数细节
代码语言:javascript复制
channel.queueDeclare("HelloWorld",false,false,false,null);
  • 参数1:队列名称,如果不存在则自动创建;
  • 参数2:队列特性-是否持久化,只是持久化队列,如果持久化消息需要在 channel.basicPublish 参数3进行额外设置:MessageProperties.PERSISTENT_TEXT_PLAIN
  • 参数3:队列特性-是否独占队列,队列只能被同一个连接绑定;
  • 参数4:队列特性-是否队列完成后自动删除队列,消费者完成消息后自动删除(消费者断开连接);
  • 参数5:额外附加参数

二、RabbitMQ 单生产消费模型实现

1、添加 Maven 依赖

# 在 pom.xml 文件中添加以下依赖

代码语言:javascript复制
<dependencies>
    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.16.0</version>
    </dependency>
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.13.2</version>
        <scope>test</scope>
    </dependency>
  	...
</dependencies>
2、生产者 实现

# Rabbit 基本消息模型 生产者-Producer

代码语言:javascript复制
package com.lizhengi.hello;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.testng.annotations.Test;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author liziheng
 * @version 1.0.0
 * @description Rabbit 基本消息模型 生产者
 * @date 2022-12-18 5:09 下午
 **/
public class Producer {

    /**
     * 消息生产
     *
     * @throws IOException      IOException
     * @throws TimeoutException TimeoutException
     */
    @Test
    public void testSendMessage() throws IOException, TimeoutException {
        // 创建MQ连接工厂对象
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 设置连接MQ主机
        connectionFactory.setHost("127.0.0.1");
        // 设置连接MQ端口号
        connectionFactory.setPort(5672);
        // 设置连接MQ虚拟主机
        connectionFactory.setVirtualHost("/test");
        // 设置连接MQ虚拟主机的用户名密码
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("123456");
        // 获取连接对象
        Connection connection = new ConnectionFactory().newConnection();
        // 获取连接通道
        Channel channel = connection.createChannel();

        /*  通道绑定对应消息队列
         *  参数1:队列名称,如果不存在则自动创建;
         *  参数2:队列特性-是否持久化;
         *  参数3:队列特性-是否独占队列;
         *  参数4:队列特性-是否队列完成后自动删除队列;
         *  参数5:额外附加参数
         */
        channel.queueDeclare("HelloWorld", false, false, false, null);

        /*  发布消息
         *  参数1:交换机名称;参数2:队列名称;参数3:传递消息额外值设置;参数4:具体消息内容
         */
        channel.basicPublish("", "HelloWorld", null, "Hello RabbitMQ".getBytes());

        // 连接关闭
        channel.close();
        connection.close();
    }
}
3、消费者 实现

# Rabbit 基本消息模型 消费者-Customer

代码语言:javascript复制
package com.lizhengi.hello;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author liziheng
 * @version 1.0.0
 * @description Rabbit 基本消息模型 消费者
 * @date 2022-12-18 5:19 下午
 **/
public class Customer {

    public static void main(String[] msg) throws IOException, TimeoutException {
        // 创建MQ连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/test");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("123456");
        Connection connection = new ConnectionFactory().newConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare("HelloWorld", false, false, false, null);

        /*  消费消息
         *  参数1:消费队列名称;参数2:开始消费的自动确认机制;参数3:消费时的回调接口
         */
        channel.basicConsume("HelloWorld", true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
                System.out.println("The Body:"   new String(body));
            }
        });

        // 关闭连接
        channel.close();
        connection.close();
    }
}

三、RabbitMQ 单生产消费模型实现(连接工具类封装)

1、定义封装工具类 ConnectionUtil

# 连接工具类封装-ConnectionUtil

代码语言:javascript复制
package com.lizhengi.hello.demo2;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * @author liziheng
 * @version 1.0.0
 * @description 连接工具类封装
 * @date 2022-12-18 5:24 下午
 **/
public class ConnectionUtil {

    /**
     * 创建 MQ 连接工厂对象
     */
    private static final ConnectionFactory CONNECTION_FACTORY;

    // 类加载时,重量级资源加载
    static {
        CONNECTION_FACTORY = new ConnectionFactory();
        //设置连接MQ主机
        CONNECTION_FACTORY.setHost("127.0.0.1");
        //设置连接MQ端口号
        CONNECTION_FACTORY.setPort(5672);
        //设置连接MQ虚拟主机
        CONNECTION_FACTORY.setVirtualHost("/test");
        //设置连接MQ虚拟主机的用户名密码
        CONNECTION_FACTORY.setUsername("admin");
        CONNECTION_FACTORY.setPassword("123456");
    }

    /**
     * 建立与RabbitMQ连接 工具方法
     *
     * @return Connection
     */
    public static Connection getConnection() {
        try {
            //返回连接对象
            return CONNECTION_FACTORY.newConnection();
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }

    /**
     * 关闭通道和连接 工具方法
     *
     * @param channel    Channel
     * @param connection Connection
     */
    public static void closeConnectionAndChanel(Channel channel, Connection connection) {
        try {
            if (channel != null) {
                channel.close();
            }
            if (connection != null) {
                connection.close();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
2、生产者

# Rabbit 基本消息模型 生产者-Producer

代码语言:javascript复制
package com.lizhengi.hello.demo2;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.testng.annotations.Test;

import java.io.IOException;

/**
 * @author liziheng
 * @version 1.0.0
 * @description Rabbit 基本消息模型 生产者
 * @date 2022-12-18 5:26 下午
 **/
public class Producer {

    /**
     * 消息生产
     * @throws IOException IOException
     */
    @Test
    public void testSendMessage() throws IOException {
        Connection connection = ConnectionUtil.getConnection();
        assert connection != null;
        Channel channel = connection.createChannel();
        channel.queueDeclare("HelloWorld",false,false,false,null);
        channel.basicPublish("","HelloWorld",null,"Hello RabbitMQ".getBytes());
        //资源关闭
        ConnectionUtil.closeConnectionAndChanel(channel,connection);
    }
}
3、消费者

# Rabbit 基本消息模型 消费者-Customer

代码语言:javascript复制
package com.lizhengi.hello.demo2;

import com.rabbitmq.client.*;
import java.io.IOException;

/**
 * @author liziheng
 * @version 1.0.0
 * @description Rabbit 基本消息模型 消费者
 * @date 2022-12-18 5:26 下午
 **/
public class Customer {
    public static void main(String[] msg) throws IOException {
        Connection connection = ConnectionUtil.getConnection();
        assert connection != null;
        Channel channel = connection.createChannel();
        channel.queueDeclare("HelloWorld",false,false,false,null);
        channel.basicConsume("HelloWorld",true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
                System.out.println("The Body:"   new String(body));
            }
        });
    }
}

0 人点赞