SpringBoot整合RabbitMQ 实现五种消息模型 详细教程

2022-08-28 11:10:06 浏览数 (1)

大家好,又见面了,我是你们的朋友全栈君。

今天说下了消息队列中间件,各种队列性能对比,RabbitMQ队列,交换机(Exchange)以及消息 中间件的应用场景,然后带着大家一起实现RabbitMQ的五种消息模型。

消息队列中间件

消息队列中间件是分布式系统中重要的组件,主要解决应用耦合异步消息流量削锋等问题实现高性能,高可用,可伸缩和终一致性[架构] 使用较多的消息队列有 ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ

消息队列在实际应用中常用的使用场景:异步处理应用解耦流量削锋消息通讯四个场景

各种消息中间件性能的比较:

TPS比较 一ZeroMq 最好,RabbitMq 次之, ActiveMq 最差。

持久化消息比较—zeroMq不支持,activeMq和rabbitMq都支持。持久化消息主要是指:MQ down或者MQ所在的服务器down了,消息不会丢失的机制。

可靠性、灵活的路由、集群、事务、高可用的队列、消息排序、问题追踪、可视化管理工具、插件系统、社区—RabbitMq最好,ActiveMq次之,ZeroMq最差。

高并发—从实现语言来看,RabbitMQ最高,原因是它的实现语言是天生具备高并发高可用的erlang语言。

综上所述:RabbitMQ的性能相对来说更好更全面,是消息中间件的首选。

RabbitMQ理论

RabbitMQ是一个消息队列,主要作用是用来实现应用程序的解耦异步削峰),同时也能起到消息缓存,信息分发的作用。

RabbitMQ由 Erlang 语言开发的 AMQP 的开源实现。

AMQP :Advanced Message Queue,高级消息队列协议。它是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品、开发语言等条件的限制。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。AMQP 的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。

消息中间件最主要的作用是解耦,中间件最标准的用法其实是生产者生产消息发送到队列,消费者从队列中拿取消息并处理,生产者不用关系是谁来消费,消费者不用关心谁来生产信息,达到解耦的目的。

在分布式系统中,消息队列也会被用在很多其他方面,列如:分布式事务的支持,RPC的调用等等。其在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

特点

RabbitMQ :初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。具体特点包括:

1.可靠性(Reliability) RabbitMQ 使用一些机制来保证可靠性,如持久化、传输确认、发布确认。

2.灵活的路由(Flexible Routing) 在消息进入队列之前,通过 Exchange 来路由消息的。对于典型的路由功能,RabbitMQ 已经提供了一些内置的 Exchange 来实现。针对更复杂的路由功能,可以将多个 Exchange 绑定在一起,也通过插件机制实现自己的 Exchange 。

3.消息集群(Clustering) 多个 RabbitMQ 服务器可以组成一个集群,形成一个逻辑 Broker 。

4.高可用(Highly Available Queues) 队列可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下队列仍然可用。

5.多种协议(Multi-protocol) RabbitMQ 支持多种消息队列协议,比如 STOMP、MQTT 等等。

6.多语言客户端(Many Clients) RabbitMQ 几乎支持所有常用语言,比如 Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP 等,支持 AJAX。

7.管理界面(Management UI) RabbitMQ 提供了一个易用的用户界面,使得用户可以监控和管理消息 Broker 的许多方 面。

8.跟踪机制(Tracing) 如果消息异常,RabbitMQ 提供了消息跟踪机制,使用者可以找出发生了什么。

9.插件机制(Plugin System) RabbitMQ 提供了许多插件,来从多方面进行扩展,也可以编写自己的插件。

相关概念 通常我们谈到队列服务, 会有三个概念: 发消息者队列收消息者,RabbitMQ 在这个基本概念之上, 多做了一层抽象, 在发消息者和 队列之间, 加入了交换器 (Exchange). 这样发消息者和队列就没有直接联系, 转而变成发消息者把消息给交换器, 交换器根据调度策略再把消息再给队列。

左侧代表生产者,也就是往 RabbitMQ 发消息的程序。 中间即是RabbitMQ,其中包括了 交换机 和 队列。 右侧代表消费者,也就是往 RabbitMQ 拿消息的程序。

