点击上方蓝字关注我们 文末有惊喜
- 介绍
- 安装
- 相关组件
- 五种模式调用示例
- 简单消息模式
- 工作队列模式
- Publish/Subscribe发布与订阅模式
- Routing路由模式
- Topics主题模式
- 完整代码领取
介绍
RabbitMQ是由erlang语言开发,基于AMQP(Advanced Message Queue 高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在分布式系统开发中应用非常广泛。
RabbitMQ官方地址:http://www.rabbitmq.com/
RabbitMQ提供了6种模式:简单模式,work模式,Publish/Subscribe发布与订阅模式,Routing路由模式,Topics主题模式,RPC远程调用模式(远程调用,不太算MQ;暂不作介绍);
安装
这里为了方便,建议大家使用docker安装方式,docker安装相关命令如下:
- 下载镜像
docker pull rabbitmq:3.7.7-management
- 创建实例并启动
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 -v ~/docker/rabbitmq/data:/var/lib/rabbitmq --hostname myRabbit -e RABBITMQ_DEFAULT_VHOST=my_vhost -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin rabbitmq:3.7.7-management
相关组件
在这里插入图片描述
- 生产者: 是消息的产生者
- 消费者:消息的处理者
- 交换机(exchange):生产者和队列的桥梁,生产者发出的消息通过交换机传到队列
- 队列:消息保存的地方,等待消费者来获取
- 主机:所有的队列和交换机都是挂在主机上的,一个broker可以创建多个独立的主机
- broker(server):交换机、队列和主组成了broker,broker是生产者和消费者的桥梁
五种模式调用示例
引入依赖
代码语言:javascript复制 <dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>
编写配置文件 application.yml
代码语言:javascript复制spring:
rabbitmq:
host: 127.0.0.1
port: 5672
virtual-host: my_vhost #这是我自己创建的主机,请填写你们自己的,注意不要带/
username: lezai
password: lezai
publisher-returns: true
publisher-confirm-type: simple
简单消息模式
简单消息使用默认交换机,routing_key和消费的队列名称保持一致
- 生产者创建mq配置文件
@Configuration
public class RabbitMQConfig {
public static final String QUEUE_TEST_SIMPLE_MODE = "queue_test_simple_mode";
// 声明队列
@Bean("queueTestSimpleMode")
public Queue queueTestSimpleMode(){
return QueueBuilder.durable(QUEUE_TEST_SIMPLE_MODE).build();
}
}
- 编写生产者端发送简单消息的代码
@SpringBootTest
class RabbitmqProductApplicationTests {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSimpleMode() {
rabbitTemplate.convertAndSend("",RabbitMQConfig.QUEUE_TEST_SIMPLE_MODE,"测试简单模式");
}
- 查看管理台 发现已经看到我们创建的队列,并且队列里面已经有一条待消费的消息
- 创建消费者
@Component
public class RabbitMQListener {
@RabbitListener(queues = "queue_test_simple_mode")
public void onMessage(Message message, Channel channel) throws Exception {
System.out.println("收到消息:" new String(message.getBody()));
}
}
工作队列模式
这个同简单模式一样,只不过是创建多个消费者来监听队列,MQ会轮询往多个监听器里面推送消息
代码语言:javascript复制@Component
public class RabbitMQListener {
@RabbitListener(queues = "queue_test_simple_mode")
public void onMessage(Message message, Channel channel) throws Exception {
System.out.println("收到消息:" new String(message.getBody()));
}
@RabbitListener(queues = "queue_test_simple_mode")
public void onMessage2(Message message, Channel channel) throws Exception {
System.out.println("收到消息:" new String(message.getBody()));
}
}
Publish/Subscribe发布与订阅模式
发布订阅使用的是广播模式,所有与这个类型交换机绑定的队列都会接收到消息 交换机类型为「fanoutExchange」
- 生产者的配置文件
@Configuration
public class RabbitMQConfig {
public static final String QUEUE_TEST_FANOUT_MODE_1 = "queue_test_fanout_mode_1";
public static final String QUEUE_TEST_FANOUT_MODE_2 = "queue_test_fanout_mode_2";
public static final String EXCHANGE_TEST_FANOUT_MODE = "exchange_test_fanout_mode";
// 声明队列
@Bean("queueTestFANOUTMode1")
public Queue queueTestFANOUTMode1() {
return QueueBuilder.durable(QUEUE_TEST_FANOUT_MODE_1).build();
}
// 声明队列
@Bean("queueTestFANOUTMode2")
public Queue queueTestFANOUTMode2() {
return QueueBuilder.durable(QUEUE_TEST_FANOUT_MODE_2).build();
}
// 声明交换机
@Bean("exchangeTestFANOUTMode")
public Exchange exchangeTestFANOUTMode() {
return ExchangeBuilder.fanoutExchange(EXCHANGE_TEST_FANOUT_MODE).durable(true).build();
}
// 绑定交换机与队列 订阅模式下 routingkey为空
@Bean
public Binding itemQueueExchange1(@Qualifier("queueTestFANOUTMode1") Queue queue,
@Qualifier("exchangeTestFANOUTMode") Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("").noargs();
}
@Bean
public Binding itemQueueExchange2(@Qualifier("queueTestFANOUTMode2") Queue queue,
@Qualifier("exchangeTestFANOUTMode") Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("").noargs();
}
}
- 编写生产者发送消息的代码
@Test
public void testFanoutMode() {
rabbitTemplate.convertAndSend("exchange_test_fanout_mode","", "测试发布订阅模式");
}
- 查看管理台,多了一个交换机和一个队列,并且每个队列中都有一条我们刚才创建的消息
- 消费端代码编写,消费端永远是监听队列的,所以和前面的方式是一样的
@Component
public class RabbitMQListener {
@RabbitListener(queues = "queue_test_fanout_mode_1")
public void onMessage1(Message message, Channel channel) throws Exception {
System.out.println("收到消息来自队列queue_test_fanout_mode_1:" new String(message.getBody()));
}
@RabbitListener(queues = "queue_test_fanout_mode_2")
public void onMessage2(Message message, Channel channel) throws Exception {
System.out.println("收到消息来自队列queue_test_fanout_mode_2:" new String(message.getBody()));
}
}
Routing路由模式
路由模式 是通过使用定向交换机进行转发消息,在队列与定向交换机绑定的时候,会提供一个routingkey,如果用户在发送消息的交换机的时候,带上了一个routingkey,则会根据这个routingkey进行匹配对应的队列。
- 生产者配置文件编写
@Configuration
public class RabbitMQConfig {
public static final String QUEUE_TEST_ROUTING_MODE_1 = "queue_test_routing_mode_1";
public static final String QUEUE_TEST_ROUTING_MODE_2 = "queue_test_routing_mode_2";
public static final String EXCHANGE_TEST_ROUTING_MODE = "exchange_test_routing_mode";
// 声明队列
@Bean("queueTestROUTINGMode1")
public Queue queueTestROUTINGMode1() {
return QueueBuilder.durable(QUEUE_TEST_ROUTING_MODE_1).build();
}
// 声明队列
@Bean("queueTestROUTINGMode2")
public Queue queueTestROUTINGMode2() {
return QueueBuilder.durable(QUEUE_TEST_ROUTING_MODE_2).build();
}
// 声明交换机
@Bean("exchangeTestROUTINGMode")
public Exchange exchangeTestROUTINGMode() {
return ExchangeBuilder.directExchange(EXCHANGE_TEST_ROUTING_MODE).durable(true).build();
}
// 绑定交换机与队列
// 这里指定了routingkey为order,也就是发消息的时候只能匹配到routingkey为order的消息
@Bean
public Binding itemQueueExchange3(@Qualifier("queueTestROUTINGMode1") Queue queue,
// 这里指定了routingkey为order,也就是发消息的时候只能匹配到routingkey为member的消息 @Qualifier("exchangeTestROUTINGMode") Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("order").noargs();
}
@Bean
public Binding itemQueueExchange4(@Qualifier("queueTestROUTINGMode2") Queue queue,
@Qualifier("exchangeTestROUTINGMode") Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("member").noargs();
}
}
- 编写生产者发送消息的代码
这里发送消息的时候指定了routingkey为order,那么只有当与交换机绑定的队列中设置了routingkey为order的才能监听到消息,这里消息会发送到queue_test_routing_mode_1队列中;
代码语言:javascript复制 @Test
public void testRoutingMode() {
rabbitTemplate.convertAndSend("exchange_test_routing_mode","order", "测试路由模式");
}
- 查看管理台,发现只有queue_test_routing_mode_1中有一条消息等待消费
- 消费者的代码
@RabbitListener(queues = "queue_test_routing_mode_1")
public void onMessage3(Message message, Channel channel) throws Exception {
System.out.println("收到消息来自队列queue_test_routing_mode_1:" new String(message.getBody()));
}
@RabbitListener(queues = "queue_test_routing_mode_2")
public void onMessage4(Message message, Channel channel) throws Exception {
System.out.println("收到消息来自队列queue_test_routing_mode_2:" new String(message.getBody()));
}
Topics主题模式
topics模式的交换机和定向交换机一样,唯一不同的是解析routingkey的时候,定向交换机会根据消息中的routingkey找到对应的队列,这个值是确定的;但是topic模式又称为通配符模式,这个在绑定的时候指定的routingkey是可以以通配符的形式存在的,这里只有两种通配符符号:
#代表1个或多个单词 例如 abc.#则会匹配到 abc.d 或abc.d.f
*代表一个单词 例如。abc.*则只能匹配到abc.a 或abc.f的消息,
代码逻辑和路由模式一样,就不演示了,已经将完整的实例代码传到git库中,欢迎索取:
完整代码领取
https://gitee.com/yangleliu/code_learning.git