整合RabbitMQ&Spring

2022-04-13 15:34:12 浏览数 (1)

RabbitAdmin

RabbitAdmin类可以很好的操作RabbitMQ,在spring中直接进行注入即可

代码语言:javascript复制
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
  RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
  rabbitAdmin.setAutoStartUp(true);
  return rabbitAdmin;
}
  • 注意: autoStartUp必须设置为true,否则Spring容器不会加载RabbitAdmin类
  • RabbitAdmin底层实现就是从Spring容器中获取Exchange、Bingding、RoutingKey以及Queue的@Bean方式的声明
  • 然后使用RabbitTemplate的execute方法指定对应的声明、修改、删除等一系列RabbitMQ基础功能操作。
  • 例如:添加一个交换机、删除一个绑定、清空一个队列的消息等等就要使用的RabbitAdmin

实例:

  1. 添加maven依赖
代码语言:javascript复制
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <dependency>
    <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>3.6.5</version>
    </dependency>
  1. 编写RabbitMQConfig类
代码语言:javascript复制
package com.pyy.spring;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;

@Configuration
@ComponentScan({"com.pyy.spring.*"})
public class RabbitMQConfig {

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setAddresses("192.168.43.113:5672");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("123456");
        connectionFactory.setVirtualHost("/");
        return connectionFactory;
    }

    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        rabbitAdmin.setAutoStartup(true);
        return rabbitAdmin;
    }
}
  1. 编写测试类
代码语言:javascript复制
@Test
    public void testAdmin() {
        rabbitAdmin.declareExchange(new DirectExchange("test.direct", false, false));

        rabbitAdmin.declareExchange(new TopicExchange("test.topic", false, false));

        rabbitAdmin.declareExchange(new FanoutExchange("test.fanout", false, false));

        rabbitAdmin.declareQueue(new Queue("test.direct.queue", false));

        rabbitAdmin.declareQueue(new Queue("test.topic.queue", false));

        rabbitAdmin.declareQueue(new Queue("test.fanout.queue", false));

        rabbitAdmin.declareBinding(new Binding("test.direct.queue",
                Binding.DestinationType.QUEUE, "test.direct", "direct", new HashMap<>()));


        rabbitAdmin.declareBinding(BindingBuilder.bind(
                new Queue("test.topic.queue1", false))  // 直接创建队列
                .to(new TopicExchange("test.topic", false, false))// 直接创建交换机建立关系
                .with("user.#"));// 直接指定路由键

        rabbitAdmin.declareBinding(BindingBuilder.bind(
                new Queue("test.topic.queue1", false))  // 直接创建队列
                .to(new FanoutExchange("test.topic", false, false)));// 直接创建交换机建立关系
    }

SpringAMQP-RabbitMQ声明式配置使用

  • 在Rabbit基础API里面声明一个Exchange、声明一个绑定、一个队列: channel.exchangeDeclare(exchangeName, exchangeType, true, false, false,null); channel.queueDeclare(queueName, false, false, false, null); channel.queueBind(queueName, exchangeName, routingkey);
  • 使用SpringAMQP的声明,就需要使用SpringAMQP的如下模式,即声明@Bean方式:
代码语言:javascript复制
@Bean
public TopicExchange exchange() {
  return new TopicExchange("topicExchange", true, false);
}

@Bean
public Queue queue() {
  return new Queue("queue", true);
}

@Bean
public Binding binding() {
  return new BindingBuilder.bind(queue()).to(exchange()).with("spring.*");
}

SpringAMQP消息模板组件-RabbitTemplate实战

  • RabbitTemplate, 即消息模板 我们在与SpringAMQP整合的时候进行发送消息的关键类
  • 该类提供了丰富的发送消息方法,包括可靠性消息投递方法回调监听消息接口ConfirmCallback返回值确认接口ReturnCallback等等。同样我们需要进行注入到Spring容器中,然后直接使用。
  • 在与Spring整合时需要实例化,当时在与SpringBoot整合时,在配置文件中添加配置即可。

配置注入:

代码语言:javascript复制
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        //rabbitTemplate.setConfirmCallback(null);
        // rabbitTemplate.setReturnCallback(null);
        return rabbitTemplate;
    }

编写测试类:

代码语言:javascript复制
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSendMessage() {
        // 1 创建消息
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.getHeaders().put("desc", "信息描述:。。。");
        messageProperties.getHeaders().put("type", "自定义消息类型");

        Message message = new Message("hello Rabbitmq".getBytes(), messageProperties);

        // 2 发送消息
        rabbitTemplate.convertAndSend("test.topic.exchange", "user.#", message, new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                System.out.println("------添加额外设置--------");
                message.getMessageProperties().getHeaders().put("desc", "额外修改的信息描述");
                message.getMessageProperties().getHeaders().put("attr", "额外新加的描述");
                return message;
            }
        });
    }

    @Test
    public void testSendMessage2() {
        // 1 创建消息
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setContentType("text/plain");
        
        Message message = new Message("消息1234".getBytes(), messageProperties);

        // 2 发送消息
        rabbitTemplate.convertAndSend("test.topic.exchange", "user.#", message);

        rabbitTemplate.convertAndSend("test.topic.exchange", "user.#", "hello object message send");

        rabbitTemplate.convertAndSend("test.topic.exchange", "user.abc", "12234");
        
        rabbitTemplate.send("test.topic.exchange", "user.#", message);
    }

