1、RabbitMQ是一个在AMQP基础上构建的新一代企业级消息系统,该组件由Pivotal公司提供,使用ErLang语言开发。
修改pom.xml配置文件,追加spring-boot-starter-amqp依赖包。
代码语言:javascript复制 1 <?xml version="1.0" encoding="UTF-8"?>
2 <project xmlns="http://maven.apache.org/POM/4.0.0"
3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
5 https://maven.apache.org/xsd/maven-4.0.0.xsd">
6 <modelVersion>4.0.0</modelVersion>
7 <parent>
8 <groupId>org.springframework.boot</groupId>
9 <artifactId>spring-boot-starter-parent</artifactId>
10 <version>2.3.5.RELEASE</version>
11 <relativePath /> <!-- lookup parent from repository -->
12 </parent>
13 <groupId>com.example</groupId>
14 <artifactId>demo</artifactId>
15 <version>0.0.1-SNAPSHOT</version>
16 <name>demo</name>
17 <description>Demo project for Spring Boot</description>
18
19 <properties>
20 <java.version>1.8</java.version>
21 <maven-jar-plugin.version>3.1.1</maven-jar-plugin.version>
22 </properties>
23
24 <dependencies>
25 <dependency>
26 <groupId>org.springframework.boot</groupId>
27 <artifactId>spring-boot-starter-web</artifactId>
28 </dependency>
29
30 <dependency>
31 <groupId>org.springframework.boot</groupId>
32 <artifactId>spring-boot-starter-test</artifactId>
33 <scope>test</scope>
34 <exclusions>
35 <exclusion>
36 <groupId>org.junit.vintage</groupId>
37 <artifactId>junit-vintage-engine</artifactId>
38 </exclusion>
39 </exclusions>
40 </dependency>
41
42 <!-- mysql驱动包 -->
43 <dependency>
44 <groupId>mysql</groupId>
45 <artifactId>mysql-connector-java</artifactId>
46 </dependency>
47
48 <!-- druid连接池 -->
49 <dependency>
50 <groupId>com.alibaba</groupId>
51 <artifactId>druid</artifactId>
52 <version>1.1.10</version>
53 </dependency>
54
55 <dependency>
56 <groupId>org.springframework.boot</groupId>
57 <artifactId>spring-boot-starter-data-jpa</artifactId>
58 </dependency>
59 <dependency>
60 <groupId>org.springframework.boot</groupId>
61 <artifactId>spring-boot-starter-cache</artifactId>
62 </dependency>
63 <dependency>
64 <groupId>org.hibernate</groupId>
65 <artifactId>hibernate-ehcache</artifactId>
66 </dependency>
67
68 <!-- activeMQ -->
69 <dependency>
70 <groupId>org.springframework.boot</groupId>
71 <artifactId>spring-boot-starter-activemq</artifactId>
72 </dependency>
73
74 <!-- rabbitMQ -->
75 <dependency>
76 <groupId>org.springframework.boot</groupId>
77 <artifactId>spring-boot-starter-amqp</artifactId>
78 </dependency>
79 </dependencies>
80
81 <build>
82 <plugins>
83 <plugin>
84 <groupId>org.springframework.boot</groupId>
85 <artifactId>spring-boot-maven-plugin</artifactId>
86 </plugin>
87 </plugins>
88 <resources>
89 <resource>
90 <directory>src/main/resources</directory>
91 <includes>
92 <include>**/*.properties</include>
93 <include>**/*.yml</include>
94 <include>**/*.xml</include>
95 <include>**/*.p12</include>
96 <include>**/*.html</include>
97 <include>**/*.jpg</include>
98 <include>**/*.png</include>
99 </includes>
100 </resource>
101 </resources>
102 </build>
103
104 </project>
修改yml.xml配置文件,进行RabbitMQ的相关配置,如下所示:
代码语言:javascript复制1 # RabbitMQ服务主机名称
2 spring.rabbitmq.addresses=192.168.110.133
3 # 用户名
4 spring.rabbitmq.username=admin
5 # 密码
6 spring.rabbitmq.password=admin
7 # 虚拟主机
8 spring.rabbitmq.virtual-host=/
这里搞一个消息生产配置类,用来进行消息处理,如下所示:
代码语言:javascript复制 1 package com.demo.config;
2
3 import org.springframework.amqp.core.Binding;
4 import org.springframework.amqp.core.BindingBuilder;
5 import org.springframework.amqp.core.DirectExchange;
6 import org.springframework.amqp.core.Queue;
7 import org.springframework.context.annotation.Bean;
8 import org.springframework.context.annotation.Configuration;
9
10 @Configuration
11 public class RabbitMqConfig {
12
13 public static final String EXCHANGE = "rabbitmq.exchange"; // 交换空间名称
14 public static final String ROUTINGKEY = "rabbitmq.routingkey"; // 设置路由key
15 public static final String QUEUE_NAME = "rabbitmq.queue"; // 设置队列名称
16
17 /**
18 * 根据路由键将队列和交换机绑定到一起
19 *
20 * @param exchange
21 * @param queue
22 * @return
23 */
24 @Bean
25 public Binding bindingExchangeQueue(DirectExchange exchange, Queue queue) {
26 Binding binding = BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY);
27 return binding;
28 }
29
30 /**
31 * 使用直连的模式
32 *
33 * @return
34 */
35 @Bean
36 public DirectExchange getDirectExchage() {
37 return new DirectExchange(EXCHANGE, true, true);
38 }
39
40 /**
41 * 队列消息
42 *
43 * @return
44 */
45 @Bean
46 public Queue queue() {
47 return new Queue(QUEUE_NAME);
48 }
49
50 }
新建消息业务实现类,用于消息生产,如下所示:
代码语言:javascript复制 1 package com.demo.producer;
2
3 import org.springframework.amqp.rabbit.core.RabbitTemplate;
4 import org.springframework.beans.factory.annotation.Autowired;
5 import org.springframework.stereotype.Service;
6
7 import com.demo.config.RabbitMqConfig;
8
9 @Service
10 public class RabbitMqMessageProducer {
11
12 @Autowired
13 private RabbitTemplate rabbitTemplate;
14
15 /**
16 * 消息发送,将交换机和路由器进行绑定
17 *
18 * @param msg
19 */
20 public void send(String msg) {
21 this.rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE, RabbitMqConfig.ROUTINGKEY, msg);
22 }
23
24 }
定义监听处理类,用于消息的消费,如下所示:
代码语言:javascript复制 1 package com.demo.consumer;
2
3 import org.springframework.amqp.rabbit.annotation.RabbitListener;
4 import org.springframework.stereotype.Service;
5
6 @Service
7 public class RabbitMqConsumer {
8
9 /**
10 * 进行消息的接受处理
11 *
12 * @param text
13 */
14 @RabbitListener(queues = "rabbitmq.queue")
15 public void receiveMessage(String text) {
16 System.err.println("【*** 接受消息 ***】 " text);
17 }
18
19 }
此时就实现了与RabbitMQ消息组件的整合,同时在整个程序中只需要调用IMessageProducer接口中的send()方法就可以正常发送,而后会找到设置同样ROUTINGKEY的消费者进行消息消费。
代码语言:javascript复制 1 package com.demo.controller;
2
3 import org.springframework.beans.factory.annotation.Autowired;
4 import org.springframework.stereotype.Controller;
5 import org.springframework.web.bind.annotation.RequestMapping;
6 import org.springframework.web.bind.annotation.ResponseBody;
7
8 import com.demo.producer.RabbitMqMessageProducer;
9
10 @Controller
11 public class RabbitMqController {
12
13 @Autowired
14 private RabbitMqMessageProducer rabbitMqMessageProducer;
15
16 @RequestMapping(value = "/messageProducer")
17 @ResponseBody
18 public void findAll() {
19 for (int i = 0; i < 10000; i ) {
20 rabbitMqMessageProducer.send("rabbitMq producer message : " i);
21 }
22 }
23
24 }
可以通过http://192.168.110.133:15672/观察,查看自己的生产消息和消费消息的情况,如下所示: