SpringBoot动态创建绑定rabbitMq队列

2024-03-07 09:19:30 浏览数 (2)

SpringBoot动态创建绑定rabbitMq队列

一、介绍

在以前,我写过一篇如何使用SpringBoot整合rabbitMq的文章。

SpringBoot整合rabbitMq | 半月无霜 (banmoon.top)

上面这种方法,是自己创建队列,交换机,绑定。生成Bean,从而实现队列等等的创建。

这种方式太过于繁琐,有没有一种方法可以快速创建呢,我们只管使用就行了

还真的有,只需要在配置文件中配置队列、交换机等信息,就可以在服务启动的时候自动创建并绑定。

一次偶然间,在csdn上看到了,动态创建rabbitMq队列的文章。

拉出来魔改了一下,只要再配置文件中配置了相关的实现,实现了队列、交换机的绑定。

同时还解决了,多个开发连接同一个rabbitMq,导致自己生产的消息,被其他同事消费走的问题。

二、代码

1)读取配置的代码

这是RabbitModuleInfoProperties.java,读取配置文件中的信息,生成信息对象

代码语言:javascript复制
package com.banmoon.config.properties;

import com.banmoon.enums.HeadersTypeEnum;
import com.banmoon.enums.RabbitExchangeTypeEnum;
import lombok.Data;

import java.util.List;
import java.util.Map;

@Data
public class RabbitModuleInfoProperties {

    /**
     * 路由key
     */
    private String routingKey;

    /**
     * 队列信息
     */
    private Queue queue;

    /**
     * 多个队列
     */
    private List<Queue> queues;

    /**
     * 交换机信息
     */
    private Exchange exchange;

    @Data
    public static class Queue {
        /**
         * 队列名称
         */
        private String name;

        /**
         * 是否持久化,默认true持久化,重启消息不会丢失
         */
        private boolean durable = true;

        /**
         * 是否具有排他性,默认false,可多个消费者消费同一个队列
         */
        private boolean exclusive = false;

        /**
         * 当消费者均断开连接,是否自动删除队列,默认false,不自动删除,避免消费者断开队列丢弃消息
         */
        private boolean autoDelete = false;

        /**
         * 绑定死信队列的队列名称
         */
        private String deadLetterQueue;

        /**
         * 绑定死信队列的交换机名称
         */
        private String deadLetterExchange;

        /**
         * 绑定死信队列的路由key
         */
        private String deadLetterRoutingKey;

        /**
         * 其他属性设置
         */
        private Map<String, Object> arguments;
    }

    @Data
    public static class Exchange {
        /**
         * 交换机类型,默认直连交换机
         */
        private String type = RabbitExchangeTypeEnum.DIRECT.getCode();

        /**
         * 交换机名称
         */
        private String name;

        /**
         * 是否持久化,默认true持久化,重启消息不会丢失
         */
        private boolean durable = true;

        /**
         * 当所有队绑定列均不在使用时,是否自动删除交换机
         */
        private boolean autoDelete = false;

        /**
         * 是否为txl延迟交换机
         */
        private boolean txlDelay = false;

        /**
         * 交换机其他参数
         */
        private Map<String, Object> arguments;

        /**
         * 头部交换机的参数
         */
        private Map<String, Object> headersMap;

        /**
         * 头部交换机的参数匹配类型,默认是所有参数都要匹配
         */
        private String headersType = HeadersTypeEnum.ALL.getCode();
    }

}

这是RabbitModuleProperties.java,上面有多个绑定配置

代码语言:javascript复制
package com.banmoon.config.properties;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;

import java.util.List;

@Data
@Configuration
@ConfigurationProperties("spring.rabbitmq")
public class RabbitModuleProperties {

    private List<RabbitModuleInfoProperties> modules;

}

2)配置文件

这个是配置,请注意交换机,队列前缀,这个就是保证不同开发之间消息隔离的关键

