RabbitAdmin
RabbitAdmin类可以很好的操作RabbitMQ,在spring中直接进行注入即可
代码语言:javascript复制@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
rabbitAdmin.setAutoStartUp(true);
return rabbitAdmin;
}
- 注意:
autoStartUp必须设置为true
,否则Spring容器不会加载RabbitAdmin类 - RabbitAdmin底层实现就是从Spring容器中获取Exchange、Bingding、RoutingKey以及Queue的@Bean方式的声明
- 然后使用RabbitTemplate的execute方法指定对应的声明、修改、删除等一系列RabbitMQ基础功能操作。
- 例如:添加一个交换机、删除一个绑定、清空一个队列的消息等等就要使用的RabbitAdmin
实例:
- 添加maven依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.6.5</version>
</dependency>
- 编写RabbitMQConfig类
package com.pyy.spring;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
@Configuration
@ComponentScan({"com.pyy.spring.*"})
public class RabbitMQConfig {
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setAddresses("192.168.43.113:5672");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("123456");
connectionFactory.setVirtualHost("/");
return connectionFactory;
}
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
rabbitAdmin.setAutoStartup(true);
return rabbitAdmin;
}
}
- 编写测试类
@Test
public void testAdmin() {
rabbitAdmin.declareExchange(new DirectExchange("test.direct", false, false));
rabbitAdmin.declareExchange(new TopicExchange("test.topic", false, false));
rabbitAdmin.declareExchange(new FanoutExchange("test.fanout", false, false));
rabbitAdmin.declareQueue(new Queue("test.direct.queue", false));
rabbitAdmin.declareQueue(new Queue("test.topic.queue", false));
rabbitAdmin.declareQueue(new Queue("test.fanout.queue", false));
rabbitAdmin.declareBinding(new Binding("test.direct.queue",
Binding.DestinationType.QUEUE, "test.direct", "direct", new HashMap<>()));
rabbitAdmin.declareBinding(BindingBuilder.bind(
new Queue("test.topic.queue1", false)) // 直接创建队列
.to(new TopicExchange("test.topic", false, false))// 直接创建交换机建立关系
.with("user.#"));// 直接指定路由键
rabbitAdmin.declareBinding(BindingBuilder.bind(
new Queue("test.topic.queue1", false)) // 直接创建队列
.to(new FanoutExchange("test.topic", false, false)));// 直接创建交换机建立关系
}
SpringAMQP-RabbitMQ声明式配置使用
- 在Rabbit基础API里面声明一个Exchange、声明一个绑定、一个队列:
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false,null);
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, exchangeName, routingkey);
- 使用SpringAMQP的声明,就需要使用SpringAMQP的如下模式,即声明@Bean方式:
@Bean
public TopicExchange exchange() {
return new TopicExchange("topicExchange", true, false);
}
@Bean
public Queue queue() {
return new Queue("queue", true);
}
@Bean
public Binding binding() {
return new BindingBuilder.bind(queue()).to(exchange()).with("spring.*");
}
SpringAMQP消息模板组件-RabbitTemplate实战
- RabbitTemplate, 即消息模板 我们在与SpringAMQP整合的时候进行发送消息的关键类
- 该类提供了丰富的发送消息方法,包括
可靠性消息投递方法
、回调监听消息接口ConfirmCallback
、返回值确认接口ReturnCallback
等等。同样我们需要进行注入到Spring容器中,然后直接使用。 - 在与Spring整合时需要实例化,当时在与SpringBoot整合时,在配置文件中添加配置即可。
配置注入:
代码语言:javascript复制 @Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
//rabbitTemplate.setConfirmCallback(null);
// rabbitTemplate.setReturnCallback(null);
return rabbitTemplate;
}
编写测试类:
代码语言:javascript复制 @Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSendMessage() {
// 1 创建消息
MessageProperties messageProperties = new MessageProperties();
messageProperties.getHeaders().put("desc", "信息描述:。。。");
messageProperties.getHeaders().put("type", "自定义消息类型");
Message message = new Message("hello Rabbitmq".getBytes(), messageProperties);
// 2 发送消息
rabbitTemplate.convertAndSend("test.topic.exchange", "user.#", message, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
System.out.println("------添加额外设置--------");
message.getMessageProperties().getHeaders().put("desc", "额外修改的信息描述");
message.getMessageProperties().getHeaders().put("attr", "额外新加的描述");
return message;
}
});
}
@Test
public void testSendMessage2() {
// 1 创建消息
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType("text/plain");
Message message = new Message("消息1234".getBytes(), messageProperties);
// 2 发送消息
rabbitTemplate.convertAndSend("test.topic.exchange", "user.#", message);
rabbitTemplate.convertAndSend("test.topic.exchange", "user.#", "hello object message send");
rabbitTemplate.convertAndSend("test.topic.exchange", "user.abc", "12234");
rabbitTemplate.send("test.topic.exchange", "user.#", message);
}
SpringAMQP消息容器-SimpleMessageListenerContainer详解
简单消息监听容器
- 这个类非常的强大,我们可以对他进行很多设置,对于消费者的配置项,这个类都可以满足
- 监听队列(多个队列)、自动启动、自动声明功能
- 设置事务特性、事务管理器、事务属性、事务容量(并发)、是否开启事务、回滚消息等
- 设置消费者数量、最小最大数量、批量消费配置
- 设置消息确认(签收)和自动确认模式、是否重回队列、异常捕获handler函数
- 设置消费者标签生成策略、是否独占模式、消费者属性等
- 设置具体的监听器、消息转换器等等。
注意:
SimpleMessageListenerContainer 可以进行动态设置,比如在运行中的应用可以动态修改其消费者数量大小、接收消息的模式等。
很多基于RabbitMQ的自定制的后端管控台在进行动态设置的时候,也是根据这一特性实现的。所有可以看出SpringAMQP非常强大。
代码语言:javascript复制@Bean
public Queue queue001() {
Queue queue = new Queue("queue001", true);
return queue;
}
@Bean
public Queue queue002() {
Queue queue = new Queue("queue002", true);
return queue;
}
@Bean
public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueues(queue001(), queue002());
container.setConcurrentConsumers(1);
container.setMaxConcurrentConsumers(5);
container.setDefaultRequeueRejected(false);
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
container.setConsumerTagStrategy(new ConsumerTagStrategy() {
@Override
public String createConsumerTag(String queue) {
return queue "_" UUID.randomUUID().toString();
}
});
container.setMessageListener(new ChannelAwareMessageListener() {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
String msg = new String(message.getBody());
System.out.println("---消费者---" msg);
}
});
return container;
}
思考问题:
SimpleMessageListenerContainer为什么可以动态感知配置变更?
SpringAMQP消息适配器-MessageListenerAdapter使用
MessageListenerAdapter 即消息监听适配器
MessageDelegate:
代码语言:javascript复制package com.pyy.spring;
public class MessageDelegate {
/**
* 方法名称固定、参数类型固定
* @param messageBody
*/
public void hadnleMessage(byte[] messageBody) {
System.out.println("默认方法,消息内容:" new String(messageBody));
}
public void consumeMessage(byte[] messageBody) {
System.out.println("字节数组方法,消息内容:" new String(messageBody));
}
public void consumeMessage(String messageBody) {
System.out.println("字符串方法,消息内容:" new String(messageBody));
}
}
TextConverter:
代码语言:javascript复制package com.pyy.spring;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.MessageConverter;
public class TextMessageConverter implements MessageConverter {
@Override
public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
return new Message(object.toString().getBytes(), messageProperties);
}
@Override
public Object fromMessage(Message message) throws MessageConversionException {
String contentType = message.getMessageProperties().getContentType();
if(null != contentType && contentType.contains("text")) {
return new String(message.getBody());
}
return message.getBody();
}
}
RabbitConfig:
代码语言:javascript复制@Bean
public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueues(queue001(), queue002());
container.setConcurrentConsumers(1);
container.setMaxConcurrentConsumers(5);
container.setDefaultRequeueRejected(false);
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
container.setConsumerTagStrategy(new ConsumerTagStrategy() {
@Override
public String createConsumerTag(String queue) {
return queue "_" UUID.randomUUID().toString();
}
});
// container.setMessageListener(new ChannelAwareMessageListener() {
// @Override
// public void onMessage(Message message, Channel channel) throws Exception {
// String msg = new String(message.getBody());
// System.out.println("---消费者---" msg);
// }
// });
// 适配器方式,默认方法名称:handleMessage
// 可以自定义方法名称:
// 也可以添加一个转换器:从字节数组转换为String
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod("consumeMessage");// 设置默认监听方法名称
adapter.setMessageConverter(new TextMessageConverter());
container.setMessageListener(adapter);
return container;
}
- 通过messgeListenerAdapter的代码我们可以看出如下核心属性:
-
defaultListenerMethod
默认监听方法名称:用于设置监听方法名称 -
Delegate
委托对象:实际真是的委托对象,用于处理消息
SpringAMQP消息转换器-MessageConverter
- 我们在进行发送消息时候,正常情况下消息体为二进制数据方式进行传输,如果希望内部帮我们进行转换,或者指定自定义的转换器,就需要用到MessageConverter
- 自定义常用转换器:
MessageConverter
,一般来讲都需要实现这个接口 重写下面两个方法:toMessage
: java对象转换为MessageformMessage
: Message对象转换为java对象 - Json转换器:
Jackson2JsonMessageConverter
:可以进行java对象的转换功能 DefaultJackson2JavaTypeMapper
映射器:可以进行java对象的映射关系- 自定义二进制转换器:比如图片类型、PDF、PPT、流媒体