RabbitMQ学习
传统http请求的缺点
代码语言:javascript复制Http请求基于请求与响应的模型,在高并发的情况下,客户端发送大量的请求达到
服务器端有可能会导致我们服务器端处理请求堆积。
代码语言:javascript复制Tomcat服务器处理每个请求都有自己独立的线程,如果超过最大线程数会将该请求缓存到队列中,如果请求堆积过多的情况下,有可能会导致tomcat服务器崩溃的问题。
所以一般都会在nginx入口实现限流,整合服务保护框架。
代码语言:javascript复制http请求处理业务逻辑如果比较耗时的情况下,容易造成客户端一直等待,阻塞等待
过程中会导致客户端超时发生重试策略,有可能会引发幂等性问题。
接口是为http协议的情况下,最好不要处理比较耗时的业务逻辑,耗时的业务逻辑应该单独交给多线程或者是mq处理。
应用场景
- 异步发送消息
- 处理一些比较耗时的操作
作用
代码语言:javascript复制支撑高并发、异步解耦、流量削峰、降低耦合度
多线程也能实现,但是消耗cpu资源,没有实现解耦。
中间件名词
- Producer 生产者:投递消息到MQ服务器端;
- Consumer 消费者:从MQ服务器端获取消息处理业务逻辑;
- Broker MQ服务器端
- Topic 主题:分类业务逻辑发送短信主题、发送优惠券主题
- Queue 存放消息模型 队列 先进先出 后进后出原则 数组/链表
- Message 生产者投递消息报文:json
主流MQ区别对比
RabbitMQ如何保证消息不丢失
针对生产者
代码语言:javascript复制确保生产者投递消息到MQ服务器端成功。
Ack 消息确认机制
同步或者异步的形式
方式1:Confirms
方式2:事务消息
针对消费者
代码语言:javascript复制在rabbitmq情况下:
必须要将消息消费成功之后,才会将该消息从mq服务器端中移除。
在kafka中的情况下:
不管是消费成功还是消费失败,该消息都不会立即从mq服务器端移除。
针对MQ服务器端
代码语言:javascript复制在默认的情况下 都会对队列中的消息实现持久化
持久化硬盘。
相关核心代码
生产者
代码语言:javascript复制public class Producer {
private static final String QUEUE_NAME = "mayikt-queue";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//1.创建一个新连接
Connection connection = RabbitMQConnection.getConnection();
//2.设置channel
Channel channel = connection.createChannel();
//3.发送消息
String msg = "每特教育6666";
channel.confirmSelect();
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
boolean result = channel.waitForConfirms();
if (result) {
System.out.println("消息投递成功");
} else {
System.out.println("消息投递失败");
}
channel.close();
connection.close();
}
}
消费者
代码语言:javascript复制public class Consumer {
private static final String QUEUE_NAME = "mayikt-queue";
public static void main(String[] args) throws IOException, TimeoutException, IOException, TimeoutException {
// 1.创建连接
Connection connection = RabbitMQConnection.getConnection();
// 2.设置通道
Channel channel = connection.createChannel();
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "UTF-8");
System.out.println("消费者获取消息:" msg);
// 消费者完成 消费该消息
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
// 3.监听队列
channel.basicConsume(QUEUE_NAME, false, defaultConsumer);
}
}
交换机类型
- Direct exchange(直连交换机):根据队列绑定的路由键转发到具体的队列中存放消息
- Fanout exchange(扇型交换机):发布订阅
- Topic exchange(主题交换机):根据队列绑定的路由建模糊转发到具体的队列中存放
- Headers exchange(头交换机)
----队列 存放消息
----交换机 路由消息存放在那个队列中 类似于nginx
---路由key 分发规则
RabbitMQ死信队列
产生背景
代码语言:javascript复制RabbitMQ死信队列俗称,备胎队列;消息中间件因为某种原因拒收该消息后,可以转移到死信队列中存放,死信队列也可以有交换机和路由key等。
产生的原因
- 消息投递到MQ中存放 消息已经过期 消费者没有及时的获取到我们消息,消息如果存放到mq服务器中过期之后,会转移到备胎死信队列存放。
- 队列达到最大的长度 (队列容器已经满了)
- 消费者消费多次消息失败,就会转移存放到死信队列中
以上内容来自蚂蚁课堂
原文档地址:http://file.chenmx.net/s/YmUV