其中比较重要的概念有 4 个,分别为:虚拟主机交换机队列绑定

虚拟主机:一个虚拟主机持有一组交换机,队列和绑定。为什么需要多个虚拟主机呢?很简单, RabbitMQ 当中,用户只能在虚拟主机的粒度进行权限控制。 因此,如果需要禁止A组访问B组的交换机/队列/绑定,必须为A和B分别创建一个虚拟主机。每一个 RabbitMQ 服务器都有一个默认的虚拟主机。

交换机:Exchange用于转发信息,但是它不会做存储,如果每天Queue bind到Exchange的话,它会被直接丢弃掉Producer发送过来的信息。 这里比较重要的一个概念:路由键。消息到交换机的时候,交换机会转发到对应的队列中,那么究竟转发到哪个队列,就根据该路由键判断。

队列的作用在上面已经说过这里就不在说明。

绑定:是交换机需要和队列相绑定,这其中如上图所示,就是多对多的关系。

图中所示概念:

RabbitMQ Server:也叫broker server,它是一种传输服务。 他的角色就是维护一条 从Producer到Consumer的路线,保证数据能够按照指定的方式进行传输。

Producer: 消息生产者,如图A、B、C,数据的发送方。消息生产者连接RabbitMQ服 务器然后将消息投递到Exchange。

Consumer:消息消费者,如图1、2、3,数据的接收方。消息消费者订阅队列, RabbitMQ将Queue中的消息发送到消息消费者。

Exchange:生产者将消息发送到Exchange(交换器),由Exchange将消息路由到一个或多个Queue中(或者丢弃)。Exchange并不存储消息。RabbitMQ中的Exchange有 direct、fanout、topic、headers四种类型,每种类型对应不同的路由规则。

Queue:(队列)是RabbitMQ的内部对象,用于存储消息。消息消费者就是通过订阅 队列来获取消息的,RabbitMQ中的消息都只能存储在Queue中,生产者生产消息并终 投递到Queue中,消费者可以从Queue中获取消息并消费。多个消费者可以订阅同一个 Queue,这时Queue中的消息会被平均分摊给多个消费者进行处理,而不是每个消费者 都收到所有的消息并处理。

RoutingKey:生产者在将消息发送给Exchange的时候,一般会指定一个routing key, 来指定这个消息的路由规则,而这个routing key需要与Exchange Type及binding key联 合使用才能终生效。在Exchange Type与binding key固定的情况下(在正常使用时一 般这些内容都是固定配置好的),我们的生产者就可以在发送消息给Exchange时,通过 指定routing key来决定消息流向哪里。RabbitMQ为routing key设定的长度限制为255 bytes。

交换机(Exchange)详解:

交换机的功能主要是接收消息并且转发到绑定的队列,交换机不存储消息,在启用ack模式后,交换机找不到队列会返回错误。交换机有四种类型:Direct, topic, Headers and Fanout

Direct:direct类型的行为是”先匹配,再投送“,即在绑定时设定一个routing_key,消息的routing_key匹配时,才会被交换机投送到绑定的队列中去。

Topic:按照规则转发信息(最灵活)

Headers:设置header attribute参数类型的交换机

Fanout:转发信息到所有绑定队列

Direct Exchange

Direct Exchange 是 RabbitMQ 默认的交换机模式,也是最简单的模式,根据key全文匹配去寻找队列。

第一个 X – Q1 就有一个 binding key,名字为 orange; X – Q2 就有 2 个 binding key,名字为 black 和 green。当消息中的 路由键 和 这个 binding key 对应上的时候,那么就知道了该消息去到哪一个队列中。

Ps:为什么 X 到 Q2 要有 black,green,2个 binding key呢,一个不就行了吗? – 这个主要是因为可能又有 Q3,而Q3只接受 black 的信息,而Q2不仅接受black 的信息,还接受 green 的信息。

Topic Exchange

Topic Exchange 转发消息主要是根据通配符。 在这种交换机下,队列和交换机的绑定会定义一种路由模式,那么,通配符就要在这种路由模式和路由键之间匹配后交换机才能转发消息。

