RabbitMQ WorkQueues 工作队列模式
工作队列模式就是一个生产者,两个消费者。在初步入门的Hello World简单模式里面我们了解到。简单模式就是一个生产者一个消费者,中间通过中间件进行消息通信。
相比较这下这个工作队列的模式就是一个生产者通过中间件给两个消费者进行通信传递。
这都是官网的图,看图说话就好了。
用代码去实现这样的一个过程。
首先呢我们定义一个生产者
代码语言:javascript复制package com.jgdabc.producer;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
//完成发送消息
public class Producer_WorkQueue {
public static void main(String[] args) throws IOException, TimeoutException {
// 1 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2 设置连接参数
connectionFactory.setHost("123.56.4.32"); //服务器主机ip公网 默认localhost
connectionFactory.setPort(5672); //消息端口 默认5672
connectionFactory.setVirtualHost("/shabi");//虚拟机 默认/
connectionFactory.setUsername("jgdabc");//用户名 默认guest
connectionFactory.setPassword("123456");//密码 默认guest
// 3创建连接connection
Connection connection = connectionFactory.newConnection();
// 4 创建Channel
Channel channel = connection.createChannel();
// 最简单的helloword这个案例不需要交换机
// 5:所以直接创建队列Queue
channel.queueDeclare();
// 发送消息
//5创建队列
// queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<Stri
//参数说明: queue :队列名称
// durable : 是否持久化 :当mq 重启数据还在
// exclusive : 是否独占,只能有一个消费者监听这队列
// 当connection关闭时候,是否删除队列
// autoDelete:是否自动删除,当没有Consumer时候,是否自动删除
channel.queueDeclare("work_queues",true,false,false,null);
// 6// 发送消息
for(int i =1;i<=10;i )
{
String body = i "hello rabbicPublish";
channel.basicPublish("","work_queues",null,body.getBytes());
}
// basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body) throws
// 参数说明:1,exchange:交换机名称,简单模式下,交换机默认的,设置参数为空字符串
// 2:routingket:路由名称,要和路由到的队列名称一样,才可以匹配到
// 3:props :配置信息
// 4:body:发送消息数据
// 释放资源
channel.close();
connection.close();
}
}
下面来看消费者代码,这样的消费者代码你需要写两份。
代码语言:javascript复制package com.jgdabc.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer_WorkQueues {
public static void main(String[] args) throws IOException, TimeoutException {
// 1 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2 设置连接参数
connectionFactory.setHost("123.56.4.32"); //服务器主机ip公网 默认localhost
connectionFactory.setPort(5672); //消息端口 默认5672
connectionFactory.setVirtualHost("/shabi");//虚拟机 默认/
connectionFactory.setUsername("jgdabc");//用户名 默认guest
connectionFactory.setPassword("123456");//密码 默认guest
// 3创建连接connection
Connection connection = connectionFactory.newConnection();
// 4 创建Channel
Channel channel = connection.createChannel();
// 最简单的helloword这个案例不需要交换价
// 所以直接创建队列Queue
channel.queueDeclare();
// 发送消息
//5创建队列
// queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<Stri
//参数说明: queue :队列名称
// durable : 是否持久化 :当mq 重启数据还在
// exclusive : 是否独占,只能有一个消费者监听这队列
// 当connection关闭时候,是否删除队列
// autoDelete:是否自动删除,当没有Consumer时候,是否自动删除
// 如果没有一个helloword的队列,则会创建该队列
channel.queueDeclare("work_queues",true,false,false,null);
// 接收消息
// basicConsume(String queue, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback)
// 参数说明
// queue: 队列名称
// autoAck : 是否自动确认
// callback: 回调函数
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 回调方法,当收到消息后会自动执行该方法
// consumerTag:消息表示
// ebvelop:获取一些信息,交换机的信息,路由等等
// properties:配置信息
// body:数据
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
super.handleDelivery(consumerTag, envelope, properties, body);
// System.out.println("consumerTag:" consumerTag);
// System.out.println("Exchange:" envelope.getExchange());
// System.out.println("RoutingKey:" envelope.getRoutingKey());
// System.out.println("properties:" properties);
System.out.println("body:" new String(body));
}
};
channel.basicConsume("work_queues",true,consumer);
//
}
}
其中用到了这样的一段很简单,但是很方便去观察这个特点。
代码语言:javascript复制 for(int i =1;i<=10;i )
{
String body = i "hello rabbicPublish";
channel.basicPublish("","work_queues",null,body.getBytes());
}
我这里运行一次生产者,它会发十条数据,为了区分是那一次的接收我加入了i作为索引记录。
然后你看
你会以为这样会存在竞争的关系吗?说消费者竞争也没有错误,但是基本上你看他们是精准的分担了数据。
这个案例和我们之前的简单模式几乎没有什么区别。要非要说点区别的话,就是你需要创建两个消费者,这两个消费者的代码是一样的。包括自己设置的配置的相关信息都是一样的,时刻牢记这下面这张图。
你可以测试下,如果生产者和消费者的虚拟主机设置不一样的话会怎么样,那必然是获取不到消息。因为消息需要从队列中取出来,队列是存在于虚拟主机的,每个用户都有自己的虚拟主机的,如果虚拟主机不一样的话,那必然是隔离的。
这些代码都是在简单模型的基础代码上进行了一些改进。代码的注释比较乱,但是说的很清楚。另外·存在可以删除的代码,就是生产者这边常见的队列在消费者这里其实不用再声明,当然声明了也不影响。
这些东西都不需要去专门记,但是为了自己后面更好回顾就记录下来了。