代码语言:javascript复制
spring:
  rabbitmq:
    host: rabbitMq服务地址
    port: rabbitMq服务端口
    username: 帐号
    password: 密码
    virtual-host: /
    # 确认消息是否发送至交换机
    publisher-confirm-type: correlated
    publisher-confirms: true
    # 确认消息是否发送至队列
    publisher-returns: true
    # 交换机,队列前缀
    prefix: whc
    modules: 
      - routingKey: test.direct.routingKey
        queue:
          name: test.direct.queue
        exchange: 
          name: test.direct.exchange
      - routingKey: test.fanout.router.key
        queues:
          - name: test.fanout.queue.a
          - name: test.fanout.queue.b
          - name: test.fanout.queue.c
        exchange: 
          name: test.fanout.exchange
          type: fanout
      - routingKey: test.topic.routerKey.#
        queue:
          name: test.topic.queue.log
        exchange: 
          name: test.topic.exchange
          type: topic
      - routingKey: test.topic.routerKey.text
        queue:
          name: test.topic.queue.text
        exchange: 
          name: test.topic.exchange
          type: topic
      - routingKey: test.topic.routerKey.image
        queue:
          name: test.topic.queue.image
        exchange: 
          name: test.topic.exchange
          type: topic
      - routingKey: test.headers.routerKey
        queue:
          name: test.headers.queue
        exchange: 
          name: test.headers.exchange
          type: headers
          headers-map:
            authentication: "半月无霜"
      - routingKey: test.ttl.routerKey
        queue:
          name: test.ttl.queue
          deadLetterQueue: test.ttl.death.queue
          deadLetterExchange: test.ttl.death.exchange
          deadLetterRoutingKey: test.ttl.death.routerKey
          arguments: 
            x-message-ttl: 5000
        exchange:
          name: test.ttl.exchange
      - routingKey: test.txl.routerKey
        queue:
          name: test.txl.queue
        exchange:
          name: test.txl.exchange
          txl-delay: true

3)初始化时创建队列、交换机

RabbitmqConfig.java;这是一个配置类,主要得到了AmqpAdmin对象、RabbitModuleProperties对象、以及定义的前缀

代码语言:javascript复制
package com.banmoon.config;

import com.banmoon.config.init.RabbitModuleInitializer;
import com.banmoon.config.properties.RabbitModuleProperties;
import com.banmoon.constant.RabbitmqConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Slf4j
@Configuration
public class RabbitmqConfig {

    @Value(RabbitmqConstant.RABBITMQ_PREFIX)
    private String rabbitPrefix;

    @Bean
    @ConditionalOnMissingBean
    public RabbitModuleInitializer rabbitModuleInitializer(AmqpAdmin amqpAdmin, RabbitModuleProperties rabbitModuleProperties) {
        return new RabbitModuleInitializer(amqpAdmin, rabbitPrefix, rabbitModuleProperties.getModules());
    }

}

RabbitModuleInitializer.java,初始化类,主要声明队列、交换机,以及绑定都在其中

代码语言:javascript复制
package com.banmoon.config.init;

import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.convert.Convert;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.StrUtil;
import com.banmoon.config.properties.RabbitModuleInfoProperties;
import com.banmoon.enums.HeadersTypeEnum;
import com.banmoon.enums.RabbitExchangeTypeEnum;
import com.banmoon.utils.stream.StreamUtil;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.SmartInitializingSingleton;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;

@Slf4j
@AllArgsConstructor
public class RabbitModuleInitializer implements SmartInitializingSingleton {

    private AmqpAdmin amqpAdmin;

    private String rabbitmqPrefix;

    private List<RabbitModuleInfoProperties> modules;

    @Override
    public void afterSingletonsInstantiated() {
        log.info("RabbitMQ 根据配置动态创建和绑定队列、交换机");
        declareRabbitModule();
    }

    /**
     * RabbitMQ 根据配置动态创建和绑定队列、交换机
     */
    private void declareRabbitModule() {
        if (CollUtil.isEmpty(modules)) {
            return;
        }
        for (RabbitModuleInfoProperties rabbitModuleInfo : modules) {
            // 配置参数校验
            configParamValidate(rabbitModuleInfo);
            // 队列
            List<Queue> queues = convertQueue(rabbitModuleInfo.getQueues(), rabbitModuleInfo.getQueue());
            // 交换机
            RabbitModuleInfoProperties.Exchange exchangeInfo = rabbitModuleInfo.getExchange();
            Exchange exchange = convertExchange(exchangeInfo);
            // 绑定关系
            String routingKey = rabbitmqPrefix   rabbitModuleInfo.getRoutingKey();
            // 创建队列
            queues.forEach(amqpAdmin::declareQueue);
            // 创建交换机
            amqpAdmin.declareExchange(exchange);
            // 队列 绑定 交换机
            queues.forEach(queue -> {
                Binding binding;
                if (RabbitExchangeTypeEnum.HEADERS.getCode().equals(exchange.getType())) {
                    HeadersExchange headersExchange = (HeadersExchange) exchange;
                    if (HeadersTypeEnum.ALL.getCode().equals(exchangeInfo.getHeadersType())) {
                        binding = BindingBuilder.bind(queue).to(headersExchange).whereAll(exchangeInfo.getHeadersMap()).match();
                    } else {
                        binding = BindingBuilder.bind(queue).to(headersExchange).whereAny(exchangeInfo.getHeadersMap()).match();
                    }
                } else {
                    binding = BindingBuilder.bind(queue).to(exchange).with(routingKey).and(null);
                }
                amqpAdmin.declareBinding(binding);
            });
        }
    }