在这种交换机模式下:

路由键必须是一串字符,用句号(.) 隔开,比如说 agreements.us,或者 agreements.eu.stockholm 等。 路由模式必须包含一个 星号(),主要用于匹配路由键指定位置的一个单词,比如说,一个路由模式是这样子:agreements…b.,那么就只能匹配路由键是这样子的:第一个单词是 agreements,第四个单词是 b。 井号(#)就表示相当于一个或者多个单词,例如一个匹配模式是agreements.eu.berlin.#,那么,以agreements.eu.berlin 开头的路由键都是可以的。

具体代码发送的情形如下,第一个参数表示交换机,第二个参数表示 routing key,第三个参数即消息。如下:

代码语言:javascript复制
rabbitTemplate.convertAndSend("testTopicExchange","key1.a.c.key2", " this is  RabbitMQ!");

topic 和 direct 类似, 只是匹配上支持了”模式”, 在”点分”的 routing_key 形式中, 可以使用两个通配符:

*:表示一个词. #:表示零个或多个词.

Headers Exchange

headers 也是根据规则匹配, 相较于 direct 和 topic 固定地使用 routing_key , headers 则是一个自定义匹配规则的类型. 在队列与交换器绑定时, 会设定一组键值对规则, 消息中也包括一组键值对( headers 属性), 当这些键值对有一对, 或全部匹配时, 消息被投送到对应队列.

Fanout Exchange

Fanout Exchange 消息广播的模式,不管路由键或者是路由模式,会把消息发给绑定给它的全部队列,如果配置了 routing_key 会被忽略。

最后讲解一下消息中间件的应用场景就开始整合。

消息中间件的应用场景

异步处理: 场景说明:用户注册后,需要发注册邮件和注册短信,传统的做法有两种1.串行的方式;2.并行的方式

(1)串行方式:将注册信息写入数据库后,发送注册邮件,再发送注册短信,以上三个任务全部完成后才返回给客户端。 这有一个问题是,邮件,短信并不是必须的,它只是一个通知,而这种做法让客户端等待没有必要等待的东西.

(2)并行方式:将注册信息写入数据库后,发送邮件的同时,发送短信,以上三个任务完成后,返回给客户端,并行的方式能提高处理的时间。

假设三个业务节点分别使用50ms,串行方式使用时间150ms,并行使用时间100ms。虽然并性已经提高的处理时间,但是,前面说过,邮件和短信对我正常的使用网站没有任何影响,客户端没有必要等着其发送完成才显示注册成功,应该是写入数据库后就返回.

(3)消息队列 引入消息队列后,把发送邮件,短信不是必须的业务逻辑异步处理

由此可以看出,引入消息队列后,用户的响应时间就等于写入数据库的时间 写入消息队列的时间(可以忽略不计),引入消息队列后处理后,响应时间是串行的3倍,是并行的2倍。

应用解耦 场景:双11是购物狂节,用户下单后,订单系统需要通知库存系统,传统的做法就是订单系统调用库存系统的接口.

这种做法有一个缺点:

当库存系统出现故障时,订单就会失败。订单系统和库存系统高耦合. 引入消息队列

订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功。

库存系统:订阅下单的消息,获取下单消息,进行库操作。 就算库存系统出现故障,消息队列也能保证消息的可靠投递,不会导致消息丢失。

流量削峰 流量削峰一般在秒杀活动中应用广泛 场景:秒杀活动,一般会因为流量过大,导致应用挂掉,为了解决这个问题,一般在应用前端加入消息队列。 作用: 1.可以控制活动人数,超过此一定阀值的订单直接丢弃(我为什么秒杀一次都没有成功过呢^^) 2.可以缓解短时间的高流量压垮应用(应用程序按自己的最大处理能力获取订单)

1.用户的请求,服务器收到之后,首先写入消息队列,加入消息队列长度超过最大值,则直接抛弃用户请求或跳转到错误页面.

2.秒杀业务根据消息队列中的请求信息,再做后续处理.

SpringBoot整合RabbitMQ 实现五种消息模型

上面的文章详细讲解了消息队列中间件,各种队列性能对比,RabbitMQ队列,交换机(Exchange)以及消息中间件的应用场景接下来进入整合实现五种消息模型。

RabbitMQ提供了6种消息模型,但是第6种其实是RPC,并不是MQ,因此不予学习。那么也就剩下5种。

基本消息模型:生产者–>队列–>消费者 work消息模型:生产者–>队列–>多个消费者共同消费 订阅模型-Fanout:广播模式,将消息交给所有绑定到交换机的队列,每个消费者都会收到同一条消息 订阅模型-Direct:定向,把消息交给符合指定 rotingKey 的队列 订阅模型-Topic 主题模式:通配符,把消息交给符合routing pattern(路由模式) 的队列

首先打开abbitmq容器,然后安装rabbitmq可视化插件。

打开容器

代码语言:javascript复制
docker start CID

执行命令进入安装好的容器内

代码语言:javascript复制
docker exec -it rabbitmq /bin/bash

执行命令安装可视化插件

代码语言:javascript复制
rabbitmq-plugins enable rabbitmq_management

http://你的ip:15672,用户名:guest,密码:guest进入rabbitmq管理界面

基本消息模型(简单队列):

P:消息的生产者 C:消息的消费者 红色:队列

生产者将消息发送到队列,消费者从队列中获取消息。

1.配置pom文件,主要是添加spring-boot-starter-amqp的支持

代码语言:javascript复制
<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2、配置application.properties文件 配置rabbitmq的安装地址、端口以及账户信息

代码语言:javascript复制
spring:
  application:
    name: spirng-boot-rabbitmq
  rabbitmq:
    host: 192.168.72.129
    port: 5672
    username: admin
    password: admin

port:5672 是RabbitMQ默认端口

3、配置队列

代码语言:javascript复制
package com.szh.springboot_rabbitmq.config;

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig {
    @Bean
    public Queue queue() {
        return new Queue("q_hello");
    }
}

4.消息的生产者(发送信息)

代码语言:javascript复制
package com.szh.springboot_rabbitmq.send;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.text.SimpleDateFormat;
import java.util.Date;

@Component
public class HelloSender {
    @Autowired
    private AmqpTemplate rabbitTemplate;
    
    public void send() {
        String date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());//24小时制
        String context = "hello "   date;
        System.out.println("Sender : "   context);
        //简单对列的情况下routingKey即为Q名
        this.rabbitTemplate.convertAndSend("q_hello", context);
    }
    }

