首先 简单模式 看下 官网介绍
官网:https://www.rabbitmq.com/tutorials/tutorial-one-java.html
RabbitMQ是消息代理:它接受并转发消息。您可以将其视为邮局:将您要发布的邮件放在邮箱中时,可以确保Mailperson先生或女士最终将邮件传递给收件人。以此类推,此模式为:RabbitMQ是一个邮箱,一个邮局和一个邮递员。
官方说:如果接收不到消息 可能是:默认情况下,它至少需要200 MB的可用空间。
其他没什么了。
好了 ,我们开始我们缕一缕 我们的逻辑了
首先 简单模式 分为3个角色 一个生产者、一个消费者、一个消息中间件
不就是 生产者生产消息,通过AMQP协议 发送到 MQ ,然后消费者 从MQ 也通过AMQP协议 获取消息。
发送消息
- 创建连接工厂对象
- 对工厂对象设置一些参数
- 用工厂创建连接
- 通过连接获取队列 (指定一些队列的属性)
- 通过队列进行发送消息
- 释放资源
接收消息
- 创建连接工厂对象
- 对工厂对象设置一些参数
- 用工厂对象创建连接
- 通过连接获取队列(指定一些队列属性)
- 创建消费者,可以写回调函数 选择是否进行需要回调函数
- 获取消息
- 不需要释放资源
中间件所做的内容
RabbitMQ:提供交换机(可理解成数据库/govbuy),并提供好可以连接的账号、密码,并可远程登录,这里是通过可视化界面 做的 。
开始看代码吧
首先 还是Maven的代码
代码语言:javascript复制 <dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.6.0</version>
</dependency>
生产者代码
代码语言:javascript复制import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class provider {
public static void main(String[] args) throws IOException, TimeoutException {
//记得刷新Maven 简单模式 没有交换机,但会用到默认的交换机
//1 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2 设置参数
factory.setHost("118.31.127.248"); //不设置 就为 127.0.0.0.1
factory.setPort(5672); //不设置 就为 5672
factory.setVirtualHost("/govbuy"); //不设置 就为默认虚拟机 /
factory.setUsername("zanglikun"); //不设置 就是默认 guest
factory.setPassword("zanglikun"); //不设置 就是默认 guest
//3 创建连接 Connection
Connection connection = factory.newConnection();
//4 获取channl
Channel channel = connection.createChannel();
//5 创建队列 Queue 如果没有叫hello_world的队列,会自动创建
/*
参数:
1: queue 队列名称
2: durable 是否持久化 持久化到erlang自带的数据库中 重启 数据依旧存在
3: exclusive 是否独占 只允许一个消费者监听这个队列 2 当connection时,是否删除队列 一般为flase
4: autodelete 是否自动删除 当没有消费者,会自动删除
5: arguement 参数:如何删除队列的参数
*/
channel.queueDeclare("hello_world",true,false,false,null);
//6 发送消息到
/*
参数:
1:exchange 交换机名称。简单模式,会使用默认的
2:routingKey 路由名称
3:props 配置信息
4:body 真实发送的数据
*/
String Body = "Hello Rabbit MQ";
//简单模式 没有交换机,所以 路由 与 队列名称一样
channel.basicPublish("","hello_world",null,Body.getBytes());
System.out.println("发送时间是:" System.currentTimeMillis());
//7 释放资源
channel.close();
connection.close();
}
}
消费者代码
代码语言:javascript复制import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class consumer {
public static void main(String[] args) throws IOException, TimeoutException {
//记得刷新Maven 简单模式 没有交换机,但会用到默认的交换机
//1 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2 设置参数
factory.setHost("118.31.127.248"); //不设置 就为 127.0.0.0.1
factory.setPort(5672); //不设置 就为 5672
factory.setVirtualHost("/govbuy"); //不设置 就为默认虚拟机 /
factory.setUsername("zanglikun"); //不设置 就是默认 guest
factory.setPassword("zanglikun"); //不设置 就是默认 guest
//3 创建连接 Connection
Connection connection = factory.newConnection();
//4 获取channl
Channel channel = connection.createChannel();
//5 此方法不需要参数 是添加方法块{} 然后Alt Inster --> Override Methords 生成的
Consumer consumer = new DefaultConsumer(channel){
// 这是一个回调方法 ,当收到消息后,会自动执行该方法。
/**
*
* @param consumerTag 标识
* @param envelope 获取一些信息,交换机,路由Key
* @param properties 配置信息
* @param body 真实的数据
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收时间是:" System.currentTimeMillis());
System.out.println("consumerTag:" consumerTag);
System.out.println("Exchange:" envelope.getExchange());
System.out.println("RoutingKey:" envelope.getRoutingKey());
System.out.println("properties:" properties);
System.out.println("body:" new String(body));
System.out.println();
}
};
//6 获取消息
/*
参数
1:queue 队列名称
2:Auto ack 是否自动确认
3:callback 回调对象
*/
channel.basicConsume("hello_world",true,consumer);
// 消费者 不需要关闭连接,因为需要监听MQ。
}
}
先启动生产者,因为生产者 创建了队列了 hello_world 的队列
不然 消费者,会爆出 在虚拟机中 没有发现 队列 的异常
强调一点:消费者 不需要关闭 连接、释放资源。
特殊说明: 解决问题的光鲜,藏着磕Bug的痛苦。 万物皆入轮回,谁也躲不掉! 以上文章,均是我实际操作,写出来的笔记资料,不会出现全文盗用别人文章!烦请各位,请勿直接盗用!