SpringBoot与消息

2022-03-12 14:30:33 浏览数 (1)

一、概述

  1. 消息服务中两个中重要的概念:消息代理目的地
  2. 消息队列主要由两种形式的目的地
    • 队列: 点对点消息通信
    • 主题: 发布/订阅 消息通信

异步处理

应用解耦

流量削峰:

  1. 点对点式:
  • 消息发送者发送消息,消息代理将其放入一个队列中,消息接收者从队列中获取消息内容,消息读取后被移出队列 。
  • 消息只有唯一的发送者和接受者,但并不是说只能有一个接收者
  1. 发布订阅式:
  • 发送者(发布者)发送消息到主题,多个接受者(订阅者)监听(订阅) 这个主题,那么就会发布到达同时收到消息。
  1. JMS (Java Message Service ) JAVA 消息服务
  • 基于 JVM 消息代理的规范。 ActiveMQ、 HornetMQ 是 JMS 实现
  1. AMQP(Advanced Message Queuing Protocol)
  • 高级消息队列协议,也是一个消息代理的规范,兼容 JMS
  • RabbitMQ 是 AMQP 的实现
  1. Spring 支持
    • spring-jms 提供了对 JMS 的支持
    • spring-rabbit 提供了对 AMQP 的支持
    • 需要 ConnectionFactory 的实现来连接消息代理
    • 提供 JmsTemplate、RebbitTemplate 来发送消息
    • @JmsListener(JMS)、 @RabbitListener(AMQP)注解在方法上监听消息代理发 布的消息
    • @EnableJms、@EnableRebbit 开启支持
  2. SpringBoot 自动配置
    • JmsAutoConfiguration
    • RabbitAutoConfiguration

二、RabbitMQ 简介

核心概念

Message

消息

Publisher

消息的生产者,也是一个向交换器发布消息的客户端应用程序。

Exchange

交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。

Queue

消息队列,用来保存消息直到发送给消费者。

Binding

绑定,用于消息队列和交换器之间的关联。

Connection

网络连接,比如一个 TCP 连接。

Channel

信道,多路复用连接中的一条独立的双向数据流通道。

Consumer

消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。

Virtual Host

虚拟主机,表示一批交换器、消息队列和相关对象。

Broker

表示消息队列服务器实体

图示:

三、RabbitMQ 运行机制

① AMQP 中的消息路由

AMQP 中消息的路由过程和 Java 开发者熟悉的 JMS 存在一些差别, AMQP 中增加了ExchangeBinding 的角色。生产者把消息发布到 Exchange 上,消息最终到达队列并被消费者接收,而 Binding 决定交换器的消息应该发送到那个队列。

② Exchange 类型

Exchange 分发消息时根据类型的不同分发策略有区别,目前共四种类型:direct、 fanout、 topic、 headers 。 headers 匹配 AMQP 消息的 header 而不是路由键, headers 交换器和 direct 交换器完全一致,但性能差很多,目前几乎用不到了,所以直接看另外三种类型:

  • Direct Exchange

消息中的路由键(routing key)如果和 Binding 中的 binding key 一致, 交换器就将消息发到对应的队列中。路由键与队列名完全匹配,如果一个队列绑定到交换机要求路由键为“dog”,则只转发 routing key 标记为“dog”的消息,不会发“dog.puppy”,也不会转发“dog.guard”等等。它是完全匹配、单播的模式。

  • Fanout Exchange

每个发到 fanout 类型交换器的消息都会分到所有绑定的队列上去。 fanout 交换器不处理路由键,只是简单的将队列绑定到交换器上,每个发送到交换器的消息都会被转发到与该交换器绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。 fanout 类型转发消息是最快的 。

  • Topic Echange

topic 交换器通过模式匹配分配消息的路由键属性,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。它将路由键和绑定键的字符串切分成单词,这些单词之间用点隔开。它同样也会识别两个通配符:符号“#”和符号“*” 。 #匹配 0 个或多个单词, *匹配一个单词。

