RabbitMQ消息的可靠性主要包括两方面,一方面是通过实现消费的重试机制(通过@Retryable来实现重试,可以设置重试次数和重试频率,但是要保证幂等性),另一方面就是实现消息生产者的可靠投递(注意消费单幂等),下面主要讲下生产者实现的可靠消息投递。
rabbitTemplate的发送流程是这样的:
1 发送数据并返回(不确认rabbitmq服务器已成功接收)
2 异步的接收从rabbitmq返回的ack确认信息
3 收到ack后调用confirmCallback函数 注意:在confirmCallback中是没有原message的,所以无法在这个函数中调用重发,confirmCallback只有一个通知的作用 在这种情况下,如果在2,3步中任何时候切断连接,我们都无法确认数据是否真的已经成功发送出去,从而造成数据丢失的问题。
最完美的解决方案只有1种: 使用rabbitmq的事务机制。 但是在这种情况下,rabbitmq的效率极低,每秒钟处理的message在几百条左右。实在不可取。
第二种解决方式,使用同步的发送机制,也就是说,客户端发送数据,rabbitmq收到后返回ack,再收到ack后,send函数才返回。代码类似这样:
代码语言:javascript复制创建channel
send message
wait for ack(or 超时)
close channel
返回成功or失败
同样的,由于每次发送message都要重新建立连接,效率很低。
基于上面的分析,我们使用一种新的方式来做到数据的不丢失。
在rabbitTemplate异步确认的基础上
1 在redis中缓存已发送的message
2 通过confirmCallback或者被确认的ack,将被确认的message从本地删除
3 定时扫描本地的message,如果大于一定时间未被确认,则重发
当然了,这种解决方式也有一定的问题: 想象这种场景,rabbitmq接收到了消息,在发送ack确认时,网络断了,造成客户端没有收到ack,重发消息。(相比于丢失消息,重发消息要好解决的多,我们可以在consumer端做到幂等)。 自动重试的代码如下:
代码语言:javascript复制package cn.chinotan.service.reliabletransmission;
/**
* @program: test
* @description: rabbitMq常量
* @author: xingcheng
* @create: 2018-08-12 12:30
**/
public class MyConstant {
public static final String MY_EXCHANGE = "my_exchange";
public static final String ERROR_EXCHANGE = "error_exchange";
public static final String MY_QUEUE_THREE = "my_queue_three";
public final static String KEY_PREFIX = "test:rabbitMq:";
/**
* consumer失败后等待时间(mils)
*/
public static final int ONE_MINUTE = 1 * 60 * 1000;
/**
* MQ消息retry时间
*/
public static final int RETRY_TIME_INTERVAL = ONE_MINUTE;
/**
* MQ消息有效时间
*/
public static final int VALID_TIME = ONE_MINUTE;
}
代码语言:javascript复制package cn.chinotan.service.reliabletransmission;
import java.io.Serializable;
/**
* @program: test
* @description: 包装消息
* @author: xingcheng
* @create: 2018-09-24 15:32
**/
public class MessageWithTime implements Serializable {
private String id;
private long time;
private String message;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public long getTime() {
return time;
}
public void setTime(long time) {
this.time = time;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
}
代码语言:javascript复制package cn.chinotan.service.reliabletransmission;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* rabbitMQ配置
*/
@Configuration
public class ReliableRabbitConfig {
@Bean
public DirectExchange myExchange() {
return new DirectExchange(MyConstant.MY_EXCHANGE, true, false);
}
@Bean
public Queue myQueueOne() {
return new Queue(MyConstant.MY_QUEUE_THREE, true);
}
@Bean
public Binding queueOneBinding() {
return BindingBuilder.bind(myQueueOne()).to(myExchange()).withQueueName();
}
}
代码语言:javascript复制package cn.chinotan.service.reliabletransmission;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import java.util.Map;
import java.util.UUID;
/**
* @program: test
* @description: rabbitService
* @author: xingcheng
* @create: 2018-09-24 14:28
**/
@Service
public class RabbitMQService {
Logger logger = LoggerFactory.getLogger(RabbitMQService.class);
@Autowired
StringRedisTemplate redisTemplate;
@Autowired
private RabbitTemplate rabbitTemplate;
public Boolean send(String exchange, String routingKey, Object message) {
try {
String key = StringUtils.join(MyConstant.KEY_PREFIX, UUID.randomUUID().toString().replace("-", "").toLowerCase());
// 发送前保存消息和时间和id到redis缓存中
MessageWithTime messageWithTime = new MessageWithTime();
messageWithTime.setId(key);
messageWithTime.setMessage(JSONObject.toJSONString(message));
messageWithTime.setTime(System.currentTimeMillis());
redisTemplate.opsForValue().set(key, JSONObject.toJSONString(messageWithTime));
// 异步回调通知
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
logger.info("message send success--id:[{}]", correlationData.getId());
// 发送成功后,删除redis缓存
redisTemplate.delete(correlationData.getId());
} else {
// 发送失败后打印日志,进行重试
logger.error("message send fail--id:[{}]", correlationData.getId());
}
});
CorrelationData correlationData = new CorrelationData(key);
rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData);
} catch (Exception e) {
logger.error("发送消息异常{}", e);
return false;
}
return true;
}
Boolean send(String exchange, String routingKey, MessageWithTime message) {
try {
// 异步回调通知
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
logger.info("message send success--id:[{}]", correlationData.getId());
// 发送成功后,删除redis缓存
redisTemplate.delete(correlationData.getId());
} else {
// 发送失败后打印日志,进行重试
logger.error("message send fail--id:[{}]", correlationData.getId());
}
});
CorrelationData correlationData = new CorrelationData(message.getId());
Map map = JSON.parseObject(message.getMessage(), Map.class);
rabbitTemplate.convertAndSend(exchange, routingKey, map, correlationData);
} catch (Exception e) {
logger.error("发送消息异常{}", e);
return false;
}
return true;
}
}
代码语言:javascript复制package cn.chinotan.service.reliabletransmission;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.Map;
/**
* 生产者
*/
@Service
public class ReliableProducr {
private static final Logger LOGGER = LoggerFactory.getLogger(ReliableProducr.class);
@Autowired
private RabbitMQService rabbitMQService;
public Boolean send(Map msg) {
return rabbitMQService.send(MyConstant.MY_EXCHANGE, MyConstant.MY_QUEUE_THREE, msg);
}
public Boolean send(MessageWithTime msg) {
return rabbitMQService.send(MyConstant.MY_EXCHANGE, MyConstant.MY_QUEUE_THREE, msg);
}
}
代码语言:javascript复制package cn.chinotan.service.reliabletransmission;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.context.WebApplicationContext;
import org.springframework.web.context.support.WebApplicationContextUtils;
import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;
import javax.servlet.annotation.WebListener;
/**
* @program: test
* @description: 可靠投递监听器
* @author: xingcheng
* @create: 2018-09-24 16:05
**/
@WebListener
public class ReliableTransContextListener implements ServletContextListener {
Logger logger = LoggerFactory.getLogger(ReliableTransContextListener.class);
private WebApplicationContext springContext;
@Override
public void contextInitialized(ServletContextEvent sce) {
logger.info("ReliableTransContextListener init start...........");
springContext = WebApplicationContextUtils.getWebApplicationContext(sce.getServletContext());
if (springContext != null) {
RetryCache retryCache = (RetryCache) springContext.getBean("retryCache");
new Thread(() -> retryCache.startRetry()).start();
}
}
@Override
public void contextDestroyed(ServletContextEvent sce) {
}
}
代码语言:javascript复制package cn.chinotan.service.reliabletransmission;
import com.alibaba.fastjson.JSON;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Set;
/**
* @program: test
* @description: 缓存重试
* @author: xingcheng
* @create: 2018-09-24 16:12
**/
@Component("retryCache")
public class RetryCache {
private boolean stop = false;
Logger logger = LoggerFactory.getLogger(RetryCache.class);
@Autowired
private ReliableProducr producr;
@Autowired
private StringRedisTemplate redisTemplate;
private final String STAR = "*";
public void startRetry() {
while (!stop) {
try {
Thread.sleep(MyConstant.RETRY_TIME_INTERVAL);
} catch (InterruptedException e) {
e.printStackTrace();
}
long now = System.currentTimeMillis();
Set<String> keys = redisTemplate.keys(StringUtils.join(MyConstant.KEY_PREFIX, STAR));
if (keys != null && !keys.isEmpty()) {
List<String> list = redisTemplate.opsForValue().multiGet(keys);
list.forEach(value -> {
MessageWithTime messageWithTime = JSON.parseObject(value, MessageWithTime.class);
if (null != messageWithTime) {
if (messageWithTime.getTime() 3 * MyConstant.VALID_TIME < now) {
logger.error("send message {} failed after 3 min ", messageWithTime);
redisTemplate.delete(messageWithTime.getId());
} else if (messageWithTime.getTime() MyConstant.VALID_TIME < now) {
Boolean send = producr.send(messageWithTime);
logger.info("进行重新投递消息");
if (!send) {
logger.error("retry send message failed {}", messageWithTime);
}
}
}
});
}
}
}
}
代码语言:javascript复制package cn.chinotan.service.reliabletransmission;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Map;
/**
* queueThree消费者
*/
@Component
public class MyQueueThreeConsumer {
private static final Logger LOGGER = LoggerFactory.getLogger(MyQueueThreeConsumer.class);
/**
* 消费者做好幂等
*
* @param content
*/
@RabbitListener(queues = MyConstant.MY_QUEUE_THREE)
@RabbitHandler
public void process(Map content) {
LOGGER.info("消费者,queueThree开始执行 {}", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
LOGGER.info("消费者,queueThree消费内容:[{}]", JSON.toJSONString(content));
}
}
代码语言:javascript复制import cn.chinotan.service.reliabletransmission.MyConstant;
import cn.chinotan.service.reliabletransmission.RabbitMQService;
import cn.chinotan.service.reliabletransmission.ReliableProducr;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.HashMap;
import java.util.Map;
/**
* @program: test
* @description: 可靠投递测试
* @author: xingcheng
* @create: 2018-09-24 15:57
**/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = MyApplication.class)
public class ReliableTransmissionTest {
@Autowired
private ReliableProducr producr;
@Autowired
private RabbitMQService rabbitMQService;
/**
* 正常情况测试
* @throws Exception
*/
@Test
public void reliableTransmissionTest() throws Exception {
Map<String, String> map = new HashMap<>();
map.put("name", "xingheng");
producr.send(map);
}
/**
* 异常情况测试
* @throws Exception
*/
@Test
public void reliableTransmissionFailTest() throws Exception {
Map<String, String> map = new HashMap<>();
map.put("name", "xingheng");
rabbitMQService.send(MyConstant.ERROR_EXCHANGE, MyConstant.MY_QUEUE_THREE, map);
}
}
注意事项:
1.配置中要开启发布者确认,类似这样:
代码语言:javascript复制spring:
rabbitmq:
publisher-confirms: true
2.如果要测试异常情况只需要将消息发送到一个不存在的交换机即可
3.注意消费端幂等
简单测试结果:
在重试一次后,会将它发送到正确的交换机,于是发送成功