文章目录
- RabbitMQ 学习(二)---- HelloWorld 简单模型
- 开放 rabbitMQ 端口号 5672
- 发送者
- (1)创建连接工厂 connectionFactory
- (2)创建 与 rabbitmq 的连接 connection
- (3)通过连接创建信道 channel
- (4)在信道中传递数据
- (6)关闭资源
- 完整的过程代码
- 接收者
- (1)发送消息
- 接受方完整代码
RabbitMQ 学习(二)---- HelloWorld 简单模型
开放 rabbitMQ 端口号 5672
之前我们使用rabbitMq 网页客户端 开放了 15672 的端口,要想是的 java客户端访问服务器成功,需要开放 5672 的端口号。在服务器安全组设置
还要再pom.xml中加载 ampq客户端的依赖
代码语言:javascript复制 <!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.14.2</version>
</dependency>
如果想要在控制台不日志的报错信息,还要加载 slf4j 的依赖
代码语言:javascript复制 <dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.25</version>
</dependency>
发送者
(1)创建连接工厂 connectionFactory
在工厂中需要设置连接的主机名、端口号、客户端的用户名、密码、虚拟主机等,为之后的连接做好预先准备。
虚拟主机就是相当于 我们在数据库软件中 一个系统对应的数据库一样,对应这个一个单独的节点
代码语言:javascript复制 //1、创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("XXXX.XXX.XXX.XXX"); // 部署rabbitMQ的
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("123456");
// 虚拟主机相当于一个节点,作用相当于数据库软件中的 存储各种表数据的数据库
// 发送消息接收消息 (channel、exchange、routinekey、queue)都在虚拟主机中完成
connectionFactory.setVirtualHost("/test");
(2)创建 与 rabbitmq 的连接 connection
通过工厂实例拿到与rabbitMq的连接对象,只有拿到了连接,才能进行后续的所有操作
代码语言:javascript复制 //2、根据连接工厂创建连接对象
connection = connectionFactory.newConnection();
(3)通过连接创建信道 channel
这里有一个经典面试题,为什么我们在channel中完成消息的发送接收,而不是直接在connection中呢?
看看这一个回答,能够很清楚的说明白。
(4)在信道中传递数据
如何在信道中传递数据呢?
我们需要明确一点,不管是什么模型,其实都需要 交换机(exchange)、路由器(routingkey)、队列(queue).
虽然这里的简单模型是点对点的,只需要 队列来传递数据的
- Exchanger交换机,用来接收生产者发布的消息并将这些消息路由给服务器中的队列。
- 默认的交换机有一个特点,只要你的routerKey与这个交换机中有同名的队列,他就会自动路由上。
这里的发送接收的规则捋了一下,是这样的,因为该模型不需要指定交换机与路由规则,只需要队列就行了所以使用默认交换机,交换机是用来接收生产者消息,并根据路由规则将消息分发给服务器中的队列中的,在此之前生产着声明了队列,但是在传递的时候仍然需要交换机与路由规则 给队列分发 消息, 巧了默认交换机有一个规则,如果路由规则与队列同名的话,那么路由与队列会自动绑定上,所以需要将 routingKey 写成 与 队列同名,让他们绑定上,然后生产者才能在队列中取到信息
一个很重要的信息传递的规则
- 生产者声明队列
- 生产者在发送消息的时候 使用 交换机 接收消息,通过 路由规则 分发消息到 指定的 队列 中等待接收
- 消费者声明队列
- 消费者在接收消息的时候通过队列(与生产者队列一致)进行接收消息,接收成功并选择是否回应
声明队列相关信息
代码语言:javascript复制 //4、在信道中声明队列
/**
* @Params1 queue:队列的名字
* @Params2 durable: 是否支持队列持久化? 这里的持久化以及就是队列信息写入磁盘,如果rabbitmq服务器重启也会恢复队列信息,但不是信息持久化,信息会失去
* @Params3 exclusive: 该队列是否支持独占? 就是这个队列在被一个信道占用的时候不能被其他进行访问
* @Params4 autoDelete: 该队列是否自动删除? 就是说这个队列中的信息被接收方拿完之后要自动删除
* @Params5 arguments: map类型,一些额外的参数,比如说过期时间设置等
*/
channel.queueDeclare("queue",false,false,false,null);
通过交换机、路由器、队列进行发送信息
代码语言:javascript复制 /**
* 参数1:exchange 写交换机的名字,如果不写说明使用默认default amqp
* 参数2:routinekey 路由器,如果没有路由器的话默认路由器和队列同名,需要写上队列的名字
* 参数3:props 这里写发送的消息是否支持持久化
* 参数4: bytes 类型,这是我们要传递的具体消息
*/
//5、使用该信道进行发送消息
for(int i=0;i<10;i ){
channel.basicPublish("", "queue", MessageProperties.PERSISTENT_TEXT_PLAIN, ("简单模型信息传递:" i).getBytes());
}
(6)关闭资源
在发送完毕之后,关闭信道,关闭连接
代码语言:javascript复制finally {
if(channel!=null){
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
if(connection!=null){
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
完整的过程代码
代码语言:javascript复制package hello;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Provider {
public static void main(String[] args) {
//1、创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("120.46.143.156");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("123456");
// 虚拟主机相当于一个节点,作用相当于数据库软件中的 存储各种表数据的数据库
// 发送消息接收消息 (channel、exchange、routinekey、queue)都在虚拟主机中完成
connectionFactory.setVirtualHost("/test");
Connection connection = null;
Channel channel =null;
try {
//2、根据连接工厂创建连接对象
connection = connectionFactory.newConnection();
//3、根据连接对象创建信道
channel = connection.createChannel();
// 这里使用的是 helloworld 简单模型,不需要交换机,不需要路由,只需要队列
//4、在信道中声明队列
/**
* @Params1 queue:队列的名字
* @Params1 durable: 是否支持队列持久化? 这里的持久化以及就是队列信息写入磁盘,如果rabbitmq服务器重启也会恢复队列信息,但不是信息持久化,信息会失去
* @Params1 exclusive: 该队列是否支持独占? 就是这个队列在被一个信道占用的时候不能被其他进行访问
* @Params1 autoDelete: 该队列是否自动删除? 就是说这个队列中的信息被接收方拿完之后要自动删除
* @Params1 arguments: map类型,一些额外的参数,比如说过期时间设置等
*/
channel.queueDeclare("queue",false,false,false,null);
//5、使用该信道使用exchange、routineKey 进行发送消息 Bytes,
// 默认的交换机有一个特点,routineKey 如果和 队列名一致的话 ,那么匹配成功
// 生产者不是将消息放在queue队列中,而是放在默认交换机中等待符合routingkey的队列匹配,routineKey 名字和队列名一致则匹配成功
// queue在消费者这里生成,匹配成功之后消费者在queue中取消息
/**
* 参数1:exchange 写交换机的名字,如果不写说明使用默认default amqp
* 参数2:routinekey 路由器,如果没有路由器的话默认路由器和队列同名,需要写上队列的名字
* 参数3:props 这里写发送的消息是否支持持久化
* 参数4: bytes 类型,这是我们要传递的具体消息
*/
for(int i=0;i<10;i ){
channel.basicPublish("", "queue", MessageProperties.PERSISTENT_TEXT_PLAIN, ("简单模型信息传递:" i).getBytes());
}
} catch (Exception e){
e.printStackTrace();
}finally {
if(channel!=null){
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
if(connection!=null){
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
接收者
前面的几个步骤 都一样,
需要注意的一点是,接收方在接收queue中的数据的时候,声明queue必须和 发送方的保持一致,所有条件都得保持一致,否则接收不到。
(1)发送消息
从使用信道发送消息开始,使用 basicConsume()
代码语言:javascript复制 /**
* 参数1 : 队列的名字
* 参数2 : 是否自动确认,如果接收方接受了消息之后是否确认收到
* 参数3 : 接收到消息之后的业务操作
*/
//5、使用该信道进行发送消息
channel.basicConsume("queue", true, new DefaultConsumer(channel){
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接受了发送方的消息:" new String(body));
}
});
这里需要说一下,在最后的业务操作的参数是一个 Consumer,设置一个接口,我们需要写一个实现类,重写其中的 方法,对接收的message进行后续的业务操作。
接受方完整代码
代码语言:javascript复制package hello;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Customer {
public static void main(String[] args) {
//1、创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("120.46.143.156");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("123456");
// 虚拟主机相当于一个节点,作用相当于数据库软件中的 存储各种表数据的数据库
// 发送消息接收消息 (channel、exchange、routinekey、queue)都在虚拟主机中完成
connectionFactory.setVirtualHost("/test");
Connection connection = null;
Channel channel =null;
try {
//2、根据连接工厂创建连接对象
connection = connectionFactory.newConnection();
//3、根据连接对象创建信道
channel = connection.createChannel();
// 这里使用的是 helloworld 简单模型,不需要交换机,不需要路由,只需要队列
//4、在信道中声明队列
channel.queueDeclare("queue",false,false,false,null);
//5、使用该信道进行发送消息
/**
* 参数1 : 队列的名字
* 参数2 : 是否自动确认,如果接收方接受了消息之后是否确认收到
* 参数3 : 接收到消息之后的业务操作
*/
channel.basicConsume("queue", true, new DefaultConsumer(channel){
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接受了发送方的消息:" new String(body));
}
});
} catch (Exception e){
e.printStackTrace();
}
}
}