5.消息的消费者(接收)

代码语言:javascript复制
package com.szh.springboot_rabbitmq.receive;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "q_hello")
public class HelloReceiver {

    @RabbitHandler
    public void process(String hello) {
        System.out.println("Receiver  : "   hello);
    }

6、测试

代码语言:javascript复制
package com.szh.springboot_rabbitmq;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitMqHelloTest {

    @Autowired
    private HelloSender helloSender;

	//基本消息模式  直接模式 单对单
    @Test
    public void hello() throws Exception {
        helloSender.send();
    }
}

多对多使用(Work模式)

如图所示: 一个生产者、2个消费者。

一个消息只能被一个消费者获取。

复用上面单对单的简单队列增加一个Receiver

代码语言:javascript复制
package com.szh.springboot_rabbitmq.receive;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "q_hello")
public class HelloReceiver2 {

    @RabbitHandler
    public void process(String hello) {
        System.out.println("Receiver2  : "   hello);
    }
}

修改消息的生产者

代码语言:javascript复制
 public void send(int i) {
       String date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());//24小时制
       String context = "hello "   i   " "   date;
       System.out.println("Sender : "   context);
       //简单对列的情况下routingKey即为Q名
       this.rabbitTemplate.convertAndSend("q_hello", context);
   }

修改测试类

代码语言:javascript复制
//work工作模式(轮询发布,公平发布)
    @Test
    public void oneToMany() throws Exception {
        for (int i=0;i<100;i  ){
            helloSender.send(i);
            Thread.sleep(300);
        }
    }

