一、消息队列介绍
消息队列中间件是分布式系统中重要的组件,主要解决应用耦合、异步消息、流量削锋等问题,具有高扩展性、可恢复性、送达保证、顺序保证等特点,可以实现高性能、高可用、可伸缩和最终一致性架构。目前在生产环境,使用较多的消息队列有ActiveMQ、RabbitMQ、ZeroMQ、Kafka、MetaMQ、RocketMQ、Redis等。
二、消息队列应用场景
2.1 异步处理
串行方式
将注册信息写入数据库成功后,发送注册邮件,然后发送注册短信,而所有任务执行完成后,返回信息给客户端。
串行
并行方式
将注册信息写入数据库成功后,同时进行发送注册邮件和发送注册短信的操作。而所有任务执行完成后,返回信息给客户端。同串行方式相比,并行方式可以提高执行效率,减少执行时间。
并行
上面的比较可以发现,假设三个操作均需要50ms的执行时间,排除网络因素,则最终执行完成,串行方式需要150ms,而并行方式需要100ms。
因为cpu在单位时间内处理的请求数量是一致的,假设:CPU每1秒吞吐量是100次,则串行方式1秒内可执行的请求量为1000/150,不到7次;并行方式1秒内可执行的请求量为1000/100,为10次。
引入消息队列,异步处理
根据上述的流程,用户的响应时间基本相当于将用户数据写入数据库的时间,发送注册邮件、发送注册短信的消息在写入消息队列后,即可返回执行结果,写入消息队列的时间很快,几乎可以忽略,也有此可以将系统吞吐量提升至20QPS,比串行方式提升近3倍,比并行方式提升2倍。
2.2 应用解耦
传统的做法为:订单系统调用库存系统的接口。如下图所示:
传统方式
传统方式具有如下缺点:
1.假设库存系统访问失败,则订单减少库存失败,导致订单创建失败
2.订单系统同库存系统过度耦合
引入消息队列,应用解耦
订单系统:用户下单后,订单系统进行数据持久化处理,然后将消息写入消息队列,返回订单创建成功
库存系统:使用拉/推的方式,获取下单信息,库存系统根据订单信息,进行库存操作。
假如在下单时库存系统不能正常使用。也不影响正常下单,因为下单后,订单系统写入消息队列就不再关心其后续操作了。由此实现了订单系统与库存系统的应用解耦。
2.3 流量削锋
流量削锋也是消息队列中的常用场景,一般在秒杀或团抢活动中使用广泛。
1、可以控制参与活动的人数;
2、可以缓解短时间内高流量对应用的巨大压力。
流量削锋处理方式系统图如下:
引入消息队列,流量削锋
1、服务器在接收到用户请求后,首先写入消息队列。这时如果消息队列中消息数量超过最大数量,则直接拒绝用户请求或返回跳转到错误页面;
2、秒杀业务根据秒杀规则读取消息队列中的请求信息,进行后续处理。
2.4 日志处理
日志处理是指将消息队列用在日志处理中,比如Kafka的应用,解决大量日志传输的问题。架构简化如下:
引入消息队列,日志处理
日志采集客户端:负责日志数据采集,定时写受写入Kafka队列; Kafka消息队列:负责日志数据的接收,存储和转发; 日志处理应用:订阅并消费kafka队列中的日志数据;
推荐案例:
http://cloud.51cto.com/art/201507/484338.htm
使用了Kafka,E(Elasticsearch)L(Logstash)K(Kibana)
2.5 消息通讯
点对点通讯
点对点通讯架构设计
在点对点通讯架构设计中,客户端A和客户端B共用一个消息队列,即可实现消息通讯功能。
聊天室通讯
聊天室架构设计
客户端A、客户端B、直至客户端N订阅同一消息队列,进行消息的发布与接收,即可实现聊天通讯方案架构设计。
三、 Kafka
Kafka是一种分布式的,基于发布/订阅的消息系统。
主要设计目标
1、以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上数据也能保证常数时间的访问性能
2、高吞吐率。即使在非常廉价的商用机器上也能做到单机支持每秒100K条消息的传输
3、支持Kafka Server间的消息分区,及分布式消费,同时保证每个partition内的消息顺序传输
4、同时支持离线数据处理和实时数据处理
Kafka架构:
kafka架构图
如上图所示,一个典型的kafka集群中包含若干producer(可以是web前端产生的page view,或者是服务器日志,系统CPU、memory等),若干broker(Kafka支持水平扩展,一般broker数量越多,集群吞吐率越高),若干consumer group,以及一个Zookeeper集群。Kafka通过Zookeeper管理集群配置,选举leader,以及在consumer group发生变化时进行rebalance。producer使用push模式将消息发布到broker,consumer使用pull模式从broker订阅并消费消息。
安装配置(windows,java 8):
由于Kafka依赖zookeeper,需要先安装配置zookeeper
1、zookeeper安装配置
安装包地址:http://apache.fayea.com/zookeeper/current/
下载解压
zookeeper解压目录
进入conf目录配置zoo.cfg文件
zooker配置目录
clientPort:zookeeper服务端口 dataDir:数据保存路径(先创建目录)
进入bin目录,双击zkServer.cmd运行zookeeper
2、kafka安装配置
安装包地址:http://kafka.apache.org/downloads.html
下载解压
kafka解压目录
进入config目录,配置server.properties
zookeeper.connect:zookeep 服务地址 listeners:kafka服务端口 advertised.listeners:Kafka安装以后默认只能Localhost访问,外网访问需要在kafka的配置文件中加上:'advertised.listeners=PLAINTEXT://IP:PORT',IP是服务器的公网IP log.dirs:日志地址 num.partitions: partition数量
cmd执行
.binwindowskafka-server-start.bat.configserver.properties
基本概念知识:
Topic:Kafaka中的消息分类是以Toptic来分类的,一个Consumer只能接收到相同Toptic发送的消息并进行处理 Group:任何一个Consumer都有一个组,一个Toptic的消息会发送到连接Kafaka的所有的组里面,相当于消息是广播 对于一条消息,同一个组里面的Consumer只有一个能够同时接收处理。相当于队列和容灾。Group的机制能够完成广播、队列以及广播队列组合的高可用方案 SpringBoot配置:'spring.kafka.consumer.auto-offset-reset=earliest'保证一个组服务器down掉了,重启后也能接收并处理在down掉期间发送给kafka的消息,否则只能接收正常服务期间发送的消息
ConsumerRebalance
Kafka保证同一consumer group中只有一个consumer会消费某条消息,实际上,Kafka保证的是稳定状态下每一个consumer实例只会消费某一个或多个特定partition的数据,而某个partition的数据只会被某一个特定的consumer实例所消费。这样设计的劣势是无法让同一个consumer group里的consumer均匀消费数据,优势是每个consumer不用都跟大量的broker通信,减少通信开销,同时也降低了分配难度,实现也更简单。另外,因为同一个partition里的数据是有序的,这种设计可以保证每个partition里的数据也是有序被消费。
如果某consumer group中consumer数量少于partition数量,则至少有一个consumer会消费多个partition的数据 如果consumer的数量与partition数量相同,则正好一个consumer消费一个partition的数据(如果某个consumer down掉,则至少有一个consumer会消费down掉的consumer的数据) 而如果consumer的数量多于partition的数量时,会有部分consumer无法消费该topic下任何一条消息。(这也是单机一个Partition的kafka测试多个Consumer,只有一个能接收消息的原因),这种情况下,多余的consumer只能作为容灾,达不到负载均衡的效果 综上,consumer如果需要集群提高负载,则采用2的方式进行部署,既可以达到容灾,也可以达到一定的负载均衡