RabbitMQ 学习初入门

2021-06-10 10:53:33 浏览数 (1)

RabbitMQ是一个消息队列,我们可以使用RabbitMQ 做消息队列,消息通知的业务功能,而且根据网上的不可靠消息得出,RabbitMQ 的性能水平甚至比 activeMQ 还要好,所以也是我选择认真去学习RabbitMQ的原因,当然我也有做个关于 activeMQ 的一些简单的 Demo 有机会的话可以分享出来~

Rabbit 使用 Erlang 来编写的AMQP 服务器。 什么是 AMQP 本人已经百度给大家了:

AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。Erlang中的实现有 RabbitMQ等。

哎哟,先从按照说起吧~ 我使用macbook 来进行学习和开发的 顺便提一下 本人是位果粉,别喷~ 使用 MAC OS 安非常简单,还我是建议使用 MAC OS 的同学使用 brew 进行开发,没有别的 就是因为方便。

使用这个命令之前 如果没有安装过 Homebrew 就安装一下吧 有机会我也分享一下安装方式 顺便介绍一下这个让你懒癌晚期的Homebrew。

打开终端输入:

brew install rabbitmq

然后会告诉你 无法获得 /usr/local/sbin 的写入权限。 因为是英文的错误提示,本人会看不会写 所以直接写中文给各位看了~ 还是那句 别喷~ 谢谢

好,接下来让他有权限,其实这个目录是没有的sbin。

mkdir /usr/local/sbin 创建目录

chmod 777 /usr/local/sbin 开权限,全开别烦

brew link rabbitmq link 一下

然后就没有然后了

启动 rabbitMQ

MacBook-Pro:~ Tony$ /usr/local/sbin/rabbitmq-server

RabbitMQ 3.5.6. Copyright (C) 2007-2015 Pivotal Software, Inc.

## ## Licensed under the MPL. See http://www.rabbitmq.com/

## ##

########## Logs: /usr/local/var/log/rabbitmq/rabbit@localhost.log

###### ## /usr/local/var/log/rabbitmq/rabbit@localhost-sasl.log

##########

Starting broker... completed with 10 plugins.

目前为止已经安装完 rabbitMQ 并且启动起来 了 简单到飞

开始废话 说说概念,别方 我也是抄书的~

消费者和生产者,由于书本的篇幅太长,就抄了!简单明了告诉你们,消息的产生者就是生产者,获得消息的称为消息消费者。我QQ M了你一下 我就是消息的生产者了,你在QQ看到我的消息了,你就是消息的消费者了,这才是正常写书方式嘛··· 举个开发的业务背景的例子,我支付了一张订单,这个业务产生了几个次要的业务,例如积分业务,我购买了订单,订单核心功能处理订单的业务,然后生产了一个消息并通过rabbitMQ 发送了一个消息,内容为产生了一个订单,然后订阅了消息的消息消费者就会消费消息,然后根据消息处理自己的业务,该干嘛干嘛去~ 积分业务就去加你的积分等等。当然这个只是举例,别吐槽 可以在台服务器或者一个业务方法上实现。 上面的举例对于一些分布式的微服务上是非常常见的方式了~

几个重要的概念:

信道:在连接到rabbitMQ 之后,你将可以创建并获得一个AMQP信道,信道是创建在“真实的”TCP连接内的虚拟链接。所有的AMQP命令都是通过 AMQP 信道发出去的,每条信道会被指派一个唯一的ID。所有发布消息还是订阅都是通过AMQP 信道去处理的。这个概念非常像端口号【但是不是端口啊 ,别说我混淆你】,而且他的英文名字可以理解到他的意思channel 有使用NIO 的开发者已经比较了解~ 其实使用AMQP信道的原因非常简单,建立TCP连接的性能代价是非常高的,当你在不同的线程当中去创建TCP连接的方式去和RabbitMQ进行通信的代价是十分高的,但是在多个不同的线程当中通过channel 我们就可以通过同一个TCP连接进行通信了。好像写得好乱不知道他们理不理解我写什么,唉 算了 本人的水平有限,况且大多数情况下都是我自己看而已,放弃~ 抽象图如下 丑,不过先将就~

交换器 与 路由键: 这个好难解释啊~ 直接说工作流程吧~ 哎哟其实应该把 队列写到这里来,算了吧 不改了。 先记住,交换器当中里面有N个队列,而路由键 是让交换器知道消息应该放到哪个队列当中去。别废话举例: 产生消息 要告诉 RabbitMQ 这个消息是放到那个交换器上,【注意:交换器是由开发者去创建的你可以搞N个交换器出来,没有管你】