SpringAMQP消息容器-SimpleMessageListenerContainer详解

简单消息监听容器

  • 这个类非常的强大,我们可以对他进行很多设置,对于消费者的配置项,这个类都可以满足
  • 监听队列(多个队列)、自动启动、自动声明功能
  • 设置事务特性、事务管理器、事务属性、事务容量(并发)、是否开启事务、回滚消息等
  • 设置消费者数量、最小最大数量、批量消费配置
  • 设置消息确认(签收)和自动确认模式、是否重回队列、异常捕获handler函数
  • 设置消费者标签生成策略、是否独占模式、消费者属性等
  • 设置具体的监听器、消息转换器等等。

注意:SimpleMessageListenerContainer 可以进行动态设置,比如在运行中的应用可以动态修改其消费者数量大小、接收消息的模式等。

很多基于RabbitMQ的自定制的后端管控台在进行动态设置的时候,也是根据这一特性实现的。所有可以看出SpringAMQP非常强大。

代码语言:javascript复制
@Bean
    public Queue queue001() {
        Queue queue = new Queue("queue001", true);
        return queue;
    }

    @Bean
    public Queue queue002() {
        Queue queue = new Queue("queue002", true);
        return queue;
    }

    @Bean
    public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        container.setQueues(queue001(), queue002());
        container.setConcurrentConsumers(1);
        container.setMaxConcurrentConsumers(5);
        container.setDefaultRequeueRejected(false);
        container.setAcknowledgeMode(AcknowledgeMode.AUTO);
        container.setConsumerTagStrategy(new ConsumerTagStrategy() {
            @Override
            public String createConsumerTag(String queue) {
                return queue   "_"   UUID.randomUUID().toString();
            }
        });
        container.setMessageListener(new ChannelAwareMessageListener() {
            @Override
            public void onMessage(Message message, Channel channel) throws Exception {
                String msg = new String(message.getBody());
                System.out.println("---消费者---"   msg);
            }
        });
        return container;
    }

思考问题:SimpleMessageListenerContainer为什么可以动态感知配置变更?

SpringAMQP消息适配器-MessageListenerAdapter使用

MessageListenerAdapter 即消息监听适配器

MessageDelegate:

代码语言:javascript复制
package com.pyy.spring;
public class MessageDelegate {

    /**
     * 方法名称固定、参数类型固定
     * @param messageBody
     */
    public void hadnleMessage(byte[] messageBody) {
        System.out.println("默认方法,消息内容:"   new String(messageBody));
    }

    public void consumeMessage(byte[] messageBody) {
        System.out.println("字节数组方法,消息内容:"   new String(messageBody));
    }

    public void consumeMessage(String messageBody) {
        System.out.println("字符串方法,消息内容:"   new String(messageBody));
    }
}

TextConverter:

代码语言:javascript复制
package com.pyy.spring;


import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.MessageConverter;

public class TextMessageConverter implements MessageConverter {


    @Override
    public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
        return new Message(object.toString().getBytes(), messageProperties);
    }

    @Override
    public Object fromMessage(Message message) throws MessageConversionException {
        String contentType = message.getMessageProperties().getContentType();
        if(null != contentType && contentType.contains("text")) {
            return new String(message.getBody());
        }
        return message.getBody();
    }
}

RabbitConfig:

代码语言:javascript复制
@Bean
    public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        container.setQueues(queue001(), queue002());
        container.setConcurrentConsumers(1);
        container.setMaxConcurrentConsumers(5);
        container.setDefaultRequeueRejected(false);
        container.setAcknowledgeMode(AcknowledgeMode.AUTO);
        container.setConsumerTagStrategy(new ConsumerTagStrategy() {
            @Override
            public String createConsumerTag(String queue) {
                return queue   "_"   UUID.randomUUID().toString();
            }
        });
//        container.setMessageListener(new ChannelAwareMessageListener() {
//            @Override
//            public void onMessage(Message message, Channel channel) throws Exception {
//                String msg = new String(message.getBody());
//                System.out.println("---消费者---"   msg);
//            }
//        });

        // 适配器方式,默认方法名称:handleMessage
        // 可以自定义方法名称:
        // 也可以添加一个转换器:从字节数组转换为String
        MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
        adapter.setDefaultListenerMethod("consumeMessage");// 设置默认监听方法名称
        adapter.setMessageConverter(new TextMessageConverter());

        container.setMessageListener(adapter);
        return container;
    }
  • 通过messgeListenerAdapter的代码我们可以看出如下核心属性:
  • defaultListenerMethod默认监听方法名称:用于设置监听方法名称
  • Delegate委托对象:实际真是的委托对象,用于处理消息

SpringAMQP消息转换器-MessageConverter

  • 我们在进行发送消息时候,正常情况下消息体为二进制数据方式进行传输,如果希望内部帮我们进行转换,或者指定自定义的转换器,就需要用到MessageConverter
  • 自定义常用转换器:MessageConverter,一般来讲都需要实现这个接口 重写下面两个方法: toMessage: java对象转换为Message formMessage: Message对象转换为java对象
  • Json转换器:Jackson2JsonMessageConverter:可以进行java对象的转换功能
  • DefaultJackson2JavaTypeMapper映射器:可以进行java对象的映射关系
  • 自定义二进制转换器:比如图片类型、PDF、PPT、流媒体

0 人点赞