    /**
     * RabbitMQ动态配置参数校验
     */
    public void configParamValidate(RabbitModuleInfoProperties rabbitModuleInfo) {
        String routingKey = rabbitModuleInfo.getRoutingKey();

        Assert.isTrue(StrUtil.isNotBlank(routingKey), "RoutingKey 未配置");

        Assert.isTrue(rabbitModuleInfo.getExchange() != null, "routingKey:{}未配置exchange", routingKey);
        Assert.isTrue(StrUtil.isNotBlank(rabbitModuleInfo.getExchange().getName()), "routingKey:{}未配置exchange的name属性", routingKey);

        Assert.isTrue(Objects.nonNull(rabbitModuleInfo.getQueue()) || CollUtil.isNotEmpty(rabbitModuleInfo.getQueues()), "routingKey:{}未配置queue", routingKey);
    }

    public List<Queue> convertQueue(List<RabbitModuleInfoProperties.Queue> queues, RabbitModuleInfoProperties.Queue queueInfo) {
        if (CollUtil.isNotEmpty(queues)) {
            if (Objects.nonNull(queueInfo)) {
                queues.add(queueInfo);
            }
            return StreamUtil.listToList(queues, this::convertQueue);
        }
        Queue queue = convertQueue(queueInfo);
        return CollUtil.newArrayList(queue);
    }

    /**
     * 转换生成RabbitMQ队列
     */
    public Queue convertQueue(RabbitModuleInfoProperties.Queue queue) {
        String name = rabbitmqPrefix   queue.getName();
        Map<String, Object> arguments = queue.getArguments();
        // 转换ttl的类型为long
        if (arguments != null && arguments.containsKey("x-message-ttl")) {
            arguments.put("x-message-ttl", Convert.toLong(arguments.get("x-message-ttl")));
        }
        // 设置队列的优先级
        if (arguments != null && arguments.containsKey("x-max-priority")) {
            arguments.put("x-max-priority", Convert.toLong(arguments.get("x-max-priority")));
        }
        // 是否需要绑定死信队列
        String deadLetterQueue = queue.getDeadLetterQueue();
        String deadLetterExchange = queue.getDeadLetterExchange();
        String deadLetterRoutingKey = queue.getDeadLetterRoutingKey();
        if (StrUtil.isNotBlank(deadLetterQueue) && StrUtil.isNotBlank(deadLetterExchange) && StrUtil.isNotBlank(deadLetterRoutingKey)) {
            if (arguments == null) {
                arguments = new HashMap<>();
            }
            deadLetterQueue = rabbitmqPrefix   deadLetterQueue;
            deadLetterExchange = rabbitmqPrefix   deadLetterExchange;
            deadLetterRoutingKey = rabbitmqPrefix   deadLetterRoutingKey;
            arguments.put("x-dead-letter-exchange", deadLetterExchange);
            arguments.put("x-dead-letter-routing-key", deadLetterRoutingKey);
            // 绑定死新队列
            Queue deadQueue = new Queue(deadLetterQueue, queue.isDurable(), queue.isExclusive(), queue.isAutoDelete(), arguments);
            amqpAdmin.declareQueue(deadQueue);
            Exchange deadExchange = new DirectExchange(deadLetterExchange, true, true, null);
            amqpAdmin.declareExchange(deadExchange);
            Binding binding = BindingBuilder.bind(deadQueue).to(deadExchange).with(deadLetterRoutingKey).and(null);
            amqpAdmin.declareBinding(binding);
        }
        return new Queue(name, queue.isDurable(), queue.isExclusive(), queue.isAutoDelete(), arguments);
    }