四、RabbitMQ 安装启动

① 环境准备

  • Linux CentOS7
  • docker 容器

② 安装步骤

  1. 在 docker hub 中找到 rebbitMQ 镜像
  1. 下载镜像,docker 安装
代码语言:javascript复制
// 选择3-mansgement 带web管理界面
docker pull rabbitmq:3-management
  1. 启动 rebbitMQ 容器
代码语言:javascript复制
docker run -d -p 5672:5672 -p 15672:15672 --name mybebbitmq rabbitmq:3-management
  1. 访问 前提:保证 linux 防火墙暂时关闭,才能给外网访问。 初始用户和密码: guest

五、RabbitMQ 整合

idea 创建工程

① 引入 spring-boot-starter-amqp

代码语言:javascript复制
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

② application.yml 配置

代码语言:javascript复制
spring.rabbitmq.host=192.168.64.129
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

③ 测试 RabbitMQ

1. AmqpAdmin:管理组件
代码语言:javascript复制
@Autowired
private AmqpAdmin amqpAdmin;

@Test
void createExchange() {
    //        创建exchange
    amqpAdmin.declareExchange(new DirectExchange("amqpadmin.exchange"));
    System.out.println("创建完成");

    // 创建Queue
    amqpAdmin.declareQueue(new Queue("amqpadmin.queue", true));

    // 创建绑定规则
    amqpAdmin.declareBinding(new Binding("amqpadmin.queue", Binding.DestinationType.QUEUE, "amqpadmin.exchange", "amqpadmin.queue", null));

}
2.RabbitTemplate: 消息发送处理文件
点对点

发送

代码语言:javascript复制
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void test1(){
            // Message 需要自己构建一个;定义消息体内容和消息头
//        rabbitTemplate.send(exchange,routeKey, message);

        // object默认当成消息体,只需要传入要发送的消息,自动序列化发送给rabbitmq;
//        rabbitTemplate.convertAndSend(exchange, object);
    rabbitTemplate.convertAndSend("amqpadmin.exchange","amqpadmin.queue","测试:test.msg");
}

接收

代码语言:javascript复制

// 接收数据
@Test
public void receive(){
    Object o = rabbitTemplate.receiveAndConvert("amqpadmin.queue");
    System.out.println(o.getClass());
    System.out.println(o);
}

第二种发送,以 Json 方式

代码语言:javascript复制
// 第二种发送,以Json方式
@Test
public void test1(){
    // 以Json 数据格式发送
    HashMap<String, Object> map = new HashMap<>();
    map.put("msg","这是第一条消息");
    map.put("data",Arrays.asList("helloworld",123,true));
    rabbitTemplate.convertAndSend("amqpadmin.exchange","amqpadmin.queue", map);

  //  rabbitTemplate.convertAndSend("amqpadmin.exchange","amqpadmin.queue",new Book("西游记","吴承恩"));
}

发送的 Json 数据被序列化,没有显示正常的 json 数据格式,解决方式:自定义序列方式采用 JSON

代码语言:javascript复制
@Configuration
public class MyAMQConfig {

    // 自定义序列的为Json 格式
    @Bean
    public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }
}
广播
代码语言:javascript复制
// 先创建Exchange
amqpAdmin.declareExchange(new FanoutExchange("amqpadmin.fanout"));
// 绑定
Binding.DestinationType.QUEUE,"amqpadmin.fanout","amqpadmin.queue",null));


@Test
public void sendMsg(){
    rabbitTemplate.convertAndSend("amqpadmin.fanout","",new Book("红楼梦","曹雪芹"));
}
监听器
代码语言:javascript复制
@Service
public class BookService {

    @RabbitListener(queues = "amqpadmin.queue")
    public void receive(Book book){
        System.out.println("收到消息:"   book);
    }

    @RabbitListener(queues = "amqpadmin.queue")
    public void receive2(Message message){
        System.out.println(message.getBody());
        System.out.println(message.getMessageProperties());
    }
}

0 人点赞