上代码:

this.channel.basicPublish(“hello-exchange”,”hola”,properties, SerializationUtils.serialize(msg));

首先 hello-exchange 是交换器的名字, hola 是路由键 这个让hello-exchange 去判断应该分发到那个队列当中,SerializationUtils.serialize(msg) 将一个消息序列化。 channel 就不用说啦上面说了,是一个AMQP信道。

交换器有很多不同的类型一会会一一介绍。交换器的类型不同也注定了消息会分发到哪个队列当中,所有尤其重要。

工作流程:

1、消费者订阅消息,先创建一个交换器,如果交换器已经存在即返回一个交换器。当然你可以passive 为 true 如果 passive 为 true , 交换器已经存在返回一个已经存在的交换器,否则报错。我暂时不知道用在哪里,所以 先记住一般情况下 创建一个交换器,如果存在就返回一个已经存在的交换器,如果不存在则创建并返回。

JAVA 代码如下【直接给源码的方法定义,是不是很贴心】:

备注是自己翻译的别喷

/**

* Declare an exchange, via an interface that allows the complete set of

* arguments.

* @see com.rabbitmq.client.AMQP.Exchange.Declare

* @see com.rabbitmq.client.AMQP.Exchange.DeclareOk

* @param exchange the name of the exchange 【交换器名称】

* @param type the exchange type 【交换器类型 一会说 别着急】

* @param durable true if we are declaring a durable exchange (the exchange will survive a server restart) 【是否持久化交换器,这个后面说,还是简单说一下,纠结帝~ 就是交换器中的消息会持久化,就是会存在硬盘当中,宕机或重启服务时会自动恢复当时会牺牲性能,除非你的SSD硬盘好快,当我没有说过~】

* @param autoDelete true if the server should delete the exchange when it is no longer in use 【当这个交换器 已经没有人使用的时候 会自动删除,longer 长时期 当时我不知道长时期是什么时候,所以我会记住是 没人用就自己删掉 自动消失】

* @param internal true if the exchange is internal, i.e. can't be directly

* published to by a client. 【不知道什么鬼意思,如果是内部 不会直接发送到客户端,我一般写false ,真心不知道他什么意思,后面学下去应该明白】

* @param arguments other properties (construction arguments) for the exchange 【其他参数 暂时没有用到】

* @return a declaration-confirm method to indicate the exchange was successfully declared

* @throws java.io.IOException if an error is encountered 【会抛出IO异常】

*/

Exchange.DeclareOk exchangeDeclare(String exchange,String type, boolean durable, boolean autoDelete, boolean internal,Map arguments) throws IOException;

2、消费者创建队列,并绑定队列到交换器当中,上写路由键,让交换器知道这个路由键的消息是跑到这个队列当中的。

JAVA 代码如下【直接给源码的方法定义】:

/**

* Declare a queue

* @see com.rabbitmq.client.AMQP.Queue.Declare

* @see com.rabbitmq.client.AMQP.Queue.DeclareOk

* @param queue the name of the queue 【队列名称】

* @param durable true if we are declaring a durable queue (the queue will survive a server restart) 【持久化,这里特别说一下,如果你想消息是持久化的,必须消息是持久化的,交换器也是持久化的,队列更是持久化的,其中一个不是也无法恢复消息】

* @param exclusive true if we are declaring an exclusive queue (restricted to this connection) 【私有的,独有的。 这个队列之后这个应用可以消费,上面的英文注释是 说restricted to this connection 就是限制在这个连接可以消费,就是说不限制channel信道咯,具体没有试过,但是应该是这样,除非备注骗我,我读得书少,你唔好呃我!!!】

* @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use) 【没有人使用自动删除】 注意:如果exclusive为true 最好 autodelete都为true 至于为什么 这么简单自己想~

* @param arguments other properties (construction arguments) for the queue 【其他参数没有玩过】

* @return a declaration-confirm method to indicate the queue was successfully declared

* @throws java.io.IOException if an error is encountered

*/

Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map arguments) throws IOException;/**

* Declare a queue

* @see com.rabbitmq.client.AMQP.Queue.Declare

* @see com.rabbitmq.client.AMQP.Queue.DeclareOk

* @param queue the name of the queue

* @param durable true if we are declaring a durable queue (the queue will survive a server restart)

* @param exclusive true if we are declaring an exclusive queue (restricted to this connection)

* @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use)

* @param arguments other properties (construction arguments) for the queue

* @return a declaration-confirm method to indicate the queue was successfully declared

* @throws java.io.IOException if an error is encountered

*/

Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map arguments) throws IOException;