    /**
     * 转换生成RabbitMQ交换机
     */
    public Exchange convertExchange(RabbitModuleInfoProperties.Exchange exchange) {
        String type = exchange.getType();
        boolean txlDelay = exchange.isTxlDelay();
        String exchangeName = rabbitmqPrefix   exchange.getName();
        boolean isDurable = exchange.isDurable();
        boolean isAutoDelete = exchange.isAutoDelete();
        Map<String, Object> arguments = exchange.getArguments();
        if (txlDelay) {
            return RabbitExchangeTypeEnum.getTxlDelayExchangeByCode(type, exchangeName, isDurable, isAutoDelete, arguments);
        }
        return RabbitExchangeTypeEnum.getExchangeByCode(type, exchangeName, isDurable, isAutoDelete, arguments);
    }

}

4)其它代码

4.1)常量

这是一个常量类,里面记录着相关的队列名称,主要是给生产者、消费者使用的。太杂乱了不好打理,故专门弄了一个常量类来进行管理

代码语言:javascript复制
package com.banmoon.constant;

/**
 * 记录rabbitmq相关的队列,交换机,路由KEY名称
 *
 * @author banmoon
 * @date 2024/02/27 12:22:13
 */
public interface RabbitmqConstant {

    /**
     * 定义的前缀
     */
    String RABBITMQ_PREFIX = "#{'${spring.rabbitmq.prefix:}'.empty ? '' : '${spring.rabbitmq.prefix:}'   '.'}";

    /**
     * 直连测试队列
     */
    String DIRECT_TEST_QUEUE = RABBITMQ_PREFIX   "test.direct.queue";
    String DIRECT_TEST_EXCHANGE = RABBITMQ_PREFIX   "test.direct.exchange";
    String DIRECT_TEST_ROUTING_KEY = RABBITMQ_PREFIX   "test.direct.routingKey";


    /**
     * 扇形测试队列
     */
    String FANOUT_TEST_QUEUE_A = RABBITMQ_PREFIX   "test.fanout.queue.a";
    String FANOUT_TEST_QUEUE_B = RABBITMQ_PREFIX   "test.fanout.queue.b";
    String FANOUT_TEST_QUEUE_C = RABBITMQ_PREFIX   "test.fanout.queue.c";
    String FANOUT_TEST_EXCHANGE = RABBITMQ_PREFIX   "test.fanout.exchange";
    String FANOUT_TEST_ROUTER_KEY = RABBITMQ_PREFIX   "test.fanout.routerKey";

    /**
     * 主题测试队列
     */
    String TOPIC_TEST_QUEUE_LOG = RABBITMQ_PREFIX   "test.topic.queue.log";
    String TOPIC_TEST_QUEUE_TEXT = RABBITMQ_PREFIX   "test.topic.queue.text";
    String TOPIC_TEST_QUEUE_IMAGE = RABBITMQ_PREFIX   "test.topic.queue.image";
    String TOPIC_TEST_EXCHANGE = RABBITMQ_PREFIX   "test.topic.exchange";
    String TOPIC_TEST_ROUTER_KEY = RABBITMQ_PREFIX   "test.topic.routerKey.#";
    String TOPIC_TEST_ROUTER_KEY_TEXT = RABBITMQ_PREFIX   "test.topic.routerKey.text";
    String TOPIC_TEST_ROUTER_KEY_IMAGE = RABBITMQ_PREFIX   "test.topic.routerKey.image";

    /**
     * 头部测试队列
     */
    String HEADERS_TEST_QUEUE = RABBITMQ_PREFIX   "test.headers.queue";
    String HEADERS_TEST_EXCHANGE = RABBITMQ_PREFIX   "test.headers.exchange";
    String HEADERS_TEST_ROUTER_KEY = RABBITMQ_PREFIX   "test.headers.routerKey";