测试结果:

1、消费者1和消费者2获取到的消息内容是不同的,同一个消息只能被一个消费者获取。 2、消费者1和消费者2获取到的消息的数量是相同的,一个是消费奇数号消息,一个是偶数。

其实,这样是不合理的,因为消费者1线程停顿的时间短。应该是消费者1要比消费者2获取到的消息多才对。 RabbitMQ 默认将消息顺序发送给下一个消费者,这样,每个消费者会得到相同数量的消息。即轮询(round-robin)分发消息。

2个概念

轮询分发 :使用任务队列的优点之一就是可以轻易的并行工作。如果我们积压了好多工作,我们可以通过增加工作者(消费者)来解决这一问题,使得系统的伸缩性更加容易。在默认情况下,RabbitMQ将逐个发送消息到在序列中的下一个消费者(而不考虑每个任务的时长等等,且是提前一次性分配,并非一个一个分配)。平均每个消费者获得相同数量的消息。这种方式分发消息机制称为Round-Robin(轮询)。

公平分发 :虽然上面的分配法方式也还行,但是有个问题就是:比如:现在有2个消费者,所有的奇数的消息都是繁忙的,而偶数则是轻松的。按照轮询的方式,奇数的任务交给了第一个消费者,所以一直在忙个不停。偶数的任务交给另一个消费者,则立即完成任务,然后闲得不行。而RabbitMQ则是不了解这些的。这是因为当消息进入队列,RabbitMQ就会分派消息。它不看消费者为应答的数目,只是盲目的将消息发给轮询指定的消费者。

公平分发模式在Spring-amqp中是默认的,这种情况也是日常工作中使用最为正常的,轮询模式用的较少,区别在于prefetch默认是1,如果设置为0就是轮询模式。

公平分发模式,轮询分发模式 查看该博客 https://blog.csdn.net/saytime/article/details/80541450

订阅模式

1、1个生产者,多个消费者 2、每一个消费者都有自己的一个队列 3、生产者没有将消息直接发送到队列,而是发送到了交换机 4、每个队列都要绑定到交换机 5、生产者发送的消息,经过交换机,到达队列,实现,一个消息被多个消费者获取的目的 注意:一个消费者队列可以有多个消费者实例,只有其中一个消费者实例会消费

Topic Exchange(主题模式,通配符模式):

同一个消息被多个消费者获取。一个消费者队列可以有多个消费者实例,只有其中一个消费者实例会消费到消息。

topic 是RabbitMQ中最灵活的一种方式,可以根据routing_key自由的绑定不同的队列

Topic类型的Exchange与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符!

Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: user.insert

通配符规则

举例

#:匹配一个或多个词

user.#:能够匹配user.insert.save 或者 user.insert

*:匹配不多不少恰好1个词

user.*:只能匹配user.insert

首先对topic规则配置,这里使用两个队列(消费者)来演示。

1.配置队列,绑定交换机。

代码语言:javascript复制
package com.szh.springboot_rabbitmq.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class TopicRabbitConfig {

    //队列名
    final static String message = "q_topic_message";
    final static String messages = "q_topic_messages";

    //创建队列
    @Bean
    public Queue queueMessage() {
        return new Queue(TopicRabbitConfig.message);
    }

    @Bean
    public Queue queueMessages() {
        return new Queue(TopicRabbitConfig.messages);
    }

    /**
     * 声明一个Topic类型的交换机
     * @return
     */
    @Bean
    TopicExchange exchange() {
        return new TopicExchange("mybootexchange");
    }

    /**
     * 绑定Q队列到交换机,并且指定routingKey
     * @param queueMessage
     * @param exchange
     * @return
     */
    @Bean
    Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) {
        return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");
    }

    @Bean
    Binding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) {
        return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");
    }
}

2.创建2个消费者 q_topic_message 和q_topic_messages

代码语言:javascript复制
package com.szh.springboot_rabbitmq.receive;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "q_topic_message")
public class Receiver1 {

    @RabbitHandler
    public void process(String hello) {
        System.out.println("Receiver1  : "   hello);
    }
}
代码语言:javascript复制
package com.szh.springboot_rabbitmq.receive;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "q_topic_messages")
public class Receiver2 {