绑定队列和交换器 并指定路由键

/**

* Bind a queue to an exchange.

* @see com.rabbitmq.client.AMQP.Queue.Bind

* @see com.rabbitmq.client.AMQP.Queue.BindOk

* @param queue the name of the queue 【队列名称】

* @param exchange the name of the exchange 【交换器名称】

* @param routingKey the routine key to use for the binding 【路由键】

* @param arguments other properties (binding parameters) 【其他参数 还是那句没有玩过】

* @return a binding-confirm method if the binding was successfully created

* @throws java.io.IOException if an error is encountered

*/

Queue.BindOk queueBind(String queue, String exchange, String routingKey, Map arguments) throws IOException;

3、然后消费者就去订阅消息咯,注意在JAVA里面的订阅方法会产生线程阻塞的。

JAVA 代码:

订阅消息并消费

/**

* Start a non-nolocal, non-exclusive consumer, with

* a server-generated consumerTag.

* @param queue the name of the queue 【所订阅消费的队列】

* @param autoAck true if the server should consider messages 【是否为自动确定消息,好像TCP的ack syn 啊,可怕的7层模型!!!一般写true就可以】

* acknowledged once delivered; false if the server should expect

* explicit acknowledgements

* @param callback an interface to the consumer object 【回调callback 这个马上上代码看看】

* @return the consumerTag generated by the serve

* @throws java.io.IOException if an error is encountered

* @see com.rabbitmq.client.AMQP.Basic.Consume

* @see com.rabbitmq.client.AMQP.Basic.ConsumeOk

* @see #basicConsume(String, boolean, String, boolean, boolean, Map, Consumer)

*/

String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;

接收到消息之后,马上有回调,来看看回调接口:

/**

* Interface for application callback objects to receive notifications and messages from

* a queue by subscription.

* Most implementations will subclass {@link DefaultConsumer}.

*

* The methods of this interface are invoked in a dispatch

* thread which is separate from the {@link Connection}'s thread. This

* allows {@link Consumer}s to call {@link Channel} or {@link

* Connection} methods without causing a deadlock.

*

* The {@link Consumer}s on a particular {@link Channel} are invoked serially on one or more

* dispatch threads. {@link Consumer}s should avoid executing long-running code

* because this will delay dispatch of messages to other {@link Consumer}s on the same

* {@link Channel}.

*

* @see Channel#basicConsume(String, boolean, String, boolean, boolean, java.util.Map, Consumer)

* @see Channel#basicCancel

*/

public interface Consumer {

/**

* Called when the consumer is registered by a call to any of the

* {@link Channel#basicConsume} methods.

* @param consumerTag the consumer tag associated with the consume

*/

void handleConsumeOk(String consumerTag);

/**

* Called when the consumer is cancelled by a call to {@link Channel#basicCancel}.

* @param consumerTag the consumer tag associated with the consume

*/

void handleCancelOk(String consumerTag);

/**

* Called when the consumer is cancelled for reasons other than by a call to

* {@link Channel#basicCancel}. For example, the queue has been deleted.

* See {@link #handleCancelOk} for notification of consume

* cancellation due to {@link Channel#basicCancel}.

* @param consumerTag the consumer tag associated with the consume

* @throws IOException

*/

void handleCancel(String consumerTag) throws IOException;

/**

* Called when either the channel or the underlying connection has been shut down.

* @param consumerTag the consumer tag associated with the consume

* @param sig a {@link ShutdownSignalException} indicating the reason for the shut down

*/

void handleShutdownSignal(String consumerTag, ShutdownSignalException sig);

/**

* Called when a basic.recover-ok is received

* in reply to a basic.recover. All messages

* received before this is invoked that haven't been ack'ed will be

* re-delivered. All messages received afterwards won't be.

* @param consumerTag the consumer tag associated with the consume

*/

void handleRecoverOk(String consumerTag);

/**

* Called when a basic.deliver is received for this consumer.

* @param consumerTag the consumer tag associated with the consume

* @param envelope packaging data for the message

* @param properties content header data for the message

* @param body the message body (opaque, client-specific byte array)

* @throws IOException if the consumer encounters an I/O error while processing the message

* @see Envelope 宝宝累了 看重点,这个方法就是消息到达的时候回调的方法其他你们喜欢研究 可以认真看看英文备注。

*/

void handleDelivery(String consumerTag,

Envelope envelope,

AMQP.BasicProperties properties,

byte[] body)

throws IOException;

}