    /**
     * TTL测试队列
     */
    String TTL_TEST_QUEUE = RABBITMQ_PREFIX   "test.ttl.queue";
    String TTL_TEST_EXCHANGE = RABBITMQ_PREFIX   "test.ttl.exchange";
    String TTL_TEST_ROUTER_KEY = RABBITMQ_PREFIX   "test.ttl.routerKey";
    String TTL_TEST_DEATH_QUEUE = RABBITMQ_PREFIX   "test.ttl.death.queue";
    String TTL_TEST_DEATH_EXCHANGE = RABBITMQ_PREFIX   "test.ttl.death.exchange";
    String TTL_TEST_DEATH_ROUTER_KEY = RABBITMQ_PREFIX   "test.ttl.death.routerKey";

    /**
     * TXL测试队列
     */
    String TXL_TEST_QUEUE = RABBITMQ_PREFIX   "test.txl.queue";
    String TXL_TEST_EXCHANGE = RABBITMQ_PREFIX   "test.txl.exchange";
    String TXL_TEST_ROUTER_KEY = RABBITMQ_PREFIX   "test.txl.routerKey";

}
4.2)枚举代码

在上面的创建中,我们用到了两个枚举类,没什么可说的,直接贴出来

代码语言:javascript复制
package com.banmoon.enums;

import lombok.AllArgsConstructor;
import lombok.Getter;

/**
 * @author banmoon
 * @date 2024/03/04 16:35:27
 */
@Getter
@AllArgsConstructor
public enum HeadersTypeEnum {

    ANY("any", "任一"),
    ALL("all", "所有"),
    ;

    private final String code;
    private final String msg;

}
代码语言:javascript复制
package com.banmoon.enums;

import cn.hutool.core.map.MapUtil;
import lombok.AllArgsConstructor;
import lombok.Getter;
import org.springframework.amqp.core.*;

import java.util.Arrays;
import java.util.Map;
import java.util.Optional;

@Getter
@AllArgsConstructor
public enum RabbitExchangeTypeEnum {

    DIRECT("direct", "直连交换机"),
    FANOUT("fanout", "扇形交换机"),
    TOPIC("topic", "主题交换机"),
    HEADERS("headers", "头部交换机"),
    ;

    private final String code;
    private final String msg;

    public static RabbitExchangeTypeEnum getByCode(String code) {
        return getByCode(code, null);
    }

    public static RabbitExchangeTypeEnum getByCode(String code, RabbitExchangeTypeEnum defaultEnum) {
        return Arrays.stream(values()).filter(e -> e.getCode().equalsIgnoreCase(code)).findFirst().orElse(defaultEnum);
    }

    public static Exchange getExchangeByCode(String type, String exchangeName, boolean durable, boolean autoDelete, Map<String, Object> arguments) {
        AbstractExchange exchange = null;
        switch (RabbitExchangeTypeEnum.getByCode(type)) {
            case DIRECT:
                exchange = new DirectExchange(exchangeName, durable, autoDelete, arguments);
                break;
            case TOPIC:
                exchange = new TopicExchange(exchangeName, durable, autoDelete, arguments);
                break;
            case FANOUT:
                exchange = new FanoutExchange(exchangeName, durable, autoDelete, arguments);
                break;
            case HEADERS:
                exchange = new HeadersExchange(exchangeName, durable, autoDelete, arguments);
                break;
        }
        return exchange;
    }

    public static Exchange getTxlDelayExchangeByCode(String type, String exchangeName, boolean durable, boolean autoDelete, Map<String, Object> arguments) {
        RabbitExchangeTypeEnum typeEnum = RabbitExchangeTypeEnum.getByCode(type);
        Map<String, Object> argMap = Optional.ofNullable(arguments).orElse(MapUtil.newHashMap(2));
        argMap.put("x-delayed-type", typeEnum.getCode());
        return new CustomExchange(exchangeName, "x-delayed-message", durable, autoDelete, argMap);
    }
}

三、生产者、消费者

1)生产者

这是一个生产者抽象类,我自己写的生产者都需要继承它

代码语言:javascript复制
package com.banmoon.queues;

import cn.hutool.extra.spring.SpringUtil;
import com.banmoon.utils.JsonUtil;
import lombok.Data;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;

import java.util.HashMap;
import java.util.Map;

/**
 * 基础的生产者
 *
 * @author banmoon
 * @date 2024/02/28 11:44:28
 */
@Data
public abstract class AbstractProducer {

    private AmqpTemplate amqpTemplate;

    private String queueName;

    private String exchangeName;

    private String routingKey;

