0 前言
官网描述六类工作队列模式:
- 简单队列模式:最简单的工作队列,一个消息生产者,一个消息消费者,一个队列。另称点对点模式
- 工作模式:一个消息生产者,一个交换器,一个消息队列,多个消费者。也称点对点模式
- 发布/订阅模式:无选择接收消息,一个消息生产者,一个交换器,多个消息队列,多个消费者
- 路由模式:基于发布/订阅模式,有选择的接收消息,即通过 routing 路由进行匹配条件是否满足接收消息
- 主题模式:同样是在发布/订阅模式的基础上,根据主题匹配进行筛选是否接收消息,比第四类更灵活
- RPC模式:拥有请求/回复的。也就是有响应的,这是其它都没的
1 简单队列模式
1 实现功能
一个生产者 P 发送消息到队列 Q,一个消费者 C 接收:
Pro
Pro负责创建消息队列,并发送消息入列:
- 获取连接
- 创建通道
- 创建队列声明
- 发送消息
- 关闭队列
public class Producer {
private static final String QUEUE_NAME = "test_queue";
public static void main(String[] args) throws IOException, TimeoutException {
Connection newConnection = MQConnectionUtils.newConnection();
Channel channel = newConnection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String msg = "我是生产者生成的消息";
System.out.println("生产者发送消息:" msg);
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
channel.close();
newConnection.close();
}
}
Con
- 获取连接
- 获取通道
- 监听队列
public class Customer {
private static final String QUEUE_NAME = "test_queue";
public static void main(String[] args) throws IOException, TimeoutException {
System.out.println("002");
Connection newConnection = MQConnectionUtils.newConnection();
Channel channel = newConnection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String msgString = new String(body, "UTF-8");
System.out.println("消费者获取消息:" msgString);
}
};
channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
}
}
创建vhost
2 工作队列模式
将耗时的任务分发给多个消费者(工作者)。
主要解决:处理资源密集型任务,且还要等他完成。有了工作队列,就可将具体的工作放到后面去做,将工作封装为一个消息,发送到队列中,一个工作进程就可取出消息并完成工作。若启动了多个工作进程,则工作就可在多个进程间共享。
工作队列也称公平性队列模式,循环分发,若有两个消费者,默认RabbitMQ按序将每条消息发给下一个 Con,每个消费者获得相同数量的消息,即轮询。
Pro
创建50个消息
代码语言:csharp复制public class Producer2 {
private static final String QUEUE_NAME = "test_queue";
public static void main(String[] args) throws IOException, TimeoutException {
Connection newConnection = MQConnectionUtils.newConnection();
Channel channel = newConnection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
/**保证一次只分发一次 限制发送给同一个消费者 不得超过一条消息 */
channel.basicQos(1);
for (int i = 1; i <= 50; i ) {
String msg = "生产者消息_" i;
System.out.println("生产者发送消息:" msg);
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
}
channel.close();
newConnection.close();
}
}
Con
代码语言:csharp复制public class Customer2_1 {
private static final String QUEUE_NAME = "test_queue";
public static void main(String[] args) throws IOException, TimeoutException {
System.out.println("001");
Connection newConnection = MQConnectionUtils.newConnection();
final Channel channel = newConnection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
/** 保证一次只分发一次 限制发送给同一个消费者 不得超过一条消息 */
channel.basicQos(1);
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String msgString = new String(body, "UTF-8");
System.out.println("消费者获取消息:" msgString);
try {
Thread.sleep(1000);
} catch (Exception e) {
} finally {
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
channel.basicConsume(QUEUE_NAME, false, defaultConsumer);
}
}
循环分发
启动生产者
启动两个消费者
Pro发送了50条消息进入队列,而上方消费者启动图里很明显的看到轮询的效果,就是每个消费者会分到相同的队列任务。
公平分发
由于上方模拟的是非常简单的消息队列的消费,假如有一些非常耗时的任务,某个消费者在缓慢地进行处理,而另一个消费者则空闲,显然是非常消耗资源的。如一个1年的程序员,跟一个3年的程序员,分配相同的任务量,明显3年的程序员处理起来更加得心应手,很快就无所事事了,但是3年的程序员拿着非常高的薪资!显然3年的程序员应该承担更多的责任,咋办?
发生上述问题的原因是 RabbitMQ 收到消息后就立即分发出去,而没有确认各个工作者未返回确认的消息数量,类似UDP,面向无连接。可用 basicQos,并将参数 prefetchCount 设为1,告诉 RabbitMQ 我每次值处理一条消息,你要等我处理完了再分给我下一个。这样 RabbitMQ 就不会轮流分发了,而是寻找空闲的工作者进行分发。
代码语言:php复制final Channel channel = newConnection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
/** 保证一次只分发一次 限制发送给同一个消费者 不得超过一条消息 */
channel.basicQos(1);
消息持久化
背景
上边我们提到的公平分发是由消费者收取消息时确认解决的,但是这里面又会出现被 kill 的情况。
当有多个消费者同时收取消息,且每个消费者在接收消息的同时,还要处理其它的事情,且会消耗很长的时间。在此过程中可能会出现一些意外,比如消息接收到一半的时候,一个消费者死掉了。
这种情况要使用消息接收确认机制,可以执行上次宕机的消费者没有完成的事情。
但是在默认情况下,我们程序创建的消息队列以及存放在队列里面的消息,都是非持久化的。当RabbitMQ死掉了或者重启了,上次创建的队列、消息都不会保存。咋办?
参数配置
参数配置一:生产者创建队列声明时,修改第二个参数为 true
代码语言:csharp复制/**3.创建队列声明 */
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
参数配置二:生产者发送消息时,修改第三个参数为MessageProperties.PERSISTENT_TEXT_PLAIN
代码语言:csharp复制for (int i = 1; i <= 50; i ) {
String msg = "生产者消息_" i;
System.out.println("生产者发送消息:" msg);
channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
}
小结
- 循环分发:消费者端在信道上打开消息应答机制,并确保能返回接收消息的确认信息,这样可以保证消费者发生故障也不会丢失消息
- 消息持久化:服务器端和客户端都要指定队列的持久化和消息的持久化,这样可以保证RabbitMQ重启,队列和消息也不会丢失
- 公平分发:指定消费者接收的消息个数,避免出现消息均匀推送出现的资源不合理利用的问题
3 发布订阅模式
工作队列模式是直接在生产者与消费者里声明好一个队列,消息就只会对应同类型的消费者。这种只处理同种类型的消息有弊端。
3.1 案例
门户网站,用户注册完后一般都会发送消息通知用户注册结果。如在一个系统中,用户注册信息有邮箱、手机号,在注册完后会向邮箱和手机号都发送注册完成信息。
利用 MQ 实现业务异步处理,若用工作队列,就声明一个注册信息队列。注册完成后生产者向队列提交一条注册数据,消费者取出数据同时向邮箱以及手机号发送两条消息。但实际上邮箱和手机号信息发送实际上是不同的业务逻辑,不应放在一块处理。
这时就可利用发布/订阅模式将消息发送到转换机(EXCHANGE),声明两个不同的队列(邮箱、手机),并绑定到交换机。这样生产者只需要发布一次消息,两个队列都会接收到消息发给对应的消费者:
只需简单的将队列绑定到交换机。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列。就像子网广播,每台子网内的主机都获得一份复制的消息。
3.2 啥是发布订阅模式
可将消息发送给不同类型的消费者。即发布一次,消费多个:
X表示交换机、红色表示队列。
展示邮件、短信的例子,通过绑定到一个交换机,但是
3.3 实战
代码语言:csharp复制public class ProducerFanout {
private static final String EXCHANGE_NAME = "fanout_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
/** 1.创建新的连接 */
Connection connection = MQConnectionUtils.newConnection();
/** 2.创建通道 */
Channel channel = connection.createChannel();
/** 3.绑定的交换机 参数1交互机名称 参数2 exchange类型 */
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
/** 4.发送消息 */
for (int i = 0; i < 10; i )
{
String message = "用户注册消息:" i;
System.out.println("[send]:" message);
// 第二个参数为空类似于表示全局广播,只要绑定到该队列上的消费者理论上是都可收到
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("utf-8"));
try {
Thread.sleep(5 * i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/** 5.关闭通道、连接 */
channel.close();
connection.close();
/** 注意:如果消费没有绑定交换机和队列,则消息会丢失 */
}
}
邮件消费者
代码语言:java复制public class ConsumerEmailFanout {
private static final String QUEUE_NAME = "consumerFanout_email";
private static final String EXCHANGE_NAME = "fanout_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
System.out.println("邮件消费者启动");
/* 1.创建新的连接 */
Connection connection = MQConnectionUtils.newConnection();
/* 2.创建通道 */
Channel channel = connection.createChannel();
/* 3.消费者关联队列 */
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
/* 4.消费者绑定交换机 参数1 队列 参数2交换机 参数3 routingKey */
// 第三个参数置为空时,可以接收到生产者所有的消息(生产者 routingKey 参数为空时)
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String msg = new String(body, "UTF-8");
System.out.println("消费者获取生产者消息:" msg);
}
};
/* 5.消费者监听队列消息 */
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
短信消费者
代码语言:java复制public class ConsumerSMSFanout {
private static final String QUEUE_NAME = "ConsumerFanout_sms";
private static final String EXCHANGE_NAME = "fanout_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
System.out.println("短信消费者启动");
/* 1.创建新的连接 */
Connection connection = MQConnectionUtils.newConnection();
/* 2.创建通道 */
Channel channel = connection.createChannel();
/* 3.消费者关联队列 */
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
/* 4.消费者绑定交换机 参数1 队列 参数2交换机 参数3 routingKey */
// 第三个参数置为空时,可接收到生产者所有的消息(生产者 routingKey 参数为空时)
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String msg = new String(body, "UTF-8");
System.out.println("消费者获取生产者消息:" msg);
}
};
/* 5.消费者监听队列消息 */
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
运行
先运行两个con,再运行pro。如没有提前将队列绑定到交换机,直接运行pro,消息是不会发到任何队列里的。
生产者
短信消费者
邮件消费者
小结
相比工作模式,发布订阅模式引入了交换机,类型上更灵活。
pro不是直接操作队列,而是将数据发给交换机,由交换机将数据发给与之绑定的队列。从不加特定参数的运行结果中可以看到,两种类型的消费者(email,sms)都收到相同数量消息。
必须声明交换机,并设置模式:channel.exchangeDeclare(EXCHANGE_NAME, "fanout"),fanout 指分发模式(将每一条消息都发送到与交换机绑定的队列)
队列必须绑定交换机:channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
生产者发送消息到交换机,多个消费者声明多个队列,与交换机进行绑定,队列中的消息可以被所有消费者消费,类似QQ群消息
4 路由模式
就是发布订阅模式(Publish/Subscribe Pattern)中的直连交换机(Direct Exchange)。一种基于路由键(Routing Key)来路由消息的模式。在这种模式下,生产者发送消息时会指定一个路由键,交换机会根据这个路由键将消息路由到与之匹配的队列。
Pro
使用 channel.basicPublish
方法发送消息,并指定交换机名称和路由键。交换机会根据路由键将消息路由到与之匹配的队列。
Con
在消费者代码中,我们声明了一个直接交换机(direct
类型),并绑定了一个队列。在绑定队列时,我们使用 channel.queueBind
方法,并指定交换机名称、队列名称和路由键。交换机会根据路由键将消息路由到与之匹配的队列。
特点
- 路由键匹配:消息的路由键必须与队列绑定的路由键完全匹配,才能将消息路由到该队列。
- 直接交换机:直接交换机根据路由键进行精确匹配,适用于需要精确控制消息路由的场景。
通过这种方式,路由模式可以实现基于路由键的精确消息路由,适用于需要将消息发送到特定队列的场景。
5 主题模式
属于发布订阅模式的TopicExchange(主题交换机)。Queue 通过 routing key 绑定到 TopicExchange,当消息到达TopicExchange后,TopicEkchange 根据消息的 routing key 将消息路由到一个或者多个Queue。