注意一下:其实也可以使用其他订阅方式去消费消息的,可以使用get的方式去获得一条消息,然后会在队列当中给你一条消息,但是你会想就不需要使用上面的订阅方法啦,直接跑个循环就可以啦。其实get方法会开启订阅获得一条消息后关闭订阅,这样会产生不必要的性能开销的,除非老板让你搞卡一点,让客户掏点优化费,不然就别这样干了。get的方法这里就先不介绍啦~

4、上面所说的都是消息消费者的工作,这个step开始说消息生产者要做的。开启一个交换器,当然这个交换器应该是存在不用创建的因为 上面所说消费者已经创建过了,但是其实谁去先创建都是可以的,消费者也可以创建,生产者也可以,这个倒是没有什么关系,主要是看你的业务需求【这句话甩锅用】。由于已经贴出过创建交换器的代码所以就不贴了,看上面。

与上面的交换器创建一样。不贴代码了自己拖上去看吧~

5、好像没有什么别的事情了,就是产生一个消息然后发过去。上代码

JAVA代码:

/**

* Publish a message

* @see com.rabbitmq.client.AMQP.Basic.Publish

* @param exchange the exchange to publish the message to 【发送到那个交换器上】

* @param routingKey the routing key 【路由键 决定消息去哪个队列里面混】

* @param props other properties for the message - routing headers etc 【其他消息参数, 哎~ 这个我玩过,一会DEMO时间,表演一下】

* @param body the message body 【消息体,一般把对象序列化一下发过去】

* @throws java.io.IOException if an error is encountered

*/

void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;

然后就没有然后了 上面 第三步的 订阅方法中的回调 就会有这个消息了。 好又到说概念的时候了。 困的同学先休息。

消息的投递方式,我也困了其实:

【1】 消息MESSAGE_A 到达 queue_A 队列

【2】 RabbitMQ 把 MESSAGE_A 发送到 APPLICATION A(消费者)

【3】 APPLICATION A 确定收到了消息 发个ack 高兴一下

【4】 RabbitMQ 把 消息从 queue_A 队列删除。

上面其实也有说过,autoack参数 有印象吧~ 所以autoack 设置成true 就没错了。这里就可以开展其他话题了,如果多个订阅者怎么办呢,是不是多个订阅者都会收到消息呢,答案是否定的。RabbitMQ的队列会对订阅者做一个循环,例如目前有两个订阅者订阅了同一个队列,serverA 和 serverB 他会循环去发送消息,队列有 ID 1-10 的消息,ID 1消息会由 serverA 处理 ID 2 消息会由 serverB 处理 ID 3 消息会由 serverA 去处理 如此类推。

如果serverA 在发送ACK应答给 RabbitMQ之前 断开连接【就是服务挂了,宕机了】RabbitMQ 会认为这个消息没有处理,即没有消费到这个消息,会把这个消息发送到 serverB 去处理。

而且在消息没有ACK之前,RabbitMQ不会发你第二条消息的,所以如果你想等待消息的任务处理完之后再给第二个消息的话,可以将autoDelete设置成false,这样你就可以在消息处理完之后再去ack。���样也是一个非常通用的场景,以防扎堆处理消息。

其实这样的设计也是非常好的,如果你出现了业务错误,执行消息处理的时候,刚好出现问题了,你可以通过断开连接的方式【这里所指的断开是断开RabbitMQ的connection】让这个消息交给下个订阅者,【这里说一下,如果队列没有消费者去订阅消息的话,消息会存在RabbitMQ当中等待有消费者去订阅再去发送】,当然这也是在你没有ack的情况下。在高级版本的RabbitMQ中可以使用reject命令,让这个消息直接传递到下个订阅者。【在RabbitMQ2.0可以使用】

AMQP信道channel与订阅

如果 一个channel 订阅了一个队列就不能订阅别的队列了,也就是说一个channel只能订阅一个队列。所以你需要取消原来的订阅,并将信道设置为“传输”模式。后面有时间写后面的文章可能会说到。