    @RabbitHandler
    public void process(String hello) {
        System.out.println("Receiver2 : "   hello);
    }
}

3.消息生产者(发送信息)

代码语言:javascript复制
package com.szh.springboot_rabbitmq.send;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class MsgSender {

    @Autowired
    private AmqpTemplate rabbitTemplate;

      /**
     * 这里的mybootexchange是交换机的名称字符串和发送消息时的名称必须相同
     * 具体代码发送的情形如下,第一个参数表示交换机,第二个参数表示 routing key,第三个参数即消息。如下:
     * this.rabbitTemplate.convertAndSend("mybootexchange", "topic.message", context);
     */
    public void send1() {
        String context = "hi, i am message 1";
        System.out.println("Sender : "   context);
        this.rabbitTemplate.convertAndSend("mybootexchange", "topic.message", context);
    }


    public void send2() {
        String context = "hi, i am messages 2";
        System.out.println("Sender : "   context);
        this.rabbitTemplate.convertAndSend("mybootexchange", "topic.messages", context);
    }
}

send1方法会匹配到topic.#和topic.message,两个Receiver都可以收到消息,发送send2只有topic.#可以匹配所有只有Receiver2监听到消息。

4.测试

代码语言:javascript复制
package com.szh.springboot_rabbitmq;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitTopicTest {

    @Autowired
    private MsgSender msgSender;

/*主题模式
   topic 是RabbitMQ中最灵活的一种方式,可以根据routing_key自由的绑定不同的队列
    首先对topic规则配置,这里使用两个队列(消费者)来演示。
    通配符,把消息交给符合routing pattern(路由模式) 的队列
    */
    @Test
    public void send1() throws Exception {
        msgSender.send1();
    }

    @Test
    public void send2() throws Exception {
        msgSender.send2();
    }
}

Fanout Exchange(订阅模式-广播)

Fanout,也称为广播。在广播模式下,消息发送流程是这样的:

1.可以有多个消费者

2.每个消费者有自己的queue(队列)

3.每个队列都要绑定到Exchange(交换机)

4.生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定。

5.交换机把消息发送给绑定过的所有队列

6.队列的消费者都能拿到消息。实现一条消息被多个消费者消费

Fanout 就是我们熟悉的广播模式或者订阅模式,给Fanout交换机发送消息,绑定了这个交换机的所有队列都收到这个消息。

1.配置队列,绑定交换机

代码语言:javascript复制
package com.szh.springboot_rabbitmq.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class FanoutRabbitConfig {

    /**
     * @author songzhenghong
     * @version 1.0
     * @date 2019/6/16 10:52
     * Broker:它提供一种传输服务,它的角色就是维护一条从生产者到消费者的路线,保证数据能按照指定的方式进行传输,
     * Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
     * Queue:消息的载体,每个消息都会被投到一个或多个队列。
     * Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来.
     * Routing Key:路由关键字,exchange根据这个关键字进行消息投递。
     * vhost:虚拟主机,一个broker里可以有多个vhost,用作不同用户的权限分离。
     * Producer:消息生产者,就是投递消息的程序.
     * Consumer:消息消费者,就是接受消息的程序.
     * Channel:消息通道,在客户端的每个连接里,可建立多个channel.
     */

    //创建队列
    @Bean
    public Queue aMessage() {
        return new Queue("q_fanout_A");
    }

    @Bean
    public Queue bMessage() {
        return new Queue("q_fanout_B");
    }

    @Bean
    public Queue cMessage() {
        return new Queue("q_fanout_C");
    }

    /**
     * 声明一个Fanout类型的交换机
     * @return
     */
    /**
     * 针对消费者配置
     * 1. 设置交换机类型
     * 2. 将队列绑定到交换机
     FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念
     DirectExchange:按照routingkey分发到指定队列
     TopicExchange:多关键字匹配
     */
    @Bean
    FanoutExchange fanoutExchange() {
        return new FanoutExchange("mybootfanoutExchange");
    }

    /**
     * 绑定aMessage队列到交换机
     * @param aMessage
     * @param fanoutExchange
     * @return
     */
    @Bean
    Binding bindingExchangeA(Queue aMessage, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(aMessage).to(fanoutExchange);
    }

    @Bean
    Binding bindingExchangeB(Queue bMessage, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(bMessage).to(fanoutExchange);
    }

    @Bean
    Binding bindingExchangeC(Queue cMessage, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(cMessage).to(fanoutExchange);
    }
}

