1、RabbitMQ与Spring Cloud Stream整合实战。SpringCloud Stream整体结构核心概念图,如下所示:
图示解释:Outputs输出,即消息的发送端。Inputs输入,即消息的接收端。Application Core即核心的应用。Binder是协调者的角色。Middleware是消息中间件。
2、SpringCloud Stream整体结构核心概念图,如下所示:
图示解释:SpringCloud Stream在RabbitMQ在生产者发送消息之前、消费者接收监听之后都套了一层插件。插件可以接收各种各样的不同的消息,也可以支持消息中间件的替换。 SpringCloud Stream插件的关键点,Barista接口,Barista接口是定义来作为后面类的参数,这一接口定义来通道类型和通道名称,通道名称是作为配置用,通道类型则决定了app会使用这一通道进行发送消息还是从中接收消息。
3、使用Spring Cloud Stream非常简单,只需要使用好这3个注解即可,在实现高性能消息的生产和消费的场景非常适合,但是使用SpringCloudStram框架有一个非常大的问题就是不能实现可靠性的投递,也就是没法保证消息的100%可靠性,会存在少量消息丢失的问题。这个原因是因为SpringCloudStream框架为了和Kafka兼顾所以在实际工作中使用它的目的就是针对高性能的消息通信的,这点就是在当前版本SpringCloudStream的定位。
@Output,输出注解,用于定义发送消息接口。 @Input,输入注解,用于定义消息的消费者接口。 @StreamListener,用于定义监听方法的注解。
4、创建maven项目rabbitmq-springcloudstream-producer。修改pom.xml配置文件,如下所示:
代码语言:javascript复制 1 <project xmlns="http://maven.apache.org/POM/4.0.0"
2 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
3 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
4 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5 <modelVersion>4.0.0</modelVersion>
6 <groupId>com.bie</groupId>
7 <artifactId>rabbitmq-springcloudstream-producer</artifactId>
8 <version>0.0.1-SNAPSHOT</version>
9
10 <parent>
11 <groupId>org.springframework.boot</groupId>
12 <artifactId>spring-boot-starter-parent</artifactId>
13 <version>1.5.8.RELEASE</version>
14 <relativePath /> <!-- lookup parent from repository -->
15 </parent>
16
17 <properties>
18 <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
19 <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
20 <java.version>1.8</java.version>
21 </properties>
22
23 <dependencies>
24 <dependency>
25 <groupId>org.springframework.boot</groupId>
26 <artifactId>spring-boot-starter-web</artifactId>
27 </dependency>
28 <dependency>
29 <groupId>org.springframework.boot</groupId>
30 <artifactId>spring-boot-starter</artifactId>
31 </dependency>
32 <!-- springboot自动装配的jar包 -->
33 <dependency>
34 <groupId>org.springframework.boot</groupId>
35 <artifactId>spring-boot-autoconfigure</artifactId>
36 </dependency>
37 <dependency>
38 <groupId>org.springframework.boot</groupId>
39 <artifactId>spring-boot-starter-test</artifactId>
40 <scope>test</scope>
41 </dependency>
42 <!-- 与spring cloud stream相关的jar包 -->
43 <dependency>
44 <groupId>org.springframework.cloud</groupId>
45 <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
46 <version>1.3.4.RELEASE</version>
47 </dependency>
48 <!-- springboot监控相关的jar包 -->
49 <dependency>
50 <groupId>org.springframework.boot</groupId>
51 <artifactId>spring-boot-starter-actuator</artifactId>
52 </dependency>
53 </dependencies>
54
55 <build>
56 <plugins>
57 <plugin>
58 <groupId>org.springframework.boot</groupId>
59 <artifactId>spring-boot-maven-plugin</artifactId>
60 </plugin>
61 </plugins>
62 </build>
63
64 </project>
修改rabbitmq-springcloudstream-producer的application.properties配置文件。如下所示:
代码语言:javascript复制 1 # 端口号
2 server.port=8001
3 # 请问访问路径
4 server.servlet.context-path=/producer
5
6 # 应用的名称
7 spring.application.name=producer
8 # 将交换机和队列绑定到了通道output_channel上面
9 # 交换机名称
10 spring.cloud.stream.bindings.output_channel.destination=exchange-3
11 # 队列名称
12 spring.cloud.stream.bindings.output_channel.group=queue-3
13 # 对集群环境进行配置
14 spring.cloud.stream.bindings.output_channel.binder=rabbit_cluster
15
16 # rabbit_cluster对应上面的spring.cloud.stream.bindings.output_channel.binder的值。名称可以自定义
17 spring.cloud.stream.binders.rabbit_cluster.type=rabbit
18 # 使用的环境是rabbit
19 spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.addresses=192.168.110.133:5672
20 # 账号
21 spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.username=guest
22 # 密码
23 spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.password=guest
24 # 虚拟主机
25 spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.virtual-host=/
创建启动类,如下所示:
代码语言:javascript复制 1 package com.bie;
2
3 import org.springframework.boot.SpringApplication;
4 import org.springframework.boot.autoconfigure.SpringBootApplication;
5
6 /**
7 *
8 * @author biehl
9 *
10 */
11 @SpringBootApplication
12 public class SpringCloudStreamRabbitMQProducerApplication {
13
14 public static void main(String[] args) {
15 SpringApplication.run(SpringCloudStreamRabbitMQProducerApplication.class, args);
16 }
17 }
创建Barista接口,用于创建输出通道,将消息和输出通道进行绑定,如下所示:
代码语言:javascript复制 1 package com.bie.stream;
2
3 import org.springframework.cloud.stream.annotation.Output;
4 import org.springframework.messaging.MessageChannel;
5
6 /**
7 * 这里的Barista接口是定义来作为后面类的参数,
8 *
9 * 这一接口定义来通道类型和通道名称。
10 *
11 * 通道名称是作为配置用,
12 *
13 * 通道类型则决定了app会使用这一通道进行发送消息还是从中接收消息。
14 */
15 public interface Barista {
16
17 // String INPUT_CHANNEL = "input_channel"; // 输入通道
18 String OUTPUT_CHANNEL = "output_channel"; // 输出通道
19
20 // 注解@Input声明了它是一个输入类型的通道,
21 // 名字是Barista.INPUT_CHANNEL,也就是position3的input_channel。
22 // 这一名字与上述配置app2的配置文件中position1应该一致,
23 // 表明注入了一个名字叫做input_channel的通道,它的类型是input,
24 // 订阅的主题是position2处声明的mydest这个主题。
25 // @Input(Barista.INPUT_CHANNEL)
26 // SubscribableChannel loginput();
27
28 // 注解@Output声明了它是一个输出类型的通道,
29 // 名字是output_channel。这一名字与app1中通道名一致,
30 // 表明注入了一个名字为output_channel的通道,类型是output,发布的主题名为mydest。
31 @Output(Barista.OUTPUT_CHANNEL) // 输出通道
32 MessageChannel logoutput();
33
34 // String INPUT_BASE = "queue-1";
35 // String OUTPUT_BASE = "queue-1";
36 // @Input(Barista.INPUT_BASE)
37 // SubscribableChannel input1();
38 // MessageChannel output1();
39
40 }
创建生产者类,如下所示:
代码语言:javascript复制 1 package com.bie.stream;
2
3 import java.util.Map;
4
5 import org.springframework.beans.factory.annotation.Autowired;
6 import org.springframework.cloud.stream.annotation.EnableBinding;
7 import org.springframework.messaging.Message;
8 import org.springframework.messaging.MessageHeaders;
9 import org.springframework.messaging.support.MessageBuilder;
10 import org.springframework.stereotype.Service;
11
12 /**
13 *
14 * @author biehl
15 *
16 */
17 @EnableBinding(Barista.class) // 启动该绑定
18 @Service // 注入到Spring容器中
19 public class RabbitmqProducer {
20
21 @Autowired // 将barista注入到spirng容器中
22 private Barista barista;
23
24 // 发送消息
25 public String sendMessage(Object message, Map<String, Object> properties) throws Exception {
26 try {
27 // 设置消息头
28 MessageHeaders mhs = new MessageHeaders(properties);
29 // 创建消息,使用消息和消息头创建消息。
30 Message msg = MessageBuilder.createMessage(message, mhs);
31 // 调用barista进行消息的发送。
32 boolean sendStatus = barista.logoutput().send(msg);
33 // 打印查看消息发送的情况
34 System.out.println("========================sending========================");
35 System.out.println("发送数据:" message ",sendStatus: " sendStatus);
36 } catch (Exception e) {
37 System.out.println("========================error========================");
38 e.printStackTrace();
39 throw new RuntimeException(e.getMessage());
40 }
41 return null;
42 }
43
44 }
5、创建消费者rabbitmq-springcloudstream-consumer。修改配置文件,如下所示:
代码语言:javascript复制 1 <project xmlns="http://maven.apache.org/POM/4.0.0"
2 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
3 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
4 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5 <modelVersion>4.0.0</modelVersion>
6 <groupId>com.bie</groupId>
7 <artifactId>rabbitmq-springcloudstream-consumer</artifactId>
8 <version>0.0.1-SNAPSHOT</version>
9
10 <parent>
11 <groupId>org.springframework.boot</groupId>
12 <artifactId>spring-boot-starter-parent</artifactId>
13 <version>1.5.8.RELEASE</version>
14 <relativePath /> <!-- lookup parent from repository -->
15 </parent>
16
17 <properties>
18 <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
19 <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
20 <java.version>1.8</java.version>
21 </properties>
22
23 <dependencies>
24 <dependency>
25 <groupId>org.springframework.boot</groupId>
26 <artifactId>spring-boot-starter-web</artifactId>
27 </dependency>
28 <dependency>
29 <groupId>org.springframework.boot</groupId>
30 <artifactId>spring-boot-starter</artifactId>
31 </dependency>
32 <dependency>
33 <groupId>org.springframework.boot</groupId>
34 <artifactId>spring-boot-starter-test</artifactId>
35 <scope>test</scope>
36 </dependency>
37 <!-- 与spring cloud stream相关的jar包 -->
38 <dependency>
39 <groupId>org.springframework.cloud</groupId>
40 <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
41 <version>1.3.4.RELEASE</version>
42 </dependency>
43 <!-- springboot监控相关的jar包 -->
44 <dependency>
45 <groupId>org.springframework.boot</groupId>
46 <artifactId>spring-boot-starter-actuator</artifactId>
47 </dependency>
48 </dependencies>
49
50 <build>
51 <plugins>
52 <plugin>
53 <groupId>org.springframework.boot</groupId>
54 <artifactId>spring-boot-maven-plugin</artifactId>
55 </plugin>
56 </plugins>
57 </build>
58
59 </project>
修改rabbitmq-springcloudstream-consumer的application.properties配置文件。如下所示:
代码语言:javascript复制 1 # 端口号
2 server.port=8002
3 # 访问路径
4 server.context-path=/consumer
5
6 # 应用的名称
7 spring.application.name=consumer
8 # 交换机名称
9 spring.cloud.stream.bindings.input_channel.destination=exchange-3
10 # 队列名称
11 spring.cloud.stream.bindings.input_channel.group=queue-3
12 # 对集群环境进行配置
13 spring.cloud.stream.bindings.input_channel.binder=rabbit_cluster
14 # 设置默认监听数
15 spring.cloud.stream.bindings.input_channel.consumer.concurrency=1
16 # 是否支持重新放回队列
17 spring.cloud.stream.rabbit.bindings.input_channel.consumer.requeue-rejected=false
18 # 接收模式是手工接收
19 spring.cloud.stream.rabbit.bindings.input_channel.consumer.acknowledge-mode=MANUAL
20 # 服务断开,3秒钟重连
21 spring.cloud.stream.rabbit.bindings.input_channel.consumer.recovery-interval=3000
22 # 是否启用持久化订阅
23 spring.cloud.stream.rabbit.bindings.input_channel.consumer.durable-subscription=true
24 # 最大的监听数
25 spring.cloud.stream.rabbit.bindings.input_channel.consumer.max-concurrency=5
26
27 # rabbit_cluster对应上面的spring.cloud.stream.bindings.output_channel.binder的值。名称可以自定义
28 # 使用的环境是rabbit
29 spring.cloud.stream.binders.rabbit_cluster.type=rabbit
30 # 访问地址和端口号
31 spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.addresses=192.168.110.133:5672
32 # 账号
33 spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.username=guest
34 # 密码
35 spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.password=guest
36 # 虚拟主机
37 spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.virtual-host=/
创建启动类,如下所示:
代码语言:javascript复制 1 package com.bie;
2
3 import org.springframework.boot.SpringApplication;
4 import org.springframework.boot.autoconfigure.SpringBootApplication;
5
6 /**
7 *
8 * @author biehl
9 *
10 */
11 @SpringBootApplication
12 public class SpringCloudStreamRabbitMQConsumerApplication {
13
14 public static void main(String[] args) {
15 SpringApplication.run(SpringCloudStreamRabbitMQConsumerApplication.class, args);
16 }
17
18 }
创建Barista接口,用于创建输入通道,将消息和输入通道进行绑定,如下所示:
代码语言:javascript复制 1 package com.bie.stream;
2
3 import org.springframework.cloud.stream.annotation.Input;
4 import org.springframework.messaging.SubscribableChannel;
5
6 /**
7 * 这里的Barista接口是定义来作为后面类的参数,
8 *
9 * 这一接口定义来通道类型和通道名称。
10 *
11 * 通道名称是作为配置用,
12 *
13 * 通道类型则决定了app会使用这一通道进行发送消息还是从中接收消息。
14 *
15 * @author biehl
16 *
17 */
18
19 public interface Barista {
20
21 // 输入管道,用于接收消息
22 String INPUT_CHANNEL = "input_channel";
23
24 // 注解@Input声明了它是一个输入类型的通道,
25 // 名字是Barista.INPUT_CHANNEL,
26 // 也就是position3的input_channel。
27 // 这一名字与上述配置app2的配置文件中position1应该一致,
28 // 表明注入了一个名字叫做input_channel的通道,它的类型是input,
29 // 订阅的主题是position2处声明的mydest这个主题。
30 @Input(Barista.INPUT_CHANNEL)
31 SubscribableChannel loginput();
32
33 }
创建消费者,如下所示:
代码语言:javascript复制 1 package com.bie.stream;
2
3 import org.springframework.amqp.support.AmqpHeaders;
4 import org.springframework.cloud.stream.annotation.EnableBinding;
5 import org.springframework.cloud.stream.annotation.StreamListener;
6 import org.springframework.messaging.Message;
7 import org.springframework.stereotype.Service;
8
9 import com.rabbitmq.client.Channel;
10
11 @EnableBinding(Barista.class) // 启动该绑定
12 @Service // 注入到Spring容器中
13 public class RabbitmqConsumer {
14
15 /**
16 * @StreamListener接收RabbitMQ的消息。指定了输入管道。
17 *
18 *
19 * 接收消息
20 * @param message
21 * @throws Exception
22 */
23 @StreamListener(Barista.INPUT_CHANNEL)
24 public void consumer(Message message) throws Exception {
25 // 获取到具体的channel
26 Channel channel = (com.rabbitmq.client.Channel) message.getHeaders().get(AmqpHeaders.CHANNEL);
27 // 获取到deliveryTag
28 Long deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
29 // 消费消息
30 System.out.println("Input Stream 1 接受数据:" message);
31 System.out.println("消费完毕------------");
32 // 手动ack确认机制,false代表了1条一条进行消息签收
33 channel.basicAck(deliveryTag, false);
34 }
35
36 }
启动消费者进行消息的监听,在生产者项目编写测试类进行代码测试,发送消息,看生产者是否可以接收到消息并进行消费处理。
代码语言:javascript复制 1 package com.bie;
2
3 import java.util.Date;
4 import java.util.HashMap;
5 import java.util.Map;
6
7 import org.apache.http.client.utils.DateUtils;
8 import org.junit.Test;
9 import org.junit.runner.RunWith;
10 import org.springframework.beans.factory.annotation.Autowired;
11 import org.springframework.boot.test.context.SpringBootTest;
12 import org.springframework.test.context.junit4.SpringRunner;
13
14 import com.bie.stream.RabbitmqProducer;
15
16 @RunWith(SpringRunner.class)
17 @SpringBootTest
18 public class ApplicationTests {
19
20 @Autowired
21 private RabbitmqProducer rabbitmqProducer;
22
23 @Test
24 public void sendMessageTest1() throws InterruptedException {
25 for (int i = 0; i < 1; i ) {
26 try {
27 Map<String, Object> properties = new HashMap<String, Object>();
28 properties.put("serial_number", "12345");
29 properties.put("bank_number", "abc");
30 properties.put("plat_send_time", DateUtils.formatDate(new Date(), "yyyy-MM-dd HH:mm:ss.SSS"));
31 rabbitmqProducer.sendMessage("Hello, I am amqp sender num :" i, properties);
32 } catch (Exception e) {
33 System.out.println("================================error================================");
34 e.printStackTrace();
35 }
36 }
37 Thread.sleep(50000);
38 }
39
40 }
运行效果如下所示: