RabbitMQ笔记(一)-基于SpringBoot使用RabbitMQ以及原理详解

2019-08-31 12:30:38 浏览数 (1)

RabbitMQ 使用与详解

RabbitMQ参考中文文档

1. RabbitMQ原理详解
  • Producer(生产者),产生消息并向RabbitMq发送消息
  • Consumer(消费者),等待RabbitMq消息到来并处理消息
  • Queue(队列), 依存于RabbitMQ内部, 虽然消息通过RabbitMQ在你的应用中传递,但是它们只能存储在queue中
  • message acknowledgment,我们可以要求消费者在消费完消息后发送一个回执给RabbitMQ,RabbitMQ收到消息回执(Message acknowledgment)后才将该消息从Queue中移除
  • message durability,我们可以将Queue与Message都设置为可持久化的(durable),这样可以保证绝大部分情况下我们的RabbitMQ消息不会丢失。
  • Prefetch Count,如果有多个消费者同时订阅同一个Queue中的消息,Queue中的消息会被平摊给多个消费者。这时如果每个消息的处理时间不同,就有可能会导致某些消费者一直在忙,而另外一些消费者很快就处理完手头工作并一直空闲的情况。我们可以通过设置prefetchCount来限制Queue每次发送给每个消费者的消息数,比如我们设置prefetchCount=1,则Queue每次给每个消费者发送一条消息;消费者处理完这条消息后Queue会再给该消费者发送一条消息。
  • Exchange(交换器),生产者将消息发送到Exchange(交换器,下图中的X),由Exchange将消息路由到一个或多个Queue中(或者丢弃)
  • routing key,生产者在将消息发送给Exchange的时候,一般会指定一个routing key,来指定这个消息的路由规则,而这个routing key需要与Exchange Type及binding key联合使用才能最终生效,RabbitMQ为routing key设定的长度限制为255 bytes
  • Binding,RabbitMQ中通过Binding将Exchange与Queue关联起来,这样RabbitMQ就知道如何正确地将消息路由到指定的Queue了
  • binding key,在绑定(Binding)Exchange与Queue的同时,一般会指定一个binding key,binding key 并不是在所有情况下都生效,它依赖于Exchange Type,比如fanout类型的Exchange就会无视binding key,而是将消息路由到所有绑定到该Exchange的Queue。
  • Exchange Type,常见的有fanout、direct、topic、headers这四种 fanout fanout类型的Exchange路由规则非常简单,它会把所有发送到该Exchange的消息路由到所有与它绑定的Queue中

direct 把消息路由到那些binding key与routing key完全匹配的Queue中

topic

​ 把消息路由到那些binding key与routing key模糊匹配的Queue中

匹配规则:

  1. routing key为一个句点号“. ”分隔的字符串(我们将被句点号“. ”分隔开的每一段独立的字符串称为一个单词),如“aa.bb.cc”
  2. binding key与routing key一样也是句点号“. ”分隔的字符串
  3. binding key中可以存在两种特殊字符“”与“#”,用于做模糊匹配,其中“”用于匹配一个单词,“#”用于匹配多个单词(可以是零个)

header

代码语言:javascript复制
headers类型的Exchange不依赖于routingkey与binding key的匹配规则来路由消息,而是根据发送的消息内容中的					headers属性进行匹配。
2. 运行RabbitMQ

使用docker运行,要使用管理页面用management的版本

代码语言:javascript复制
docker run -d --name rabbitmq --publish 5671:5671 --publish 5672:5672 --publish 4369:4369 --publish 25672:25672 --publish 15671:15671 --publish 15672:15672 rabbitmq:management

管理页面默认用户名和密码都是guest

3. 创建QUEUE

点击Queues,Add a new queue

填入queue名称保存即可

4. 创建Exchange

点击Exchanges,Add a new exchange

输入Echange名称,选择type

保存即可

5. 绑定queue和exchange

点击刚才创建的exchange,Bindings下面填入queue的名称和Routing Key即可

6. 创建springboot程序来收发消息

pom.xml

代码语言:javascript复制
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>com.mt.demo</groupId>
	<artifactId>spring-boot-rabbitmq-demo</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<packaging>jar</packaging>

	<name>spring-boot-rabbitmq-demo</name>
	<description>Demo project for Spring Boot</description>

	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.0.6.RELEASE</version>
		<relativePath/> <!-- lookup parent from repository -->
	</parent>

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
		<java.version>1.8</java.version>
	</properties>

	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-amqp</artifactId>
		</dependency>
		<dependency>
			<groupId>org.projectlombok</groupId>
			<artifactId>lombok</artifactId>
			<version>1.16.14</version>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>
</project>

application.yml

代码语言:javascript复制
spring:
  application:
    name: rabbitmq-demo
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    publisher-confirms: true #生产者可以判断消息是否发送到了broker
    publisher-returns: true #生产者可以判断消息是否发送到了queue
    listener:
      direct:
        acknowledge-mode: manual
      simple:
        acknowledge-mode: manual
server:
  port: 10001

先在RabbitMQ管理页面上创建hello的队列,并且使用绑定到topic交换器上

创建一个消费者

代码语言:javascript复制
@Slf4j
@Component
@RabbitListener(queues = "hello")
public class HelloListener {

    @RabbitHandler
    public void process(String hello) {
        log.info("Receiver: {}", hello);
    }
}

创建一个生产者

代码语言:javascript复制
@GetMapping("/send")
public void send(@RequestParam String topic, @RequestParam String route, @RequestParam String msg) {
    log.info("send topic[{}], msg: {}", topic, msg);

    rabbitTemplate.convertAndSend(topic, route, msg);
}

如果再创建一个消费者绑定同样的队列,则可以看到两个消费者交替收到消息

代码语言:javascript复制
@Slf4j
@Component
@RabbitListener(queues = "hello")
public class HelloListener2 {

    @RabbitHandler
    public void process(String hello) {
        log.info("Receiver2: {}", hello);
    }
}

如果再创建一个queue和前一个使用一样的bindingkey,则发送的消息会同是发送进两个queue

配置RabbitTemplate,加入消息确认机制回调

代码语言:javascript复制
@Autowired
private ReturnCallBackListener returnCallBackListener;

@Autowired
private ConfirmCallbackListener confirmCallbackListener;

@Bean
public RabbitTemplate getRabbitTemplate(CachingConnectionFactory connectionFactory){
    RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
    rabbitTemplate.setConfirmCallback(confirmCallbackListener);
    /**
    * 当mandatory标志位设置为true时
    * 如果exchange根据自身类型和消息routingKey无法找到一个合适的queue存储消息
    * 那么broker会调用basic.return方法将消息返还给生产者
    * 当mandatory设置为false时,出现上述情况broker会直接将消息丢弃
    */
    rabbitTemplate.setMandatory(true);
    rabbitTemplate.setReturnCallback(returnCallBackListener);

    return rabbitTemplate;
}

ConfirmCallback: ConfirmCallback接口用于实现消息发送到RabbitMQ交换器后接收ack回调

ReturnCallback:ReturnCallback接口用于实现消息发送到RabbitMQ交换器,但无相应队列与交换器绑定时的回调

完整代码参考GITHUB

0 人点赞