消息重复消费 消息重复消费的问题 第一种情况是发送时消息重复,当一条消息已被成功发送到服务端并完成持久化,此时出现了网络抖动或者客户端宕机,导致服务端对客户端应答失败。 如果此时生产者意识到消息发送失败并尝试再次发送消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。
第二种情况是投递时消息重复,消息消费的场景下,消息已投递到消费者并完成业务处理,当客户端给服务端反馈应答的时候网络闪断。 为了保证消息至少被消费一次,tMQ 的服务端将在网络恢复后再次尝试投递之前已被处理过的消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。
第三种情况是负载均衡时消息重复,比如网络抖动、Broker 重启以及订阅方应用重启,当MQ的Broker或客户端重启、扩容或缩容时,会触发Rebalance,此时消费者可能会收到重复消息。
那么怎么解决消息重复消费的问题呢?就是对消息进行幂等性处理。
在MQ中,是无法保证每个消息只被投递一次的,因为网络抖动或者客户端宕机等其他因素,基本都会配置重试机制,所以要在消费者端的业务上做消费幂等处理,MQ的每条消息都有一个唯一的MessageId,这个参数在多次投递的过程中是不会改变的,业务上可以用这个MessageId加上业务的唯一标识来作为判断幂等的关键依据,例如订单ID。而这个业务标识可以使用Message的Key来进行传递。消费者获取到消息后先根据id去查询redis/db是否存在该消息,如果不存在,则正常消费,消费完后写入redis/db。如果存在,则证明消息被消费过,直接丢弃。
消息顺序 消息顺序的问题,如果发送端配置了重试机制,mq不会等之前那条消息完全发送成功,才去发送下一条消息,这样可能会出现发送了1,2,3条消息,但是第1条超时了,后面两条发送成功,再重试发送第1条消息,这时消息在broker端的顺序就是2,3,1了。RocketMQ消息有序要保证最终消费到的消息是有序的,需要从Producer、Broker、Consumer三个步骤都保证消息有序才行。在发送者端:在默认情况下,消息发送者会采取Round Robin轮询方式把消息发送到不同的分区队列,而消费者消费的时候也从多个MessageQueue上拉取消息,这种情况下消息是不能保证顺序的。而只有当一组有序的消息发送到同一个MessageQueue上时,才能利用MessageQueue先进先出的特性保证这一组消息有序。而Broker中一个队列内的消息是可以保证有序的。在消费者端:消费者会从多个消息队列上去拿消息。这时虽然每个消息队列上的消息是有序的,但是多个队列之间的消息仍然是乱序的。消费者端要保证消息有序,就需要按队列一个一个来取消息,即取完一个队列的消息后,再去取下一个队列的消息。而给consumer注入的MessageListenerOrderly对象,在RocketMQ内部就会通过锁队列的方式保证消息是一个一个队列来取的。MessageListenerConcurrently这个消息监听器则不会锁队列,每次都是从多个Message中取一批数据,默认不超过32条。因此也无法保证消息有序。RocketMQ 在默认情况下不保证顺序,要保证全局顺序,需要把 Topic 的读写队列数设置为 1,然后生产者和消费者的并发设置也是 1,不能使用多线程。所以这样的话高并发,高吞吐量的功能完全用不上。全局有序就是无论发的是不是同一个分区,我都可以按照你生产的顺序来消费。分区有序就只针对发到同一个分区的消息可以顺序消费。kafka保证全链路消息顺序消费,需要从发送端开始,将所有有序消息发送到同一个分区,然后用一个消费者去消费,但是这种性能比较低,可以在消费者端接收到消息后将需要保证顺序消费的几条消费发到内存队列(可以搞多个),一个内存队列开启一个线程顺序处理消息。RabbitMq没有属性设置消息的顺序性,不过我们可以通过拆分为多个queue,每个queue由一个consumer消费。或者一个queue对应一个consumer,然后这个consumer内部用内存队列做排队,然后分发给底层不同的worker来处理,保证消息的顺序性。