分布式事务框架分析
事务为什么要分布式
- 什么是事务 ◆ 事务指的是一 系列业务操作,只能同时成功或同时失败 ◆ 传统事务有4个主要特性:原子性、一致性、隔离性、持久性
- 微服务化带来的挑战 ◆ 在传统单体应用中,事务在本地即可完成 ◆ 随着后端架构的微服务化,事务无法在本地完成 ◆ 所以需要将事务“分布式化”
- 传统单体应用 ◆ 在传统单体应用中,事务在本地即可完成
- 微服务应用 ◆ 随着后端架构的微服务化,事务无法在本地完成
◆ 所以需要将事务"分布式化"
事务的前提理论
- 分布式框架理论 ACID 事务正确执行的四个基本要素 ◆ 原子性(Atomicity) ◆ 一致性(Consistency) ◆ 隔离性(Isolation) ◆ 持久性(Durability)
- 分布式框架理论 CAP 一致性、可用性、分区容忍性不可能三者兼顾 ◆ 一致性(Consistency) ◆ 可用性(Availability) ◆ 分区容忍性 (Partition tolerance)
- 分布式框架理论 BASE 由于CAP无法同时满足,基于I程实际,提出了BASE理论 ◆ Basically Available (基本可用) ◆ Soft state (软状态) ◆ Eventually consistent (最终一致性)
分布式事务的取舍
◆ ACID往往针对传统本地事务,分布式事务无法满足原子性和隔离性,需要舍弃传统ACID理论
◆ 基于BASE理论,业务状态不需要在微服务系统内强一致
◆ 基于BASE理论,订单状态要做到最终一致性即可
◆ 为了做到最终一致性, 要保证消息不丢失,发送处理的流程要有重试机制,重试多次失败后要有告警
分布式事务框架设计
根据上述分析,分布式事务框架应该包含以下部分
◆ 发送失败重试
◆ 消费失败重试
◆ 死信告警
数据表设计
分布式事务框架搭建
要用到的相关技术:
◆ 声明ConnectionFactory、RabbitAdmin、RabbitListenerContainerFactory、RabbitTemplate
◆ 声明枚举、PO、 开发dao层
◆ 声明定时任务
分布式事务相关说明
- 消息发送失败重试 ◆ 发送消息前消息持久化 ◆ 发送成功时删除消息 ◆ 定时巡检未发送成功消息、重试发送
- 消息消费失败重试 ◆ 收到消息时先进行持久化 ◆ 消息处理成功,消费端确认(ACK),删除消息 ◆ 消息处理失败,延时,不确认消息(NACK),记录次数 ◆ 再次处理消息
- 死信消息告警 ◆ 声明死信队列、交换机、绑定 ◆ 普通队列加入死信设置 ◆ 监听到死信,持久化、告警
步骤如下
该消息发送失败重试、消息消费失败重试、死信消息告警的事务框架功能我们写在一个统一的包下面,以便于以后的复用。包名为moodymq。
目录结构如下:
开发为在RabbitMQ学习笔记(四)——RabbitMQ与SpringBoot适配的源码基础上新增代码。
1. 新建数据表
代码语言:javascript复制DROP TABLE IF EXISTS `trans_message`;
CREATE TABLE `trans_message` (
`id` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '消息ID',
`service` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '服务名称',
`type` varchar(32) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '消息类型',
`exchange` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '交换机',
`routing_Key` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '路由键',
`queue` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '队列',
`sequence` int(11) NULL DEFAULT NULL COMMENT '序号',
`payload` text CHARACTER SET utf8 COLLATE utf8_general_ci NULL COMMENT '消息内容',
`date` datetime(0) NULL DEFAULT NULL COMMENT '时间',
PRIMARY KEY (`id`, `service`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
2. 修改配置文件
application.properties增加moodymq
代码语言:javascript复制#订单微服务配置类
server.port=8080
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/food?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=GMT+8
spring.datasource.username=root
spring.datasource.password=123456
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
rabbitmq.host=192.168.137.138
rabbitmq.port=5672
rabbitmq.username=guest
rabbitmq.password=guest
rabbitmq.exchange=exchange.food
spring.rabbitmq.addresses=192.168.137.138
spring.rabbitmq.host=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
# 自动ack
spring.rabbitmq.listener.direct.acknowledge-mode=auto
moodymq.service=orderService
moodymq.host=192.168.137.138
moodymq.port=5672
moodymq.username=guest
moodymq.password=guest
moodymq.vhost=/
# 重复消费最多五次
moodymq.resendTimes=5
# 重复消费间隔时长
moodymq.resendFreq=5000
3. 新建状态枚举、Po和Dao层
TransMessageType.java
代码语言:javascript复制package cn.kt.food.orderservicemanager.moodymq.enummeration;
public enum TransMessageType {
SEND,
RECEIVE,
DEAD;
}
TransMessagePO.java
代码语言:javascript复制package cn.kt.food.orderservicemanager.moodymq.po;
import cn.kt.food.orderservicemanager.moodymq.enummeration.TransMessageType;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import java.util.Date;
@Getter
@Setter
@ToString
public class TransMessagePO {
private String id;
private String service;
private TransMessageType type;
private String exchange;
private String routingKey;
private String queue;
private Integer sequence;
private String payload;
private Date date;
}
TransMessageDao.java
代码语言:javascript复制package cn.kt.food.orderservicemanager.moodymq.dao;
import cn.kt.food.orderservicemanager.moodymq.po.TransMessagePO;
import org.apache.ibatis.annotations.*;
import org.springframework.stereotype.Repository;
import java.util.List;
@Mapper
@Repository
public interface TransMessageDao {
@Insert("INSERT INTO trans_message (id, type, service, "
"exchange, routing_key, queue, sequence, payload,"
"date) "
"VALUES(#{id}, #{type}, #{service},#{exchange},"
"#{routingKey},#{queue},#{sequence}, #{payload},#{date})")
void insert(TransMessagePO transMessagePO);
@Update("UPDATE trans_message set type=#{type}, "
"service=#{service}, exchange =#{exchange},"
"routing_key =#{routingKey}, queue =#{queue}, "
"sequence =#{sequence}, payload =#{payload}, "
"date =#{date} "
"where id=#{id} and service=#{service}")
void update(TransMessagePO transMessagePO);
@Select("SELECT id, type, service, exchange, "
"routing_key routingKey, queue, sequence, "
"payload, date "
"FROM trans_message "
"where id=#{id} and service=#{service}")
TransMessagePO selectByIdAndService(@Param("id") String id,
@Param("service") String service);
@Select("SELECT id, type, service, exchange, "
"routing_key routingKey, queue, sequence, "
"payload, date "
"FROM trans_message "
"WHERE type = #{type} and service = #{service}")
List<TransMessagePO> selectByTypeAndService(
@Param("type") String type,
@Param("service") String service);
@Delete("DELETE FROM trans_message "
"where id=#{id} and service=#{service}")
void delete(@Param("id") String id,
@Param("service") String service);
}
4. 发送消息封装send
TransMessageSender.java
代码语言:javascript复制package cn.kt.food.orderservicemanager.moodymq.sender;
import cn.kt.food.orderservicemanager.moodymq.po.TransMessagePO;
import cn.kt.food.orderservicemanager.moodymq.service.TransMessageService;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class TransMessageSender {
@Autowired
RabbitTemplate rabbitTemplate;
@Autowired
TransMessageService transMessageService;
// 发送消息封装
public void send(String exchange, String routingKey, Object payload) {
log.info("send(): exchange:{} routingKey:{} payload:{}",
exchange, routingKey, payload);
try {
ObjectMapper mapper = new ObjectMapper();
String payloadStr = mapper.writeValueAsString(payload);
System.out.println(payloadStr);
// 发送前暂存消息
TransMessagePO transMessagePO =
transMessageService.messageSendReady(
exchange,
routingKey,
payloadStr
);
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType("application/json");
// 封装消息
Message message = new Message(payloadStr.getBytes(), messageProperties);
message.getMessageProperties().setMessageId(transMessagePO.getId());
// 发送消息
rabbitTemplate.convertAndSend(exchange, routingKey, message,
new CorrelationData(transMessagePO.getId()));
log.info("message sent, ID:{}", transMessagePO.getId());
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
}
5. 新建service层和实现类
TransMessageService.java
代码语言:javascript复制package cn.kt.food.orderservicemanager.moodymq.service;
import cn.kt.food.orderservicemanager.moodymq.po.TransMessagePO;
import java.util.List;
public interface TransMessageService {
/**
* 发送前暂存消息
*
* @param exchange exchange
* @param routingKey routingKey
* @param body body
* @return TransMessagePO
*/
TransMessagePO messageSendReady(String exchange, String routingKey, String body);
/**
* 设置消息发送成功,需要把消息删除
*
* @param id 消息ID
*/
void messageSendSuccess(String id);
/**
* 设置消息返回,将消息持久化
*
* @param id id
* @param exchange exchange
* @param routingKey routingKey
* @param body body
* @return TransMessagePO
*/
TransMessagePO messageSendReturn(
String id, String exchange, String routingKey, String body);
/**
* 查询应发未发消息(之前发送出错的消息,还需要重发,未到告警)
*
* @return List<TransMessagePO>
*/
List<TransMessagePO> listReadyMessages();
/**
* 记录消息发送次数
*
* @param id id
*/
void messageResend(String id);
/**
* 消息重发多次,放弃
*
* @param id id
*/
void messageDead(String id);
/**
* 保存监听到的死信消息
* @param id
* @param exchange
* @param routingKey
* @param queue
* @param body
*/
void messageDead(String id, String exchange,
String routingKey, String queue,
String body);
/**
* 消息消费前保存
*
* @param id
* @param exchange
* @param routingKey
* @param queue
* @param body
* @return
*/
TransMessagePO messageReceiveReady(
String id,
String exchange,
String routingKey,
String queue,
String body);
/**
* 消息消费成功
*
* @param id
*/
void messageReceiveSuccess(String id);
}
TransMessageServiceImpl.java
代码语言:javascript复制package cn.kt.food.orderservicemanager.moodymq.service;
import cn.kt.food.orderservicemanager.moodymq.dao.TransMessageDao;
import cn.kt.food.orderservicemanager.moodymq.enummeration.TransMessageType;
import cn.kt.food.orderservicemanager.moodymq.po.TransMessagePO;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import java.util.Date;
import java.util.List;
import java.util.UUID;
@Service
public class TransMessageServiceImpl implements TransMessageService {
@Autowired
TransMessageDao transMessageDao;
@Value("${moodymq.service}")
String serviceName;
@Override
public TransMessagePO messageSendReady(String exchange, String routingKey, String body) {
final String messageId = UUID.randomUUID().toString();
TransMessagePO transMessagePO = new TransMessagePO();
transMessagePO.setId(messageId);
transMessagePO.setService(serviceName);
transMessagePO.setExchange(exchange);
transMessagePO.setRoutingKey(routingKey);
transMessagePO.setPayload(body);
transMessagePO.setDate(new Date());
transMessagePO.setSequence(0);
transMessagePO.setType(TransMessageType.SEND);
transMessageDao.insert(transMessagePO);
return transMessagePO;
}
@Override
public void messageSendSuccess(String id) {
transMessageDao.delete(id, serviceName);
}
@Override
public TransMessagePO messageSendReturn(String id, String exchange, String routingKey, String body) {
TransMessagePO selectByIdAndService = transMessageDao.selectByIdAndService(id, serviceName);
if (selectByIdAndService == null) {
TransMessagePO transMessagePO = new TransMessagePO();
transMessagePO.setId(id);
transMessagePO.setService(serviceName);
transMessagePO.setExchange(exchange);
transMessagePO.setRoutingKey(routingKey);
transMessagePO.setPayload(body);
transMessagePO.setDate(new Date());
transMessagePO.setSequence(0);
transMessagePO.setType(TransMessageType.SEND);
transMessageDao.insert(transMessagePO);
return transMessagePO;
} else {
return selectByIdAndService;
}
// return messageSendReady(exchange, routingKey, body);
}
@Override
public List<TransMessagePO> listReadyMessages() {
return transMessageDao.selectByTypeAndService(
TransMessageType.SEND.toString(), serviceName
);
}
@Override
public void messageResend(String id) {
TransMessagePO transMessagePO = transMessageDao.selectByIdAndService(id, serviceName);
transMessagePO.setSequence(transMessagePO.getSequence() 1);
transMessageDao.update(transMessagePO);
}
@Override
public void messageDead(String id) {
TransMessagePO transMessagePO = transMessageDao.selectByIdAndService(id, serviceName);
transMessagePO.setType(TransMessageType.DEAD);
transMessageDao.update(transMessagePO);
}
@Override
public void messageDead(String id, String exchange, String routingKey, String queue, String body) {
TransMessagePO transMessagePO = new TransMessagePO();
transMessagePO.setId(id);
transMessagePO.setService(serviceName);
transMessagePO.setExchange(exchange);
transMessagePO.setRoutingKey(routingKey);
transMessagePO.setQueue(queue);
transMessagePO.setPayload(body);
transMessagePO.setDate(new Date());
transMessagePO.setSequence(0);
transMessagePO.setType(TransMessageType.DEAD);
transMessageDao.insert(transMessagePO);
}
@Override
public TransMessagePO messageReceiveReady(
String id, String exchange,
String routingKey, String queue, String body) {
TransMessagePO transMessagePO =
transMessageDao.selectByIdAndService(id, serviceName);
if (null == transMessagePO) {
// 说明是第一次消费
transMessagePO = new TransMessagePO();
transMessagePO.setId(id);
transMessagePO.setService(serviceName);
transMessagePO.setExchange(exchange);
transMessagePO.setRoutingKey(routingKey);
transMessagePO.setQueue(queue);
transMessagePO.setPayload(body);
transMessagePO.setDate(new Date());
transMessagePO.setSequence(0);
transMessagePO.setType(TransMessageType.RECEIVE);
transMessageDao.insert(transMessagePO);
} else {
// 否则消费次数 1
transMessagePO.setSequence(transMessagePO.getSequence() 1);
transMessageDao.update(transMessagePO);
}
return transMessagePO;
}
@Override
public void messageReceiveSuccess(String id) {
// 消费成功后删除消息
transMessageDao.delete(id, serviceName);
}
}
6. 新建config配置RabbitAdmin和RabbitTemplate实现消息的监听和确认逻辑
消息监听使用手动ack
消息确认机制消息投递至交换机失败进行消息重发
MoodyRabbitConfig.java
代码语言:javascript复制package cn.kt.food.orderservicemanager.moodymq.config;
import cn.kt.food.orderservicemanager.moodymq.service.TransMessageService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@Slf4j
public class MoodyRabbitConfig {
@Autowired
TransMessageService transMessageService;
@Value("${moodymq.host}")
String host;
@Value("${moodymq.port}")
int port;
@Value("${moodymq.username}")
String username;
@Value("${moodymq.password}")
String password;
@Value("${moodymq.vhost}")
String vhost;
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(host);
connectionFactory.setPort(port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(vhost);
// CORRELATED:发送消息的成功还是失败需要有id的参数,因为确认消息是异步的,需要确认哪条消息被确认,
// 体现在发送消息前持久化时设置id:message.getMessageProperties().setMessageId(transMessagePO.getId());
connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
// 消息无法路由的时候需要设置消息返回
connectionFactory.setPublisherReturns(true);
connectionFactory.createConnection();
return connectionFactory;
}
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
RabbitAdmin admin = new RabbitAdmin(connectionFactory);
admin.setAutoStartup(true);
return admin;
}
/* 配置消费端消息监听 */
@Bean
public RabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory =
new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setConcurrentConsumers(3);
factory.setMaxConcurrentConsumers(10);
// 手动ack
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return factory;
}
/* 消息是否路由的消息确认机制 */
@Bean
public RabbitTemplate customRabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMandatory(true);
// 消息确认回调
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
log.info("correlationData:{}, ack:{}, cause:{}",
correlationData, ack, cause);
// 如果消息确认接收
if (ack && null != correlationData) {
String messageId = correlationData.getId();
log.info("消息已经正确投递到交换机, id:{}", messageId);
transMessageService.messageSendSuccess(messageId);
} else {
// 如果消息确认接收失败,则消息保留,等待下次重发
log.error("消息投递至交换机失败,correlationData:{}", correlationData);
}
});
// 当消息未进入队列时回调
rabbitTemplate.setReturnsCallback(returnedMessage -> {
log.error("消息无法路由!message:{}, replyCode:{} replyText:{} exchange:{} routingKey:{}",
returnedMessage.getMessage(), returnedMessage.getReplyCode(),
returnedMessage.getReplyText(), returnedMessage.getExchange(),
returnedMessage.getRoutingKey());
transMessageService.messageSendReturn(
returnedMessage.getMessage().getMessageProperties().getMessageId(),
returnedMessage.getExchange(),
returnedMessage.getRoutingKey(),
new String(returnedMessage.getMessage().getBody())
);
});
return rabbitTemplate;
}
}
7. 新建抽象类实现ChannelAwareMessageListener完成消息监听
监听消息的接收和业务执行是否异常,如果消息处理异常,则消息重回队列
AbstractMessageListener.java
代码语言:javascript复制package cn.kt.food.orderservicemanager.moodymq.listener;
import cn.kt.food.orderservicemanager.moodymq.po.TransMessagePO;
import cn.kt.food.orderservicemanager.moodymq.service.TransMessageService;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import java.io.IOException;
@Slf4j
public abstract class AbstractMessageListener implements ChannelAwareMessageListener {
@Autowired
TransMessageService transMessageService;
@Value("${moodymq.resendTimes}")
Integer resendTimes;
public abstract void receviceMessage(Message message) throws JsonProcessingException;
@Override
public void onMessage(Message message, Channel channel) throws IOException, InterruptedException {
MessageProperties messageProperties = message.getMessageProperties();
// deliveryTag:跟消息接收确认有关的数字
long deliveryTag = messageProperties.getDeliveryTag();
// 持久化接收到的消息
log.info("收到的消息{}", new String(message.getBody()));
TransMessagePO transMessagePO =
transMessageService.messageReceiveReady(
messageProperties.getMessageId(),
messageProperties.getReceivedExchange(),
messageProperties.getReceivedRoutingKey(),
messageProperties.getConsumerQueue(),
new String(message.getBody())
);
log.info("收到消息{}, 消费次数{}",
messageProperties.getMessageId(), transMessagePO.getSequence());
try {
// 该方法让业务去执行,这里抓异常
receviceMessage(message);
// 消息处理完成
channel.basicAck(deliveryTag, false);
transMessageService.messageReceiveSuccess(messageProperties.getMessageId());
} catch (Exception e) {
// 消息处理异常
log.error(e.getMessage(), e);
// 判断该消息的消费次数
if (transMessagePO.getSequence() >= resendTimes) {
// 消费次数超限,拒收消息
channel.basicReject(deliveryTag, false);
} else {
// 消息重回队列
Thread.sleep((long) (Math.pow(2, transMessagePO.getSequence())) * 1000);
channel.basicNack(deliveryTag, false, true);
}
}
}
}
8. 配置死信消息告警
声明死信交换机、队列和绑定
DlxConfig.java
代码语言:javascript复制package cn.kt.food.orderservicemanager.moodymq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@ConditionalOnProperty("moodymq.dlxEnabled")
public class DlxConfig {
/*
* 声明死信交换机、队列和绑定
*/
@Bean
public TopicExchange dlxExchange() {
return new TopicExchange("exchange.dlx");
}
@Bean
public Queue dlxQueue() {
return new Queue("queue.dlx",
true,
false,
false);
}
@Bean
public Binding dlxBinding() {
return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with("#");
}
}
监听死信消息
DlxListener.java
代码语言:javascript复制package cn.kt.food.orderservicemanager.moodymq.listener;
import cn.kt.food.orderservicemanager.moodymq.service.TransMessageService;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
@Component
@Slf4j
//配置是不是需要监听死信:@ConditionalOnProperty实现是通过havingValue与配置文件中的值对比,返回为true则配置类生效,反之失效.
@ConditionalOnProperty("moodymq.dlxEnabled")
public class DlxListener implements ChannelAwareMessageListener {
@Autowired
TransMessageService transMessageService;
@Override
public void onMessage(Message message, Channel channel) throws Exception {
String messageBody = new String(message.getBody());
log.error("dead letter! message:{}", message);
//发邮件、打电话、发短信
//XXXXX()
MessageProperties messageProperties = message.getMessageProperties();
transMessageService.messageDead(
messageProperties.getMessageId(),
messageProperties.getReceivedExchange(),
messageProperties.getReceivedRoutingKey(),
messageProperties.getConsumerQueue(),
messageBody
);
// 单条确认
channel.basicAck(messageProperties.getDeliveryTag(), false);
}
}
9. 配置定时任务
每隔5秒巡检异常消息
ResendTask.java
代码语言:javascript复制package cn.kt.food.orderservicemanager.moodymq.task;
import cn.kt.food.orderservicemanager.moodymq.enummeration.TransMessageType;
import cn.kt.food.orderservicemanager.moodymq.po.TransMessagePO;
import cn.kt.food.orderservicemanager.moodymq.service.TransMessageService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.List;
@EnableScheduling
@Configuration
@Component
@Slf4j
public class ResendTask {
@Autowired
TransMessageService transMessageService;
@Autowired
RabbitTemplate rabbitTemplate;
@Value("${moodymq.resendTimes}")
Integer resendTimes;
// 在配置类中取出来
@Scheduled(fixedDelayString = "${moodymq.resendFreq}")
public void resendMessage(){
log.info("resendMessage() invoked.");
List<TransMessagePO> messagePOS =
transMessageService.listReadyMessages();
log.info("resendMessage(): messagepos:{}", messagePOS);
for (TransMessagePO po: messagePOS) {
log.info("resendMessage(): po:{}", po);
// 过滤dead消息
if(po.getSequence() > resendTimes){
log.error("resend too many times!");
transMessageService.messageDead(po.getId());
continue;
}
// 封装和发送消息
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType("application/json");
Message message = new Message(po.getPayload().getBytes(), messageProperties);
message.getMessageProperties().setMessageId(po.getId());
rabbitTemplate.convertAndSend(
po.getExchange(),
po.getRoutingKey(),
message,
new CorrelationData(po.getId()));
log.info("message sent, ID:{}", po.getId());
// 消息重发,发送次数 1
transMessageService.messageResend(po.getId());
}
}
}
10. 改造moddymq包外的业务代码
- 继承moddymq包中抽象出来的监听方法
- 注解绑定交换机队列配置改用RabbitConfig配置类中使用@Bean配置
- 监听消息的handMessage改用抽象类的方法receviceMessage进行重写
- 在moddymq包外的业务代码中使用包内封装的发送方法 具体实现详情看下面源码
源码下载
https://gitee.com/KT1205529635/rabbit-mq/tree/master/food_master_3
小结
- 消息发送失败重试,消息消费失败重试,死信消息告警只是有效的保证rabbitMQ消息的事务一致性,有效的解决了消息失败的结果。
- 在实际项目中可以把开发的分布式事务框架包moddymq新建另外一个项目,并打成jar包,统一使用规范供多微服务模块使用
- 本moddymq中并无注明给死信队列queue.dlx发送消息的场景,实际开发中可以定时将状态为DEAD的消息发送至死信队列进行死信告警。告警方法方法已给出,但具体告警逻辑可以根据实际场景需要进行完善。
- 在源代码中,沿用了RabbitMQ快速上手中的订单微服务的案例,改造使用了该分布式的事务框架。