一、前言
我们先来聊聊消息中间件:
消息中间件利用高效可靠的消息传递机制
进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型,它可以在分布式环境下扩展进程间的通信
。(来自百度百科)
我们常见的中间件其实有很多种了,例如ActiveMQ、RabbitMQ、RocketMQ、Kafka、ZeroMQ等,其中应用最为广泛的要数RabbitMQ、RocketMQ、Kafka 这三款。Redis在某种程度上也可以使用list或者Stream来实现消息队列,但不能算是中间件哈!
如果大家对怎么选型感兴趣,可以看一下小编的这篇文章:四大MQ选型
今天小编带着大家一起学习一下RabbitMQ,从入门到精通,从无到有!!小编没有使用Windows安装,很麻烦,所以使用Docker安装。如果没有安装Docker的可以看一下小编的另一篇文章:Linux安装Docker
小编其实也是通过雷神的课件和讲解后,自己在整理一下,供以后学习和参考,在此感谢尚硅谷雷神哈!
小编觉得在说概念之前,应该知道他的作用,然后再系统的学习概念等!
二、RabbitMQ作用
其实作用还是挺多的,但是主要是以下三条:
- 异步处理
- 应用解耦
- 流量控制
下面我们进行一个个的简单描述一下哈,我们还是拿被用了一万次的例子和图例哈!!
1. 异步处理
用户在某网站注册成功后,需要向用户发送邮件和信息提示其注册成功(其实没什么必要,但是例子说一下还可以,小编自己的理解哈!)。常规的做法是:后台将注册信息保存到数据库,然后再给用户发邮件发短信。
我们看到这样非常的耗时,其实保存完成后,就可以登录了,短信和邮件过一会接收也是没有什么问题的!或者发送失败,用户一直没有收到,这都是没什么问题的,用户已经登录进去了,管你发不发短信,大家说对吧!!
既然存在问题,我们就是有消息队列来解决这个问题: 我们可以在将注册信息保存数据库之后,把要发送注册邮件和发送短信的消息写入消息队列,然后就告知用户注册成功。发送邮件和短信将由订阅了消息的应用异步的去执行。这样耗时的问题就解决了!!
2. 应用解耦
在大型电商项目中,会将订单系统和库存系统分成两个不同的应用。然后进行服务与服务之间的调用,正常情况下用户下单后订单系统会调用库存系统,然后返回给用户显示下单成功。
但是也存在问题,如果库存系统挂了,这样就会导致下单失败;如果你是用户,你会判断这个产品不行,以后不用了!!
别着急,这位用户,我们帮您解决哈:这时我们引入消息队列进行解耦
现在有的同学会问,怎么解决的呢?
别着急,小编来和你说一下哈!刚刚出错的原因就是库存系统挂了,改处理的请求没有处理,所以下单失败;我们引入消息队列,就是把订单消息写入到消息队列中,然后库存系统订阅我们的消息队列;然后库存系统去消息队列中获取消息,进行处理订单,来完成减库存的操作;如果失败也会有重试机制
,真的挂了,也可以持久化
,等到库存系统活了之后继续处理!!一个宗旨,不能影响用户的使用体验呢!!
3. 流量控制
看名字就能知道,肯定是并发很大的情况才会出现的,不用想就是秒杀时刻
了!
假设一瓶茅台2万人抢,这是我们的系统可能会被打垮。所以我们把超过一定并发量
时,把超过的请求放在消息队列中,然后减缓系统压力
,然后慢慢处理;虽然可能降低一下用户的体验,但是秒杀就是这样,只能有一部分人成功,我们要保证好系统可以正常运行哈!!
三、RabbitMQ概念
1. RabbitMQ简介
RabbitMQ是一个由erlang开发的AMQP
(Advanved Message Queue Protocol)的开源实现。
RabbitMQ 是部署最广泛的开源消息代理。
RabbitMQ拥有数万用户,是最流行的开源消息代理之一。从T-Mobile到Runtastic,RabbitMQ在世界各地的小型初创公司和大型企业中使用。
RabbitMQ是轻量级的,易于在本地和云中部署。它支持多种消息传递协议。RabbitMQ可以在分布式和联合配置中部署,以满足高规模、高可用性需求。
RabbitMQ运行在许多操作系统和云环境上,并为最流行的语言提供了广泛的开发工具。
2. 核心概念
Message
消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,
这些属性包括routing-key
(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可
能需要持久性存储)等。
Publisher 消息的生产者,也是一个向交换器发布消息的客户端应用程序。
Exchange
交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。
Exchange有4种类型:direct(默认),fanout, topic, 和headers,不同类型的Exchange转发消息的策略有所区别Queue
消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点
。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。
Binding 绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。
Exchange 和Queue的绑定可以是多对多的关系
。
Connection 网络连接,比如一个TCP连接。
Channel
信道,多路复用连接中的一条独立的双向数据流通道
。信道是建立在真实的TCP连接内的虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。
Consumer 消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。
Virtual Host 虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 / 。 ==类似docker容器和容器之间是相互隔离的,一个坏了,不耽误另一个使用==
Broker 表示消息队列服务器实体。
总架构图
四、JMS与AMQP比较
JMS(Java Message Service) | AMQP(Advanced Message Queuing Protocol) | |
---|---|---|
定义 | Java api | 网络线级协议 |
跨语言 | 否 | 是 |
跨平台 | 否 | 是 |
Model | 提供两种消息模型:(1)、Peer-2-Peer(2)、Pub/sub | 提供了五种消息模型:(1)、direct exchange(2)、fanout exchange(3)、topic change(4)、headers exchange(5)、system exchange本质来讲,后四种和JMS的pub/sub模型没有太大差别,仅是在路由机制上做了更详细的划分; |
支持消息类型 | 多种消息类型:TextMessageMapMessageBytesMessageStreamMessageObjectMessageMessage (只有消息头和属性) | byte[]当实际应用时,有复杂的消息,可以将消息序列化后发送。 |
实现中间件 | ActiveMQ、HornetMQ | RabbitMQ |
综合评价 | JMS 定义了JAVA API层面的标准;在java体系中,多个client均可以通过JMS进行交互,不需要应用修改代码,但是其对跨平台的支持较差; | AMQP定义了wire-level层的协议标准;天然具有跨平台、跨语言特性 |
五、RabbitMQ运行机制
AMQP 中消息的路由过程和 Java 开发者熟悉的 JMS 存在一些差别,AMQP 中增加了Exchange 和Binding
的角色。生产者把消息发布到 Exchange 上,消息最终到达队列并被消费者接收,而 Binding 决定交换器的消息应该发送到那个队列
。
Exchange 类型
- direct
- fanout
- topic
- headers(不建议使用)
RabbitMQ默认七大交换机
1. direct
消息中的路由键(routing key)如果和Binding 中的 binding key 一致, 交换器就将消息发到对应的队列中。路由键与队列名完全匹配,如果一个队列绑定到交换机要求路由键为"a1.b1",则只转发 routing key 标记为"a1.b1"的消息,不会转发"a1.b2”,也不会转发"a1.b3" 等等。它是完全匹配、单播的模式
。
2. fanout
每个发到 fanout 类型交换器的消息都会分到所有绑定的队列上去。fanout 交换器不处理路由键
,只是简单的将队列绑定到交换器上,每个发送到交换器的消息都会被转发到与该交换器绑定的所有队列上。很像子网广播
,每台子网内的主机都获得了一份复制的消息。fanout 类型转发消息是最快的、广播
。
3. topic
topic是升级版的fanout模式,做了选择权,并不是全都会接受,符合条件才会收到!topic 交换器通过模式匹配分配消息的路由键属性,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。它将路由键和绑定键的字符串切分成单词,这些单词之间用点隔开。
它同样也会识别两个通配符:符号#
和符号*
。#
匹配0个或多个单词,*
匹配一个单词。
六、Docker安装RabbitMQ
直接输入命令,docker会帮助我们自动去拉去镜像的:
代码语言:javascript复制docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 25672:25672
-p 15671:15671 -p 15672:15672 rabbitmq:management
我们查询是否运行成功
代码语言:javascript复制docker ps
我们在windows上进行测试是否能够打开界面:
输入:http://192.168.17.130:15672/
用户名密码都是:guest
进入界面:
七、整合Springboot
1. 引入依赖
代码语言:javascript复制<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--自定义消息转化器Jackson2JsonMessageConverter所需依赖-->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
2. 主启动类上添加注解
代码语言:javascript复制@EnableRabbit
@SpringBootApplication
public class GulimallOrderApplication {
public static void main(String[] args) {
SpringApplication.run(GulimallOrderApplication.class, args);
}
}
3. 编写配置文件
代码语言:javascript复制# 指定rabbitmq服务器主机
spring.rabbitmq.host=192.168.17.130
# 账号密码端口号都默认配置了,我们无需配置
八、测试创建交换机、队列、绑定关系
1. 测试创建Direct交换机
代码语言:javascript复制@Autowired
AmqpAdmin amqpAdmin;
@Test
public void createExchange() {
// 第一个参数为交换机名字,第二个参数为是否持久化,第三个参数为不使用交换机时删除
DirectExchange directExchange = new DirectExchange("hello-java-exchange",true,false);
amqpAdmin.declareExchange(directExchange);
System.out.println("交换机创建成功");
}
2. 打开交换机界面查看
3. 创建Queue
代码语言:javascript复制@Autowired
AmqpAdmin amqpAdmin;
@Test
public void createQueue() {
/**
* 第一个参数为队列名字,
* 第二个参数为是否持久化,
* 第三个参数为是否排他(true:一个连接只能有一个队列,false:一个连接可以有多个(推荐))
* 第四个参数为不使用队列时自动删除
*/
Queue queue = new Queue("hello-java-queue",true,false,false);
amqpAdmin.declareQueue(queue);
System.out.println("队列创建成功");
}
4. 打开队列界面查看
5. 绑定交换机和队列
代码语言:javascript复制@Autowired
AmqpAdmin amqpAdmin;
@Test
public void createBinding() {
/**
* 第一个参数为目的地,就是交换机或者队列的名字
* 第二个参数为目的地类型,交换机还是队列
* 第三个参数为交换机,
* 第四个参数为路由键,匹配的名称
*/
Binding binding = new Binding("hello-java-queue",
Binding.DestinationType.QUEUE,
"hello-java-exchange",
"hello.java",null);
amqpAdmin.declareBinding(binding);
System.out.println("绑定成功");
}
6. 打开交换机中的绑定界面
点击交换机
绑定列表
九、测试发消息
1. 测试发送消息
代码语言:javascript复制@Autowired
RabbitTemplate rabbitTemplate;
@Test
public void sendMessageTest(){
// 消息类型为object 发送对象也是可以的
String msg = "这是一条消息";
// 第一个参数为发送消息到那个交换机上,第二个是发送的路由键(交换机进行需要符合绑定的队列),第三个参数为发送的消息
rabbitTemplate.convertAndSend("hello-java-exchange","hello.java",msg);
System.out.println("消息发送成功");
}
2. Queue列表中查看消息
3. 手动确认消息
点击我们的队列:
进入详细界面,下滑找到Get messages
:
在次点击队列,消息消失:
4. 测试发送对象
代码语言:javascript复制@Data
// 必须序列化,不然报错
public class User implements Serializable {
private String name;
private Integer age;
}
代码语言:javascript复制@Autowired
RabbitTemplate rabbitTemplate;
@Test
public void sendMessageTest(){
User user = new User();
user.setAge(22);
user.setName("王振军");
// 第一个参数为发送消息到那个交换机上,第二个是发送的路由键(交换机进行需要符合绑定的队列),第三个参数为发送的消息
rabbitTemplate.convertAndSend("hello-java-exchange","hello.java",user);
System.out.println("消息发送成功");
}
5. 查看消息发送的对象
6. 书写消息发送对象转JSON配置类
编写配置类
代码语言:javascript复制@Configuration
public class MyRabbitmqConfig {
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
}
7. 再次发送九、4中的代码,查询是否正常显示对象
十、测试收消息
1. 创建接收信息的方法
方法所在的类必须交给了IOC管理,我们直接写在service里面。代码如下:
代码语言:javascript复制@Service
public class TestService {
// queues是监听的队列名字,可以是多个
@RabbitListener(queues = {"hello-java-queue"})
public void reciveMessage(Object message){
System.out.println("接受的信息" message);
}
}
2. 模拟发送消息
还是用上面的方法进行发送一个对象!
代码语言:javascript复制@Autowired
RabbitTemplate rabbitTemplate;
@Test
public void sendMessageTest(){
User user = new User();
user.setAge(22);
user.setName("王振军");
// 第一个参数为发送消息到那个交换机上,第二个是发送的路由键(交换机进行需要符合绑定的队列),第三个参数为发送的消息
rabbitTemplate.convertAndSend("hello-java-exchange","hello.java",user);
System.out.println("消息发送成功");
}
3. 接收消息查看
代码语言:javascript复制接受的信息:(Body:'{"name":"王振军","age":22}'
MessageProperties [headers={__TypeId__=com.atguigu.gulimall.order.entity.User},
contentType=application/json, contentEncoding=UTF-8, contentLength=0,
receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false,
receivedExchange=hello-java-exchange, receivedRoutingKey=hello.java, deliveryTag=1,
consumerTag=amq.ctag-Nlg0mulsX9mxdPvGe72CBw, consumerQueue=hello-java-queue])
4. 在思考
我们发现刚刚返回的是详细信息,我们可以指定消息的类型,就是发送消息发的对象是什么,我们就可以直接接收就行!看代码:
代码语言:javascript复制@Service
public class TestService {
@RabbitListener(queues = {"hello-java-queue"})
public void reciveMessage(Message message, User user){
System.out.println("接受的信息:" message);
System.out.println("发送的信息:" user);
}
}
在此发送消息,我们看一下控制台:
代码语言:javascript复制接受的信息:(Body:'{"name":"王振军","age":22}'
MessageProperties [headers={__TypeId__=com.atguigu.gulimall.order.entity.User},
contentType=application/json, contentEncoding=UTF-8, contentLength=0,
receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false,
receivedExchange=hello-java-exchange, receivedRoutingKey=hello.java, deliveryTag=1,
consumerTag=amq.ctag-Nlg0mulsX9mxdPvGe72CBw, consumerQueue=hello-java-queue])
发送的信息:User(name=王振军, age=22)
这样就很清晰了哈!
==拓展:== 接收的还有第三个参数就是通道,每一个连接只会有一个通道哈!,大家可以自己测试一下,打印看看一下!!
代码语言:javascript复制public void reciveMessage(Message message, User user, Channel channel)
5. 多个服务监听同一条队列
右击已存在服务,复制一份配置不同端口:
现在有两个服务监听同一个队列!!
6. 模拟发送十条消息,查看会被那个服务接收
调整测试发消息代码:
代码语言:javascript复制@Autowired
RabbitTemplate rabbitTemplate;
@Test
public void sendMessageTest(){
for (int i = 0;i < 10; i ) {
User user = new User();
user.setAge(i);
user.setName("王振军" i);
// 第一个参数为发送消息到那个交换机上,第二个是发送的路由键(交换机进行需要符合绑定的队列),第三个参数为发送的消息
rabbitTemplate.convertAndSend("hello-java-exchange", "hello.java", user);
System.out.println("消息发送成功");
}
}
接收消息的代码:
代码语言:javascript复制@Service
public class TestService {
@RabbitListener(queues = {"hello-java-queue"})
public void reciveMessage(Message message, User user){
System.out.println("接收的信息:" user);
}
}
7. 多服务接收消息结果查看
我们看到9000服务接收了1,4,7
消息
9010服务接收了0,3,6,9
消息
==总结:== 我们可以发现一个消息只会被接收一次!
还有就是发了10条消息,只有7条被接收了,其余的呢?
别急小编来告诉大家,这是因为我们测试是使用SpringBoot的测试类进行的,有的部分消息被测试的接收了!大家不信可以看一下测试的控制台,找一下:
我们看到消息的2,5,8
在这里呢!!
==拓展:== 我们一次发送十条消息,每条接收消息假如耗时10s,此时会接收处理完一个消息,才会接收下一个,就是我们说的串行化!!
十一、总结
这样我们就对RabbitMQ有了新的认识,从入门也算走上了实践!后面有时间小编再把消息的可靠性发出来,也就是进阶版!!
Q.E.D.