交换器的类型:

direct :这个简单,只要路由键一模一样就投递到相应的队列当中。

<hr />

fanout :这个也简单,消息会投递到所有绑定到这个交换器的队列当中。

<hr />

topic :这个就复杂一点了,我很客观的简单就简单 复杂就复杂 从不忽悠。但是这个也好常用不能不学,好累。在路由键当中可以使用通配符。怎么说呢,好纠结啊 不知道怎么解释啊 怎么办 在线等。

举个例子吧,绑定队列:

//绑定队列 路由键是 tony.teamA 有路由键为tony.teamA的消息就投递到这里

this.channel.queueBind("queueA","topic_exchangeA","tony.teamA",null);

//绑定队列 路由键是 yan.teamA 有路由键为tony.teamB的消息就投递到这里

this.channel.queueBind("queueB","topic_exchangeA","yan.teamA",null);

//绑定队列 路由键是 *.teamA 有路由键为.teamA结尾的消息就投递到这里

this.channel.queueBind("queueC","topic_exchangeA","*.teamA",null);

//绑定队列 路由键是 chao.teamB 有路由键为chao.teamB的消息就投递到这里

this.channel.queueBind("queueD","topic_exchangeA","chao.teamB",null);

//绑定队列 路由键是 *.teamB 有路由键为.teamB结尾的消息就投递到这里

this.channel.queueBind("queueE","topic_exchangeA","*.teamB",null);

//绑定队列 路由键是 # 所有在这个交换器的消息都投递到这里

this.channel.queueBind("queueF","topic_exchangeA","#",null);

// queueA[路由键:tony.teamA]、queueC[路由键:*.teamA]、queueF[路由键:#]会收到消息

this.channel.basicPublish("topic_exchangeA","tony.teamA",properties,SerializationUtils.serialize(msg));

// queueD[路由键:chao.teamB]、queueE[路由键:*.teamB]、queueF[路由键:#]会收到消息

this.channel.basicPublish("topic_exchangeA","chao.teamB",properties,SerializationUtils.serialize(msg));

然后基本的rabbitMQ 的东西就搞好了。不过这只是开始~

简单说一下虚拟主机和隔离吧,简单来说想MySQL 你可以创建很多个库,里面有很多个表。然后呢 rabbitMQ可以创建很多个虚拟主机,虚拟主机里面有很多个交换器,交换器里面有很多个队列,解释得完美。默认会提供一个 默认虚拟主机 vhost : “/”。后面找时间再说这个 vhost 吧。

重头大戏上DEMO,由于方便我阅读回忆,所以我忽略的封装性,一切以容易快速看懂和回忆为目标。别喷~

生产者:

package com.maxfunner;

import com.maxfunner.mq.EndPoint;

import com.rabbitmq.client.*;

import org.apache.commons.lang.SerializationUtils;

import java.io.IOException;

import java.util.ArrayList;

import java.util.HashMap;

import java.util.List;

import java.util.Map;

/**

* Produce

*/