2.创建3个消费者

代码语言:javascript复制
package com.szh.springboot_rabbitmq.receive;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "q_fanout_A")
public class ReceiverA {

    @RabbitHandler
    public void process(String hello) {
        System.out.println("AReceiver  : "   hello   "/n");
    }
}
代码语言:javascript复制
package com.szh.springboot_rabbitmq.receive;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "q_fanout_B")
public class ReceiverB {

    @RabbitHandler
    public void process(String hello) {
        System.out.println("BReceiver  : "   hello   "/n");
    }
}
代码语言:javascript复制
package com.szh.springboot_rabbitmq.receive;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "q_fanout_C")
public class ReceiverC {

    @RabbitHandler
    public void process(String hello) {
        System.out.println("CReceiver  : "   hello   "/n");
    }
}

3.生产者

代码语言:javascript复制
package com.szh.springboot_rabbitmq.send;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class MsgSenderFanout {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send() {
        String context = "hi, fanout msg ";
        System.out.println("Sender : "   context);
        this.rabbitTemplate.convertAndSend("mybootfanoutExchange","", context);
    }
}

4.测试

代码语言:javascript复制
package com.szh.springboot_rabbitmq;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitFanoutTest {

    @Autowired
    private MsgSenderFanout msgSender;

    @Test
    public void send1() throws Exception {
        msgSender.send();
    }
}

结果如下,三个消费者都收到消息: AReceiver : hi, fanout msg CReceiver : hi, fanout msg BReceiver : hi, fanout msg

订阅模型-Direct

在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。

在Direct模型下:

队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)

消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey。

Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息.

1.配置队列, 绑定交换机

1.生产者(发送消息)

代码语言:javascript复制
package com.szh.springboot_rabbitmq.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class DirectRabbitConfig {
    @Bean
    public Queue q_direct_A() {
        return new Queue("q_direct_A");
    }

    @Bean
    public Queue q_direct_B() {
        return new Queue("q_direct_B");
    }

    @Bean
    public Queue q_direct_C() {
        return new Queue("q_direct_C");
    }

    @Bean
    DirectExchange directExchange() {
        return new DirectExchange("mybootdirectExchange");
    }

    @Bean
    Binding bindingExchangeDA(Queue q_direct_A, DirectExchange directExchange) {
        return BindingBuilder.bind(q_direct_A).to(directExchange).with("topic");
    }

    @Bean
    Binding bindingExchangeDB(Queue q_direct_B, DirectExchange directExchange) {
        return BindingBuilder.bind(q_direct_B).to(directExchange).with("topic");
    }

    @Bean
    Binding bindingExchangeDC(Queue q_direct_C, DirectExchange directExchange) {
        return BindingBuilder.bind(q_direct_C).to(directExchange).with("topic");
    }
}

仍是之前Fanout创建好的三个消费者

2.生产者

代码语言:javascript复制
package com.szh.springboot_rabbitmq.send;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class MsgProducers {
    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void producerA(){
        String context = "hi, direct msg ";
        System.out.println("Sender : "   context);
        this.rabbitTemplate.convertAndSend("mybootdirectExchange","topic", context);
    }

}

3.测试

代码语言:javascript复制
   @Test
    public void send1() throws Exception {
        msgProducers.producerA();
    }

结果如下,三个消费者都收到消息: AReceiver : hi, direct msg CReceiver : hi, direct msg BReceiver : hi, direct msg

这是因为3个队列绑定交换机的rountingkey都是topic所导致的。

发布者:全栈程序员栈长,转载请注明出处:https://javaforall.cn/146414.html原文链接:https://javaforall.cn

0 人点赞