abbitMq 技术文档
AMQP简介
AMQP即Advanced Message Queuing Protocol,高级消息队列协议,是面向消息中间件设计的应用层协议的一个开放标准。它的主要特点是面向消息、队列、路由(包括点对点和发布/订阅)]、可靠性和安全。 AMQP允许来自不同供应商的消息生产者和消费者实现真正的互操作扩展,就如同SMTP、HTTP、FTP等协议采用的方式一样。而此前对于消息中间件的标准化努力则集中在API层面上(比如JMS),且没有提供互操作性的途径。不同于JMS的仅仅定义API,AMQP是一个线路级的协议——它描述了通过网络传输的字节流的数据格式。因此,遵从这个协议的任何语言编写的工具均可以操作AMQP消息。 AMQP模型
AMQP的实现
1)OpenAMQ AMQP的开源实现,用C语言编写,运行于Linux、AIX、Solaris、Windows、OpenVMS。
2)Apache Qpid Apache的开源项目,支持C 、Ruby、Java、JMS、Python和.NET。
3)Redhat Enterprise MRG 实现了AMQP的最新版本0-10,提供了丰富的特征集,比如完全管理、联合、Active-Active集群,有Web控制台,还有许多企业级特征,客户端支持C 、Ruby、Java、JMS、Python和.NET。
4)RabbitMQ 一个独立的开源实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。RabbitMQ发布在Ubuntu、FreeBSD平台。
5)AMQP Infrastructure Linux下,包括Broker、管理工具、Agent和客户端。
6)Zyre 是一个Broker,实现了RestMS协议和AMQP协议,提供了RESTful HTTP访问网络AMQP的能力。
RabbitMQ简介
RabbitMQ是一个遵循AMQP协议的消息中间件,它从生产者接收消息并递送给消费者,在这个过程中,根据规则进行路由,缓存与持久化。
概念说明
代码语言:javascript复制 Broker:简单来说就是消息队列服务器实体。
Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
Queue:消息队列载体,每个消息都会被投入到一个或多个队列。
Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来。
Routing Key:路由关键字,exchange根据这个关键字进行消息投递。
vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。
producer:消息生产者,就是投递消息的程序。
consumer:消息消费者,就是接受消息的程序。
channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。
消息队列的使用过程
代码语言:javascript复制 (1)客户端连接到消息队列服务器,打开一个channel。
(2)客户端声明一个exchange,并设置相关属性。
(3)客户端声明一个queue,并设置相关属性。
(4)客户端使用routing key,在exchange和queue之间建立好绑定关系。
(5)客户端投递消息到exchange。
exchange接收到消息后,就根据消息的key和已经设置的binding,进行消息路由,将消息投递到一个或多个队列里。 exchange也有几个类型,完全根据key进行投递的叫做Direct交换机,例如,绑定时设置了routing key为"abc",那么客户端提交的消息,只有设置了key为"abc"的才会投递到队列。对key进行模式匹配后进行投递的叫做Topic交换机,符号"#"匹配一个或多个词,符号"*"匹配正好一个词。例如"abc.#"匹配"abc.def.ghi","abc.*"只匹配"abc.def"。还有一种不需要key的,叫做Fanout交换机,它采取广播模式,一个消息进来时,投递到与该交换机绑定的所有队列。
RabbitMQ的特性
代码语言:javascript复制 可靠性:包括消息持久化,消费者和生产者的消息确认。
灵活路由:遵循AMQP协议,支持多种Exchange类型实现不同路由策略
分布式:集群的支持,包括本地网络与远程网络
高可用性:支持主从备份与镜像队列
多语言支持:支持多语言的客户端
WEB界面管理:可以管理用户权限,exhange,queue,binding,与实时监控
访问控制:基于vhosts实现访问控制
调试追踪:支持tracing,方便调试
RabbitMQ使用向导
官网提供的几种工作方式(教程)
Hello World
工作队列
发布/订阅
路由选择 (Routing)
主题(Topic)
RPC
消息的可靠传递
连接失败的处理
RabbitMQ不支持连接的failover,所以需要客户端自己实现失败重连。
服务器的可靠性
为保证消息的可靠传递,服务器使用持久化保证消息不丢失。包括exchange与queue必须定义为持久的,同时发送消息时,也要设置消息为持久消息。 在代码中可以通过以下语句设置发送持久消息:
代码语言:javascript复制 channel.basicPublish(exchangeName, routeKey,MessageProperties.PERSISTENT_TEXT_PLAIN,msg)
或者:
BasicProperties basicProperties = new AMQP.BasicProperties.Builder().deliveryMode(2).build(); // deliveryMode为1是非持久
channel.basicPublish(exchangeName, routeKey, basicProperties, msg)
生产者的可靠性
生产者的消息确认叫做confirm,confirm确保消息已经发送到MQ中。当connection或channel异常时,会重新发送消息,如果消息是持久的,并不能一定保证消息持久化到磁盘中,因为消息可能存在与磁盘的缓存中。为进一步提高可靠性,可以使用事务。Confirm与事务不能同时使用。当生产者收不到confirm时,消息可能会重复,所以如果消息不允许重复,则消费者需要自己实现消息去重。 使用以下代码打开confirm,默认是关闭的
代码语言:javascript复制 channel.confirmSelect();
消费者的可靠性
消费者的消息确认叫做Acknowledgements,Acknowledgements确保消费者已经处理了消息,如果收不到消费者的Acknowledgements,MQ会重新发送消息。默认Acknowledgements是自动确认,如需客户端控制,在消费者的代码中设置:
代码语言:javascript复制 channel.basicConsume(queueName,false,consumer);//声明队列时,设置autoack为false
。。。
//消息处理代码
。。。
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); //发送确认
同样,MQ也可能收不到消费者的Acknowledgements,就会重复发送消息,若要避免,消费者需要自己实现消息去重。
分布式
RabbitMQ的集群节点包括内存节点、磁盘节点。顾名思义内存节点就是将所有数据放在内存,磁盘节点将数据放在磁盘。不过,如果在投递消息时,打开了消息的持久化,那么即使是内存节点,数据还是安全的放在磁盘。 Rabbitmq集群大概分为二种方式:
代码语言:javascript复制 (1)普通模式:默认的集群模式。
(2)镜像模式:把需要的队列做成镜像队列。
集群中有两种节点:
代码语言:javascript复制 (1)内存节点:只保存状态到内存(一个例外的情况是:持久的queue的持久内容将被保存到磁盘)
(2)磁盘节点:保存状态到内存和磁盘。
内存节点虽然不写入磁盘,但是它执行比磁盘节点要好。集群中,只需要一个磁盘节点来保存状态 就足够了如果集群中只有内存节点,那么不能停止它们,否则所有的状态,消息等都会丢失。
良好的设计架构可以如下:在一个集群里,有3台以上机器,其中1台使用磁盘模式,其它使用内存模式。其它几台为内存模式的节点,无疑速度更快,因此客户端(consumer、producer)连接访问它们。而磁盘模式的节点,由于磁盘IO相对较慢,因此仅作数据备份使用。
普通模式
默认的集群模式,queue创建之后,如果没有其它策略,则queue就会按照普通模式集群。对于Queue来说,消息实体只存在于其中一个节点,A、B两个节点仅有相同的元数据,即队列结构,但队列的元数据仅保存有一份,即创建该队列的rabbitmq节点(A节点),当A节点宕机,你可以去其B节点查看,./rabbitmqctl list_queues 发现该队列已经丢失,但声明的exchange还存在。当消息进入A节点的Queue中后,consumer从B节点拉取时,RabbitMQ会临时在A、B间进行消息传输,把A中的消息实体取出并经过B发送给consumer,所以consumer应平均连接每一个节点,从中取消息。该模式存在一个问题就是当A节点故障后,B节点无法取到A节点中还未消费的消息实体。如果做了队列持久化或消息持久化,那么得等A节点恢复,然后才可被消费,并且在A节点恢复之前其它节点不能再创建A节点已经创建过的持久队列;如果没有持久化的话,消息就会失丢。这种模式更适合非持久化队列,只有该队列是非持久的,客户端才能重新连接到集群里的其他节点,并重新创建队列。假如该队列是持久化的,那么唯一办法是将故障节点恢复起来。
为什么RabbitMQ不将队列复制到集群里每个节点呢?这与它的集群的设计本意相冲突,集群的设计目的就是增加更多节点时,能线性的增加性能(CPU、内存)和容量(内存、磁盘),理由如下:
代码语言:javascript复制 1。存储空间:如果每个集群节点每个队列的一个完整副本,增加节点需要更多的存储容量。例如,如果一个节点可以存储1 gb的消息,添加两个节点需要两份相同的1gb的消息
2。性能:发布消息需要将这些信息复制到每个集群节点。对持久消息,要求为每条消息触发磁盘活动在所有节点上。每次添加一个节点都会带来 网络和磁盘的负载。
当然RabbitMQ新版本集群也支持队列复制(有个选项可以配置)。比如在有五个节点的集群里,可以指定某个队列的内容在2个节点上进行存储,从而在性能与高可用性之间取得一个平衡(应该就是指镜像模式)。
镜像模式
把需要的队列做成镜像队列,存在于多个节点,属于RabbitMQ的HA方案(镜像模式是在普通模式的基础上,增加一些镜像策略)。 该模式解决了上述问题,其实质和普通模式不同之处在于,消息实体会主动在镜像节点间同步,而不是在consumer取数据时临时拉取。该模式带来的副作用也很明显,除了降低系统性能外,如果镜像队列数量过多,加之大量的消息进入,集群内部的网络带宽将会被这种同步通讯大大消耗掉。所以在对可靠性要求较高的场合中适用,一个队列想做成镜像队列,需要先设置策略,然后客户端创建队列的时候,rabbitmq集群根据“队列名称”自动设置是普通集群模式或镜像队列。 具体如下: 队列通过策略能实现镜像。策略能在任何时刻改变,rabbitmq队列也近可能的将队列随着策略变化而变化;非镜像队列和镜像队列之间是有区别的,前者缺乏额外的镜像基础设施,没有任何追随者,因此会运行得更快。为了使队列称为镜像队列,你将会创建一个策略来匹配队列,设置策略有两个键“ha-mode和 ha-params(可选)”。ha-params根据ha-mode设置不同的值,下面表格说明这些key的选项:
集群部署
镜像模式配置
流量控制
基于连接的流量控制
当生产者发送消息的速率大于消息被路由到queue的速率时,会触发流量控制,发送速率受到限制,但不会完全阻塞。
基于内存的流量控制
当内存使用达到vm_memory_high_watermark的值时,会触发流量控制,生产者被阻塞。vm_memory_high_watermark的默认值是系统内存的40%,这个值可以在配置文件中修改。 [{rabbit, [{vm_memory_high_watermark, 0.4}]}].或者在运行时通过命令rabbitmqctlset_vm_memory_high_watermark fraction修改,修改立即生效,但下次重启后恢复。所以要永久修改,必须同时修改配置文件。
基于磁盘的流量控制
当磁盘剩余空间小于disk_free_limit的值时,触发流量控制,生产者被阻塞。 disk_free_limit的默认值是1GB,可在配置文件中修改。[{rabbit, [{disk_free_limit, 25000000000}]}].
内存使用
配置管理
RabbitMQ的默认配置在大部分情况下是最佳配置,如果服务运行良好,不需要修改。 RabbitMQ支持3种方式修改配置:环境变量、配置文件、运行时参数与策略。 环境变量可以配置到shell环境变量中,也可以在RabbitMQ的环境变量中配置。例如:配置服务绑定IP,可以在shell环境变量里配置RABBITMQ_NODE_IP_ADDRESS的值,也可以在RabbitMQ的环境变量中配置NODE_IP_ADDRESS的值,即RabbitMQ的环境变量中变量名称要去掉RABBITMQ_。RabbitMQ的环境变量文件在$RABBITMQ_HOME/sbin/rabbitmq-env。 配置的优先级为shell环境变量优先于RabbitMQ的环境变量,RabbitMQ的环境变量优先于RabbitMQ默认的环境变量。 通过配置文件配置,要先在环境变量中指定配置文件路径,例如:
代码语言:javascript复制 CONFIG_FILE=/etc/rabbitmq/rabbitmq.config
然后添加配置,例如:
[
{mnesia, [{dump_log_write_threshold, 1000}]},
{rabbit, [{tcp_listeners, [5673]}]}
].
通过rabbitmqctl命令可以在运行时修改配置,例如修改vm_memory_high_watermark。还有些配置,比如镜像队列,是通过管理界面或命令配置策略实现的。 详细的配置项请参考 http://www.rabbitmq.com/configure.html
RabbitMq
RabbitMq.. 1
1. AMQP简介... 3
2. AMQP的实现... 3
3. RabbitMQ简介... 4
3.1 概念说明... 4
3.2 消息队列的使用过程... 5
3.3 RabbitMQ的特性... 5
4. RabbitMQ使用向导... 5
4.1 RabbitMQ的安装... 5
4.1.1 安装前准备... 5
4.1.2 安装Erlang. 6
4.1.3 安装RabbitMQ.. 7
4.2 RabbitMQ服务... 7
4.3 RabbitMQ页面监控系统... 7
5 使用客户端程序发送与接收消息... 8
5.1 Hello World. 8
5.2工作队列... 11
5.2.1 初级版本... 11
5.2.2 消息应答(message acknowledgments)... 14
5.2.3 消息持久化(Message durability)... 17
5.2.4 公平转发(Fair dispatch)... 17
5.2.5 最终版本... 19
5.3发布/订阅... 21
5.3.1转发器(Exchanges)... 22
5.3.2匿名转发器(nameless exchange)... 24
5.3.3临时队列(Temporary queues)... 24
5.3.4绑定(Bindings)... 24
5.3.5完整的例子... 25
5.4 路由选择 (Routing)29
5.4.1 绑定(Bindings)... 30
5.4.2 直接转发(Direct exchange)... 30
5.4.3 多重绑定(multiple bindings)... 31
5.4.4 发送日志(Emittinglogs)... 31
5.4.5 订阅... 31
5.4.6 完整的实例... 32
5.6 主题(Topic)... 35
5.6.1 主题转发(Topic Exchange)... 35
5.6.2 图解... 36
5.6.3 完整的例子... 36
5.7 RPC. 40
5.7.1 RPC 工作流程... 41
5.7.2 AMQP协议为消息预定义了14种属性... 41
5.7.3 实例... 42
6.消息的可靠传递... 46
6.1连接失败的处理... 46
6.2服务器的可靠性... 46
6.3生产者的可靠性... 46
6.4消费者的可靠性... 46
7. 分布式... 47
7.1 普通模式... 47
7.2 镜像模式... 48
7.2.1 语法讲解... 49
7.2.2 “nodes”策略和迁移master. 49
7.2.3 创建策略例子... 49
7.3 集群部署... 50
7.3.1 3台机器如下(通过/etc/sysconfig/network修改主机名):... 51
7.3.2 部署RabbitMq,并可以正常启动... 52
7.3.3 设置每个节点Cookie. 52
7.3.4 使用detached参数独立运行启动服务... 52
7.3.5 设置内存节点及内存节点连接磁盘节点... 52
7.3.6 运行cluster_status命令查看集群状态... 53
7.3.7 测试... 53
7.4 镜像模式配置... 53
7.4.1 增加负载均衡器... 53
7.4.2 配置策略... 55
8. 流量控制... 56
8.1基于连接的流量控制... 56
8.2基于内存的流量控制... 57
8.3基于磁盘的流量控制... 57
9. 内存使用... 57
10.配置管理... 57
11.性能... 58
11.1性能测试... 58
11.2队列的性能... 59
11.3 类似产品对比... 59
1. AMQP简介
AMQP即Advanced Message Queuing Protocol,高级消息队列协议,是面向消息中间件设计的应用层协议的一个开放标准。它的主要特点是面向消息、队列、路由(包括点对点和发布/订阅)]、可靠性和安全。
AMQP允许来自不同供应商的消息生产者和消费者实现真正的互操作扩展,就如同SMTP、HTTP、FTP等协议采用的方式一样。而此前对于消息中间件的标准化努力则集中在API层面上(比如JMS),且没有提供互操作性的途径。不同于JMS的仅仅定义API,AMQP是一个线路级的协议——它描述了通过网络传输的字节流的数据格式。因此,遵从这个协议的任何语言编写的工具均可以操作AMQP消息。
AMQP模型
2. AMQP的实现
1)OpenAMQ
AMQP的开源实现,用C语言编写,运行于Linux、AIX、Solaris、Windows、OpenVMS。
2)Apache Qpid
Apache的开源项目,支持C 、Ruby、Java、JMS、Python和.NET。
3)Redhat Enterprise MRG
实现了AMQP的最新版本0-10,提供了丰富的特征集,比如完全管理、联合、Active-Active集群,有Web控制台,还有许多企业级特征,客户端支持C 、Ruby、Java、JMS、Python和.NET。
4)RabbitMQ
一个独立的开源实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。RabbitMQ发布在Ubuntu、FreeBSD平台。
5)AMQP Infrastructure
Linux下,包括Broker、管理工具、Agent和客户端。
6)Zyre
是一个Broker,实现了RestMS协议和AMQP协议,提供了RESTful HTTP访问网络AMQP 的能力。
3. RabbitMQ简介
RabbitMQ是一个遵循AMQP协议的消息中间件,它从生产者接收消息并递送给消费者,在这个过程中,根据规则进行路由,缓存与持久化。
3.1 概念说明
Broker:简单来说就是消息队列服务器实体。
Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
Queue:消息队列载体,每个消息都会被投入到一个或多个队列。
Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来。
Routing Key:路由关键字,exchange根据这个关键字进行消息投递。
vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。
producer:消息生产者,就是投递消息的程序。
consumer:消息消费者,就是接受消息的程序。
channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。
3.2 消息队列的使用过程
(1)客户端连接到消息队列服务器,打开一个channel。
(2)客户端声明一个exchange,并设置相关属性。
(3)客户端声明一个queue,并设置相关属性。
(4)客户端使用routing key,在exchange和queue之间建立好绑定关系。
(5)客户端投递消息到exchange。
exchange接收到消息后,就根据消息的key和已经设置的binding,进行消息路由,将消息投递到一个或多个队列里。
exchange也有几个类型,完全根据key进行投递的叫做Direct交换机,例如,绑定时设置了routing key为"abc",那么客户端提交的消息,只有设置了key为"abc"的才会投递到队列。对key进行模式匹配后进行投递的叫做Topic交换机,符号"#"匹配一个或多个词,符号"*"匹配正好一个词。例如"abc.#"匹配"abc.def.ghi","abc.*"只匹配"abc.def"。还有一种不需要key的,叫做Fanout交换机,它采取广播模式,一个消息进来时,投递到与该交换机绑定的所有队列。
3.3 RabbitMQ的特性
可靠性:包括消息持久化,消费者和生产者的消息确认。
灵活路由:遵循AMQP协议,支持多种Exchange类型实现不同路由策略
分布式:集群的支持,包括本地网络与远程网络
高可用性:支持主从备份与镜像队列
多语言支持:支持多语言的客户端
WEB界面管理:可以管理用户权限,exhange,queue,binding,与实时监控
访问控制:基于vhosts实现访问控制
调试追踪:支持tracing,方便调试
4. RabbitMQ使用向导
4.1 RabbitMQ的安装
4.1.1 安装前准备
Linux系统:centos-6.3
RabbitMQ下载页:http://www.rabbitmq.com/download.html
说明:开始选择的是直接下载对应的版本进行安装,但是出现依赖包的问题没有安装成功。最后选择RabbitMQ Server 下的Installation Guids下的Fedora/RHEL的安装提示页面进行安装的。其他linux系统的可以选择对应的链接页面。参考页面:http://www.rabbitmq.com/install-rpm.html
4.1.2 安装Erlang
根据页面提示说明,在安装RabbitMQ之前需要安装依赖包Erlang,然后在进行安装RabbitMQ
本人选择的是第二个:Install Erlang from Erlang Solutions or
根据说明,需要安装Erlang Solutins 和esl-erlang-compat两个软件包。可以分别点进相应的链接,根据提示进行安装。
安装方法总结如下:
Ⅰ安装Erlang Solutions:
1. # rpm --import http://binaries.erlang-solutions.com/debian/erlang_solutions.asc
2. # wget /etc/yum.repos.d/ http://binaries.erlang-solutions.com/rpm/centos/erlang_solutions.repo
3. # yum install esl-erlang
Ⅱ安装esl-erlang-compat:
1. # cd /tmp/
2. # wget https://raw.github.com/jasonmcintosh/esl-erlang-compat/master/rpmbuild/RPMS/noarch/esl-erlang-compat-R14B-1.el6.noarch.rpm
3. # yum install esl-erlang-compat-R14B-1.el6.noarch.rpm
4.1.3 安装RabbitMQ
1. # cd/tmp
2. # wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.1.3/rabbitmq-server-3.1.3-1.noarch.rpm
3. # yum install rabbitmq-server-3.1.3-1.noarch.rpm
到此,RabbitMQ基本安装完成了。
4.2 RabbitMQ服务
开启:# service rabbitmq-server start
关闭:# service rabbitmq-server stop
重启:# service rabbitmq-server restart
4.3 RabbitMQ页面监控系统
RabbitMQ提供了一个web的监控页面系统,这个系统是以Plugin的方式进行调用的。
在Documentation下的Server下的Management是关于配置这个插件的。地址:http://www.rabbitmq.com/management.html
这个管理插件是包含在RabbitMQ发行包里的,所以只需激活即可。
命令:# rabbitmq-plugins enable rabbitmq_management
浏览器地址栏输入:http://localhost:15672
默认用户名:guest ,密码:guest
登陆后如下图:
5 使用客户端程序发送与接收消息
5.1 Hello World
一个producer发送消息,一个接收者接收消息,并在控制台打印出来。如下图:
1) RabbitMQ是用Erlang,对于主要的编程语言都有驱动或者客户端。我们这里要用的是Java,所以先要获得Java客户端。。下面是Java客户端的maven依赖的配置。
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.0.4</version>
</dependency>
2)发送端:Send.java 连接到RabbitMQ(此时服务需要启动),发送一条数据,然后退出。
- package com.zhy.rabbit._01;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- public class Send
- {
- //队列名称
- private final static String QUEUE_NAME = "hello";
- public static void main(String[] argv) throws java.io.IOException
- {
- /**
- * 创建连接连接到MabbitMQ
- */
- ConnectionFactory factory = new ConnectionFactory();
- //设置MabbitMQ所在主机ip或者主机名
- factory.setHost("localhost");
- //创建一个连接
- Connection connection = factory.newConnection();
- //创建一个频道
- Channel channel = connection.createChannel();
- //指定一个队列
- channel.queueDeclare(QUEUE_NAME, false, false, false, null);
- //发送的消息
- String message = "hello world!";
- //往队列中发出一条消息
- channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
- System.out.println(" [x] Sent '" message "'");
- //关闭频道和连接
- channel.close();
- connection.close();
- }
- }
值得注意的是队列只会在它不存在的时候创建,多次声明并不会重复创建。信息的内容是字节数组,也就意味着你可以传递任何数据。
3) 接收端:Recv.java 不断等待服务器推送消息,然后在控制台输出。
- package com.zhy.rabbit._01;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import com.rabbitmq.client.QueueingConsumer;
- public class Recv
- {
- //队列名称
- private final static String QUEUE_NAME = "hello";
- public static void main(String[] argv) throws java.io.IOException,
- java.lang.InterruptedException
- {
- //打开连接和创建频道,与发送端一样
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("localhost");
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
- //声明队列,主要为了防止消息接收者先运行此程序,队列还不存在时创建队列。
- channel.queueDeclare(QUEUE_NAME, false, false, false, null);
- System.out.println(" [*] Waiting for messages. To exit press CTRL C");
- //创建队列消费者
- QueueingConsumer consumer = new QueueingConsumer(channel);
- //指定消费队列
- channel.basicConsume(QUEUE_NAME, true, consumer);
- while (true)
- {
- //nextDelivery是一个阻塞方法(内部实现其实是阻塞队列的take方法)
- QueueingConsumer.Delivery delivery = consumer.nextDelivery();
- String message = new String(delivery.getBody());
- System.out.println(" [x] Received '" message "'");
- }
- }
- }
分别运行Send.java和Recv.java 顺序无所谓。前提RabbitMQ服务开启。
运行结果:
[x]Sent 'hello world!'
----------------------------------------
[*] Waiting for messages. To exitpress CTRL C
[x] Received 'hello world!'
5.2工作队列
工作队列的主要任务是:避免立刻执行资源密集型任务,然后必须等待其完成。相反地,我们进行任务调度:我们把任务封装为消息发送给队列。工作进行在后台运行并不断的从队列中取出任务然后执行。当你运行了多个工作进程时,任务队列中的任务将会被工作进程共享执行。这样的概念在web应用中极其有用,当在很短的HTTP请求间需要执行复杂的任务。
5.2.1 初级版本
我们使用Thread.sleep来模拟耗时的任务。我们在发送到队列的消息的末尾添加一定数量的点,每个点代表在工作线程中需要耗时1秒,例如hello…将会需要等待3秒。
发送端:
- package com.zhy.rabbit._02_workqueue;
- import java.io.IOException;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- public class NewTask
- {
- //队列名称
- private final static String QUEUE_NAME = "workqueue";
- public static void main(String[] args) throws IOException
- {
- //创建连接和频道
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("localhost");
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
- //声明队列
- channel.queueDeclare(QUEUE_NAME, false, false, false, null);
- //发送10条消息,依次在消息后面附加1-10个点
- for (int i = 0; i < 10; i )
- {
- String dots = "";
- for (int j = 0; j <= i; j )
- {
- dots = ".";
- }
- String message = "helloworld" dots dots.length();
- channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
- System.out.println(" [x] Sent '" message "'");
- }
- //关闭频道和资源
- channel.close();
- connection.close();
- }
- }
接收端:
- package com.zhy.rabbit._02_workqueue;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import com.rabbitmq.client.QueueingConsumer;
- public class Work
- {
- //队列名称
- private final static String QUEUE_NAME = "workqueue";
- public static void main(String[] argv) throws java.io.IOException,
- java.lang.InterruptedException
- {
- //区分不同工作进程的输出
- int hashCode = Work.class.hashCode();
- //创建连接和频道
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("localhost");
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
- //声明队列
- channel.queueDeclare(QUEUE_NAME, false, false, false, null);
- System.out.println(hashCode
- " [*] Waiting for messages. To exit press CTRL C");
- QueueingConsumer consumer = new QueueingConsumer(channel);
- // 指定消费队列
- channel.basicConsume(QUEUE_NAME, true, consumer);
- while (true)
- {
- QueueingConsumer.Delivery delivery = consumer.nextDelivery();
- String message = new String(delivery.getBody());
- System.out.println(hashCode " [x] Received '" message "'");
- doWork(message);
- System.out.println(hashCode " [x] Done");
- }
- }
- /**
- * 每个点耗时1s
- * @param task
- * @throws InterruptedException
- */
- private static void doWork(String task) throws InterruptedException
- {
- for (char ch : task.toCharArray())
- {
- if (ch == '.')
- Thread.sleep(1000);
- }
- }
- }
Round-robin 转发 使用任务队列的好处是能够很容易的并行工作。如果我们积压了很多工作,我们仅仅通过增加更多的工作者就可以解决问题,使系统的伸缩性更加容易。 下面我们先运行3个工作者(Work.java)实例,然后运行NewTask.java,3个工作者实例都会得到信息。但是如何分配呢?让我们来看输出结果:
[x] Sent 'helloworld.1' [x] Sent 'helloworld..2' [x] Sent 'helloworld...3' [x] Sent 'helloworld....4' [x] Sent 'helloworld.....5' [x] Sent 'helloworld......6' [x] Sent 'helloworld.......7' [x] Sent 'helloworld........8' [x] Sent 'helloworld.........9' [x] Sent 'helloworld..........10'
工作者1: 605645 [*] Waiting for messages. To exit press CTRL C 605645 [x] Received 'helloworld.1' 605645 [x] Done 605645 [x] Received 'helloworld....4' 605645 [x] Done 605645 [x] Received 'helloworld.......7' 605645 [x] Done 605645 [x] Received 'helloworld..........10' 605645 [x] Done
工作者2: 18019860 [*] Waiting for messages. To exit press CTRL C 18019860 [x] Received 'helloworld..2' 18019860 [x] Done 18019860 [x] Received 'helloworld.....5' 18019860 [x] Done 18019860 [x] Received 'helloworld........8' 18019860 [x] Done
工作者3: 18019860 [*] Waiting for messages. To exit press CTRL C 18019860 [x] Received 'helloworld...3' 18019860 [x] Done 18019860 [x] Received 'helloworld......6' 18019860 [x] Done 18019860 [x] Received 'helloworld.........9' 18019860 [x] Done 可以看到,默认的,RabbitMQ会一个一个的发送信息给下一个消费者(consumer),而不考虑每个任务的时长等等,且是一次性分配,并非一个一个分配。平均的每个消费者将会获得相等数量的消息。这样分发消息的方式叫做round-robin。
5.2.2 消息应答(message acknowledgments)
执行一个任务需要花费几秒钟。你可能会担心当一个工作者在执行任务时发生中断。我们上面的代码,一旦RabbItMQ交付了一个信息给消费者,会马上从内存中移除这个信息。在这种情况下,如果杀死正在执行任务的某个工作者,我们会丢失它正在处理的信息。我们也会丢失已经转发给这个工作者且它还未执行的消息。 上面的例子,我们首先开启两个任务,然后执行发送任务的代码,然后立即关闭第二个任务,结果为: 工作者2:
31054905 [*] Waiting for messages. To exit press CTRL C 31054905 [x] Received 'helloworld..2' 31054905 [x] Done 31054905 [x] Received 'helloworld....4'
工作者1: 18019860 [*] Waiting for messages. To exit press CTRL C 18019860 [x] Received 'helloworld.1' 18019860 [x] Done 18019860 [x] Received 'helloworld...3' 18019860 [x] Done 18019860 [x] Received 'helloworld.....5' 18019860 [x] Done 18019860 [x] Received 'helloworld.......7' 18019860 [x] Done 18019860 [x] Received 'helloworld.........9' 18019860 [x] Done 可以看到,第二个工作者至少丢失了6,8,10号任务,且4号任务未完成。
但是,我们不希望丢失任何任务(信息)。当某个工作者(接收者)被杀死时,我们希望将任务传递给另一个工作者。为了保证消息永远不会丢失,RabbitMQ支持消息应答(message acknowledgments)。消费者发送应答给RabbitMQ,告诉它信息已经被接收和处理,然后RabbitMQ可以自由的进行信息删除。如果消费者被杀死而没有发送应答,RabbitMQ会认为该信息没有被完全的处理,然后将会重新转发给别的消费者。通过这种方式,你可以确认信息不会被丢失,即使消者偶尔被杀死。 这种机制并没有超时时间这么一说,RabbitMQ只有在消费者连接断开是重新转发此信息。如果消费者处理一个信息需要耗费特别特别长的时间是允许的。 消息应答默认是打开的。上面的代码中我们通过显示的设置autoAsk=true关闭了这种机制。下面我们修改代码(Work.java):
- boolean ack = false ; //打开应答机制
- channel.basicConsume(QUEUE_NAME, ack, consumer);
- //另外需要在每次处理完成一个消息后,手动发送一次应答。
- channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
完整修改后的Work.java
- package com.zhy.rabbit._02_workqueue.ack;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import com.rabbitmq.client.QueueingConsumer;
- public class Work
- {
- //队列名称
- private final static String QUEUE_NAME = "workqueue";
- public static void main(String[] argv) throws java.io.IOException,
- java.lang.InterruptedException
- {
- //区分不同工作进程的输出
- int hashCode = Work.class.hashCode();
- //创建连接和频道
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("localhost");
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
- //声明队列
- channel.queueDeclare(QUEUE_NAME, false, false, false, null);
- System.out.println(hashCode
- " [*] Waiting for messages. To exit press CTRL C");
- QueueingConsumer consumer = new QueueingConsumer(channel);
- // 指定消费队列
- boolean ack = false ; //打开应答机制
- channel.basicConsume(QUEUE_NAME, ack, consumer);
- while (true)
- {
- QueueingConsumer.Delivery delivery = consumer.nextDelivery();
- String message = new String(delivery.getBody());
- System.out.println(hashCode " [x] Received '" message "'");
- doWork(message);
- System.out.println(hashCode " [x] Done");
- //发送应答
- channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
- }
- }
- }
测试: 我们把消息数量改为5,然后先打开两个消费者(Work.java),然后发送任务(NewTask.java),立即关闭一个消费者,观察输出: [x] Sent 'helloworld.1' [x] Sent 'helloworld..2' [x] Sent 'helloworld...3' [x] Sent 'helloworld....4' [x] Sent 'helloworld.....5'
工作者2 18019860 [*] Waiting for messages. To exit press CTRL C 18019860 [x] Received 'helloworld..2' 18019860 [x] Done 18019860 [x] Received 'helloworld....4'
工作者1 31054905 [*] Waiting for messages. To exit press CTRL C 31054905 [x] Received 'helloworld.1' 31054905 [x] Done 31054905 [x] Received 'helloworld...3' 31054905 [x] Done 31054905 [x] Received 'helloworld.....5' 31054905 [x] Done 31054905 [x] Received 'helloworld....4' 31054905 [x] Done
可以看到工作者2没有完成的任务4,重新转发给工作者1进行完成了。
5.2.3 消息持久化(Message durability)
我们已经学习了即使消费者被杀死,消息也不会被丢失。但是如果此时RabbitMQ服务被停止,我们的消息仍然会丢失。当RabbitMQ退出或者异常退出,将会丢失所有的队列和信息,除非你告诉它不要丢失。我们需要做两件事来确保信息不会被丢失:我们需要给所有的队列和消息设置持久化的标志。 第一,我们需要确认RabbitMQ永远不会丢失我们的队列。为了这样,我们需要声明它为持久化的。 boolean durable = true; channel.queueDeclare("task_queue", durable, false, false, null); 注:RabbitMQ不允许使用不同的参数重新定义一个队列,所以已经存在的队列,我们无法修改其属性。 第二,我们需要标识我们的信息为持久化的。通过设置MessageProperties(implements BasicProperties)值为PERSISTENT_TEXT_PLAIN。 channel.basicPublish("", "task_queue",MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes()); 现在你可以执行一个发送消息的程序,然后关闭服务,再重新启动服务,运行消费者程序做下实验。
5.2.4 公平转发(Fair dispatch)
或许会发现,目前的消息转发机制(Round-robin)并非是我们想要的。例如,这样一种情况,对于两个消费者,有一系列的任务,奇数任务特别耗时,而偶数任务却很轻松,这样造成一个消费者一直繁忙,另一个消费者却很快执行完任务后等待。 造成这样的原因是因为RabbitMQ仅仅是当消息到达队列进行转发消息。并不在乎有多少任务消费者并未传递一个应答给RabbitMQ。仅仅盲目转发所有的奇数给一个消费者,偶数给另一个消费者。 为了解决这样的问题,我们可以使用basicQos方法,传递参数为prefetchCount = 1。这样告诉RabbitMQ不要在同一时间给一个消费者超过一条消息。换句话说,只有在消费者空闲的时候会发送下一条信息。
- int prefetchCount = 1;
- channel.basicQos(prefetchCount);
注:如果所有的工作者都处于繁忙状态,你的队列有可能被填充满。你可能会观察队列的使用情况,然后增加工作者,或者使用别的什么策略。 测试:改变发送消息的代码,将消息末尾点数改为6-2个,然后首先开启两个工作者,接着发送消息:
[x] Sent 'helloworld......6' [x] Sent 'helloworld.....5' [x] Sent 'helloworld....4' [x] Sent 'helloworld...3' [x] Sent 'helloworld..2'
工作者1: 18019860 [*] Waiting for messages. To exit press CTRL C 18019860 [x] Received 'helloworld......6' 18019860 [x] Done 18019860 [x] Received 'helloworld...3' 18019860 [x] Done
工作者2: 31054905 [*] Waiting for messages. To exit press CTRL C 31054905 [x] Received 'helloworld.....5' 31054905 [x] Done 31054905 [x] Received 'helloworld....4' 31054905 [x] Done 31054905 [x] Received 'helloworld..2' 31054905 [x] Done
可以看出此时并没有按照之前的Round-robin机制进行转发消息,而是当消费者不忙时进行转发。且这种模式下支持动态增加消费者,因为消息并没有发送出去,动态增加了消费者马上投入工作。而默认的转发机制会造成,即使动态增加了消费者,此时的消息已经分配完毕,无法立即加入工作,即使有很多未完成的任务。
5.2.5 最终版本
NewTask.java
- package com.zhy.rabbit._02_workqueue.ackandpersistence;
- import java.io.IOException;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import com.rabbitmq.client.MessageProperties;
- public class NewTask
- {
- // 队列名称
- private final static String QUEUE_NAME = "workqueue_persistence";
- public static void main(String[] args) throws IOException
- {
- // 创建连接和频道
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("localhost");
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
- // 声明队列
- boolean durable = true;// 1、设置队列持久化
- channel.queueDeclare(QUEUE_NAME, durable, false, false, null);
- // 发送10条消息,依次在消息后面附加1-10个点
- for (int i = 5; i > 0; i--)
- {
- String dots = "";
- for (int j = 0; j <= i; j )
- {
- dots = ".";
- }
- String message = "helloworld" dots dots.length();
- // MessageProperties 2、设置消息持久化
- channel.basicPublish("", QUEUE_NAME,
- MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
- System.out.println(" [x] Sent '" message "'");
- }
- // 关闭频道和资源
- channel.close();
- connection.close();
- }
- }
Work.java
- package com.zhy.rabbit._02_workqueue.ackandpersistence;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import com.rabbitmq.client.QueueingConsumer;
- public class Work
- {
- // 队列名称
- private final static String QUEUE_NAME = "workqueue_persistence";
- public static void main(String[] argv) throws java.io.IOException,
- java.lang.InterruptedException
- {
- // 区分不同工作进程的输出
- int hashCode = Work.class.hashCode();
- // 创建连接和频道
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("localhost");
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
- // 声明队列
- boolean durable = true;
- channel.queueDeclare(QUEUE_NAME, durable, false, false, null);
- System.out.println(hashCode
- " [*] Waiting for messages. To exit press CTRL C");
- //设置最大服务转发消息数量
- int prefetchCount = 1;
- channel.basicQos(prefetchCount);
- QueueingConsumer consumer = new QueueingConsumer(channel);
- // 指定消费队列
- boolean ack = false; // 打开应答机制
- channel.basicConsume(QUEUE_NAME, ack, consumer);
- while (true)
- {
- QueueingConsumer.Delivery delivery = consumer.nextDelivery();
- String message = new String(delivery.getBody());
- System.out.println(hashCode " [x] Received '" message "'");
- doWork(message);
- System.out.println(hashCode " [x] Done");
- //channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
- channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
- }
- }
- /**
- * 每个点耗时1s
- *
- * @param task
- * @throws InterruptedException
- */
- private static void doWork(String task) throws InterruptedException
- {
- for (char ch : task.toCharArray())
- {
- if (ch == '.')
- Thread.sleep(1000);
- }
- }
- }
5.3发布/订阅
把一个消息发给多个消费者,这种模式称之为发布/订阅(类似观察者模式)。
为了验证这种模式,我们准备构建一个简单的日志系统。这个系统包含两类程序,一类程序发动日志,另一类程序接收和处理日志。
在我们的日志系统中,每一个运行的接收者程序都会收到日志。然后我们实现,一个接收者将接收到的数据写到硬盘上,与此同时,另一个接收者把接收到的消息展现在屏幕上。
本质上来说,就是发布的日志消息会转发给所有的接收者。
5.3.1转发器(Exchanges)
前面我们主要的介绍都是发送者发送消息给队列,接收者从队列接收消息。下面我们会引入Exchanges,展示RabbitMQ的完整的消息模型。RabbitMQ消息模型的核心理念是生产者永远不会直接发送任何消息给队列,一般的情况生产者甚至不知道消息应该发送到哪些队列。相反的,生产者只能发送消息给转发器(Exchange)。转发器是非常简单的,一边接收从生产者发来的消息,另一边把消息推送到队列中。转发器必须清楚的知道消息如何处理它收到的每一条消息。是否应该追加到一个指定的队列?是否应该追加到多个队列?或者是否应该丢弃?这些规则通过转发器的类型进行定义。
下面列出一些可用的转发器类型:
Direct
需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键“dog”,则只有被标记为“dog”的消息才被转发,不会转发dog.puppy,也不会转发dog.guard,只会转发dog。Direct交换机是如何工作的:
Topic、
将路由键和某模式进行匹配。此时队列需要绑定到一个模式上。符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词。因此“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.*”只会匹配到“audit.irs”。topic交换机是如何工作的:
Fanout
你只需要简单的将队列绑定到交换机上。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout交换机转发消息是最快的。Fanout 是如何工作的:
目前我们关注最后一个fanout,声明转发器类型的代码:
channel.exchangeDeclare("logs","fanout");
fanout类型转发器特别简单,把所有它介绍到的消息,广播到所有它所知道的队列。不过这正是我们前述的日志系统所需要的。
5.3.2匿名转发器(nameless exchange)
前面说到生产者只能发送消息给转发器(Exchange),但是我们前两篇博客中的例子并没有使用到转发器,我们仍然可以发送和接收消息。这是因为我们使用了一个默认的转发器,它的标识符为””。之前发送消息的代码:
channel.basicPublish("",QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
第一个参数为转发器的名称,我们设置为”” : 如果存在routingKey(第二个参数),消息由routingKey决定发送到哪个队列。
现在我们可以指定消息发送到的转发器:
channel.basicPublish( "logs","", null, message.getBytes());
5.3.3临时队列(Temporary queues)
前面我们都为队列指定了一个特定的名称。能够为队列命名对我们来说是很关键的,我们需要指定消费者为某个队列。当我们希望在生产者和消费者间共享队列时,为队列命名是很重要的。不过,对于我们的日志系统我们并不关心队列的名称。我们想要接收到所有的消息,而且我们也只对当前正在传递的数据感兴趣。为了满足我们的需求,需要做两件事:
第一, 无论什么时间连接到Rabbit我们都需要一个新的空的队列。为了实现,我们可以使用随机数创建队列,或者更好的,让服务器给我们提供一个随机的名称。
第二, 一旦消费者与Rabbit断开,消费者所接收的那个队列应该被自动删除。
Java中我们可以使用queueDeclare()方法,不传递任何参数,来创建一个非持久的、唯一的、自动删除的队列且队列名称由服务器随机产生。
String queueName = channel.queueDeclare().getQueue();
一般情况这个名称与amq.gen-JzTY20BRgKO-HjmUJj0wLg 类似。
5.3.4绑定(Bindings)
我们已经创建了一个fanout转发器和队列,我们现在需要通过binding告诉转发器把消息发送给我们的队列。
channel.queueBind(queueName, “logs”, ””)参数1:队列名称 ;参数2:转发器名称
5.3.5完整的例子
日志发送端:
- package com.zhy.rabbit._03_bindings_exchanges;
- import java.io.IOException;
- import java.util.Date;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- public class EmitLog
- {
- private final static String EXCHANGE_NAME = "ex_log";
- public static void main(String[] args) throws IOException
- {
- // 创建连接和频道
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("localhost");
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
- // 声明转发器和类型
- channel.exchangeDeclare(EXCHANGE_NAME, "fanout" );
- String message = new Date().toLocaleString() " : log something";
- // 往转发器上发送消息
- channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
- System.out.println(" [x] Sent '" message "'");
- channel.close();
- connection.close();
- }
- }
没什么太大的改变,声明队列的代码,改为声明转发器了,同样的消息的传递也交给了转发器。
接收端1 :ReceiveLogsToSave.java:
- package com.zhy.rabbit._03_bindings_exchanges;
- import java.io.File;
- import java.io.FileNotFoundException;
- import java.io.FileOutputStream;
- import java.io.IOException;
- import java.text.SimpleDateFormat;
- import java.util.Date;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import com.rabbitmq.client.QueueingConsumer;
- public class ReceiveLogsToSave
- {
- private final static String EXCHANGE_NAME = "ex_log";
- public static void main(String[] argv) throws java.io.IOException,
- java.lang.InterruptedException
- {
- // 创建连接和频道
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("localhost");
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
- channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
- // 创建一个非持久的、唯一的且自动删除的队列
- String queueName = channel.queueDeclare().getQueue();
- // 为转发器指定队列,设置binding
- channel.queueBind(queueName, EXCHANGE_NAME, "");
- System.out.println(" [*] Waiting for messages. To exit press CTRL C");
- QueueingConsumer consumer = new QueueingConsumer(channel);
- // 指定接收者,第二个参数为自动应答,无需手动应答
- channel.basicConsume(queueName, true, consumer);
- while (true)
- {
- QueueingConsumer.Delivery delivery = consumer.nextDelivery();
- String message = new String(delivery.getBody());
- print2File(message);
- }
- }
- private static void print2File(String msg)
- {
- try
- {
- String dir = ReceiveLogsToSave.class.getClassLoader().getResource("").getPath();
- String logFileName = new SimpleDateFormat("yyyy-MM-dd")
- .format(new Date());
- File file = new File(dir, logFileName ".txt");
- FileOutputStream fos = new FileOutputStream(file, true);
- fos.write((msg "rn").getBytes());
- fos.flush();
- fos.close();
- } catch (FileNotFoundException e)
- {
- e.printStackTrace();
- } catch (IOException e)
- {
- e.printStackTrace();
- }
- }
- }
随机创建一个队列,然后将队列与转发器绑定,然后将消费者与该队列绑定,然后写入日志文件。
接收端2:ReceiveLogsToConsole.java
- package com.zhy.rabbit._03_bindings_exchanges;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import com.rabbitmq.client.QueueingConsumer;
- public class ReceiveLogsToConsole
- {
- private final static String EXCHANGE_NAME = "ex_log";
- public static void main(String[] argv) throws java.io.IOException,
- java.lang.InterruptedException
- {
- // 创建连接和频道
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("localhost");
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
- channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
- // 创建一个非持久的、唯一的且自动删除的队列
- String queueName = channel.queueDeclare().getQueue();
- // 为转发器指定队列,设置binding
- channel.queueBind(queueName, EXCHANGE_NAME, "");
- System.out.println(" [*] Waiting for messages. To exit press CTRL C");
- QueueingConsumer consumer = new QueueingConsumer(channel);
- // 指定接收者,第二个参数为自动应答,无需手动应答
- channel.basicConsume(queueName, true, consumer);
- while (true)
- {
- QueueingConsumer.Delivery delivery = consumer.nextDelivery();
- String message = new String(delivery.getBody());
- System.out.println(" [x] Received '" message "'");
- }
- }
- }
随机创建一个队列,然后将队列与转发器绑定,然后将消费者与该队列绑定,然后打印到控制台。
现在把两个接收端运行,然后运行3次发送端:
输出结果:
发送端:
[x] Sent '2014-7-10 16:04:54 : log something'
[x] Sent '2014-7-10 16:04:58 : log something'
[x] Sent '2014-7-10 16:05:02 : log something'
接收端1:
接收端2:
[*] Waiting for messages. To exit press CTRL C [x] Received '2014-7-10 16:04:54 : log something' [x] Received '2014-7-10 16:04:58 : log something' [x] Received '2014-7-10 16:05:02 : log something'
这个例子实现了我们文章开头所描述的日志系统,利用了转发器的类型:fanout。
说明了,生产者将消息发送至转发器,转发器决定将消息发送至哪些队列,消费者绑定队列获取消息。
5.4 路由选择 (Routing)
我们准备给日志系统添加新的特性,让日志接收者能够订阅部分消息。例如,我们可以仅仅将致命的错误写入日志文件,然而仍然在控制面板上打印出所有的其他类型的日志消息。
5.4.1 绑定(Bindings)
在前面的例子中我们已经使用过绑定。类似下面的代码:
channel.queueBind(queueName, EXCHANGE_NAME, "");
绑定表示转发器与队列之间的关系。我们也可以简单的认为:队列对该转发器上的消息感兴趣。绑定可以附带一个额外的参数routingKey。为了与避免basicPublish方法(发布消息的方法)的参数混淆,我们准备把它称作绑定键(binding key)。下面展示如何使用绑定键(binding key)来创建一个绑定:
channel.queueBind(queueName, EXCHANGE_NAME, "black");
绑定键的意义依赖于转发器的类型。对于fanout类型,忽略此参数。
5.4.2 直接转发(Direct exchange)
上面的日志系统广播所有的消息给所有的消费者。我们希望可以对其扩展,来允许根据日志的严重性进行过滤日志。例如:我们可能希望把致命类型的错误写入硬盘,而不把硬盘空间浪费在警告或者消息类型的日志上。之前我们使用fanout类型的转发器,但是并没有给我们带来更多的灵活性:仅仅可以愚蠢的转发。
我们将会使用direct类型的转发器进行替代。direct类型的转发器背后的路由转发算法很简单:消息会被推送至绑定键(binding key)和消息发布附带的选择键(routing key)完全匹配的队列。
图解:
上图,我们可以看到direct类型的转发器与两个队列绑定。第一个队列与绑定键orange绑定,第二个队列与转发器间有两个绑定,一个与绑定键black绑定,另一个与green绑定键绑定。
这样的话,当一个消息附带一个选择键(routing key) orange发布至转发器将会被导向到队列Q1。消息附带一个选择键(routing key)black或者green将会被导向到Q2.所有的其他的消息将会被丢弃。
5.4.3 多重绑定(multiple bindings)
使用一个绑定键(binding key)绑定多个队列是完全合法的。如上图,一个附带选择键(routing key)的消息将会被转发到Q1和Q2。
5.4.4 发送日志(Emittinglogs)
我们准备将这种模式用于我们的日志系统。我们将消息发送到direct类型的转发器而不是fanout类型。我们将把日志的严重性作为选择键(routing key)。这样的话,接收程序可以根据严重性来选择接收。我们首先关注发送日志的代码:
像以前一样,我们需要先创建一个转发器:
channel.exchangeDeclare(EXCHANGE_NAME,"direct");
然后我们准备发送一条消息:
channel.basicPublish(EXCHANGE_NAME,severity, null, message.getBytes());
为了简化代码,我们假定‘severity’是‘info’,‘warning’,‘error’中的一个。
5.4.5 订阅
接收消息的代码和前面例子中类似,只有一点不同:我们给我们所感兴趣的严重性类型的日志创建一个绑定。
StringqueueName = channel.queueDeclare().getQueue();
for(Stringseverity : argv)
{
channel.queueBind(queueName, EXCHANGE_NAME, severity);
}
5.4.6 完整的实例
发送端:EmitLogDirect.java
- package com.zhy.rabbit._04_binding_key;
- import java.util.Random;
- import java.util.UUID;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- public class EmitLogDirect
- {
- private static final String EXCHANGE_NAME = "ex_logs_direct";
- private static final String[] SEVERITIES = { "info", "warning", "error" };
- public static void main(String[] argv) throws java.io.IOException
- {
- // 创建连接和频道
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("localhost");
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
- // 声明转发器的类型
- channel.exchangeDeclare(EXCHANGE_NAME, "direct");
- //发送6条消息
- for (int i = 0; i < 6; i )
- {
- String severity = getSeverity();
- String message = severity "_log :" UUID.randomUUID().toString();
- // 发布消息至转发器,指定routingkey
- channel.basicPublish(EXCHANGE_NAME, severity, null, message
- .getBytes());
- System.out.println(" [x] Sent '" message "'");
- }
- channel.close();
- connection.close();
- }
- /**
- * 随机产生一种日志类型
- *
- * @return
- */
- private static String getSeverity()
- {
- Random random = new Random();
- int ranVal = random.nextInt(3);
- return SEVERITIES[ranVal];
- }
- }
随机发送6条随机类型(routing key)的日志给转发器~~
接收端:ReceiveLogsDirect.java
- package com.zhy.rabbit._04_binding_key;
- import java.util.Random;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import com.rabbitmq.client.QueueingConsumer;
- public class ReceiveLogsDirect
- {
- private static final String EXCHANGE_NAME = "ex_logs_direct";
- private static final String[] SEVERITIES = { "info", "warning", "error" };
- public static void main(String[] argv) throws java.io.IOException,
- java.lang.InterruptedException
- {
- // 创建连接和频道
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("localhost");
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
- // 声明direct类型转发器
- channel.exchangeDeclare(EXCHANGE_NAME, "direct");
- String queueName = channel.queueDeclare().getQueue();
- String severity = getSeverity();
- // 指定binding_key
- channel.queueBind(queueName, EXCHANGE_NAME, severity);
- System.out.println(" [*] Waiting for " severity " logs. To exit press CTRL C");
- QueueingConsumer consumer = new QueueingConsumer(channel);
- channel.basicConsume(queueName, true, consumer);
- while (true)
- {
- QueueingConsumer.Delivery delivery = consumer.nextDelivery();
- String message = new String(delivery.getBody());
- System.out.println(" [x] Received '" message "'");
- }
- }
- /**
- * 随机产生一种日志类型
- *
- * @return
- */
- private static String getSeverity()
- {
- Random random = new Random();
- int ranVal = random.nextInt(3);
- return SEVERITIES[ranVal];
- }
- }
接收端随机设置一个日志严重级别(binding_key)。。。
我开启了3个接收端程序,两个准备接收error类型日志,一个接收info类型日志,然后运行发送端程序
运行结果:
[x] Sent 'error_log :d142b096-46c0-4380-a1d2-d8b2ac136a9c' [x] Sent 'error_log :55ee1fc4-c87c-4e5e-81ba-49433890b9ce' [x] Sent 'error_log :d01877d6-87c7-4e0a-a109-697d122bc4c9' [x] Sent 'error_log :b42471b1-875c-43f1-b1ea-0dd5b49863f3' [x] Sent 'info_log :a6c1bc87-efb0-43eb-8314-8a74c345ed05' [x] Sent 'info_log :b6a84b6a-353e-4e88-8c23-c791d93b44be'
------------------------------------------------------------------------------------
[*] Waiting for error logs. To exit press CTRL C [x] Received 'error_log :d142b096-46c0-4380-a1d2-d8b2ac136a9c' [x] Received 'error_log :55ee1fc4-c87c-4e5e-81ba-49433890b9ce' [x] Received 'error_log :d01877d6-87c7-4e0a-a109-697d122bc4c9' [x] Received 'error_log :b42471b1-875c-43f1-b1ea-0dd5b49863f3'
------------------------------------------------------------------------------------
[*] Waiting for error logs. To exit press CTRL C [x] Received 'error_log :d142b096-46c0-4380-a1d2-d8b2ac136a9c' [x] Received 'error_log :55ee1fc4-c87c-4e5e-81ba-49433890b9ce' [x] Received 'error_log :d01877d6-87c7-4e0a-a109-697d122bc4c9' [x] Received 'error_log :b42471b1-875c-43f1-b1ea-0dd5b49863f3'
------------------------------------------------------------------------------------
[*] Waiting for info logs. To exit press CTRL C [x] Received 'info_log :a6c1bc87-efb0-43eb-8314-8a74c345ed05' [x] Received 'info_log :b6a84b6a-353e-4e88-8c23-c791d93b44be'
可以看到我们实现了开头所描述的特性,接收者可以自定义自己感兴趣类型的日志。
其实文章这么长就在说:发送消息时可以设置routing_key,接收队列与转发器间可以设置binding_key,接收者接收与binding_key与routing_key相同的消息。
5.6 主题(Topic)
虽然使用direct类型改良了我们的系统,但是仍然存在一些局限性:它不能够基于多重条件进行路由选择。在我们的日志系统中,我们有可能希望不仅根据日志的级别而且想根据日志的来源进行订阅。这个概念类似unix工具:syslog,它转发日志基于严重性(info/warning/crit…)和设备(auth/cron/kern…)这样可能给我们更多的灵活性:我们可能只想订阅来自’cron’的致命错误日志,而不是来自’kern’的。为了在我们的系统中实现上述的需求,我们需要学习稍微复杂的主题类型的转发器(topic exchange)。
5.6.1 主题转发(Topic Exchange)
发往主题类型的转发器的消息不能随意的设置选择键(routing_key),必须是由点隔开的一系列的标识符组成。标识符可以是任何东西,但是一般都与消息的某些特性相关。
一些合法的选择键的例子:"stock.usd.nyse", "nyse.vmw","quick.orange.rabbit".你可以定义任何数量的标识符,上限为255个字节。
绑定键和选择键的形式一样。主题类型的转发器背后的逻辑和直接类型的转发器很类似:一个附带特殊的选择键将会被转发到绑定键与之匹配的队列中。需要注意的是:关于绑定键有两种特殊的情况。
*可以匹配一个标识符。
#可以匹配0个或多个标识符。
5.6.2 图解
我们准备发送关于动物的消息。消息会附加一个选择键包含3个标识符(两个点隔开)。第一个标识符描述动物的速度,第二个标识符描述动物的颜色,第三个标识符描述动物的物种:<speed>.<color>.<species>。
我们创建3个绑定键:Q1与*.orange.*绑定 Q2与*.*.rabbit和lazy.#绑定。
可以简单的认为:
Q1对所有的橙色动物感兴趣。
Q2想要知道关于rabbit的一切以及关于懒惰的动物的一切。
一个附带quick.orange.rabbit的选择键的消息将会被转发到两个队列。附带lazy.orange.elephant的消息也会被转发到两个队列。另一方面quick.orange.fox只会被转发到Q1,lazy.brown.fox将会被转发到Q2。lazy.pink.rabbit虽然与两个绑定键匹配,但是也只会被转发到Q2一次。quick.brown.fox不能与任何绑定键匹配,所以会被丢弃。
如果我们违法我们的约定,发送一个或者四个标识符的选择键,类似:orange,quick.orange.male.rabbit,这些选择键不能与任何绑定键匹配,所以消息将会被丢弃。
另一方面,lazy.orange.male.rabbit,虽然是四个标识符,也可以与lazy.#匹配,从而转发至Q2。
注:主题类型的转发器非常强大,可以实现其他类型的转发器。
当一个队列与绑定键#绑定,将会收到所有的消息,类似fanout类型转发器。
当绑定键中不包含任何#与*时,类似direct类型转发器。
5.6.3 完整的例子
发送端EmitLogTopic.java:
- package com.zhy.rabbit._05_topic_exchange;
- import java.util.UUID;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- public class EmitLogTopic
- {
- private static final String EXCHANGE_NAME = "topic_logs";
- public static void main(String[] argv) throws Exception
- {
- // 创建连接和频道
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("localhost");
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
- channel.exchangeDeclare(EXCHANGE_NAME, "topic");
- String[] routing_keys = new String[] { "kernal.info", "cron.warning",
- "auth.info", "kernel.critical" };
- for (String routing_key : routing_keys)
- {
- String msg = UUID.randomUUID().toString();
- channel.basicPublish(EXCHANGE_NAME, routing_key, null, msg
- .getBytes());
- System.out.println(" [x] Sent routingKey = " routing_key " ,msg = " msg ".");
- }
- channel.close();
- connection.close();
- }
- }
我们发送了4条消息,分别设置了不同的选择键。
接收端1,ReceiveLogsTopicForKernel.java
- package com.zhy.rabbit._05_topic_exchange;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import com.rabbitmq.client.QueueingConsumer;
- public class ReceiveLogsTopicForKernel
- {
- private static final String EXCHANGE_NAME = "topic_logs";
- public static void main(String[] argv) throws Exception
- {
- // 创建连接和频道
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("localhost");
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
- // 声明转发器
- channel.exchangeDeclare(EXCHANGE_NAME, "topic");
- // 随机生成一个队列
- String queueName = channel.queueDeclare().getQueue();
- //接收所有与kernel相关的消息
- channel.queueBind(queueName, EXCHANGE_NAME, "kernel.*");
- System.out.println(" [*] Waiting for messages about kernel. To exit press CTRL C");
- QueueingConsumer consumer = new QueueingConsumer(channel);
- channel.basicConsume(queueName, true, consumer);
- while (true)
- {
- QueueingConsumer.Delivery delivery = consumer.nextDelivery();
- String message = new String(delivery.getBody());
- String routingKey = delivery.getEnvelope().getRoutingKey();
- System.out.println(" [x] Received routingKey = " routingKey
- ",msg = " message ".");
- }
- }
- }
直接收和Kernel相关的日志消息。
接收端2,ReceiveLogsTopicForCritical.java
- package com.zhy.rabbit._05_topic_exchange;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import com.rabbitmq.client.QueueingConsumer;
- public class ReceiveLogsTopicForCritical
- {
- private static final String EXCHANGE_NAME = "topic_logs";
- public static void main(String[] argv) throws Exception
- {
- // 创建连接和频道
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("localhost");
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
- // 声明转发器
- channel.exchangeDeclare(EXCHANGE_NAME, "topic");
- // 随机生成一个队列
- String queueName = channel.queueDeclare().getQueue();
- // 接收所有与kernel相关的消息
- channel.queueBind(queueName, EXCHANGE_NAME, "*.critical");
- System.out
- .println(" [*] Waiting for critical messages. To exit press CTRL C");
- QueueingConsumer consumer = new QueueingConsumer(channel);
- channel.basicConsume(queueName, true, consumer);
- while (true)
- {
- QueueingConsumer.Delivery delivery = consumer.nextDelivery();
- String message = new String(delivery.getBody());
- String routingKey = delivery.getEnvelope().getRoutingKey();
- System.out.println(" [x] Received routingKey = " routingKey
- ",msg = " message ".");
- }
- }
- }
只接收致命错误的日志消息。
运行结果:
[x] Sent routingKey = kernal.info ,msg = a7261f0d-18cc-4c85-ba80-5ecd9283dae7. [x] Sent routingKey = cron.warning ,msg = 0c7e4484-66e0-4846-a869-a7a266e16281. [x] Sent routingKey = auth.info ,msg = 3273f21f-6e6e-42f2-83df-1f2fafa7a19a. [x] Sent routingKey = kernel.critical ,msg = f65d3e1a-0619-4f85-8b0d-59375380ecc9.
--------------------------------------------------------------------------------------------------------------------
[*] Waiting for messages about kernel. To exit press CTRL C [x] Received routingKey = kernel.critical,msg = f65d3e1a-0619-4f85-8b0d-59375380ecc9.
--------------------------------------------------------------------------------------------------------------------
[*] Waiting for critical messages. To exit press CTRL C [x] Received routingKey = kernel.critical,msg = f65d3e1a-0619-4f85-8b0d-59375380ecc9.
可以看到,我们通过使用topic类型的转发器,成功实现了多重条件选择的订阅。
5.7 RPC
MQ本身是基于异步的消息处理,前面的示例中所有的生产者(P)将消息发送到RabbitMQ后不会知道消费者(C)处理成功或者失败(甚至连有没有消费者来处理这条消息都不知道)。
但实际的应用场景中,我们很可能需要一些同步处理,需要同步等待服务端将我的消息处理完成后再进行下一步处理。这相当于RPC(Remote Procedure Call,远程过程调用)。在RabbitMQ中也支持RPC。
5.7.1 RPC 工作流程
RabbitMQ中实现RPC的机制是:
1)、客户端启动时,创建了一个匿名的回调队列。 2)、在一个RPC请求中,客户端发送一个消息,它有两个属性:1.REPLYTO,用来设置回调队列名;2.correlationId,对于每个请求都被设置成唯一的值。 3)、请求被发送到rpc_queue队列. 4)、RPC工作者(又名:服务器)等待接收该队列的请求。当收到一个请求,它就会处理并把结果发送给客户端,使用的队列是replyTo字段指定的。 5)、客户端等待接收回调队列中的数据。当接到一个消息,它会检查它的correlationId属性。如果它和设置的相匹配,就会把响应返回给应用程序。
5.7.2 AMQP协议为消息预定义了14种属性
private String contentType;
private String contentEncoding;
private Map<String,Object> headers;
private Integer deliveryMode;
private Integer priority;
private String correlationId;
private String replyTo;
private String expiration;
private String messageId;
private Date timestamp;
private String type;
private String userId;
private String appId;
private String clusterId;
大部分的属性很少使用,除了下面几个:
deliveryMode: 将消息标记为持久(值为2)或瞬态(任何其他值)。你可能记得在第二个教程中使用了这个属性。
contentType:用来设置mime类型。例如经常使用的JSON格式数据,就需要将此属性设置为:application/json。
replyTo: 通常用来命名一个回调队列.
correlationId: 用来关联RPC请求的响应.
另外需要说明这个correlationId。
其实在上面的代码中我们为每一个RPC请求都创建了一个回调队列。
但这样明显不效率,我们可以为每一个客户端只创建一个回调队列。
但这样我们又需要考虑另一个问题:当我们将收到的消息放到队列时,如何确定该消息是属于哪个请求?
这时我们可以使用correlationId解决这个问题。
我们可以用它来为每一个请求加上标识,获取信息时对比这个标识,以对应请求和响应。
如果我们收到了无法识别的correlationId,即该响应不与任何请求匹配,那么这个消息将会废除。
5.7.3 实例
1、RPC服务器的RPCServer.java,接收消息调用rpc并返回结果
- package cn.slimsmart.rabbitmq.demo.rpc;
- import java.security.MessageDigest;
- import com.rabbitmq.client.AMQP;
- import com.rabbitmq.client.AMQP.BasicProperties;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import com.rabbitmq.client.QueueingConsumer;
- //RPC调用服务端
- public class RPCServer {
- private static final String RPC_QUEUE_NAME = "rpc_queue";
- public static void main(String[] args) throws Exception {
- //• 先建立连接、通道,并声明队列
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("192.168.36.217");
- factory.setUsername("admin");
- factory.setPassword("admin");
- factory.setPort(AMQP.PROTOCOL.PORT);
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
- channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
- //•可以运行多个服务器进程。通过channel.basicQos设置prefetchCount属性可将负载平均分配到多台服务器上。
- channel.basicQos(1);
- QueueingConsumer consumer = new QueueingConsumer(channel);
- //打开应答机制autoAck=false
- channel.basicConsume(RPC_QUEUE_NAME, false, consumer);
- System.out.println(" [x] Awaiting RPC requests");
- while (true) {
- QueueingConsumer.Delivery delivery = consumer.nextDelivery();
- BasicProperties props = delivery.getProperties();
- BasicProperties replyProps = new BasicProperties.Builder()
- .correlationId(props.getCorrelationId()).build();
- String message = new String(delivery.getBody());
- System.out.println(" [.] getMd5String(" message ")");
- String response = getMd5String(message);
- //返回处理结果队列
- channel.basicPublish("", props.getReplyTo(), replyProps,
- response.getBytes());
- //发送应答
- channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
- }
- }
- // 模拟RPC方法 获取MD5字符串
- public static String getMd5String(String str) {
- MessageDigest md5 = null;
- try {
- md5 = MessageDigest.getInstance("MD5");
- } catch (Exception e) {
- System.out.println(e.toString());
- e.printStackTrace();
- return "";
- }
- char[] charArray = str.toCharArray();
- byte[] byteArray = new byte[charArray.length];
- for (int i = 0; i < charArray.length; i )
- byteArray[i] = (byte) charArray[i];
- byte[] md5Bytes = md5.digest(byteArray);
- StringBuffer hexValue = new StringBuffer();
- for (int i = 0; i < md5Bytes.length; i ) {
- int val = ((int) md5Bytes[i]) & 0xff;
- if (val < 16)
- hexValue.append("0");
- hexValue.append(Integer.toHexString(val));
- }
- return hexValue.toString();
- }
- }
2.客户端RPCClient.java,发送rpc调用消息,接收结果
- package cn.slimsmart.rabbitmq.demo.rpc;
- import com.rabbitmq.client.AMQP;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import com.rabbitmq.client.QueueingConsumer;
- import com.rabbitmq.client.AMQP.BasicProperties;
- //RPC调用客户端
- public class RPCClient {
- private Connection connection;
- private Channel channel;
- private String requestQueueName = "rpc_queue";
- private String replyQueueName;
- private QueueingConsumer consumer;
- public RPCClient() throws Exception {
- //• 先建立一个连接和一个通道,并为回调声明一个唯一的'回调'队列
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("192.168.36.217");
- factory.setUsername("admin");
- factory.setPassword("admin");
- factory.setPort(AMQP.PROTOCOL.PORT);
- connection = factory.newConnection();
- channel = connection.createChannel();
- //• 注册'回调'队列,这样就可以收到RPC响应
- replyQueueName = channel.queueDeclare().getQueue();
- consumer = new QueueingConsumer(channel);
- channel.basicConsume(replyQueueName, true, consumer);
- }
- //发送RPC请求
- public String call(String message) throws Exception {
- String response = null;
- String corrId = java.util.UUID.randomUUID().toString();
- //发送请求消息,消息使用了两个属性:replyto和correlationId
- BasicProperties props = new BasicProperties.Builder()
- .correlationId(corrId).replyTo(replyQueueName).build();
- channel.basicPublish("", requestQueueName, props, message.getBytes());
- //等待接收结果
- while (true) {
- QueueingConsumer.Delivery delivery = consumer.nextDelivery();
- //检查它的correlationId是否是我们所要找的那个
- if (delivery.getProperties().getCorrelationId().equals(corrId)) {
- response = new String(delivery.getBody());
- break;
- }
- }
- return response;
- }
- public void close() throws Exception {
- connection.close();
- }
- }
3、运行client主函数RPCMain.java
- package cn.slimsmart.rabbitmq.demo.rpc;
- public class RPCMain {
- public static void main(String[] args) throws Exception {
- RPCClient rpcClient = new RPCClient();
- System.out.println(" [x] Requesting getMd5String(abc)");
- String response = rpcClient.call("abc");
- System.out.println(" [.] Got '" response "'");
- rpcClient.close();
- }
- }
先运行服务端,再运行RPCMain,发送消息调用RPC。
这里介绍的是该设计不是实现RPC服务的唯一可能,但它有一些重要的优点: 1)如果RPC服务器速度太慢,你可以通过运行多个RPC服务器。尝试在一个新的控制台上运行第二RPCServer。 2)RPC客户端只发送和接收一个消息。不需要queueDeclare那样要求同步调用。因此,RPC客户端只需要在一个网络上发送和接收为一个单一的RPC请求。
6.消息的可靠传递
6.1连接失败的处理
RabbitMQ不支持连接的failover,所以需要客户端自己实现失败重连。
6.2服务器的可靠性
为保证消息的可靠传递,服务器使用持久化保证消息不丢失。包括exchange与queue必须定义为持久的,同时发送消息时,也要设置消息为持久消息。
在代码中可以通过以下语句设置发送持久消息:
channel.basicPublish(exchangeName, routeKey,MessageProperties.PERSISTENT_TEXT_PLAIN,msg)
或者:
BasicProperties basicProperties = new AMQP.BasicProperties.Builder()
.deliveryMode(2).build(); // deliveryMode为1是非持久
channel.basicPublish(exchangeName, routeKey, basicProperties, msg)
6.3生产者的可靠性
生产者的消息确认叫做confirm,confirm确保消息已经发送到MQ中。当connection或channel异常时,会重新发送消息,如果消息是持久的,并不能一定保证消息持久化到磁盘中,因为消息可能存在与磁盘的缓存中。为进一步提高可靠性,可以使用事务。Confirm与事务不能同时使用。当生产者收不到confirm时,消息可能会重复,所以如果消息不允许重复,则消费者需要自己实现消息去重。
使用以下代码打开confirm,默认是关闭的
channel.confirmSelect();
6.4消费者的可靠性
消费者的消息确认叫做Acknowledgements,Acknowledgements确保消费者已经处理了消息,如果收不到消费者的Acknowledgements,MQ会重新发送消息。默认Acknowledgements是自动确认,如需客户端控制,在消费者的代码中设置:
channel.basicConsume(queueName,false,consumer);//声明队列时,设置autoack为false
。。。
//消息处理代码
。。。
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); //发送确认
同样,MQ也可能收不到消费者的Acknowledgements,就会重复发送消息,若要避免,消费者需要自己实现消息去重。
7. 分布式
RabbitMQ的集群节点包括内存节点、磁盘节点。顾名思义内存节点就是将所有数据放在内存,磁盘节点将数据放在磁盘。不过,如果在投递消息时,打开了消息的持久化,那么即使是内存节点,数据还是安全的放在磁盘。
Rabbitmq集群大概分为二种方式:
(1)普通模式:默认的集群模式。
(2)镜像模式:把需要的队列做成镜像队列。
集群中有两种节点:
(1)内存节点:只保存状态到内存(一个例外的情况是:持久的queue的持久内容将被保存到磁盘)
(2)磁盘节点:保存状态到内存和磁盘。
内存节点虽然不写入磁盘,但是它执行比磁盘节点要好。集群中,只需要一个磁盘节点来保存状态 就足够了如果集群中只有内存节点,那么不能停止它们,否则所有的状态,消息等都会丢失。
良好的设计架构可以如下:在一个集群里,有3台以上机器,其中1台使用磁盘模式,其它使用内存模式。其它几台为内存模式的节点,无疑速度更快,因此客户端(consumer、producer)连接访问它们。而磁盘模式的节点,由于磁盘IO相对较慢,因此仅作数据备份使用。
7.1 普通模式
默认的集群模式,queue创建之后,如果没有其它策略,则queue就会按照普通模式集群。对于Queue来说,消息实体只存在于其中一个节点,A、B两个节点仅有相同的元数据,即队列结构,但队列的元数据仅保存有一份,即创建该队列的rabbitmq节点(A节点),当A节点宕机,你可以去其B节点查看,./rabbitmqctl list_queues 发现该队列已经丢失,但声明的exchange还存在。当消息进入A节点的Queue中后,consumer从B节点拉取时,RabbitMQ会临时在A、B间进行消息传输,把A中的消息实体取出并经过B发送给consumer,所以consumer应平均连接每一个节点,从中取消息。该模式存在一个问题就是当A节点故障后,B节点无法取到A节点中还未消费的消息实体。如果做了队列持久化或消息持久化,那么得等A节点恢复,然后才可被消费,并且在A节点恢复之前其它节点不能再创建A节点已经创建过的持久队列;如果没有持久化的话,消息就会失丢。这种模式更适合非持久化队列,只有该队列是非持久的,客户端才能重新连接到集群里的其他节点,并重新创建队列。假如该队列是持久化的,那么唯一办法是将故障节点恢复起来。
为什么RabbitMQ不将队列复制到集群里每个节点呢?这与它的集群的设计本意相冲突,集群的设计目的就是增加更多节点时,能线性的增加性能(CPU、内存)和容量(内存、磁盘),理由如下:
1。存储空间:如果每个集群节点每个队列的一个完整副本,增加节点需要更多的存储容量。例如,如果一个节点可以存储1 gb的消息,添加两个节点需要两份相同的1gb的消息
2。性能:发布消息需要将这些信息复制到每个集群节点。对持久消息,要求为每条消息触发磁盘活动在所有节点上。每次添加一个节点都会带来 网络和磁盘的负载。
当然RabbitMQ新版本集群也支持队列复制(有个选项可以配置)。比如在有五个节点的集群里,可以指定某个队列的内容在2个节点上进行存储,从而在性能与高可用性之间取得一个平衡(应该就是指镜像模式)。
7.2 镜像模式
把需要的队列做成镜像队列,存在于多个节点,属于RabbitMQ的HA方案(镜像模式是在普通模式的基础上,增加一些镜像策略)。
该模式解决了上述问题,其实质和普通模式不同之处在于,消息实体会主动在镜像节点间同步,而不是在consumer取数据时临时拉取。该模式带来的副作用也很明显,除了降低系统性能外,如果镜像队列数量过多,加之大量的消息进入,集群内部的网络带宽将会被这种同步通讯大大消耗掉。所以在对可靠性要求较高的场合中适用,一个队列想做成镜像队列,需要先设置策略,然后客户端创建队列的时候,rabbitmq集群根据“队列名称”自动设置是普通集群模式或镜像队列。
具体如下:
队列通过策略能实现镜像。策略能在任何时刻改变,rabbitmq队列也近可能的将队列随着策略变化而变化;非镜像队列和镜像队列之间是有区别的,前者缺乏额外的镜像基础设施,没有任何追随者,因此会运行得更快。为了使队列称为镜像队列,你将会创建一个策略来匹配队列,设置策略有两个键“ha-mode和 ha-params(可选)”。ha-params根据ha-mode设置不同的值,下面表格说明这些key的选项:
7.2.1 语法讲解
在cluster中任意节点启用策略,策略会自动同步到集群节点
rabbitmqctl set_policy -p hrsystem ha-allqueue"^" '{"ha-mode":"all"}'
这行命令在vhost名称为hrsystem创建了一个策略,策略名称为ha-allqueue,策略模式为 all 即复制到所有节点,包含新增节点,策略正则表达式为 “^” 表示所有匹配所有队列名称。
例如rabbitmqctl set_policy -p hrsystem ha-allqueue "^message" '{"ha-mode":"all"}'
注意:"^message" 这个规则要根据自己修改,这个是指同步"message"开头的队列名称,我们配置时使用的应用于所有队列,所以表达式为"^"
官方set_policy说明参见
set_policy [-p vhostpath] {name} {pattern} {definition} [priority]
(http://www.rabbitmq.com/man/rabbitmqctl.1.man.html)
7.2.2 “nodes”策略和迁移master
需要注意的是设置和修改一个“nodes”策略将不会引起已经存在的master离开,尽管你让其离开。比如:如果一个队列在{A},并且你给它一个节点策略告知它在{B C},它将会在{A B C}。如果节点A那时失败或者停机了,那个节点上的镜像将不回来且队列将继续保持在{B C}(注:当队列已经是镜像队列且同步到其它节点,就算原节点宕机,也不影响其它节点对此队列使用)。
7.2.3 创建策略例子
队列名称以“ha.”开头的队列都是镜像队列,镜像到集群内所有节点:
列名称以“two.”开头的队列,其策略镜像到集群内任何两个节点:
队列同步到指rabbitmq 节点 ,rabbitmqctl:
./rabbitmqctl set_policy sa-specify "^sa.specify." '{"ha-mode":"nodes","ha-params":["rabbit@is137","rabbit@raxtone"]}'
切记,需要把队列同步到的节点都写进去。
7.3 集群部署
我们先搭建一个普通集群模式,在这个模式基础上再配置镜像模式实现高可用,Rabbit集群前增加一个反向代理,生产者、消费者通过反向代理访问RabbitMQ集群。
架构图如下:
设计架构可以如下:在一个集群里,有3台机器,其中1台使用磁盘模式,另2台使用内存模式。
7.3.1 3台机器如下(通过/etc/sysconfig/network修改主机名):
192.168.36.217 M-zhutianwei-A (Rabbit)
192.168.36.102 M-zhutianwei-C (Rabbit)
192.168.36.136 S-zhutianwei-B (Rabbit)
将上面的Rabbit主机解析都加入到每个Rabbit主机的/etc/hosts中
1. <span style="font-family:SimSun;font-size:12px;">127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4
2. ::1 localhost localhost.localdomain localhost6 localhost6.localdomain6
3.
4. 192.168.36.217 M-zhutianwei-A
5. 192.168.36.102 M-zhutianwei-C
6. 192.168.36.136 S-zhutianwei-B</span>
7.3.2 部署RabbitMq,并可以正常启动
7.3.3 设置每个节点Cookie
关闭所有rabbit服务,将M-zhutianwei-A 节点的/root/.erlang.cookie内容复制到其他两台机器。
#chmod 777 /root/.erlang.cookie
复制内容
#chmod 400 /root/.erlang.cookie
保持文件权限一致。
7.3.4 使用detached参数独立运行启动服务
nohup /usr/local/rabbitmq/sbin/rabbitmq-server –detached &
查看节点下的集群:/usr/local/rabbitmq/sbin/rabbitmqctl cluster_status
Cluster status of node 'rabbit@m-zhutianwei-a' ...
[{nodes,[{disc,['rabbit@m-zhutianwei-a']}]},
{running_nodes,['rabbit@m-zhutianwei-a']},
{cluster_name,<<"rabbit@M-zhutianwei-A">>},
{partitions,[]}]
7.3.5 设置内存节点及内存节点连接磁盘节点
将M-zhutianwei-C、S-zhutianwei-B作为内存节点与M-zhutianwei-A连接起来,执行如下命令:
l /usr/local/rabbitmq/sbin/rabbitmqctl stop_app
/usr/local/rabbitmq/sbin/rabbitmqctl join_cluster --ram rabbit@m-zhutianwei-a
/usr/local/rabbitmq/sbin/rabbitmqctl start_app
上述命令先停掉rabbitmq应用,然后调用cluster命令,将M-zhutianwei-C或S-zhutianwei-B连接到,使两者成为一个集群,最后重启rabbitmq应用。在这个cluster命令下,M-zhutianwei-C、S-zhutianwei-B是内存节点,M-zhutianwei-A是磁盘节点(RabbitMQ启动后,默认是磁盘节点)。
如果要使M-zhutianwei-C或S-zhutianwei-B在集群里也是磁盘节点,join_cluster 命令去掉--ram参数即可
#/usr/local/rabbitmq/sbin/rabbitmqctl join_cluster rabbit@queue
只要在节点列表里包含了自己,它就成为一个磁盘节点。在RabbitMQ集群里,必须至少有一个磁盘节点存在。
7.3.6 运行cluster_status命令查看集群状态
/usr/local/rabbitmq/sbin/rabbitmqctl cluster_status
- <span style="font-family:SimSun;font-size:12px;">Cluster status of node 'rabbit@s-zhutianwei-b' ...
- [{nodes,[{disc,['rabbit@m-zhutianwei-a']},
- {ram,['rabbit@s-zhutianwei-b','rabbit@m-zhutianwei-c']}]},
- {running_nodes,['rabbit@m-zhutianwei-c','rabbit@m-zhutianwei-a',
- 'rabbit@s-zhutianwei-b']},
- {cluster_name,<<"rabbit@M-zhutianwei-A">>},
- {partitions,[]}]</span>
7.3.7 测试
往任意一台集群节点里写入消息队列,会复制到另一个节点上,我们看到两个节点的消息队列数一致。
Listing queues ...
helloword 1
这样RabbitMQ集群就正常工作了。这种模式更适合非持久化队列,只有该队列是非持久的,客户端才能重新连接到集群里的其他节点,并重新创建队列。假如该队列是持久化的,那么唯一办法是将故障节点恢复起来。
上面配置RabbitMQ默认集群模式,但并不保证队列的高可用性,尽管交换机、绑定这些可以复制到集群里的任何一个节点,但是队列内容不会复制,虽然该模式解决一部分节点压力,但队列节点宕机直接导致该队列无法使用,只能等待重启,所以要想在队列节点宕机或故障也能正常使用,就要复制队列内容到集群里的每个节点,需要创建镜像队列。下一节我们看看如何镜像模式来解决复制的问题,从而提高可用性 。
7.4 镜像模式配置
7.4.1 增加负载均衡器
关于负载均衡器,商业的比如F5的BIG-IP,Radware的AppDirector,是硬件架构的产品,可以实现很高的处理能力。但这些产品昂贵的价格会让人止步,所以我们还有软件负载均衡方案。互联网公司常用的软件LB一般有LVS、HAProxy、Nginx等。LVS是一个内核层的产品,主要在第四层负责数据包转发,使用较复杂。HAProxy和Nginx是应用层的产品,但Nginx主要用于处理HTTP,所以这里选择HAProxy作为RabbitMQ前端的LB。HAProxy的安装使用非常简单,在Centos下直接yum install haproxy,然后更改/etc/haproxy/haproxy.cfg 文件即可,文件内容大概如下:
- #---------------------------------------------------------------------
- # Global settings
- #---------------------------------------------------------------------
- global
- log 127.0.0.1 local2
- chroot /var/lib/haproxy
- pidfile /var/run/haproxy.pid
- maxconn 4000
- user haproxy
- group haproxy
- daemon
- # turn on stats unix socket
- stats socket /var/lib/haproxy/stats
- #---------------------------------------------------------------------
- # common defaults that all the 'listen' and 'backend' sections will
- # use if not designated in their block
- #---------------------------------------------------------------------
- defaults
- mode http
- log global
- option httplog
- option dontlognull
- option http-server-close
- option forwardfor except 127.0.0.0/8
- option redispatch
- retries 3
- timeout http-request 10s
- timeout queue 1m
- timeout connect 10s
- timeout client 1m
- timeout server 1m
- timeout http-keep-alive 10s
- timeout check 10s
- maxconn 3000
- listen rabbitmq_cluster 0.0.0.0:5672
- mode tcp
- balance roundrobin
- server rqslave1 192.168.36.102:5672 check inter 2000 rise 2 fall 3
- server rqslave2 192.168.36.136:5672 check inter 2000 rise 2 fall 3
- #server rqmaster 192.168.36.217:5672 check inter 2000 rise 2 fall 3
启动:service haproxy start
负载均衡器会监听5672端口,轮询我们的两个内存节点192.168.36.102、192.168.36.136的5672端口,192.168.36.217为磁盘节点,只做备份不提供给生产者、消费者使用,当然如果我们服务器资源充足情况也可以配置多个磁盘节点,这样磁盘节点除了故障也不会影响,除非同时出故障。
7.4.2 配置策略
使用Rabbit镜像功能,需要基于rabbitmq策略来实现,策策是用来控制和修改群集范围的某个vhost队列行为和Exchange行为在cluster中任意节点启用策略,策略会自动同步到集群节点
# rabbitmqctl set_policy -p hrsystem ha-allqueue"^" '{"ha-mode":"all"}'
这行命令在vhost名称为hrsystem创建了一个策略,策略名称为ha-allqueue,策略模式为 all 即复制到所有节点,包含新增节点,
策略正则表达式为 “^” 表示所有匹配所有队列名称。
例如rabbitmqctl set_policy -p hrsystem ha-allqueue "^message" '{"ha-mode":"all"}'
注意:"^message" 这个规则要根据自己修改,这个是指同步"message"开头的队列名称,我们配置时使用的应用于所有队列,所以表达式为"^"
set_policy说明参见
set_policy [-p vhostpath] {name} {pattern} {definition} [priority] #ha-mode:all、exactly、nodes
也可以通过rabbit控制台添加
下面我们来添加一个queues队列来看看效果,这里只是测试结果,其它的先不填写
在这里边添加的时候你是可以指定Node选项也就是把这个queues放在哪个node节点上.
3.创建队列时需要指定ha 参数,如果不指定x-ha-prolicy 的话将无法复制
4.客户端使用负载服务器192.168.36.127发送消息,队列会被复制到所有节点,当然策略也可以配置制定某几个节点,这时任何节点故障 、或者重启将不会影响我们正常使用某个队列,到这里我们完成了高可用配置(所有节点都宕机那没有办法了)。
5.使用rabbitmq管理端可以看到集群镜像模式中对列状态
8. 流量控制
8.1基于连接的流量控制
当生产者发送消息的速率大于消息被路由到queue的速率时,会触发流量控制,发送速率受到限制,但不会完全阻塞。
8.2基于内存的流量控制
当内存使用达到vm_memory_high_watermark的值时,会触发流量控制,生产者被阻塞。vm_memory_high_watermark的默认值是系统内存的40%,这个值可以在配置文件中修改。
[{rabbit, [{vm_memory_high_watermark, 0.4}]}].
或者在运行时通过命令rabbitmqctlset_vm_memory_high_watermark fraction修改,修改立即生效,但下次重启后恢复。所以要永久修改,必须同时修改配置文件。
8.3基于磁盘的流量控制
当磁盘剩余空间小于disk_free_limit的值时,触发流量控制,生产者被阻塞。
disk_free_limit的默认值是1GB,可在配置文件中修改。
[{rabbit, [{disk_free_limit, 25000000000}]}].
9. 内存使用
通过命令rabbitmqctl status可以查看内存使用状态,或者在WEB管理界面中点击节点后查看。
其中Queues表示队列中消息占用的内存
Mnesia表示MQ中定义的exchange,queue,bindings,用户及权限占用的内存
详细说明请参考http://www.rabbitmq.com/memory-use.html
10.配置管理
RabbitMQ的默认配置在大部分情况下是最佳配置,如果服务运行良好,不需要修改。RabbitMQ支持3种方式修改配置:环境变量、配置文件、运行时参数与策略。
环境变量可以配置到shell环境变量中,也可以在RabbitMQ的环境变量中配置。例如:配置服务绑定IP,可以在shell环境变量里配置RABBITMQ_NODE_IP_ADDRESS的值,也可以在RabbitMQ的环境变量中配置NODE_IP_ADDRESS的值,即RabbitMQ的环境变量中变量名称要去掉RABBITMQ_。RabbitMQ的环境变量文件在$RABBITMQ_HOME/sbin/rabbitmq-env。配置的优先级为shell环境变量优先于RabbitMQ的环境变量,RabbitMQ的环境变量优先于RabbitMQ默认的环境变量。
通过配置文件配置,要先在环境变量中指定配置文件路径,例如:
CONFIG_FILE=/etc/rabbitmq/rabbitmq.config
然后添加配置,例如:
[
{mnesia, [{dump_log_write_threshold, 1000}]},
{rabbit, [{tcp_listeners, [5673]}]}
].
通过rabbitmqctl命令可以在运行时修改配置,例如修改vm_memory_high_watermark。还有些配置,比如镜像队列,是通过管理界面或命令配置策略实现的。
详细的配置项请参考http://www.rabbitmq.com/configure.html
11.性能
11.1性能测试
RabbitMQ的JAVA客户端中附带了性能测试脚本,以下数据都由此脚本测试得到。
以下是发送0.5KB大小消息的测试结果:
producer | consumer | confirm(max unconfirmed publishes 100) | ack | persistent | throughput (msg/s) |
---|---|---|---|---|---|
1 | 1 | N | N | N | 17650 |
1 | 1 | Y | N | N | 15640 |
1 | 1 | N | Y | N | 17100 |
1 | 1 | N | N | Y | 17368 |
1 | 1 | Y | N | Y | 15635 |
1 | 1 | N | Y | Y | 9154 |
1 | 1 | Y | Y | N | 15266 |
1 | 1 | Y | Y | Y | 6111 |
max unconfirmed publishes的值对于吞吐量的影响较大.
在发送持久消息与打开消费者的acknowledgements时,吞吐量变化明显。
关于性能,请参考以下文章:
http://www.rabbitmq.com/blog/2012/04/17/rabbitmq-performance-measurements-part-1/
http://www.rabbitmq.com/blog/2012/04/25/rabbitmq-performance-measurements-part-2/
11.2队列的性能
RabbitMQ中的队列性能是一个值得关注的地方。在设计方案时就应该考虑到。队列只有在保持队列中不积压消息时,性能才是最佳的,队列中积压的消息越多,性能下降越多。
例如生产者发送消息的速度是600msg/s,消费者接收的速度是1200msg/s,正常情况下,是没有性能问题的。这时如果停止消费者一段时间,让消息在队列中积压,然后在打开消费者。按理消费者的速度大于生产者速度,可以转发新消息,并把老消息也取走,最终队列又回到为空的状态。但实际情况则不是,队列中的消息会继续积压,而且会继续变多,而这时消费者的速度就不如之前的了。
RabbitMQ中的队列,在实现上又分为多个小的队列,每个队列里存储着不同状态的消息。当消息不积压时,消息由交换器到达队列,就会被直接发送给消费者。而当消息堆积时,由于占用较多内存,RabbitMQ会把消息放入更深层次的队列,例如将内存中的消息换出到磁盘上(不管消息是否持久化),而这些操作会消耗更多的CPU等系统资源,从而导致影响队列中消息的发送。
为了不使消息积压,可以采取两种方法:
1)停止向队列发送消息
停止发送消息,让系统资源都集中到向消费者发送消息,队列中的消息逐渐减少,队列最终会恢复至为空状态。
2)转移负载
有些时候不能停止生产者,这时可以改变绑定,让新消息发送到新的队列,新队列必须位于新的机器上。当然也需要新的消费者来连接。这样可以让老队列中的消息慢慢取走,也不影响新消息的发送。
11.3 类似产品对比
1百万条1K的消息