10-Spring Boot整合RabbitMQ

2022-11-22 09:57:15 浏览数 (1)

10-Spring Boot整合RabbitMQ

简介

在Spring项目中,可以使用Spring-Rabbit去操作RabbitMQ https://github.com/spring-projects/spring-amqp

尤其是在spring boot项目中只需要引入对应的amqp启动器依赖即可,方便的使用RabbitTemplate发送消息,使用注解接收消息。

一般在开发过程中

生产者工程:

  1. application.yml文件配置RabbitMQ相关信息;
  2. 在生产者工程中编写配置类,用于创建交换机和队列,并进行绑定
  3. 注入RabbitTemplate对象,通过RabbitTemplate对象发送消息到交换机

消费者工程:

  1. application.yml文件配置RabbitMQ相关信息
  2. 创建消息处理类,用于接收队列中的消息并进行处理

搭建生产者工程

1.创建工程

创建生产者工程 springboot-rabbitmq-producer

2. 添加依赖

修改pom.xml文件内容为如下:

代码语言:javascript复制
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <!-- springboot 父工程   -->
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.6.4</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <!-- 工程坐标   -->
    <groupId>com.lijw</groupId>
    <artifactId>springboot-rabbitmq-producer</artifactId>
    <version>0.0.1-SNAPSHOT</version>

    <!-- 工程信息   -->
    <name>springboot-rabbitmq-producer</name>
    <description>Demo project for Spring Boot</description>
    <properties>
        <java.version>1.8</java.version>
    </properties>

    <!-- 工程依赖   -->
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <!-- rabbitmq起步依赖   -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

3. 启动类

代码语言:javascript复制
package com.lijw.springbootrabbitmqproducer;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class SpringbootRabbitmqProducerApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringbootRabbitmqProducerApplication.class, args);
    }

}

4. 配置RabbitMQ

1)配置文件

创建application.yml,内容如下:

代码语言:javascript复制
spring:
  rabbitmq:
    host: localhost
    port: 5672
    virtual-host: /
    username: guest
    password: guest
2)绑定交换机和队列

创建RabbitMQ队列与交换机绑定的配置类com.lijw.springbootrabbitmqproducer.config.RabbitMQConfig

代码语言:javascript复制
package com.lijw.springbootrabbitmqproducer.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {

    // 交换机名称
    public static final String ITEM_TOPIC_EXCHANGE = "item_topic_exchange";
    // 队列名称
    public static final String ITEM_QUEUE = "item_queue";

    // 声明交换机
    @Bean("itemTopicExchange")
    public Exchange topicExchange(){
        /**
         * ExchangeBuilder
         * - topicExchange(ITEM_TOPIC_EXCHANGE) 设置 topic 模式的交换机名称
         * - durable(true) 设置持久化
         * - build() 构建返回 Exchange 对象
         */
        return ExchangeBuilder.topicExchange(ITEM_TOPIC_EXCHANGE).durable(true).build();
    }

    // 声明队列
    @Bean("itemQueue")
    public Queue itemQueue(){
        /**
         * QueueBuilder
         * - .durable(ITEM_QUEUE) 设置队列名称以及持久化
         * - .build() 构建返回 Queue 对象
         */
        return QueueBuilder.durable(ITEM_QUEUE).build();
    }

    // 绑定队列与交换机
    @Bean
    public Binding itemQueueExchange(@Qualifier("itemQueue") Queue queue,
                                     @Qualifier("itemTopicExchange") Exchange exchange){
        /**
         * BindingBuilder
         * - .bind(queue) 绑定队列
         * - .to(exchange) 绑定交换机
         * - with("item.#") 设置 Routing Key
         * - .noargs() 设置无参数
         */
        return BindingBuilder.bind(queue).to(exchange).with("item.#").noargs();
    }
}

5.测试发送消息

在生产者工程springboot-rabbitmq-producer中创建测试类,发送消息:

代码语言:javascript复制
package com.lijw.springbootrabbitmqproducer;

import com.lijw.springbootrabbitmqproducer.config.RabbitMQConfig;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
public class RabbitMQTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    // 生产者发送消息
    @Test
    public void testSend() {
        /**
         * 1. 参数一: 交换机名称
         * 2. 参数二: routingKey
         * 3. 参数三: 发送的消息
         */
        rabbitTemplate.convertAndSend(RabbitMQConfig.ITEM_TOPIC_EXCHANGE, "item.insert", "商品新增,routing key 为item.insert");
        rabbitTemplate.convertAndSend(RabbitMQConfig.ITEM_TOPIC_EXCHANGE, "item.update", "商品修改,routing key 为item.update");
        rabbitTemplate.convertAndSend(RabbitMQConfig.ITEM_TOPIC_EXCHANGE, "item.delete", "商品删除,routing key 为item.delete");
    }
}

运行上述测试程序(交换机和队列才能先被声明和绑定),执行如下:

执行完毕后, 可以在RabbitMQ的管理控制台中查看到交换机与队列的绑定:

在上面我们已经确认了消息写入了队列,下面我们来编写消费者工程进行消费。

搭建消费者工程

1. 创建工程

创建消费者工程 springboot-rabbitmq-consumer

2. 添加依赖

修改pom.xml文件内容为如下:

代码语言:javascript复制
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <!-- springboot 父工程  -->
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.6.4</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <!-- 工程坐标   -->
    <groupId>com.lijw</groupId>
    <artifactId>springboot-rabbitmq-consumer</artifactId>
    <version>0.0.1-SNAPSHOT</version>

    <!-- 工程信息   -->
    <name>springboot-rabbitmq-consumer</name>
    <description>Demo project for Spring Boot</description>
    <properties>
        <java.version>1.8</java.version>
    </properties>

    <!-- 工程依赖   -->
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <!-- rabbitmq的起步依赖  -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

3. 启动类

代码语言:javascript复制
package com.lijw.springbootrabbitmqconsumer;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class SpringbootRabbitmqConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringbootRabbitmqConsumerApplication.class, args);
    }

}

4. 配置RabbitMQ

创建application.yml,内容如下:

代码语言:javascript复制
spring:
  rabbitmq:
    host: localhost
    port: 5672
    virtual-host: /
    username: guest
    password: guest

5. 消息监听处理类

编写消息监听器 com.lijw.springbootrabbitmqconsumer.listener.MyListener

代码语言:javascript复制
package com.lijw.springbootrabbitmqconsumer.listener;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class MyListener {

    /**
     * 监听某个队列的消息
     * @param message 接收到的消息
     */
    @RabbitListener(queues = "item_queue")
    public void myListener1(String message){
        System.out.println("消费者接收到的消息: "   message);
    }
}

6. 测试接收消息

启动消费者工程,查看接收到的消息:

我们可以在生产者工程多发送几次消息看看,如下:

到这里,我们已经确认消费者能够正常接收消息了。

0 人点赞