Spring AMQP特性
◆ 异步消息监听容器 ◆ 原生提供RabbitTemplate,方便收发消息 ◆ 原生提供RabbitAdmin,方便队列、交换机声明 ◆ Spring Boot Config原生支持RabbitMQ
1. 异步消息监听容器
◆ 原始实现:自己实现线程池、回调方法,并注册回调方法 ◆ SpringBoot:自动实现可配置的线程池,并自动注册回调方法,只需实现回调方法
2. RabbitTemplate
相比basicPublish,功能更加强大,能自动实现消息转换等功能.
3. RabbitAdmin
◆ 声明式提供队列、交换机、绑定关系的注册方法 ◆ 甚至不需要显式的注册代码
4. Spring Boot Config
◆ 充分发挥Spring Boot约定大于配置的特性 ◆ 可以隐式建立Connection、Channel
利用RebbitAdmin快速配置Rabbit服务
在RabbitConfig中配置RabbitAdmin用来管理RabbitMQ
创建方法: ConnectionFactory connectionFactory = new CachingConnectionFactory() ; RabbitAdmin rabbitAdmin = new RabbitAdmin (connectionFactory) ;
RabbitAdmin功能 declareExchange: 创建交换机 deleteExchange: 删除交换机 declareQueue: 创建队列 deleteQueue: 删除队列 purgeQueue: 清空队列 declareBinding: 新建绑定关系 removeBinding: 删除绑定关系 getQueueProperties: 查询队列属性
代码实践
使用springboot的amqp包在配置类中init RabbitMQ配置
代码语言:javascript复制package cn.kt.food.orderservicemanager.config;
import cn.kt.food.orderservicemanager.service.OrderMessageService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author tao
* @date 2022-03-24 22:58
* 概要: 自动执行OrderMessageService中handleMessage方法(配置了RabbitMQ的交换机等)
*/
@Slf4j
@Configuration
public class RabbitConfig {
@Value("${rabbitmq.host}")
public String host;
@Autowired
OrderMessageService orderMessageService;
//配置类中的@Autowired方法会被自动调用
@Autowired
public void startListenMessage() throws IOException, TimeoutException, InterruptedException {
log.info("【自动执行了-handleMessage】");
// orderMessageService.handleMessage();
}
//配置类中的@Autowired方法会被自动调用
@Autowired
public void initRabbit() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(host);
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
/* *********************** 声明交换机 *********************** */
/*---------------------restaurant微服务(声明)---------------------*/
// 声明交换机
Exchange exchange = new DirectExchange("exchange.order.restaurant");
rabbitAdmin.declareExchange(exchange);
// 声明队列
Queue queue = new Queue("queue.order");
rabbitAdmin.declareQueue(queue);
// 绑定交换机和队列
Binding binding = new Binding(
"queue.order",
Binding.DestinationType.QUEUE,
"exchange.order.restaurant",
"key.order",
null
);
rabbitAdmin.declareBinding(binding);
/*---------------------deliveryman微服务---------------------*/
exchange = new DirectExchange("exchange.order.deliveryman");
rabbitAdmin.declareExchange(exchange);
// 绑定交换机和队列
binding = new Binding(
"queue.order",
Binding.DestinationType.QUEUE,
"exchange.order.deliveryman",
"key.order",
null
);
rabbitAdmin.declareBinding(binding);
/*---------------------settlement微服务---------------------*/
exchange = new FanoutExchange("exchange.order.settlement");
rabbitAdmin.declareExchange(exchange);
exchange = new FanoutExchange("exchange.settlement.order");
rabbitAdmin.declareExchange(exchange);
// 绑定交换机和队列
binding = new Binding(
"queue.order",
Binding.DestinationType.QUEUE,
"exchange.settlement.order",
"key.order",
null
);
rabbitAdmin.declareBinding(binding);
/*---------------------settlement微服务---------------------*/
exchange = new TopicExchange("exchange.order.reward");
rabbitAdmin.declareExchange(exchange);
// 绑定交换机和队列
binding = new Binding(
"queue.order",
Binding.DestinationType.QUEUE,
"exchange.order.reward",
"key.order",
null
);
rabbitAdmin.declareBinding(binding);
}
}
利用RabbitAdmin简化配置Rabbit服务流程
RabbitAdmin声明式配置
◆ 将Exchange、Queue、 Binding声明为Bean ◆ 再将RabbitAdmin声明为Bean ◆ Exchange、Queue、 Binding即可 自动创建
RabbitAdmin声明式配置的优点
◆ 将声明和创建工作分开,解耦多人工作 ◆ 不需显式声明,减少代码量,减少Bug
代码实践
使用springboot的amqp包在配置类中使用@Bean容器进行配置
代码语言:javascript复制package cn.kt.food.orderservicemanager.config;
import cn.kt.food.orderservicemanager.service.OrderMessageService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author tao
* @date 2022-03-24 22:58
* 概要: 自动执行OrderMessageService中handleMessage方法(配置了RabbitMQ的交换机等)
*/
@Slf4j
@Configuration
public class RabbitConfig {
@Value("${rabbitmq.host}")
public String host;
@Autowired
OrderMessageService orderMessageService;
//配置类中的@Autowired方法会被自动调用
@Autowired
public void startListenMessage() throws IOException, TimeoutException, InterruptedException {
log.info("【自动执行了-handleMessage】");
orderMessageService.handleMessage();
}
/*---------------------restaurant微服务(声明)---------------------*/
@Bean
public Exchange exchange1() {
return new DirectExchange("exchange.order.restaurant");
}
@Bean
public Queue queue1() {
return new Queue("queue.order");
}
@Bean
public Binding binding1() {
return new Binding(
"queue.order",
Binding.DestinationType.QUEUE,
"exchange.order.restaurant",
"key.order",
null
);
}
/*---------------------deliveryman微服务---------------------*/
@Bean
public Exchange exchange2() {
return new DirectExchange("exchange.order.deliveryman");
}
@Bean
public Binding binding2() {
return new Binding(
"queue.order",
Binding.DestinationType.QUEUE,
"exchange.order.deliveryman",
"key.order",
null
);
}
/*---------------------settlement微服务---------------------*/
@Bean
public Exchange exchange3() {
return new FanoutExchange("exchange.order.settlement");
}
@Bean
public Exchange exchange4() {
return new FanoutExchange("exchange.settlement.order");
}
@Bean
public Binding binding3() {
return new Binding(
"queue.order",
Binding.DestinationType.QUEUE,
"exchange.order.settlement",
"key.order",
null
);
}
/*---------------------reward---------------------*/
@Bean
public Exchange exchange5() {
return new TopicExchange("exchange.order.reward");
}
@Bean
public Binding binding4() {
return new Binding(
"queue.order",
Binding.DestinationType.QUEUE,
"exchange.order.reward",
"key.order",
null
);
}
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(host);
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
return connectionFactory;
}
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
// RabbitAdmin设置自动执行
rabbitAdmin.setAutoStartup(true);
return rabbitAdmin;
}
}
利用RabbitTemplate快速消息发送
RabbitTemplate特点
◆ RabbitTemplate与RestTemplate类似,使用了模板方法设计模式 ◆ RabbitTemplate提供 了丰富的功能,方便消息收发 ◆ RabbitTemplate可以显式传入配置也可以隐式声明配置
代码实践
步骤:
- 配置类中声明RabbitTemplate,并且设置消息返回时回调和确认消息收到回调的方法
- 在ConnectionFactory中开启RabbitTemplate发送者确认,消息返回机制
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(host);
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
// 开启RabbitTemplate发送者确认,消息返回机制
connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
connectionFactory.setPublisherReturns(true);
// 立即执行
connectionFactory.createConnection();
return connectionFactory;
}
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
// RabbitAdmin设置自动执行
rabbitAdmin.setAutoStartup(true);
return rabbitAdmin;
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
// 托管:callback消息返回必须把托管打开
rabbitTemplate.setMandatory(true);
// 消息返回时回调的方法
rabbitTemplate.setReturnsCallback(returnedMessage -> {
log.info("message:{},replyCode:{},replyText:{},exchange:{},routingKey:{},",
returnedMessage.getMessage(), returnedMessage.getReplyCode(), returnedMessage.getReplyText(), returnedMessage.getExchange(), returnedMessage.getRoutingKey());
});
// 确认消息收到回调
rabbitTemplate.setConfirmCallback((correlationData, b, s) -> {
log.info("correlationData:{},ack:{},cause:{}", correlationData, b, s);
});
return rabbitTemplate;
}
- 在业务逻辑代码中使用RabbitTemplate发送消息
package cn.kt.food.orderservicemanager.service;
import cn.kt.food.orderservicemanager.dao.OrderDetailDao;
import cn.kt.food.orderservicemanager.dto.OrderMessageDTO;
import cn.kt.food.orderservicemanager.enums.OrderStatusEnum;
import cn.kt.food.orderservicemanager.po.OrderDetailPO;
import cn.kt.food.orderservicemanager.vo.OrderCreateVO;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.rabbitmq.client.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Correlation;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.util.Date;
import java.util.concurrent.TimeoutException;
/**
* @author tao
* @date 2022-03-24 21:13
* 概要: 处理用户关于订单的业务请求
*/
@Slf4j
@Service
public class OrderService {
@Autowired
private OrderDetailDao orderDetailDao;
@Autowired
RabbitTemplate rabbitTemplate;
@Value("${rabbitmq.host}")
public String host;
@Value("${rabbitmq.exchange}")
public String exchangeName;
private ObjectMapper objectMapper = new ObjectMapper();
// 创建订单
public void createOrder(OrderCreateVO orderCreateVO) throws IOException, TimeoutException, InterruptedException {
log.info("createOrder:orderCreateVO:{}", orderCreateVO);
OrderDetailPO orderPO = new OrderDetailPO();
orderPO.setAddress(orderCreateVO.getAddress());
orderPO.setAccountId(orderCreateVO.getAccountId());
orderPO.setProductId(orderCreateVO.getProductId());
orderPO.setStatus(OrderStatusEnum.ORDER_CREATING);
orderPO.setDate(new Date());
// 会返回数据库自动生成的数据
orderDetailDao.insert(orderPO);
OrderMessageDTO orderMessageDTO = new OrderMessageDTO();
orderMessageDTO.setOrderId(orderPO.getId());
orderMessageDTO.setProductId(orderPO.getProductId());
orderMessageDTO.setAccountId(orderCreateVO.getAccountId());
String messageToSend = objectMapper.writeValueAsString(orderMessageDTO);
MessageProperties messageProperties = new MessageProperties();
// 设置过期时间等
messageProperties.setExpiration("15000");
/* -------------- 使用RabbitTemplate发送消息 ----------------- */
Message message = new Message(messageToSend.getBytes(), messageProperties);
// 设置发送消息的对应关系信息
CorrelationData correlationData = new CorrelationData();
correlationData.setId(orderPO.getId().toString());
rabbitTemplate.send(
"exchange.order.restaurant",
"key.restaurant",
message,
correlationData
);
// 消息转换并发送
/*rabbitTemplate.convertAndSend(
"exchange.order.restaurant",
"key.restaurant",
messageToSend
);*/
// 也可以使用原生
/*rabbitTemplate.execute(channel -> {
// 使用channel做原生的操作(@FunctionalInterface函数式编程)
});*/
log.info("message send");
Thread.sleep(1000);
}
}
利用SimpleMessageListenerContainer高效监听消息
SimpleMessageListenerContainer特征:
◆ 设置同时监听多个队列、自动启动、自动配置RabbitMQ ◆ 设置消费者数量(最大数量、最小数量、批量消费) ◆ 设置消息确认模式、是否重回队列、异常捕获 ◆ 设置是否独占、其他消费者属性等 ◆ 设置具体的监听器、消息转换器等 ◆ 支持动态设置,运行中修改监听器配置
实践代码
代码语言:javascript复制 @Bean
public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
simpleMessageListenerContainer.setQueueNames("queue.order");
// 相当于设置并发消费者线程线程数:一般给3
simpleMessageListenerContainer.setConcurrentConsumers(3);
// 最大线程数
simpleMessageListenerContainer.setMaxConcurrentConsumers(5);
// 消息确认的方式
// 消费者自动确认
/*simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO);
simpleMessageListenerContainer.setMessageListener(message -> {
log.info("message:{}", message);
});*/
// 消费者手动确认
simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);
simpleMessageListenerContainer.setMessageListener(new ChannelAwareMessageListener() {
// 可以拿到channel
@Override
public void onMessage(Message message, Channel channel) throws Exception {
log.info("message:{}", message);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
// channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
}
});
// 设置消费端限流
simpleMessageListenerContainer.setPrefetchCount(1);
return simpleMessageListenerContainer;
}
利用MessageListenerAdapter(消息监听适配器)自定义消息监听
MessageListenerAdapter作用
◆ 适配器设计模式 ◆ 解决业务逻辑代码无法修改的问题
使用方法
◆ 简单模式:实现handleMessage方法 ◆ 高阶模式:自定义"队列名-→方法名"映射关系
实战步骤
- 改造原生的DeliverCallback被异步方法handleMessage()调用 原生使用方法参考步骤7: https://blog.csdn.net/qq_42038623/article/details/124292280
改造后:
代码语言:javascript复制package cn.kt.food.orderservicemanager.service;
import cn.kt.food.orderservicemanager.dao.OrderDetailDao;
import cn.kt.food.orderservicemanager.dto.OrderMessageDTO;
import cn.kt.food.orderservicemanager.enums.OrderStatusEnum;
import cn.kt.food.orderservicemanager.po.OrderDetailPO;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author tao
* @date 2022-03-24 21:15
* 概要:消息处理相关业务逻辑
*/
@Slf4j
@Service
public class OrderMessageService {
@Value("${rabbitmq.host}")
public String host;
@Value("${rabbitmq.exchange}")
public String exchangeName;
@Autowired
private OrderDetailDao orderDetailDao;
ObjectMapper objectMapper = new ObjectMapper();
/* 处理消息的业务代码默认方法名是handMessage(源码使用了反射) */
public void handMessage(byte[] messageBody) throws IOException {
// String messageBody = new String(message.getBody());
log.info("handMessage:messageBody:{}", new String(messageBody));
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(host);
try {
// 将消息体反序列化成DTO
OrderMessageDTO orderMessageDTO = objectMapper.readValue(messageBody,
OrderMessageDTO.class);
// 读取数据库中的PO
OrderDetailPO orderPO = orderDetailDao.selectOrder(orderMessageDTO.getOrderId());
switch (orderPO.getStatus()) {
case ORDER_CREATING:
// 修改订单状态
if (orderMessageDTO.getConfirmed() && null != orderMessageDTO.getPrice()) {
orderPO.setStatus(OrderStatusEnum.RESTAURANT_CONFIRMED);
orderPO.setPrice(orderMessageDTO.getPrice());
orderDetailDao.update(orderPO);
// 订单状态更新后给骑手发消息
try (Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel()) {
String messageToSend = objectMapper.writeValueAsString(orderMessageDTO);
channel.basicPublish("exchange.order.deliveryman",
"key.deliveryman",
null,
messageToSend.getBytes());
}
} else {
orderPO.setStatus(OrderStatusEnum.FAILED);
orderDetailDao.update(orderPO);
}
break;
case RESTAURANT_CONFIRMED:
if (null != orderMessageDTO.getDeliverymanId()) {
orderPO.setStatus(OrderStatusEnum.DELIVERYMAN_CONFIRMED);
orderPO.setDeliverymanId(orderMessageDTO.getDeliverymanId());
orderDetailDao.update(orderPO);
// 发消息
try (Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel()) {
String messageToSend = objectMapper.writeValueAsString(orderMessageDTO);
channel.basicPublish("exchange.order.settlement",
"key.settlement",
null,
messageToSend.getBytes());
}
} else {
orderPO.setStatus(OrderStatusEnum.FAILED);
orderDetailDao.update(orderPO);
}
break;
case DELIVERYMAN_CONFIRMED:
if (null != orderMessageDTO.getSettlementId()) {
orderPO.setStatus(OrderStatusEnum.SETTLEMENT_CONFIRMED);
orderPO.setSettlementId(orderMessageDTO.getSettlementId());
orderDetailDao.update(orderPO);
try (Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel()) {
String messageToSend = objectMapper.writeValueAsString(orderMessageDTO);
channel.basicPublish("exchange.order.reward", "key.reward", null, messageToSend.getBytes());
}
} else {
orderPO.setStatus(OrderStatusEnum.FAILED);
orderDetailDao.update(orderPO);
}
break;
case SETTLEMENT_CONFIRMED: // 订单创建完成
if (null != orderMessageDTO.getRewardId()) {
orderPO.setStatus(OrderStatusEnum.ORDER_CREATED);
orderPO.setRewardId(orderMessageDTO.getRewardId());
orderDetailDao.update(orderPO);
} else {
orderPO.setStatus(OrderStatusEnum.FAILED);
orderDetailDao.update(orderPO);
}
break;
}
} catch (JsonProcessingException | TimeoutException e) {
e.printStackTrace();
}
};
}
- 在RabbitConfig配置类中实现自动调用消息监听时业务处理的适配方法 在消息监听配置SimpleMessageListenerContainer方法中调用消息监听后的业务处理方法 可以在设置监听后的onMessage方法中调用,但是这个做法不优雅,有缺陷,所以更推荐使用:new MessageListenerAdapter(); 注意: ◆ 如果使用默认的handleMessage方法作为业务处理方法,可直接使用即可: simpleMessageListenerContainer.setMessageListener(new MessageListenerAdapter(orderMessageService)); ◆ 如果没有使用默认的handleMessage方法作为业务处理方法,需要配置一个map实现自定义队列和业务处理方法的映射关系 ◆ 当然也可以通过在map中配置多个对应关系实现多个自定义队列和业务处理方法的映射关系。
实现代码如下:
代码语言:javascript复制@Bean
public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
simpleMessageListenerContainer.setQueueNames("queue.order");
// 相当于设置并发消费者线程线程数:一般给3
simpleMessageListenerContainer.setConcurrentConsumers(3);
// 最大线程数
simpleMessageListenerContainer.setMaxConcurrentConsumers(5);
// 消息确认的方式
// 消费者自动确认
/*simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO);
simpleMessageListenerContainer.setMessageListener(message -> {
log.info("message:{}", message);
});*/
// 消费者手动确认
simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);
simpleMessageListenerContainer.setMessageListener(new ChannelAwareMessageListener() {
// 可以拿到channel
@Override
public void onMessage(Message message, Channel channel) throws Exception {
log.info("message:{}", message);
//调用自定义消息监听(不优雅)
// orderMessageService.handMessage(message.getBody());
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
// channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
}
});
// 设置消费端限流
simpleMessageListenerContainer.setPrefetchCount(1);
// 调用自定义消息监听后的业务处理方法(orderMessageService中的调用方法为:handMessage(默认))
// simpleMessageListenerContainer.setMessageListener(new MessageListenerAdapter(orderMessageService));
// 调用自定义消息监听(高级用法,没有使用默认方法名)
MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(orderMessageService);
Map<String, String> methodMap = new HashMap<>(8);
methodMap.put("queue.order", "handMessage");
// 可以设置多个队列多个handMessage2方法去处理
// methodMap.put("queue.order2", "handMessage2");
messageListenerAdapter.setQueueOrTagToMethodName(methodMap);
simpleMessageListenerContainer.setMessageListener(messageListenerAdapter);
return simpleMessageListenerContainer;
}
}
利用MessageConverter高效处理消息
MessageConverter(接口)作用
之前收发消息时,使用了Byte[]数组作为消息体,编写业务逻辑时,需要使用Java对象,MessageConverter用来在收发消息时自动转换消息
MessageConverter是一个接口,需要进行实现。 sping对MessageConverter的实现类:Jackson2JsonMessageConverter ◆ 最常用的MessageConverter,用来转换Json格式消息 ◆ 配合ClassMapper可以直接转换为POJO对象
自定义MessageConverter ◆ 实现MessageConverter接口 ◆ 重写toMessage、 fromMessage方法
代码实现:
- 消息监听时业务处理的适配方法中给simpleMessageListenerContainer添加messageConverter
// MessageConverter用来在收发消息时自动转换消息
Jackson2JsonMessageConverter messageConverter = new Jackson2JsonMessageConverter();
messageConverter.setClassMapper(new ClassMapper() {
@Override
public void fromClass(Class<?> aClass, MessageProperties messageProperties) {
}
@Override
public Class<?> toClass(MessageProperties messageProperties) {
// OrderMessageDTO为接收消息的类型
return OrderMessageDTO.class;
}
});
// 根据消息里面的属性判断属于哪个java类(不推荐:因为各个微服务之间可能不都是java,也可能是go语言)
/*Jackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper();
messageConverter.setJavaTypeMapper(javaTypeMapper);*/
messageListenerAdapter.setMessageConverter(messageConverter);
- 在业务逻辑代码中直接使用对象接收
public void handMessage(OrderMessageDTO orderMessageDTO) throws IOException {
log.info("handMessage:orderMessageDTO:{}", orderMessageDTO);
}
利用RabbitListener快速实现消息处理器
RabbitListener是什么
◆ RabbitListener是 SpringBoot架构中监听消息的"终极方案” ◆ RabbitListener使用注解声明,对业务代码无侵入 ◆ RabbitListener可以在SpringBoot配置文件中进行配置
@RabbitListener注解
◆ @RabbitListener是一个组合注解,可以嵌套以下注解: ◆ @Exchange:自动声明Exchange ◆ @Queue:自动声明队列 ◆ @QueueBinding:自动声明绑定关系
代码实践
- 在RabbitConfig配置类中配置RabbitListenerContainerFactory(之前的SimpleMessageListenerContainer就不需要了)
@Bean
public RabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
return factory;
}
- 在业务代码中使用注解 用法一: 在类上设置@RabbitListener
@Slf4j
@Service
// 设置该类为消息监听器
@RabbitListener(containerFactory = "rabbitListenerContainerFactory",
queues = "queue.order"
)
public class OrderMessageService {}
在方法上设置@RabbitHandler
代码语言:javascript复制// 设置该方法为默认调用方法(注意无法使用MessageConverter接收消息)
@RabbitHandler(isDefault = true)
public void handMessage(@Payload Message message) throws IOException {
String messageBody = new String(message.getBody());
// log.info("handMessage:orderMessageDTO:{}", orderMessageDTO);
log.info("handMessage:message:{}", new String(message.getBody()));
}
用法二: 直接在方法上设置@RabbitListener(对整体代码入侵小)
代码语言:javascript复制 // 设置该方法为默认调用方法(注意无法使用MessageConverter接收消息)
@RabbitListener(containerFactory = "rabbitListenerContainerFactory",
queues = "queue.order"
)
public void handMessage(@Payload Message message) throws IOException {
String messageBody = new String(message.getBody());
// log.info("handMessage:orderMessageDTO:{}", orderMessageDTO);
log.info("handMessage:message:{}", new String(message.getBody()));
}
@RabbitListener是一个组合注解,可以嵌套以下注解:比如
代码语言:javascript复制@RabbitListener(
containerFactory = "rabbitListenerContainerFactory",
admin = "rabbitAdmin",
bindings = {
@QueueBinding(
value = @Queue(
name = "queue.order"
/*arguments = {
@Argument(
name = "x-message-ttl",
value = "1000",
type = "java.lang.Integer"
),
@Argument(
name = "x-dead-letter-exchange",
value = "exchange.dlx"
)
}*/
),
exchange = @Exchange(name = "exchange.order.restaurant", type = ExchangeTypes.DIRECT),
key = "key.order"
),
@QueueBinding(
value = @Queue(name = "queue.order"),
exchange = @Exchange(name = "exchange.order.deliveryman", type = ExchangeTypes.DIRECT),
key = "key.order"
),
@QueueBinding(
value = @Queue(name = "queue.order"),
exchange = @Exchange(name = "exchange.order.settlement", type = ExchangeTypes.FANOUT),
key = "key.order"
),
@QueueBinding(
value = @Queue(name = "queue.order"),
exchange = @Exchange(name = "exchange.order.reward", type = ExchangeTypes.TOPIC),
key = "key.order"
),
}
)
public void handMessage(@Payload Message message) throws IOException {
String messageBody = new String(message.getBody());
// log.info("handMessage:orderMessageDTO:{}", orderMessageDTO);
log.info("handMessage:message:{}", new String(message.getBody()));
}
约定大于配置
去除所有的RabbiyConfig中的@Bean配置,在application.properties中配置RabbitMQ
代码语言:javascript复制spring.rabbitmq.addresses=192.168.149.134
spring.rabbitmq.host=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
# 自动ack
spring.rabbitmq.listener.direct.acknowledge-mode=auto
## 还有很对spring.rabbitmq配置
这时候就可以几乎0配置使用RabbiyMQ(containerFactory和admin可以去掉了,应为springboot已经帮我们约定熟成的使用containerFactory和admin) 去除RabbiyConfig配置,增加application.properties配置后,下面代码依然可以生效
代码语言:javascript复制@RabbitListener(
// containerFactory = "rabbitListenerContainerFactory",
// admin = "rabbitAdmin",
bindings = {
@QueueBinding(
value = @Queue(
name = "queue.order"
/*arguments = {
@Argument(
name = "x-message-ttl",
value = "1000",
type = "java.lang.Integer"
),
@Argument(
name = "x-dead-letter-exchange",
value = "exchange.dlx"
)
}*/
),
exchange = @Exchange(name = "exchange.order.restaurant", type = ExchangeTypes.DIRECT),
key = "key.order"
),
@QueueBinding(
value = @Queue(name = "queue.order"),
exchange = @Exchange(name = "exchange.order.deliveryman", type = ExchangeTypes.DIRECT),
key = "key.order"
),
@QueueBinding(
value = @Queue(name = "queue.order"),
exchange = @Exchange(name = "exchange.order.settlement", type = ExchangeTypes.FANOUT),
key = "key.order"
),
@QueueBinding(
value = @Queue(name = "queue.order"),
exchange = @Exchange(name = "exchange.order.reward", type = ExchangeTypes.TOPIC),
key = "key.order"
),
}
)
/* 处理消息的业务代码默认方法名是handMessage(源码使用了反射) */
public void handMessage(@Payload Message message) throws IOException {
String messageBody = new String(message.getBody());
// log.info("handMessage:orderMessageDTO:{}", orderMessageDTO);
log.info("handMessage:message:{}", new String(message.getBody()));
}
amqp配置文件配置详解
代码语言:javascript复制# base
spring.rabbitmq.host: 服务Host
spring.rabbitmq.port: 服务端口
spring.rabbitmq.username: 登陆用户名
spring.rabbitmq.password: 登陆密码
spring.rabbitmq.virtual-host: 连接到rabbitMQ的vhost
spring.rabbitmq.addresses: 指定client连接到的server的地址,多个以逗号分隔(优先取addresses,然后再取host)
spring.rabbitmq.requested-heartbeat: 指定心跳超时,单位秒,0为不指定;默认60s
spring.rabbitmq.publisher-confirms: 是否启用【发布确认】
spring.rabbitmq.publisher-returns: 是否启用【发布返回】
spring.rabbitmq.connection-timeout: 连接超时,单位毫秒,0表示无穷大,不超时
spring.rabbitmq.parsed-addresses:
# ssl
spring.rabbitmq.ssl.enabled: 是否支持ssl
spring.rabbitmq.ssl.key-store: 指定持有SSL certificate的key store的路径
spring.rabbitmq.ssl.key-store-password: 指定访问key store的密码
spring.rabbitmq.ssl.trust-store: 指定持有SSL certificates的Trust store
spring.rabbitmq.ssl.trust-store-password: 指定访问trust store的密码
spring.rabbitmq.ssl.algorithm: ssl使用的算法,例如,TLSv1.1
# cache
spring.rabbitmq.cache.channel.size: 缓存中保持的channel数量
spring.rabbitmq.cache.channel.checkout-timeout: 当缓存数量被设置时,从缓存中获取一个channel的超时时间,单位毫秒;如果为0,则总是创建一个新channel
spring.rabbitmq.cache.connection.size: 缓存的连接数,只有是CONNECTION模式时生效
spring.rabbitmq.cache.connection.mode: 连接工厂缓存模式:CHANNEL 和 CONNECTION
# listener
spring.rabbitmq.listener.simple.auto-startup: 是否启动时自动启动容器
spring.rabbitmq.listener.simple.acknowledge-mode: 表示消息确认方式,其有三种配置方式,分别是none、manual和auto;默认auto
spring.rabbitmq.listener.simple.concurrency: 最小的消费者数量
spring.rabbitmq.listener.simple.max-concurrency: 最大的消费者数量
spring.rabbitmq.listener.simple.prefetch: 指定一个请求能处理多少个消息,如果有事务的话,必须大于等于transaction数量.
spring.rabbitmq.listener.simple.transaction-size: 指定一个事务处理的消息数量,最好是小于等于prefetch的数量.
spring.rabbitmq.listener.simple.default-requeue-rejected: 决定被拒绝的消息是否重新入队;默认是true(与参数acknowledge-mode有关系)
spring.rabbitmq.listener.simple.idle-event-interval: 多少长时间发布空闲容器时间,单位毫秒
spring.rabbitmq.listener.simple.retry.enabled: 监听重试是否可用
spring.rabbitmq.listener.simple.retry.max-attempts: 最大重试次数
spring.rabbitmq.listener.simple.retry.initial-interval: 第一次和第二次尝试发布或传递消息之间的间隔
spring.rabbitmq.listener.simple.retry.multiplier: 应用于上一重试间隔的乘数
spring.rabbitmq.listener.simple.retry.max-interval: 最大重试时间间隔
spring.rabbitmq.listener.simple.retry.stateless: 重试是有状态or无状态
# template
spring.rabbitmq.template.mandatory: 启用强制信息;默认false
spring.rabbitmq.template.receive-timeout: receive() 操作的超时时间
spring.rabbitmq.template.reply-timeout: sendAndReceive() 操作的超时时间
spring.rabbitmq.template.retry.enabled: 发送重试是否可用
spring.rabbitmq.template.retry.max-attempts: 最大重试次数
spring.rabbitmq.template.retry.initial-interval: 第一次和第二次尝试发布或传递消息之间的间隔
spring.rabbitmq.template.retry.multiplier: 应用于上一重试间隔的乘数
spring.rabbitmq.template.retry.max-interval: 最大重试时间间隔
目前的项目不足之处分析
- RabbitMQ容量不足 ◆ 受制于服务器、虚机、容器的规模,单节RabbitMQ容量受限 ◆ 在业务量庞大时,单节点MQ可能会因为内存不足导致OOM
- RabbitMQ数据无副本 ◆ 单节点RabbitMQ没有备份数据 ◆ 若单节点故障,消息数据丢失.
- RabbitMQ可用性低 ◆ 单节点RabbitMQ不可避免会出现故障 ◆ 单节点故障后,RabbitMQ服务不可用,系统业务崩溃
源码下载
https://gitee.com/KT1205529635/rabbit-mq/tree/master/food_master_2