目录
- 传统的http请求存在那些缺点
- 为什么需要使用mq
- java代码使用多线程的缺点
- rabbitmq安装
- Virtual Hosts
- 入门案例(一个消费者)
- 多个消费者的案例
- 问题
- 实现
- 消息应答
- 为什么要有这个
- 自动应答
- 手动应答
- 消息自动重新入队
- RabbitMQ 持久化
- 为什么持久化
- 队列如何实现持久化
- 不要轮训分发(不公平分发)
- 预取值
- 发布确认
- 发布确认的策略
- 单个确认发布(在生产端)
- 批量确认发布(在生产端)
- 异步确认发布(在生产端)
- 什么是交换机
- 直接(direct)
- 主题(topic) 路由
- 标题(headers)
- 扇出(fanout)发布订阅
- 临时队列
- 绑定(bindings)
- 死信队列
- 延迟队列
- 高级发布确认
默认的端口15672:rabbitmq管理平台端口号 默认的端口5672: rabbitmq消息中间内部通讯的端口 默认的端口号25672 rabbitmq集群的端口号
传统的http请求存在那些缺点
我们学习mq之前,我们可以看看传统的http请求有什么缺点?
1 浏览器发送http请求,在高并发情况下,会对服务器造成压力; 2 有的服务器会设置最大的请求线程数,如果高并发,剩余的会放到队列里面,队列里面的线程多了,也会造成服务器崩溃; 3 如果这个请求的逻辑里面,处理的业务是比较的大,比较的耗时,这样客户端就会一直的等待,或者超时之后,客户端会一直尝试的重新请求,这样都是问题;
注意事项:接口是为http协议的情况下,最好不要处理比较耗时的业务逻辑,耗时的业务逻辑应该单独交给多线程或者是mq处理。
为什么需要使用mq
可以实现支撑高并发、异步解耦、流量削峰、降低耦合度。
java代码使用多线程的缺点
多线程会造成上下文的切换,抢cpu,会对主业务造成问题,有可能延迟主业务的执行时间;
优点:适合于小项目 实现异步 缺点:有可能会消耗服务器cpu资源资源
rabbitmq安装
我们是在docker里面安装的rabbitmq,所以很快
Virtual Hosts
在RabbitMQ中可以虚拟消息服务器VirtualHost,每
个VirtualHost相当月一个相对独立的RabbitMQ服务器,每个VirtualHost之间是相互
隔离的。exchange、queue、message不能互通。
代码语言:javascript复制比如我们一个团队,做支付,做会员,都需要用到这个rabbitmq,我们就可以
使用这个VirtualHost 进行区分
入门案例(一个消费者)
1 创建一个maven项目 2 pom文件
代码语言:javascript复制 <!--指定 jdk 编译版本--> <build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<!--rabbitmq 依赖客户端-->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.8.0</version>
</dependency>
<!--操作文件流的一个依赖-->
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.6</version>
</dependency>
</dependencies>
3 创建发送者的消息controller
代码语言:javascript复制public class Producer {
// 创建一个队列
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
//创建一个连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.40.129");
factory.setUsername("root");
factory.setPassword("123456");
//channel 实现了自动 close 接口 自动关闭 不需要显示关闭
try(
Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
/**
* 生成一个队列(queueDeclare 里面的参数)
* 1.队列名称
* 2.队列里面的消息是否持久化 默认消息存储在内存中
* 3.该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费
* 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除
* 5.其他参数
*/
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
String message="hello world";
/**
* 发送一个消息(basicPublish 参数)
* 1.发送到那个交换机
* 2.路由的 key 是哪个
* 3.其他的参数信息
* 4.发送消息的消息体
*/
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println("消息发送完毕");
}
}
}
执行完以上的
4 消费者的代码
代码语言:javascript复制public class Consume {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.40.129");
factory.setUsername("root");
factory.setPassword("123456");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
System.out.println("等待接收消息....");
//推送的消息如何进行消费的接口回调
DeliverCallback deliverCallback=(consumerTag, delivery)->{
String message= new String(delivery.getBody());
System.out.println(message);
};
//取消消费的一个回调接口 如在消费的时候队列被删除掉了
CancelCallback cancelCallback=(consumerTag)->{
System.out.println("消息消费被中断");
};
/**
* 消费者消费消息
* 1.消费哪个队列
* 2.消费成功之后是否要自动应答 true 代表自动应答 false 手动应答
* 3.消费者未成功消费的回调
*/
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
}
}
执行这个代码,会一直运行,就等的队列,只要队列里面有消息,那么就立马输出;
多个消费者的案例
问题
实现
1 创建一个链接rabbitmq的工具类
代码语言:javascript复制public class RabbitMqUtils {
//得到一个连接的 channel
public static Channel getChannel() throws Exception{
//创建一个连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.40.129");
factory.setUsername("root");
factory.setPassword("123456");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
return channel;
}
}
2 创建消费者
代码语言:javascript复制public class Consume02 {
private static final String QUEUE_NAME="hello";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
DeliverCallback deliverCallback=(consumerTag, delivery)->{
String receivedMessage = new String(delivery.getBody());
System.out.println("接收到消息:" receivedMessage);
};
CancelCallback cancelCallback=(consumerTag)->{
System.out.println(consumerTag "消费者取消消费接口回调逻辑");
};
System.out.println("C2 消费者启动等待消费......");
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
}
}
3 使用idea工具,将同一个文件启动多次
勾选了那个就是多线程了;
4 写生产者的代码
代码语言:javascript复制public class Producer01 {
private static final String QUEUE_NAME="hello";
public static void main(String[] args) throws Exception {
try(Channel channel= RabbitMqUtils.getChannel();) {
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//从控制台当中接受信息
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()){
String message = scanner.next();
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println("发送消息完成:" message);
}
}
}
}
消息应答
为什么要有这个
为了保证消息在发送过程中不丢失,rabbitmq 引入消息应答机制,消息应答就是:消费者在接 收到消息并且处理该消息之后,告诉 rabbitmq 它已经处理了,rabbitmq 可以把该消息删除了。
代码语言:javascript复制就是为了防止消息丢失
自动应答
就是消费者一拿到队列里面的消息,就告诉队列,成功消费了,这种是不可取的,因为万一拿到之后,处理过程中报错了咋办;
代码语言:javascript复制想要性能,就用自动应答,因为速度快,但是安全性较低,有可能丢失消息
手动应答
里面有3个方法
代码语言:javascript复制A.Channel.basicAck(用于肯定确认)
RabbitMQ 已知道该消息并且成功的处理消息,可以将其丢弃了
B.Channel.basicNack(用于否定确认)
C.Channel.basicReject(用于否定确认)
与 Channel.basicNack 相比少一个参数
不处理该消息了直接拒绝,可以将其丢弃了
手动应答的好处是可以批量应答并且减少网络拥堵
代码里面使用false,建议; 只应答当前处理完成的;
消息自动重新入队
代码语言:javascript复制如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或 TCP 连接丢失),
导致消息未发送 ACK 确认,RabbitMQ 将了解到消息未完全处理,
并将对其重新排队。如果此时其他消费者可以处理,
它将很快将其重新分发给另一个消费者。这样,即使某个消费者偶尔死亡,
也可以确保不会丢失任何消息。
就是消费者没有返回ack,那么就将消息重新入队;
RabbitMQ 持久化
为什么持久化
刚刚我们已经看到了如何处理任务不丢失的情况,但是如何保障当 RabbitMQ 服务停掉以后消 息生产者发送过来的消息不丢失。默认情况下 RabbitMQ 退出或由于某种原因崩溃时,它忽视队列 和消息,除非告知它不要这样做。确保消息不会丢失需要做两件事:我们需要将队列和消息都标 记为持久化。
队列如何实现持久化
之前我们创建的队列都是非持久化的,rabbitmq 如果重启的化,该队列就会被删除掉,如果 要队列实现持久化 需要在声明队列的时候把 durable 参数设置为持久化
不要轮训分发(不公平分发)
能者多劳,建议使用不公平分发;
我们可以设置参数 channel.basicQos(1);设置在消费方
预取值
。该值定义通道上允许的未确认消息的最大数量。
发布确认
我们之前为了消息不丢失,要求了队列持久化,消息持久化,但是在消息持久化到磁盘之前,rabbitmq宕机了,咋办,消息还是会丢失的,所以我们需要第三个,就是在消息确保到硬盘的时候,返回给发送者一个确认机制,就是发布确认。
发布确认的策略
1 开启发布确认的方法 发布确认默认是没有开启的,如果要开启需要调用方法 confirmSelect,每当你要想使用发布确认,都需要在 channel 上调用该方法(发送端)
单个确认发布(在生产端)
一个确认了,后面的才发
代码语言:javascript复制这是一种简单的确认方式,它是一种同步确认发布的方式,也就是发布一个
消息之后只有它被确认发布,后续的消息才能继续发布,
waitForConfirmsOrDie(long)这个方法只有在消息被确认
的时候才返回,如果在指定时间范围内这个消息没有被确认那么它将抛出异常。
这种确认方式有一个最大的缺点就是:发布速度特别的慢,
因为如果没有确认
发布的消息就会阻塞所有后续消息的发布,这种方式最多提供每秒不超过数百条
发布消息的吞吐量。当然对于某
些应用程序来说这可能已经足够了。
批量确认发布(在生产端)
最主要是这两个代码
代码语言:javascript复制//开启发布确认
channel.confirmSelect();
//为了确保还有剩余没有确认消息 再次确认
if (outstandingMessageCount > 0) {
channel.waitForConfirms();
}
代码语言:javascript复制public static void publishMessageBatch() throws Exception {
try (Channel channel = RabbitMqUtils.getChannel()) {
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName, false, false, false, null);
//开启发布确认
channel.confirmSelect();
//批量确认消息大小
int batchSize = 100;
//未确认消息个数
int outstandingMessageCount = 0;
long begin = System.currentTimeMillis();
for (int i = 0; i < MESSAGE_COUNT; i ) {
String message = i "";
channel.basicPublish("", queueName, null, message.getBytes());
outstandingMessageCount ;
if (outstandingMessageCount == batchSize) {
channel.waitForConfirms();
outstandingMessageCount = 0;
}
}
//为了确保还有剩余没有确认消息 再次确认
if (outstandingMessageCount > 0) {
channel.waitForConfirms();
}
long end = System.currentTimeMillis();
System.out.println("发布" MESSAGE_COUNT "个批量确认消息,耗时" (end - begin)
"ms");
} }
异步确认发布(在生产端)
代码语言:javascript复制就是发送端到rabbitmq之间有一个信道,在这个信道里面是一个队列,可以理解
为数组,每一个数组里面存到是map,键值对形式保存;
这个信道里面的消息到了rabbitmq里面的队列里面,不管是成功到达,还是失败
,都会异步返回给发送者,发送者不用管,因为是异步的,所以发送者只需要
一直发消息就可以了;
发送者根据key可以知道,那个成功,哪个失败;
什么是交换机
RabbitMQ 消息传递模型的核心思想是: 生产者生产的消息从不会直接发送到队列。实际上,通常生产 者甚至都不知道这些消息传递传递到了哪些队列中。 相反,生产者只能将消息发送到交换机(exchange),交换机工作的内容非常简单,一方面它接收来 自生产者的消息,另一方面将它们推入队列。交换机必须确切知道如何处理收到的消息。是应该把这些消 息放到特定队列还是说把他们到许多队列中还是说应该丢弃它们。这就的由交换机的类型来决定。
直接(direct)
消息只去到它绑定的routingKey 队列中去
主题(topic) 路由
标题(headers)
扇出(fanout)发布订阅
它是将接收到的所有消息广播到它知道的所有队列中。 和路由键是没有关系的,不管队列里面的路由键一样还是不一样,都会发到所有的队列;
临时队列
一旦我们断开了消费者的连接,队列将被自动删除。也就是不是持久化的队列,就是临时队列;
绑定(bindings)
什么是 bingding 呢,binding 其实是 exchange 和 queue 之间的桥梁,它告诉我们 exchange 和那个队列进行了绑定关系
死信队列
无法被消费的消息,放到死信队列,之后再处理
应用场景为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息 消费发生异常时,将消息投入死信队列中.还有比如说: 用户在商城下单成功并点击去支付后在指定时 间未支付时自动失效
延迟队列
延时队列,队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望 在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的 元素的队列。
高级发布确认
之前是消息到达队列了,在准备持久化之前,宕机了,要进行确认,现在是准备发消息呀,发现rabbitmq宕机了,宕机的时间是不一样的;