public class Producer {

private Connection connection;

private Channel channel;

private Map messageMap = new HashMap();

private int maxID = 0;

private static final String EXCHANGE_NAME = "MY_EXCHANGE";

public void createConnectionAndChannel() throws IOException {

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("127.0.0.1"); //服务器地址

factory.setUsername("guest"); //默认用户名

factory.setPassword("guest"); //默认密码

factory.setPort(5672); //默认端口,对就是这么屌全部默认的

this.connection = factory.newConnection(); //创建链接

this.channel = this.connection.createChannel();

}

public void initChannelAndCreateExchange() throws IOException {

this.channel.confirmSelect(); //启用消息确认已经投递成功的回调

/**

* 创建了一个交换器,类型为 direct 非持久化 自动删除 没有额外参数

*/

this.channel.exchangeDeclare(EXCHANGE_NAME, "direct", false, true, null);

this.channel.addConfirmListener(new ConfirmListener() {

/**

* 成功的时候回调【这个是当消息到达交换器的时候回调】

* @param deliveryTag 每一条消息都有一个唯一ID【只是同一个channel唯一】,每次发出消息递增1 因为同一个channel所有也保证了消息的流水性。

* @param multiple

* @throws IOException

*/

public void handleAck(long deliveryTag, boolean multiple) throws IOException {

String message = messageMap.get(deliveryTag);

System.out.println("message : " message " ! 发送成功");

messageMap.remove(message);

//最后一个消息都搞掂之后 关闭所有东西

if (deliveryTag >= maxID) {

closeAnything();

}

}

/**

* 失败的时候回调

* @param deliveryTag 每一条消息都有一个唯一ID【只是同一个channel唯一】,每次发出消息递增1 因为同一个channel所有也保证了消息的流水性。

* @param multiple

* @throws IOException

*/

public void handleNack(long deliveryTag, boolean multiple) throws IOException {

String message = messageMap.get(deliveryTag);

System.out.println("message : " message " ! 发送失败");

messageMap.remove(message); //发送失败就不重发了,发脾气

//最后一个消息都搞掂之后 关闭所有东西

if (deliveryTag >= maxID) {

closeAnything();

}

}

});

}

public void sendMessage(String message) throws IOException {

AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()

.contentType("text/plain") //指定是一个文本

.build();

// 发送一个消息 到 EXCHANGE_NAME 的交换器中 路由键为 KEY_A 发送 message 之前序列化一下 具体用什么包上面import自己看

this.channel.basicPublish(EXCHANGE_NAME, "KEY_A", properties, SerializationUtils.serialize(message));

}

public void closeAnything() throws IOException {

this.channel.close(); //跪安吧 小channel

this.connection.close(); //你也滚吧 connection

}

public static void main(String[] args) throws IOException {

Producer producer = new Producer();

producer.createConnectionAndChannel();

producer.initChannelAndCreateExchange();

List messageList = new ArrayList();

messageList.add("message_A");

messageList.add("message_B");

messageList.add("message_C");

messageList.add("message_D");

messageList.add("message_E");

messageList.add("message_F");

producer.maxID = messageList.size(); //记录最后一个ID 当最后一个消息发送成功后关闭连接

//注意:因为channel产生的ID 是从1开始的

for (int i = 1; i <= messageList.size(); i ) {

producer.messageMap.put(new Long(i), messageList.get(i - 1)); //这里看懂了吗?没看懂也没有办法了,这里我真不知道怎么解释

producer.sendMessage(messageList.get(i - 1));

}

}

}

消费者:

package com.maxfunner;

import com.maxfunner.mq.QueueConsumer;

import com.rabbitmq.client.*;

import org.apache.commons.lang.SerializationUtils;

import java.io.IOException;

import java.util.HashMap;

import java.util.Map;

/**

* Consume

*/

public class Consumer {

private Connection connection;

private Channel channel;

private Map messageMap = new HashMap();

private static final String EXCHANGE_NAME = "MY_EXCHANGE";

/**

* 对,你猜得一点都没有错,我是复制的

* @throws IOException

*/

public void createConnectionAndChannel() throws IOException {

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("127.0.0.1"); //服务器地址

factory.setUsername("guest"); //默认用户名

factory.setPassword("guest"); //默认密码

factory.setPort(5672); //默认端口,对就是这么屌全部默认的

this.connection = factory.newConnection(); //创建链接

this.channel = this.connection.createChannel();

}

public void createAndBindQueue() throws IOException {

/**

* 创建了一个交换器,类型为 direct 非持久化 自动删除 没有额外参数

*/

this.channel.exchangeDeclare(EXCHANGE_NAME,"direct",false,true,null); //最好也创建一下交换器,反正已经创建也没有关系

/**

* 创建了一个队列, 名称为 QUEUE_A 非持久化 非独有的 自动删除的 没有额外删除的

*/

this.channel.queueDeclare("QUEUE_A",false,false,true,null);

this.channel.queueBind("QUEUE_A",EXCHANGE_NAME,"KEY_A");

}

public static void main(String args[]) throws IOException {

final Consumer consumer = new Consumer();

consumer.createConnectionAndChannel();

consumer.createAndBindQueue();

System.out.println("等待消息中。。。。");

new Thread(new Runnable() {

public void run() {

try {

/**

* 订阅消息,订阅队列QUEUE_A 获得消息后自动确认

*/

consumer.channel.basicConsume("QUEUE_A", true, new com.rabbitmq.client.Consumer() {

public void handleConsumeOk(String consumerTag) {

}

public void handleCancelOk(String consumerTag) {

}

public void handleCancel(String consumerTag) throws IOException {

}

public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {

}

public void handleRecoverOk(String consumerTag) {

}

public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

System.out.println("message : " SerializationUtils.deserialize(body));

}

});

} catch (IOException e) {

e.printStackTrace();

}

}

}).start();

}

}

