RabbitMQ是什么
RabbitMQ是一个开源的AMQP(Advanced Message Queuing Protocol)实现,服务端用Erlang语言编写,支持多种客户端。用于在分布式系统中存储转发消息,在易用性,扩展性,高可用性等方面表现不俗。
AMQP协议
AMQP(Advanced Message Queuing Protocol)高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。
AMQP结构:
AMQP生产者流转过程:
AMQP消费者流转过程:
RabbitMQ核心概念
RabbitMQ整体结构:
Producer 生产者,就是投递消息的一方。生产者创建消息,然后发布到RabbitMQ中 消息一般可以包含两个部分:消息体和附加信息 消息体:在实际应用中,消息体一般是一个带有业务逻辑结构的数据,比如一个JSON字符串。当然也可以进一步对这个消息体进行序列化操作。 附加信息:用来表述这条信息,比如目标交换器的名称,路由键和一些自定义属性等。
Broker 消息中间件的服务节点 对于RabbitMQ来说,一个RabbitMQ Broker可以简单的看作一个RabbitMQ服务节点,或者RabbitMQ服务实例。也可以将一个RabbitMQ Broker看作一台RabbitMQ服务器。
Virtual Host 虚拟主机,表示一批交换器,消息队列和相关对象。 虚拟主机是共享相同的身份认证和加密环境的独立服务器域。 每个vhost本质上就是一个mini版的RabbitMQ服务器,拥有自己的队列,交换器,绑定和权限机制。 vhost是AMQP概念的基础,必须在连接时指定,RabbitMQ默认的vhost是/。
Channel 频道或信道,是建立在Connection连接之上的一种轻量级的连接。 大部分的操作是在Channel这个接口中完成的,包括定义队列的声明queueDeclare,交换机的声明exchangeDeclare,队列的绑定queueBind,发布消息basicPublish,消费消息basicConsume等。 如果把Connection比作一条光纤电缆的话,那么Channel信道就比作成光纤电缆中的其中一束光纤。一个Connection上可以创建任意数量的Channel。
RoutingKey 路由键。生产者将消息发给交换机的时候,一般会指定一个RoutingKey,用来指定这个消息的路由规则。 RoutingKey需要和交换器类型和绑定键联合使用。在交换器类型和绑定键固定的情况下,生产在发送消息给交换器时,通过指定RoutingKey来决定消息流向哪里。
Exchange 交换器,生产者将消息发送到Exchange(交换器,通常也可以用大写的”X”来表示),由交换器将消息路由到一个或多个队列中。如果路由不到,或返回给生产者,或直接丢弃。
Queue 队列,是RabbitMQ的内部对象,用于存储消息。
Binding 绑定,RabbitMQ中通过绑定将交换器与队列关联起来,在绑定的时候一般会指定一个绑定键(BindingKey),这样RabbitMQ就知道如何正确的将消息路由到队列了。
Exchange类型
RabbitMQ常用的交换器类型有fanout,direct,topic,headers四种
fanout:扇形交换机 它会把所有发送到该交换器的消息路由到所有与该交换器绑定的队列中
direct:直连交换机 它会把消息路由到那些BindingKey和RoutingKey完全匹配的队列中
topic:主题交换机 与direct类似,但它可以通过通配符进行模糊匹配
headers:头交换机 不依赖于路由键的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配 headers类型的交换器性能很差,而且也不实用
Consumer 消费者,就是接收消息的一方。消费者连接到RabbitMQ服务器,并订阅到队列上。 当消费者消费一条消息时,只是消费消息的消息体(payload)。在消息路由的过程中,消息的标签会丢弃,存入到队列中的消息只有消息体,消费者也只会消费到消息体,也就不知道消息的生产者是谁,当然消费者也不需要知道。
运转流程:
RabbitMQ运转流程
生产者发送消息的过程:
- 生产者连接到RabbitMQ Broker,建立一个连接(Connection),开启一个信道(Channel)
- 生产者声明一个交换器,并设置相关属性,如交换机类型,是否持久化等
- 生产者声明一个队列并设置相关属性,如是否排他,是否持久化,是否自动删除等
- 生产者通过路由键将路由器和队列绑定起来
- 生产者发送消息到RabbitMQ Broker,其中包含路由键,交换器等信息
- 相应的交换器根据接受到的路由键查找匹配的队列
- 如果找到,则将从生产者发送过来的消息存入相应的队列中
- 如果没有找到,则根据生产者配置的属性选择丢弃还是退回给生产者
- 关闭信道,关闭连接
消费者接受消息的过程:
- 消费者连接到RabbitMQ Broker,建立一个连接(Connection),开启一个信道(Channel)
- 消费者像RabbitMQ Broker请求消费相应队列中的消息,可能会设置相应的回调函数,以及做一些准备工作
- 等待RabbitMQ Broker回应并投递相应队列中的消息,消费者接受消息
- 消费者确认(ack)接受到的消息
- RabbitMQ从队列中删除相应已经被确认的消息
- 关闭信道,关闭连接
RabbitMQ的安装
环境准备:CentOS7,Erlang
安装依赖环境:
- 在 http://www.rabbitmq.com/which-erlang.html 页面查看安装rabbitmq需要安装erlang对应的版本
- 在 https://github.com/rabbitmq/erlang-rpm/releases 页面找到需要下载的erlang版本
- 复制下载地址后,使用wget命令下载
wget -P /home/download https://github.com/rabbitmq/erlang-rpm/releases/download/v22.3.2/erlang-22.3.2-1.el7.x86_64.rpm
- 安装 Erlang
rpm -Uvh /home/download/erlang-22.3.2-1.el7.x86_64.rpm
- 安装 socat
yum install -y socat
安装RabbitMQ:
- 在官方下载页面找到CentOS7版本的下载链接,下载rpm安装包
wget -P /home/download https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.3/rabbitmq-server-3.8.3-1.el7.noarch.rpm
- 安装RabbitMQ
rpm -Uvh /home/download/rabbitmq-server-3.8.3-1.el7.noarch.rpm
- 服务的启动与关闭
启动服务:
代码语言:javascript复制systemctl start rabbitmq-server
查看服务状态:
代码语言:javascript复制systemctl status rabbitmq-server
停止服务:
代码语言:javascript复制systemctl stop rabbitmq-server
设置开机启动:
代码语言:javascript复制systemctl enable rabbitmq-server
- 开启Web管理插件
开启插件:
代码语言:javascript复制rabbitmq-plugins enable rabbitmq_management
添加用户:
代码语言:javascript复制rabbitmqctl add_user admin admin
为用户分配操作权限:
代码语言:javascript复制rabbitmqctl set_user_tags admin administrator
为用户分配资源权限:
代码语言:javascript复制rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
- 防火墙添加端口
添加端口:
代码语言:javascript复制firewall-cmd --zone=public --add-port=4369/tcp --permanent
firewall-cmd --zone=public --add-port=5672/tcp --permanent
firewall-cmd --zone=public --add-port=25672/tcp --permanent
firewall-cmd --zone=public --add-port=15672/tcp --permanent
重启防火墙:
代码语言:javascript复制firewall-cmd --reload
本地测试环境可以直接关闭防火墙:
代码语言:javascript复制systemctl stop firewalld.service
- 浏览器输入http://ip:15672 即可访问RabbitMQ的Web管理页面,用前面配置的用户admin/admin即可登录。
RabbitMQ的配置
RabbitMQ有一套默认的配置,能够满足日常开发需求,如需修改,需要自己创建一个配置文件。
代码语言:javascript复制touch /etc/rabbitmq/rabbitmq.conf
配置文件示例:https://github.com/rabbitmq/rabbitmq-server/blob/master/docs/rabbitmq.conf.example
配置项说明:https://www.rabbitmq.com/configure.html#config-items
RabbitMQ的端口
RabbitMQ会绑定一些端口,安装完后,需要将这些端口添加到防火墙。
4369 是Erlang的端口/节点名称映射程序,用来跟踪节点名称监听地址,在集群中起到一个类似DNS的作用
5672,5671 AMQP 0-9-1和1.0客户端端口,没有使用SSL和使用SSL的端口
25672 用于RabbitMQ节点间和CLI工具通信,配合4369使用
15672 HTTP API Web端口,用于管理RabbitMQ,需要启动management插件
61613,61614 当STOMP插件启用的时候,作为STOMP客户端端口
1883,8883 当MQTT插件启动的时候,作为MQTT客户端端口
15674 基于WebSocket的STOMP客户端端口
15675 基于WebSocket的MQTT客户端端口
RabbitMQ的管理界面
RabbitMQ安装包中带有管理插件,但需要手动激活
代码语言:javascript复制rabbitmq-plugins enable rabbitmq_management
RabbitMQ有一个默认的用户”guest”,但这个用户默认只能通过本机访问,要让其他机器访问,需要创建一个新用户,为其分配权限
代码语言:javascript复制添加用户:
rabbitmqctl add_user admin admin
为用户分配操作权限:
rabbitmqctl set_user_tags admin administrator
为用户分配资源权限:
rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
RabbitMQ角色
RabbitMQ的用户角色分类:none,management,policymaker,monitoring,administrator
none 不能访问management plugin
management 用过户可以通过AMQP的任何事,外加:
- 列出自己可以通过AMQP登入的virtual hosts
- 查看自己的virtual hosts中queues,exchanges和bindings
- 查看和关闭自己的channels和connections
- 查看有关自己的virtual hosts的“全局”的统计信息,包含其他用户在这些hosts中的活动
policymaker management可以做的任何事,外加:
- 查看,创建和删除自己的virtual hosts所属的policies和parameters
monitoring management可以做的任何事,外加:
- 列出所有virtual hosts,包括他们不登录的virtual hosts
- 查看其他用户的connections和channels
- 查看节点级别的数据,如clustering和memory使用情况
- 查看真正的关于所有virtual hosts的全局统计信息
administrator policymaker和monitoring可以做的任何事,外加:
- 查看,创建和删除virtual hosts
- 查看,创建和删除users
- 查案,创建和删除permissions
- 关闭其他用户的connections
代码示例:
Maven依赖:
代码语言:javascript复制<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.9.0</version>
</dependency>
Producer:
代码语言:javascript复制import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
public class Producer {
public static void main(String[] args) {
// 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 设置连接属性
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
Connection connection = null;
Channel channel = null;
try {
// 从连接工厂获取连接
connection = connectionFactory.newConnection("Producer");
// 从链接中创建通道
channel = connection.createChannel();
// 声明队列,如果队列不存在,会创建
// RabbitMQ 不允许声明两个队列名相同,属性不同的队列,否则会报错
channel.queueDeclare("queue1", false, false, false, null);
String message = "Hello Rabbit";
// 发送消息
channel.basicPublish("", "queue1", null, message.getBytes());
System.out.println("消息已发送!");
} catch (Exception e) {
e.printStackTrace();
} finally {
// 关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
// 关闭连接
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
Consumer:
代码语言:javascript复制import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer {
public static void main(String[] args) {
// 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 设置连接属性
connectionFactory.setHost("localhost");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
Connection connection = null;
Channel channel = null;
try {
// 从连接工厂获取连接
connection = connectionFactory.newConnection("Consumer");
// 从链接中创建通道
channel = connection.createChannel();
// 声明队列,如果队列不存在,会创建
// RabbitMQ 不允许声明两个队列名相同,属性不同的队列,否则会报错
channel.queueDeclare("queue1", false, false, false, null);
// 定义收到消息后的回调
DeliverCallback deliverCallback =
new DeliverCallback() {
public void handle(String s, Delivery message) throws IOException {
System.out.println("收到消息:" new String(message.getBody(), "UTF-8"));
}
};
// 监听队列
channel.basicConsume(
"queue1",
true,
deliverCallback,
new CancelCallback() {
public void handle(String s) throws IOException {}
});
System.out.println("开始接收消息");
System.in.read();
} catch (Exception e) {
e.printStackTrace();
} finally {
// 关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
// 关闭连接
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}