    public AbstractProducer(AmqpTemplate amqpTemplate, String queueName, String exchangeName, String routingKey) {
        this.amqpTemplate = amqpTemplate;
        // TODO: 2024/3/2 这边还要进行修改
        this.queueName = SpringUtil.getProperty(queueName);
        this.exchangeName = SpringUtil.getProperty(exchangeName);
        this.routingKey = SpringUtil.getProperty(routingKey);
    }

    public AbstractProducer(AmqpTemplate amqpTemplate) {
        this.amqpTemplate = amqpTemplate;
    }

    public void send(Object obj) {
        String msg = JsonUtil.toJSONString(obj);
        amqpTemplate.convertAndSend(exchangeName, routingKey, msg);
    }

    public void sendTtlMesssage(Object obj, Integer delayMillisecond) {
        Map<String, Object> map = new HashMap<>(2);
        map.put("x-message-ttl", delayMillisecond);
        send(obj, map);
    }

    public void sendTxlMesssage(Object obj, Integer delayMillisecond) {
        send(obj, message -> {
            MessageProperties properties = message.getMessageProperties();
            properties.setDelay(delayMillisecond);
            return message;
        });
    }

    public void send(Object obj, Map<String, Object> headers) {
        String msg = JsonUtil.toJSONString(obj);
        amqpTemplate.convertAndSend(exchangeName, routingKey, msg, message -> {
            MessageProperties properties = message.getMessageProperties();
            properties.setHeaders(headers);
            return message;
        });
    }

    public void send(Object obj, MessagePostProcessor messagePostProcessor) {
        String msg = JsonUtil.toJSONString(obj);
        amqpTemplate.convertAndSend(exchangeName, routingKey, msg, messagePostProcessor);
    }

}

比如说直连交换机队列的生产者

代码语言:javascript复制
package com.banmoon.queues.producer;

import com.banmoon.constant.RabbitmqConstant;
import com.banmoon.queues.AbstractProducer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

/**
 * 直连测试队列生产者
 *
 * @author banmoon
 * @date 2024/02/28 16:39:13
 */
@Slf4j
@Component
public class TestDirectProducer extends AbstractProducer {

    public TestDirectProducer(AmqpTemplate amqpTemplate) {
        super(amqpTemplate);
    }

    @Override
    @Value(RabbitmqConstant.DIRECT_TEST_QUEUE)
    public void setQueueName(String queueName) {
        super.setQueueName(queueName);
    }

    @Override
    @Value(RabbitmqConstant.DIRECT_TEST_EXCHANGE)
    public void setExchangeName(String exchangeName) {
        super.setExchangeName(exchangeName);
    }

    @Override
    @Value(RabbitmqConstant.DIRECT_TEST_ROUTING_KEY)
    public void setRoutingKey(String routingKey) {
        super.setRoutingKey(routingKey);
    }
}

2)消费者

代码语言:javascript复制
package com.banmoon.queues.consumer;

import com.banmoon.constant.RabbitmqConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 直连测试队列消费者
 *
 * @author banmoon
 * @date 2024/02/27 12:08:12
 */
@Slf4j
@Component
public class TestDirectConsumer {

    @RabbitListener(queues = RabbitmqConstant.DIRECT_TEST_QUEUE)
    public void test(String message) {
        log.info("直连测试队列消费者:{}", message);
    }

}

四、最后

关于上面几种交换机类型,以及TTL死信队列、TXL延迟队列都有做了配置示例。

主要是没有生产者、消费者的代码示例,相信大家都知道怎么写了。

那个,关于生产者的那个抽象类AbstractProducer.java 有一个地方一直没有调通,就是如何将spel表达式获取配置文件中的配置信息 只能退而求其次,使用@Value注解来进行获取了。 相信注解能获取的,一定有注解解析器,这边也一定可以的。 又要看源码喽! 还有那个开发环境队列隔离问题 有些公司开发使用的是同一个配置文件,这样会导致前缀都是同一个,那样设置前缀就没有意义了。 其实可以这样,如果是使用nacos的远端配置的,可以创建自己的命名空间,修改前缀。 如果是在本地resources文件夹里面,可以使用maven编译后替换变量的那个功能。 如何读取到maven中profile设置的参数 | 半月无霜 (banmoon.top) 上面两种方法,都是可以实现的

我是半月,你我一同共勉!!!

0 人点赞