20-SpringBoot整合RabbitMQ

2022-10-06 08:43:20 浏览数 (1)

SpringBoot整合RabbitMQ

整合就直接使用单机版的了, 一直开着5个虚拟机, 我电脑不太行

新建SpringBoot工程

你已经是一个长大的IDEA了, 要学会自己新建工程, 然后IDEA自己创建了rabbitmq-consumer和rabbitmq-producer工程

添加依赖

代码语言:javascript复制
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

POM.xml

代码语言:javascript复制
<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.72</version>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-devtools</artifactId>
        <scope>runtime</scope>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-configuration-processor</artifactId>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
        <exclusions>
            <exclusion>
                <groupId>org.junit.vintage</groupId>
                <artifactId>junit-vintage-engine</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>org.springframework.amqp</groupId>
        <artifactId>spring-rabbit-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

都是在创建工程的时候随便点的功能

启动类就不用我粘贴了吧

生产者代码实现

配置文件

代码语言:javascript复制
spring:
  application:
    name: rabbitmq-producer
  rabbitmq:
    # 集群用
#    addresses: 192.168.247.142:5672
    username: root                         # 用户名
    password: 123456                       # 密码
    host: 192.168.247.142                  # IP地址
    port: 5672                             # 端口号
    virtual-host: /                        # 虚拟地址
    connection-timeout: 15000              # 连接超时时间
    # https://blog.csdn.net/yaomingyang/article/details/108410286 : 详解了publisher-confirm-type 与 publisher-confirms的关系
    publisher-confirm-type: correlated     # confirm回调模式
    publisher-returns: true                # 回调监听, 没有被路由到的消息, 配合下面的mandatory使用
    template:
      mandatory: true

server:
  port: 8001
  servlet:
    context-path: /

消息发送类

代码语言:javascript复制
package com.dance.rabbitmqproducer.component;

import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

import java.util.Map;
import java.util.UUID;

@Component
public class RabbitMQSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * correlationData : 唯一标识
     * ack : 是否成功持久化
     * cause : 失败原因
     */
    final RabbitTemplate.ConfirmCallback confirmCallback = (correlationData, ack, cause) -> {
        System.out.println("消息ID: "   correlationData.getId());
        System.out.println("ack: "   ack);
        System.out.println("cause: "   cause);
    };

    /**
     * 发送消息
     *
     * @param messageBody 消息提
     * @param headers     参数
     * @throws Exception 异常
     */
    public void sendMessage(Object messageBody, Map<String, Object> headers) throws Exception {
        // 消息头
        MessageHeaders messageHeaders = new MessageHeaders(headers);
        Message<Object> message = MessageBuilder.createMessage(messageBody, messageHeaders);
        // 设置消息确认监听
        rabbitTemplate.setConfirmCallback(confirmCallback);
        /**
         * exchange : 交换机
         * routingKey : 路由键
         * message : 消息体
         * messagePostProcess : 消息后置处理器
         * correlation : 消息唯一ID
         */
        rabbitTemplate.convertAndSend("exchange-test",
                "test.order",
                message,
                postMessage -> {
                    System.out.println("post process message: "   postMessage);
                    return postMessage;
                },
                new CorrelationData(UUID.randomUUID().toString().replace("-", "")));
    }

}

测试类

代码语言:javascript复制
package com.dance.rabbitmqproducer;

import com.dance.rabbitmqproducer.component.RabbitMQSender;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

import java.util.HashMap;

@SpringBootTest
public class TestSendMsg {

    @Autowired
    private RabbitMQSender rabbitMQSender;

    @Test
    public void testSendMsg() throws Exception {
        String msg = "hello world!";
        rabbitMQSender.sendMessage(msg,new HashMap<>());
    }

}

消费者代码实现

配置文件

代码语言:javascript复制
spring:
  application:
    name: rabbitmq-consumer
  rabbitmq:
    # 集群用
    #    addresses: 192.168.247.142:5672
    username: root                         # 用户名
    password: 123456                       # 密码
    host: 192.168.247.142                  # IP地址
    port: 5672                             # 端口号
    virtual-host: /                        # 虚拟地址
    connection-timeout: 15000              # 连接超时时间
    listener:
      simple:
        acknowledge-mode: manual           # 手工ACK, 默认为auto, 自动ack
        concurrency: 5                                     # 默认通道数量
        max-concurrency: 10                                 # 最大通道数量
        prefetch: 1                        # 限制消费流量, ack 1个后再接收
      order:
        exchange:
          value: exchange-test
          type: topic
          durable: true
          ignoreDeclarationExceptions: true
        queue:
          value: test-queue
          durable: true
        bindings:
          key: test.#

server:
  port: 8002
  servlet:
    context-path: /

消息消费类

代码语言:javascript复制
package com.dance.rabbitmqconsumer.component;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.stereotype.Component;

@Component
public class RabbitMQRReceiver {

    /**
     * 消费消息
     *
     * @param messageBody 消息体
     * @param channel     通道
     * @throws Exception 异常
     * @RabbitListener 监听消息
     * @QueueBinding 绑定
     * @Queue 队列
     * @Exchange 交换机
     */
    // 直接写死[不建议]
//    @RabbitListener(
//            bindings = {
//                    @QueueBinding(
//                            value = @Queue(value = "test-queue", durable = "true"),
//                            exchange = @Exchange(
//                                    value = "exchange-test",
//                                    type = "topic",
//                                    durable = "true",
//                                    ignoreDeclarationExceptions = "true"
//                            ),
//                            key = "test.#"
//                    )
//            }
//    )
    // 采用配置文件的方式
    @RabbitListener(
            bindings = {
                    @QueueBinding(
                            value = @Queue(
                                    value = "${spring.rabbitmq.listener.order.queue.value}",
                                    durable = "${spring.rabbitmq.listener.order.queue.durable}"
                            ),
                            exchange = @Exchange(
                                    value = "${spring.rabbitmq.listener.order.exchange.value}",
                                    type = "${spring.rabbitmq.listener.order.exchange.type}",
                                    durable = "${spring.rabbitmq.listener.order.exchange.durable}",
                                    ignoreDeclarationExceptions = "${spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions}"
                            ),
                            key = "${spring.rabbitmq.listener.order.bindings.key}"
                    )
            }
    )
    @RabbitHandler
    public void onMessage(Message<Object> messageBody, Channel channel) throws Exception {
        System.out.println("消费消息 :"   messageBody.getPayload());
        MessageHeaders headers = messageBody.getHeaders();
        long deliveryTag = (long) headers.get(AmqpHeaders.DELIVERY_TAG);
        // 手工ACK
        channel.basicAck(deliveryTag, false);
    }

}

测试

启动消费者

启动成功

查看控制台

多了exchange-test交换机

多了test-queue队列

并且存在绑定关系, 路由key是test.#

对了一个连接, 这个就是消费者

在channel中多了5个通道, 这个就是我们在配置文件中设置的初始值

点进去可以看到消费的是哪个队列

启动生产者测试类

可以看到, 在confirm监听中, 得到了消息ID, ack为true, 没有异常, 消息发送成功

查看消费者

消费成功, SpringBoot成功集成RabbitMQ

当然这只是一个Demo, 具体开发中使用, 该需要各位自行改造

0 人点赞