点击上方“芋道源码”,选择“设为星标”
管她前浪,还是后浪?
能浪的浪,才是好浪!
每天 10:33 更新文章,每天掉亿点点头发...
源码精品专栏
- 原创 | Java 2021 超神之路,很肝~
- 中文详细注释的开源项目
- RPC 框架 Dubbo 源码解析
- 网络应用框架 Netty 源码解析
- 消息中间件 RocketMQ 源码解析
- 数据库中间件 Sharding-JDBC 和 MyCAT 源码解析
- 作业调度中间件 Elastic-Job 源码解析
- 分布式事务中间件 TCC-Transaction 源码解析
- Eureka 和 Hystrix 源码解析
- Java 并发源码
来源:writing-bugs.blog.csdn.net/
article/details/103701101
- 一、前言
- 二、准备
- 2.1、依赖引入
- 2.2、连接yml的配置
- 2.3、config注入配置
- 2.4、消费者的配置
- 2.5、消息生产者
- 三、ack配置和测试
- 3.1、模拟消费者二出问题
- 四、分析几个回执方法
- 4.1、确认消息
- 4.2、拒绝消息
- 4.3、拒绝消息
- 五、总结
一、前言
前几天我研究了关于springboot整合简单消息队列,实现springboot推送消息至队列中,消费者成功消费。同时也加了消息转发器,对消息转发器各种类型的配置等做了总结。
但是,主要还有一点,我一直存在疑问:如何确保消息成功被消费者消费?
说到这里,我相信很多人会说使用ack啊,关闭队列自动删除啊什么的。主要是道理大家都懂,我要实际的代码,网上找了半天,和我设想的有很大差异,还是自己做研究总结吧。
基于 Spring Boot MyBatis Plus Vue & Element 实现的后台管理系统 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能
- 项目地址:https://gitee.com/zhijiantianya/ruoyi-vue-pro
- 视频教程:https://doc.iocoder.cn/video/
二、准备
本次写案例,就按照最简单的方式,direct方式进行配置吧,实际流程如下所示:
- 消息转发器类型: direct直连方式。
- 消息队列: 暂时采取公平分发方式。
- 实现流程: 消息生产者生产的消息发送至队列中,由两个消费者获取并消费,消费完成后,清楚消息队列中的消息。
所以我们接下来先写配置和demo。
2.1、依赖引入
再一般的springboot 2.1.4项目中,添加一个pom依赖。
代码语言:javascript复制<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.2、连接yml的配置
我们这边暂时只有一个rabbitmq,所以连接操作,基本rabbitmq的信息配置问题直接再yml中编写就可以了。
代码语言:javascript复制spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: xiangjiao
password: bunana
virtual-host: /xiangjiao
publisher-confirms: true #开启发送确认
publisher-returns: true #开启发送失败回退
#开启ack
listener:
direct:
acknowledge-mode: manual
simple:
acknowledge-mode: manual #采取手动应答
#concurrency: 1 # 指定最小的消费者数量
#max-concurrency: 1 #指定最大的消费者数量
retry:
enabled: true # 是否支持重试
2.3、config注入配置
我们根据图示
知道我们必须配置以下东西:
- 一个消息转发器,我们取名
directExchangeTx
。 - 一个消息队列,取名
directQueueTx
,并将其绑定至指定的消息转发器上。
所以我们的配置文件需要这么写:
代码语言:javascript复制import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* 直连交换机,发送指定队列信息,但这个队列后有两个消费者同时进行消费
* @author 7651
*
*/
@Configuration
public class DirectExchangeTxQueueConfig {
@Bean(name="getDirectExchangeTx")
public DirectExchange getDirectExchangeTx(){
return new DirectExchange("directExchangeTx", true, false);
}
@Bean(name="getQueueTx")
public Queue getQueueTx(){
return new Queue("directQueueTx", true, false, false);
}
@Bean
public Binding getDirectExchangeQueueTx(
@Qualifier(value="getDirectExchangeTx") DirectExchange getDirectExchangeTx,
@Qualifier(value="getQueueTx") Queue getQueueTx){
return BindingBuilder.bind(getQueueTx).to(getDirectExchangeTx).with("directQueueTxRoutingKey");
}
}
2.4、消费者的配置
有了队列和消息转发器,消息当然需要去消费啊,所以我们接下来配置消息消费者。
从图中,我们看出,我们需要配置两个消息消费者,同时监听一个队列,所以我们的配置类为:
消费者一:
代码语言:javascript复制import java.io.IOException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import com.rabbitmq.client.Channel;
@Component
@RabbitListener(queues="directQueueTx")
public class Consumer1 {
@RabbitHandler
public void process(String msg,Channel channel, Message message) throws IOException {
//拿到消息延迟消费
try {
Thread.sleep(1000*1);
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
/**
* 确认一条消息:<br>
* channel.basicAck(deliveryTag, false); <br>
* deliveryTag:该消息的index <br>
* multiple:是否批量.true:将一次性ack所有小于deliveryTag的消息 <br>
*/
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
System.out.println("get msg1 success msg = " msg);
} catch (Exception e) {
//消费者处理出了问题,需要告诉队列信息消费失败
/**
* 拒绝确认消息:<br>
* channel.basicNack(long deliveryTag, boolean multiple, boolean requeue) ; <br>
* deliveryTag:该消息的index<br>
* multiple:是否批量.true:将一次性拒绝所有小于deliveryTag的消息。<br>
* requeue:被拒绝的是否重新入队列 <br>
*/
channel.basicNack(message.getMessageProperties().getDeliveryTag(),
false, true);
System.err.println("get msg1 failed msg = " msg);
/**
* 拒绝一条消息:<br>
* channel.basicReject(long deliveryTag, boolean requeue);<br>
* deliveryTag:该消息的index<br>
* requeue:被拒绝的是否重新入队列
*/
//channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
}
}
}
消息消费者二:
代码语言:javascript复制import java.io.IOException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import com.rabbitmq.client.Channel;
@Component
@RabbitListener(queues="directQueueTx")
public class Consumer2 {
@RabbitHandler
public void process(String msg,Channel channel, Message message) throws IOException {
//拿到消息延迟消费
try {
Thread.sleep(1000*3);
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
System.out.println("get msg2 success msg = " msg);
} catch (Exception e) {
//消费者处理出了问题,需要告诉队列信息消费失败
channel.basicNack(message.getMessageProperties().getDeliveryTag(),
false, true);
System.err.println("get msg2 failed msg = " msg);
}
}
}
两个消费者之间唯一的区别在于两者获取消息后,延迟时间不一致。
2.5、消息生产者
有了消息消费者,我们需要有一个方式提供消息并将消息推送到消息队列中。
代码语言:javascript复制public interface IMessageServcie {
public void sendMessage(String exchange,String routingKey,Object msg);
}
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import cn.linkpower.service.IMessageServcie;
@Component
public class MessageServiceImpl implements IMessageServcie,ConfirmCallback,ReturnCallback {
private static Logger log = LoggerFactory.getLogger(MessageServiceImpl.class);
@Autowired
private RabbitTemplate rabbitTemplate;
@Override
public void sendMessage(String exchange,String routingKey,Object msg) {
//消息发送失败返回到队列中, yml需要配置 publisher-returns: true
rabbitTemplate.setMandatory(true);
//消息消费者确认收到消息后,手动ack回执
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnCallback(this);
//发送消息
rabbitTemplate.convertAndSend(exchange,routingKey,msg);
}
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.info("---- returnedMessage ----replyCode=" replyCode " replyText=" replyText " ");
}
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
log.info("---- confirm ----ack=" ack " cause=" String.valueOf(cause));
log.info("correlationData -->" correlationData.toString());
if(ack){
log.info("---- confirm ----ack==true cause=" cause);
}else{
log.info("---- confirm ----ack==false cause=" cause);
}
}
}
除了定义好了消息发送的工具服务接口外,我们还需要一个类,实现请求时产生消息,所以我们写一个controller。
代码语言:javascript复制import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import cn.linkpower.service.IMessageServcie;
@Controller
public class SendMessageTx {
@Autowired
private IMessageServcie messageServiceImpl;
@RequestMapping("/sendMoreMsgTx")
@ResponseBody
public String sendMoreMsgTx(){
//发送10条消息
for (int i = 0; i < 10; i ) {
String msg = "msg" i;
System.out.println("发送消息 msg:" msg);
messageServiceImpl.sendMessage("directExchangeTx", "directQueueTxRoutingKey", msg);
//每两秒发送一次
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return "send ok";
}
}
运行springboot项目,访问指定的url,是可以观察到消息产生和消费的。
有些人会问,写到这里就够了吗,你这和之前博客相比,和没写一样啊,都是教我们如何配置,如何生产消息,如何消费消息。
所以接下来的才是重点了,我们一起研究一个事,当我们配置的消费者二出现消费消息时,出问题了,你如何能够保证像之前那样,消费者一处理剩下的消息?
基于 Spring Cloud Alibaba Gateway Nacos RocketMQ Vue & Element 实现的后台管理系统 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能
- 项目地址:https://gitee.com/zhijiantianya/yudao-cloud
- 视频教程:https://doc.iocoder.cn/video/
三、ack配置和测试
3.1、模拟消费者二出问题
我们发送的消息格式都是 msg1、msg2、…
所以,我们不妨这么想,当我消费者二拿到的消息msg后面的数字大于3,表示我不要了。
代码语言:javascript复制import java.io.IOException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import com.rabbitmq.client.Channel;
@Component
@RabbitListener(queues="directQueueTx")
public class Consumer2 {
@RabbitHandler
public void process(String msg,Channel channel, Message message) throws IOException {
//拿到消息延迟消费
try {
Thread.sleep(1000*3);
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
if(!isNull(msg)){
String numstr = msg.substring(3);
Integer num = Integer.parseInt(numstr);
if(num >= 3){
channel.basicNack(message.getMessageProperties().getDeliveryTag(),
false, true);
System.out.println("get msg2 basicNack msg = " msg);
}else{
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
System.out.println("get msg2 basicAck msg = " msg);
}
}
} catch (Exception e) {
//消费者处理出了问题,需要告诉队列信息消费失败
channel.basicNack(message.getMessageProperties().getDeliveryTag(),
false, true);
System.err.println("get msg2 failed msg = " msg);
}
}
public static boolean isNull(Object obj){
return obj == null || obj == ""||obj == "null";
}
}
再次请求接口,我们统计日志信息打印发现:
发现:
当我们对消息者二进行限制大于等于3时,不接受消息队列传递来的消息时,消息队列会随机重发那条消息,直至消息发送至完好的消费者一时,才会把消息消费掉。
四、分析几个回执方法
4.1、确认消息
代码语言:javascript复制channel.basicAck(long deliveryTag, boolean multiple);
我们一般使用下列方式:
代码语言:javascript复制channel.basicAck(
message.getMessageProperties().getDeliveryTag(),
false);
4.2、拒绝消息
代码语言:javascript复制channel.basicNack(long deliveryTag, boolean multiple, boolean requeue) ;
我们接下来还是修改消费者二,将这个方法最后个参数更改为false,看现象是什么?
代码语言:javascript复制import java.io.IOException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import com.rabbitmq.client.Channel;
@Component
@RabbitListener(queues="directQueueTx")
public class Consumer2 {
@RabbitHandler
public void process(String msg,Channel channel, Message message) throws IOException {
//拿到消息延迟消费
try {
Thread.sleep(1000*3);
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
if(!isNull(msg)){
String numstr = msg.substring(3);
Integer num = Integer.parseInt(numstr);
if(num >= 3){
channel.basicNack(message.getMessageProperties().getDeliveryTag(),
false, false);
System.out.println("get msg2 basicNack msg = " msg);
}else{
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
System.out.println("get msg2 basicAck msg = " msg);
}
}
} catch (Exception e) {
//消费者处理出了问题,需要告诉队列信息消费失败
channel.basicNack(message.getMessageProperties().getDeliveryTag(),
false, true);
System.err.println("get msg2 failed msg = " msg);
}
}
public static boolean isNull(Object obj){
return obj == null || obj == ""||obj == "null";
}
}
重启项目,重新请求测试接口。
发现,当出现设置参数为false时,也就是如下所示的设置时:
代码语言:javascript复制channel.basicNack(
message.getMessageProperties().getDeliveryTag(),
false,
false);
如果此时消费者二出了问题,这条消息不会重新回归队列中重新发送,会丢失这条数据。
并且再消息队列中不会保存:
4.3、拒绝消息
代码语言:javascript复制channel.basicReject(long deliveryTag, boolean requeue);
这个和上面的channel.basicNack
又有什么不同呢?我们还是修改消费者二实验下。
请求测试接口,查看日志信息。
发现,此时的日志信息配置
代码语言:javascript复制channel.basicReject(
message.getMessageProperties().getDeliveryTag(),
true);
和
代码语言:javascript复制channel.basicNack(
message.getMessageProperties().getDeliveryTag(),
false, true);
实现的效果是一样的,都是将信息拒绝接收,由于设置的requeue为true,所以都会将拒绝的消息重新入队列中,重新进行消息分配并消费。
五、总结
这一篇博客,我们总结了相关的配置,三个确认(或回执)信息的方法,并区别了他们的各项属性,也知道了当消息再一个消费者中处理失败了,如何不丢失消息重新进行消息的分配消费问题。
但是这个只是队列和消费者之间的消息确认机制,使用手动ACK方式确保消息队列中的消息都能在消费者中成功消费。那么,消息转发器和消息队列之间呢?消息生产者和消息转发器之间呢?
当然,差点忘了一个小问题。
我们思考一个问题,如果消息队列对应的消费者只有一个,并且那个消费者炸了,会出现什么问题呢??
欢迎加入我的知识星球,一起探讨架构,交流源码。加入方式,长按下方二维码噢:
已在知识星球更新源码解析如下:
最近更新《芋道 SpringBoot 2.X 入门》系列,已经 101 余篇,覆盖了 MyBatis、Redis、MongoDB、ES、分库分表、读写分离、SpringMVC、Webflux、权限、WebSocket、Dubbo、RabbitMQ、RocketMQ、Kafka、性能测试等等内容。
提供近 3W 行代码的 SpringBoot 示例,以及超 4W 行代码的电商微服务项目。
获取方式:点“在看”,关注公众号并回复 666 领取,更多内容陆续奉上。
代码语言:javascript复制文章有帮助的话,在看,转发吧。谢谢支持哟 (*^__^*)