生产者运行结果:

message : message_A ! 发送成功

message : message_B ! 发送成功

message : message_C ! 发送成功

message : message_D ! 发送成功

message : message_E ! 发送成功

message : message_F ! 发送成功

消费者运行结果:

等待消息中。。。。

message : message_A

message : message_B

message : message_C

message : message_D

message : message_E

message : message_F

项目我是用maven创建的贴一下maven 的pom.xml文件

4.0.0

com.maxfunne

rabbitlearning

1.0-SNAPSHOT

ja

rabbitlearning

http://maven.apache.org

UTF-8

junit

junit

3.8.1

test

com.rabbitmq

amqp-client

3.0.4

commons-lang

commons-lang

2.6

org.apache.commons

commons-lang3

3.1

最后一个重点问题

消息“黑洞”,如果在没有绑定队列和交换器之前,所有发出的消息都无法匹配到相应的队列当中,那些消息将永远不会被消费。而且confirm的callback也是会返回成功的即使消息进入了消息“黑洞”。所以在发送消息之前 必须确定队列已经绑定,确保消息能分配到相应的队列当中。 测试很简单,上面的DEMO 先运行 Producer 显示所有消息发送成功,然后再运行 Consumer 发现没有消息可以接收。 再尝试先运行Consumer 再运行 Producer 就发现一切都正常了,这也是为什么我把autoDelete设置为true的原因,有了autoDelete当队列没有人用的时候就会自动删除。所以每次运行都可以测试出问题。要保证消息能够到达指定的队列最好也在Producer中建立队列 而且进行相关的绑定 然后再发送消息 修改一下 Producer 的其中一方法即可。 不写了 累了~

public void initChannelAndCreateExchange() throws IOException {

this.channel.confirmSelect(); //启用消息确认已经投递成功的回调

/**

* 创建了一个交换器,类型为 direct 非持久化 自动删除 没有额外参数

*/

this.channel.exchangeDeclare(EXCHANGE_NAME, "direct", false, true, null);

this.channel.addConfirmListener(new ConfirmListener() {

/**

* 成功的时候回调【这个是当消息到达交换器的时候回调】

* @param deliveryTag 每一条消息都有一个唯一ID【只是同一个channel唯一】,每次发出消息递增1 因为同一个channel所有也保证了消息的流水性。

* @param multiple

* @throws IOException

*/

public void handleAck(long deliveryTag, boolean multiple) throws IOException {

String message = messageMap.get(deliveryTag);

System.out.println("message : " message " ! 发送成功");

messageMap.remove(message);

//最后一个消息都搞掂之后 关闭所有东西

if (deliveryTag >= maxID) {

closeAnything();

}

}

/**

* 失败的时候回调

* @param deliveryTag 每一条消息都有一个唯一ID【只是同一个channel唯一】,每次发出消息递增1 因为同一个channel所有也保证了消息的流水性。

* @param multiple

* @throws IOException

*/

public void handleNack(long deliveryTag, boolean multiple) throws IOException {

String message = messageMap.get(deliveryTag);

System.out.println("message : " message " ! 发送失败");

messageMap.remove(message); //发送失败就不重发了,发脾气

//最后一个消息都搞掂之后 关闭所有东西

if (deliveryTag >= maxID) {

closeAnything();

}

}

});

/**

* 创建了一个队列, 名称为 QUEUE_A 非持久化 非独有的 自动删除的 没有额外删除的

*/

this.channel.queueDeclare("QUEUE_A",false,false,true,null);

this.channel.queueBind("QUEUE_A",EXCHANGE_NAME,"KEY_A");

 

CentOS7下安装RabbitMQ  http://www.linuxidc.com/Linux/2016-11/136812.htm

CentOS 7.2 下 RabbitMQ 集群搭建  http://www.linuxidc.com/Linux/2016-12/137812.htm

CentOS7环境安装使用专业的消息队列产品RabbitMQ http://www.linuxidc.com/Linux/2016-11/13673.htm

配置与管理RabbitMQ  http://www.linuxidc.com/Linux/2016-11/136815.htm

RabbitMQ概念及环境搭建 http://www.linuxidc.com/Linux/2014-12/110449.htm

RabbitMQ入门教程  http://www.linuxidc.com/Linux/2015-02/113983.htm

0 人点赞