消息队列(一)

2020-11-25 15:10:10 浏览数 (1)

消息队列,即MQ,是典型的生产者、消费者模型。生产者不断生成消息添加到队列中,消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入,这样就实现了生产者和消费者的解耦。

为什么使用消息队列?

主要用在三个方面:异步解耦削峰

  • 异步:例如A系统要发送一个请求给B系统进行处理,由于B系统处理这个请求需要很长时间,那么A系统就要等待B系统处理完毕后再发送下个请求,造成A系统资源浪费。使用消息队列后,A系统生产完消息后直接丢进消息队列就完成了请求,继续处理下个请求,B系统只要从消息队列里获取消息进行处理即可。
  • 解耦:假如现在的业务场景是A系统发送数据到B、C、D三个系统,通过接口调用发送。如果现在又有E系统也需要A系统发送数据,那么A系统就要修改代码,刚修改完,C系统因需求变更又不需要A发送数据了,A又要修改,这种错综复杂的系统严重耦合,A系统还要时刻考虑数据接收方是否是健康的系统,如果挂掉了要怎么办等等问题。如果使用MQ,A系统生成一条数据,发送到MQ里面去,哪个系统需要数据就自己从MQ里获取数据即可,如果新增系统需要数据,就可以直接从MQ里消费;如果哪个系统因业务变更不需要了,直接取消对MQ的消费即可。
  • 削峰:例如一个系统每天大部分时间的请求只有每秒50个,但是每天高峰期可以达到每秒10000个请求,系统每秒最多只能,处理1000个请求,如果直接访问会导致系统崩溃,用户也不能再使用系统。这就造成了低峰期毫无压力,高峰期系统扛不住。如果使用MQ,把所有的请求写入MQ,系统再从MQ里慢慢拉取请求,只要拉取的速度不超过自己最大处理能力即可,这样哪怕高峰期也不会挂掉。

消息队列的优缺点:

优点:异步、解耦、削峰

缺点:

  1. 系统的可用性降低,系统引入外部的依赖越多,系统越复杂也导致越容易挂掉。
  2. 系统的复杂性,加入了MQ,还要保证消息不能重复消费,消息不能丢失,已经消息传递的顺序等等一系列问题都要考虑。
  3. 一致性问题,一般发送消息到消息队列就直接返回请求成功了,如果消息堆积,或者消息在处理时出现了问题,这就造成了数据的不一致性,也要有相应 处理。

AMQP和JMS

AMQP和JMS是学习消息队列很难绕开的,先看一下它们是什么意思。

  • AMQP:即Advanced Message Queuing Protocol ,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开发标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端中间件不同产品、不同的开发语言等条件的限制。
  • JMS:通常讲的 JMS(Java MessageService)实际上是JMS API。由sun公司早期提出的消息标准,是为了Java应用提供统一的消息操作,包括create、send、receive等。

两者的区别:

  1. JMS是定义了统一的接口,来对消息操作进行统一;AMQP是通过规定协议来统一数据交互的格式。
  2. JMS限定了必须使用Java语言;AMQP只是协议,不规定实现的语言,因此是跨语言的。
  3. JMS规定了两种消息模型;AMQP的消息模型更加丰富。

常见的几个MQ:

  • ActiveMQ:基于JMS
  • RabbitMQ:基于AMQP协议,erlang语言开发,稳定性好
  • RocketMQ:基于JMS,阿里开发,目前由Apache基金会维护
  • Kafka:分布式消息系统,高吞吐量

常见的几个MQ的对比:

  • 吞吐量:ActiveMQ和RabbitMQ的吞吐量是万级的,RocketMQ和Kafka是十万级的。
  • 可用性:都可以实现高可用,ActiveMQ和RabbitMQ是基于主从架构实现高可用性。RocketMQ基于分布式架构 。Kafka也是分布式的,一个数据多个副本,少数宕机不会丢失数据。
  • 时效性:RabbitMQ的性能极好,延时很低可以达到微秒级,其他三个都是毫秒级
  • 功能支持:除了Kafka,其他三个都功能完备,Kafka功能比较简单,只支持主要的MQ功能。
  • 消息丢失:ActiveMQ和RabbitMQ丢失的可能性非常低,RocketMQ和Kafka理论上不会丢失

如何保证消息不被重复消费:

要结合具体的业务进行处理,一般有三个思路:

  1. 如果是将数据写库,可以先根据主键查,如果查到了就不做插入操作,做一个update操作
  2. 如果写的是写入redis,可以直接操作,因为redis每次都是set
  3. 如果不是上述两种,要做复杂操作,在每个生产者生成的消息中加入一个全局唯一的id,每次消息到了之后,先根据这个id去redis里查一次,如果消费过了就不做处理,如果没有消费过就处理,然后将这个id写入redis。

处理消息丢失问题:

可能出现消息丢失的三个地方分别是生产者、消息队列、消费者。以RabbitMQ为例来说明。

  • 生产者丢失数据,主要是发生在将消息发送给RabbitMQ的时候,可能在发送的过程中出现网络问题导致消息丢失。RabbitMQ提供了两种方式处理这个问题,分别是RabbitMQ事务和confirm机制。先说RabbitMQ事务,就说在生产者发送消息之前开启事务,然后发送消息,如果RabbitMQ没有接收到,生产者就会报异常回滚事务,然后重新发送,如果收到了消息就提交事务,这样做会导致吞吐量下降,对性能的消耗也比较大,一般不用这种方式。再说confirm模式,在生产者那里设置confirm模式后,RabbitMQ会回传一个ack消息,告诉这个消息接收到了,如果没有接收到了会回调一个nck接口,告诉消息接收失败,可以再次重试。还可以结合这个机制在内存中维护每个消息id的状态,如果超过一定时间没有接收到这个消息的回调可以重发。

  事务机制和confirm机制的区别在于事务机制是同步的,开启了事务会阻塞,但是confirm是异步的,对下一个消息的发送没有影响,所以一般在生产者这块防止数据丢失,都是使用confirm机制

  • RabbitMQ丢失消息,一般RabbitMQ有消息持久化机制,可以将消息写入磁盘,哪怕RabbitMQ挂了也可以读取磁盘中存储的消息然后恢复。很少见的是还没持久化的时候RabbitMQ挂了,导致少量数据丢失。设置持久化有两步操作,第一步,创建queue的时候将其设置为持久化,这样可以保证RabbitMQ持久化queue的元数据,但是不会持久化queue里的数据;第二步,发送消息的时候将deliveryMode设置为2,就是将消息设置为持久化,RabbitMQ就会将消息持久化到磁盘上。同时设置两个持久化可以保证哪怕RabbitMQ挂了重新启动也可以恢复queue,恢复queue里的数据。持久化可以结合confirm机制,只有消息被持久化之后才会通知生产者ack,保证消息一定可以写入磁盘。
  • 消费者丢失了消息,就是消费者刚从消息队列中获取消息,还未处理,消费者挂了,这种情况可以关闭RabbitMQ的自动ack机制,通过手动维护,在消息处理完后的时候,在程序里ack。这样如果还没处理完消费者挂了,RabbitMQ会将这个消息重新分发。

0 人点赞