Work模型
简介(重要)
Work Queues,工作队列(又名:任务队列)。当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。
长此以往,消息就会堆积越来越多,无法及时处理。此时就可以使用Work Queues:让多个消费者绑定到一个队列,共同消费队列中的消息。
队列中的消息一旦消费,就会消失,因此任务是不会被重复执行的。
背后的主要思想是避免立即执行资源密集型任务,并等待它完成。
相反,我们计划稍后完成的任务。我们将任务封装为消息,并将其发送到队列。
在后台运行的工作进程将弹出任务并最终执行作业。运行许多工作人员时,任务将在它们之间共享。
代码实现
下面我将基于第一篇博客的工具类来进行模型的测试使用。
如简介中的图所示,是由一个生产者和两个消费者来完成这个工作,我们去看看结果是怎么样的。
代码语言:javascript复制package com.xn2001.workquene;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.xn2001.util.RabbitMQUtil;
import java.io.IOException;
/**
* @author 乐心湖
* @date 2020/6/2 14:05
**/
public class Producer {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("work", true, false, false, null);
//生产消息
for (int i = 0; i < 10; i ) {
channel.basicPublish("", "work", null, (i "发消息来了").getBytes());
}
RabbitMQUtil.closeChannelAndConnection(channel, connection);
}
}
代码语言:javascript复制package com.xn2001.workquene;
import com.rabbitmq.client.*;
import com.xn2001.util.RabbitMQUtil;
import java.io.IOException;
/**
* @author 乐心湖
* @date 2020/6/2 18:49
**/
public class ConsumerOne {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("work", true, false, false, null);
channel.basicConsume("work", true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body));
}
});
}
}
代码语言:javascript复制package com.xn2001.workquene;
import com.rabbitmq.client.*;
import com.xn2001.util.RabbitMQUtil;
import java.io.IOException;
/**
* @author 乐心湖
* @date 2020/6/2 18:56
**/
public class ConsumerTwo {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("work", true, false, false, null);
channel.basicConsume("work", true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body));
}
});
}
}
先运行两个消费者,然后开启生产者来生产消息。
结果如下图:
可以看到02468在第一个消费者中,13579在第二个消费者中消费。
By default, RabbitMQ will send each message to the next consumer, in sequence. On average every consumer will get the same number of messages. This way of distributing messages is called round-robin. Try this out with three or more workers.
默认情况下,RabbitMQ将每个消息依次发送给下一个使用者。平均而言,每个消费者都会收到相同数量的消息。
这种分发消息的方式称为循环。可以尝试与三个或更多的消费者来进行工作。
循环调度
One of the advantages of using a Task Queue is the ability to easily parallelise work. If we are building up a backlog of work, we can just add more workers and that way, scale easily.
使用任务队列的优点之一是能够轻松地并行化工作。如果我们正在积压工作,我们只需要增加更多的工人,这样就可以很容易地扩大规模。
Doing a task can take a few seconds. You may wonder what happens if one of the consumers starts a long task and dies with it only partly done. With our current code, once RabbitMQ delivers a message to the consumer it immediately marks it for deletion. In this case, if you kill a worker we will lose the message it was just processing. We'll also lose all the messages that were dispatched to this particular worker but were not yet handled.
完成一项任务需要几秒钟的时间。你可能想知道,如果一个消费者开始了一个长任务,并且只完成了一部分,那么会发生什么。对于我们当前的代码,一旦RabbitMQ向消费者传递了一条消息,它会立即将其标记为删除。在这种情况下,如果你失去一个工人,我们将失去它正在处理的消息。我们还会丢失所有发送给这个特定工作人员但尚未处理的消息。
消息确认
But we don't want to lose any tasks. If a worker dies, we'd like the task to be delivered to another worker.
但我们不想失去任何任务。如果一个工人不干了,我们希望把这个任务交给另一个工人。
In order to make sure a message is never lost, RabbitMQ supports message acknowledgments. An acknowledgement is sent back by the consumer to tell RabbitMQ that a particular message has been received, processed and that RabbitMQ is free to delete it.
为了确保消息不会丢失,RabbitMQ支持消息确认。消费者发回一个确认消息,告诉RabbitMQ已经接收并处理了一条特定消息,并且RabbitMQ可以自由删除该消息。
If a consumer dies (its channel is closed, connection is closed, or TCP connection is lost) without sending an ack, RabbitMQ will understand that a message wasn't processed fully and will re-queue it. If there are other consumers online at the same time, it will then quickly redeliver it to another consumer. That way you can be sure that no message is lost, even if the workers occasionally die.
如果消费者在没有发送 ack 的情况下死亡(其通道关闭、连接关闭或 TCP 连接丢失) ,RabbitMQ 将理解消息没有被完全处理,并将重新对其排队。 如果有其他消费者在线同时,它将迅速重新交付给另一个消费者。 这样你就可以确保没有信息丢失,即使工人偶尔死亡。
下面我们将去模拟一个"能者多劳"的场景,来完美规避上面这种问题的出现。
首先,我们需要去模拟一个消费者,它的速度效率低下,所以另一个消费者就消费得比他多。
实现这种场景我们只需让消费者1线程等待1秒,消费2线程正常运行,同时要关闭掉消息自动确定。
代码语言:javascript复制package com.xn2001.workquene;
import com.rabbitmq.client.*;
import com.xn2001.util.RabbitMQUtil;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
/**
* @author 乐心湖
* @date 2020/6/2 18:49
**/
public class ConsumerOne {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
//设置每次只消费一个消息
channel.basicQos(1);
channel.queueDeclare("work", true, false, false, null);
//参数2:关闭自动消息确认
channel.basicConsume("work", false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try{
TimeUnit.SECONDS.sleep(1);
}catch (InterruptedException e){
System.out.println(e);
}
System.out.println(new String(body));
//手动消息确认
// 参数1:确定队列中的具体消息
// 参数2:是否开启多个消息同时确认
channel.basicAck(envelope.getDeliveryTag(),false);
}
});
}
}
代码语言:javascript复制package com.xn2001.workquene;
import com.rabbitmq.client.*;
import com.xn2001.util.RabbitMQUtil;
import java.io.IOException;
/**
* @author 乐心湖
* @date 2020/6/2 18:56
**/
public class ConsumerTwo {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
channel.basicQos(1);
channel.queueDeclare("work", true, false, false, null);
//参数2:false 表示不自动确定消息
channel.basicConsume("work", false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body));
//手动消息确认
// 参数1:确定队列中的具体消息
// 参数2:是否开启多个消息同时确认
channel.basicAck(envelope.getDeliveryTag(),false);
}
});
}
}
代码语言:javascript复制package com.xn2001.workquene;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.xn2001.util.RabbitMQUtil;
import java.io.IOException;
/**
* @author 乐心湖
* @date 2020/6/2 14:05
**/
public class Producer {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("work", true, false, false, null);
//生产消息
for (int i = 0; i < 20; i ) {
channel.basicPublish("", "work", null, (i "发消息来了").getBytes());
}
RabbitMQUtil.closeChannelAndConnection(channel, connection);
}
}
结果正如所料,能者多劳。
Fanout模型
简介(重要)
Let's quickly go over what we covered in the previous tutorials:
- A producer 生产者 is a user application that sends messages. 是一个发送消息的用户应用程序
- A queue 排队 is a buffer that stores messages. 是一个存储消息的缓冲区
- A consumer 消费者 is a user application that receives messages. 是一个接收消息的用户应用程序
Rabbitmq 中消息传递模型的核心思想是,生产者从不直接将任何消息发送到队列。 实际上,很多时候生产者甚至不知道消息是否会被传递到任何队列。
Instead, the producer can only send messages to an exchange. An exchange is a very simple thing. On one side it receives messages from producers and the other side it pushes them to queues. The exchange must know exactly what to do with a message it receives. Should it be appended to a particular queue? Should it be appended to many queues? Or should it get discarded. The rules for that are defined by the exchange type.
相反,生产者只能向交换机发送消息。一方面它接收来自生产者的消息,另一方面它将消息推送到队列中。交换机必须确切地知道如何处理它接收到的消息。
- 是否应该将它添加到特定队列中?
- 是否应该将它附加到许多队列中?
- 或者它应该被丢弃。
- 其规则由交换器类型定义。
There are a few exchange types available: direct, topic, headers and fanout. We'll focus on the last one -- the fanout. Let's create an exchange of this type, and call it logs:
可用的交换类型有: 直接交换、主题交换、头文件交换和扇出交换。
我们将关注最后一个->扇形交换。
让我们创建一个这种类型的交换,并称之为 logs:
代码语言:javascript复制channel.exchangeDeclare("logs", "fanout");
The fanout exchange is very simple. As you can probably guess from the name, it just broadcasts all the messages it receives to all the queues it knows. And that's exactly what we need for our logger.
扇出交换非常简单。它只是将接收到的所有消息广播到它所知道的所有队列。
fanout模型:扇出,我们也称为广播
在广播模式下,消息发送流程是这样的:
- 可以有多个消费者
- 每个消费者有自己的queue (队列)
- 每个队列都要绑定到Exchange (交换机)
- 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定。
- 交换机把消息发送给绑定过的所有队列
- 队列的消费者都能拿到消息。实现一条消息被多个消费者消费
代码实现
之前我们是如何发布消息的呢,
代码语言:javascript复制channel.basicPublish("", "hello", null, message.getBytes());
第一个参数是交换器的名称,空字符串表示缺省或无名交换器: 如果存在,则使用 routingKey
指定的名称将消息路由到消息队列。
我们创建一个交换机
代码语言:javascript复制channel.exchangeDeclare("logs", "fanout");
所以现在,我们可以发布到我们刚创建的交换:
代码语言:javascript复制channel.basicPublish( "logs", "", null, message.getBytes());
开发消费者
我们至少需要用到3个消费者,同时启动观察是否都能接受来自广播的消息。
代码语言:javascript复制package com.xn2001.fanout;
import com.rabbitmq.client.*;
import com.xn2001.util.RabbitMQUtil;
import java.io.IOException;
/**
* @author 乐心湖
* @date 2020/6/4 0:39
**/
public class ConsumerOne {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
//绑定交换机
channel.exchangeDeclare("logs", "fanout");
//获取临时队列
String queue = channel.queueDeclare().getQueue();
//通道绑定交换机和队列
channel.queueBind(queue, "logs", "");
//消费消息
channel.basicConsume(queue, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1:" new String(body));
}
});
}
}
代码语言:javascript复制package com.xn2001.fanout;
import com.rabbitmq.client.*;
import com.xn2001.util.RabbitMQUtil;
import java.io.IOException;
/**
* @author 乐心湖
* @date 2020/6/4 0:39
**/
public class ConsumerTwo {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
//绑定交换机
channel.exchangeDeclare("logs", "fanout");
//获取临时队列
String queue = channel.queueDeclare().getQueue();
//通道绑定交换机和队列
channel.queueBind(queue, "logs", "");
//消费消息
channel.basicConsume(queue, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者2:" new String(body));
}
});
}
}
代码语言:javascript复制package com.xn2001.fanout;
import com.rabbitmq.client.*;
import com.xn2001.util.RabbitMQUtil;
import java.io.IOException;
/**
* @author 乐心湖
* @date 2020/6/4 0:39
**/
public class ConsumerThree {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
//绑定交换机
channel.exchangeDeclare("logs", "fanout");
//获取临时队列
String queue = channel.queueDeclare().getQueue();
//通道绑定交换机和队列
channel.queueBind(queue, "logs", "");
//消费消息
channel.basicConsume(queue, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者3:" new String(body));
}
});
}
}
启动这三个消费者,然后试着用生产者来发送消息。可以看到三个消费者都能消费消息。这就是我们的扇出(广播)模型。
Routing模型
这种模型的交换机类型为Direct
简介
Our logging system from the previous tutorial broadcasts all messages to all consumers. We want to extend that to allow filtering messages based on their severity. For example we may want a program which writes log messages to the disk to only receive critical errors, and not waste disk space on warning or info log messages.
我们的日志系统从上一个模型广播所有消息给所有的消费者。我们希望扩展它,以允许基于严重性的过滤消息。例如,我们可能希望一个程序写日志消息到磁盘只接收严重错误,而不浪费磁盘空间的警告或信息日志消息。
We were using a fanout exchange, which doesn't give us much flexibility - it's only capable of mindless broadcasting.
重点:我们使用的是扇出交换,这并没有给我们太多的灵活性,因为它只能无意识地广播。
我们将改用直接交换。
直接交换背后的路由算法很简单。
消息发送到其绑定键与消息的路由键完全匹配的队列。
绑定可以采用额外的 routingKey 参数。我们之前使用的扇出交换则完全忽略了它的价值。
代码实现
下面我们去实现这样的一个过程。
首先我需要说清楚这个模拟的场景具体是怎么样的,我们拥有两个消费者,第一个消费者绑定(routingKey)error的消息,另一个消费者绑定info,error,warning三者消息。我们将发送一个info的消息,猜想应该只有后者可以消费到。
代码语言:javascript复制package com.xn2001.direct;
import com.rabbitmq.client.*;
import com.xn2001.util.RabbitMQUtil;
import java.io.IOException;
/**
* @author 乐心湖
* @date 2020/6/4 16:58
**/
public class ConsumerOne {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
//声明交换机和交换机的类型
channel.exchangeDeclare("logs_direct","direct");
//创建一个临时队列
String queue = channel.queueDeclare().getQueue();
//临时队列和交换机绑定
channel.queueBind(queue,"logs_direct","error");
//消费消息
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1:" new String(body));
}
});
}
}
代码语言:javascript复制package com.xn2001.direct;
import com.rabbitmq.client.*;
import com.xn2001.util.RabbitMQUtil;
import java.io.IOException;
/**
* @author 乐心湖
* @date 2020/6/4 16:52
**/
public class ConsumerTwo {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
//声明交换机和交换机的类型
channel.exchangeDeclare("logs_direct", "direct");
//创建一个临时队列
String queue = channel.queueDeclare().getQueue();
//临时队列和交换机绑定
channel.queueBind(queue, "logs_direct", "info");
channel.queueBind(queue, "logs_direct", "error");
channel.queueBind(queue, "logs_direct", "warning");
//消费消息
channel.basicConsume(queue, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者2:" new String(body));
}
});
}
}
代码语言:javascript复制package com.xn2001.direct;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.xn2001.util.RabbitMQUtil;
import java.io.IOException;
/**
* @author 乐心湖
* @date 2020/6/4 17:03
**/
public class Producer {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
//声明交换机
channel.exchangeDeclare("logs_direct","direct");
//发送消息
String routingKey = "info";
channel.basicPublish("logs_direct",routingKey,null,
(("这是direct模型发布的基于routing Key:" routingKey " 发送的消息").getBytes()));
RabbitMQUtil.closeChannelAndConnection(channel,connection);
}
}
结果如我们猜想的一样,只有消费者2接收到了消息。
当然了,如果你发送消息不是error,info,warning,意味着没有消费者可以消费消息。
Topics模型
简介
在上一个模型中,我们改进了我们的日志系统。 我们不再使用只能进行虚拟广播的扇出交换,而是使用直接广播,从而获得了有选择地接收日志的可能性。虽然使用直接交换改进了我们的系统,但是它仍然有局限性,它不能基于多个标准进行路由。在我们的日志系统中,我们可能不仅希望订阅基于严重性的日志,还希望订阅基于发出日志的源的日志。
That would give us a lot of flexibility - we may want to listen to just critical errors coming from 'cron' but also all logs from 'kern'. To implement that in our logging system we need to learn about a more complex topic exchange.
这将给我们很大的灵活性——我们可能希望监听来自“cron”的关键错误,也可能需要监听来自”kern“的所有日志。
为了在我们的日志系统中实现这一点,我们需要了解更复杂的topic
。
其实这种模型就是让你的消费者中设置routingKey
可以更加细节的匹配到需要的消息。
*
代表一个单词
#
代表后面的所有
下面我们通过代码去演示会更加清晰。
代码实现
代码语言:javascript复制package com.xn2001.topic;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.xn2001.util.RabbitMQUtil;
import java.io.IOException;
/**
* @author 乐心湖
* @date 2020/6/4 19:28
**/
public class Producer {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("topics","topic");
String routingKey = "user.hello.hh";
channel.basicPublish("topics",routingKey,null,("routeKey:" routingKey).getBytes());
RabbitMQUtil.closeChannelAndConnection(channel,connection);
}
}
代码语言:javascript复制package com.xn2001.topic;
import com.rabbitmq.client.*;
import com.xn2001.util.RabbitMQUtil;
import java.io.IOException;
/**
* @author 乐心湖
* @date 2020/6/4 19:31
**/
public class ConsumerOne {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
//声明交换机和交换机的类型
channel.exchangeDeclare("topics","topic");
//创建一个临时队列
String queue = channel.queueDeclare().getQueue();
//临时队列和交换机绑定
channel.queueBind(queue,"topics","user.*");
//消费消息
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1:" new String(body));
}
});
}
}
代码语言:javascript复制package com.xn2001.topic;
import com.rabbitmq.client.*;
import com.xn2001.util.RabbitMQUtil;
import java.io.IOException;
/**
* @author 乐心湖
* @date 2020/6/4 19:32
**/
public class ConsumerTwo {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
//声明交换机和交换机的类型
channel.exchangeDeclare("topics","topic");
//创建一个临时队列
String queue = channel.queueDeclare().getQueue();
//临时队列和交换机绑定
channel.queueBind(queue,"topics","user.#");
//消费消息
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者2:" new String(body));
}
});
}
}
结果是consumerTwo
拿到了消息,因为它的routingKey
中写的是#。
另外你也可以测试如果我们生产者发送的消息是两个单词,例如user.hello
那么就意味着两个消费者都能拿到了。