RabbitMQ学习

2022-09-08 14:03:05 浏览数 (1)

RabbitMQ学习

传统http请求的缺点
代码语言:javascript复制
Http请求基于请求与响应的模型,在高并发的情况下,客户端发送大量的请求达到
服务器端有可能会导致我们服务器端处理请求堆积。
代码语言:javascript复制
Tomcat服务器处理每个请求都有自己独立的线程,如果超过最大线程数会将该请求缓存到队列中,如果请求堆积过多的情况下,有可能会导致tomcat服务器崩溃的问题。
所以一般都会在nginx入口实现限流,整合服务保护框架。
代码语言:javascript复制
http请求处理业务逻辑如果比较耗时的情况下,容易造成客户端一直等待,阻塞等待
过程中会导致客户端超时发生重试策略,有可能会引发幂等性问题。

接口是为http协议的情况下,最好不要处理比较耗时的业务逻辑,耗时的业务逻辑应该单独交给多线程或者是mq处理。

应用场景
  1. 异步发送消息
  2. 处理一些比较耗时的操作
作用
代码语言:javascript复制
支撑高并发、异步解耦、流量削峰、降低耦合度

多线程也能实现,但是消耗cpu资源,没有实现解耦。

中间件名词
  • Producer 生产者:投递消息到MQ服务器端;
  • Consumer 消费者:从MQ服务器端获取消息处理业务逻辑;
  • Broker MQ服务器端
  • Topic 主题:分类业务逻辑发送短信主题、发送优惠券主题
  • Queue 存放消息模型 队列 先进先出 后进后出原则 数组/链表
  • Message 生产者投递消息报文:json
主流MQ区别对比
RabbitMQ如何保证消息不丢失

针对生产者

代码语言:javascript复制
确保生产者投递消息到MQ服务器端成功。
Ack 消息确认机制
同步或者异步的形式
方式1:Confirms
方式2:事务消息

针对消费者

代码语言:javascript复制
在rabbitmq情况下:
   必须要将消息消费成功之后,才会将该消息从mq服务器端中移除。
在kafka中的情况下:
   不管是消费成功还是消费失败,该消息都不会立即从mq服务器端移除。

针对MQ服务器端

代码语言:javascript复制
在默认的情况下 都会对队列中的消息实现持久化
持久化硬盘。
相关核心代码

生产者

代码语言:javascript复制
public class Producer {
    private static final String QUEUE_NAME = "mayikt-queue";

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        //1.创建一个新连接
        Connection connection = RabbitMQConnection.getConnection();
        //2.设置channel
        Channel channel = connection.createChannel();
        //3.发送消息
        String msg = "每特教育6666";
channel.confirmSelect();

        channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
        boolean result = channel.waitForConfirms();
        if (result) {
            System.out.println("消息投递成功");
        } else {
            System.out.println("消息投递失败");
        }
        channel.close();
        connection.close();
    }
}

消费者

代码语言:javascript复制
public class Consumer {
    private static final String QUEUE_NAME = "mayikt-queue";

    public static void main(String[] args) throws IOException, TimeoutException, IOException, TimeoutException {
        // 1.创建连接
        Connection connection = RabbitMQConnection.getConnection();
        // 2.设置通道
        Channel channel = connection.createChannel();
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "UTF-8");
                System.out.println("消费者获取消息:"   msg);
                // 消费者完成 消费该消息
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        // 3.监听队列
        channel.basicConsume(QUEUE_NAME, false, defaultConsumer);

    }
}
交换机类型
  • Direct exchange(直连交换机):根据队列绑定的路由键转发到具体的队列中存放消息
  • Fanout exchange(扇型交换机):发布订阅
  • Topic exchange(主题交换机):根据队列绑定的路由建模糊转发到具体的队列中存放
  • Headers exchange(头交换机)
代码语言:javascript复制
----队列 存放消息
----交换机 路由消息存放在那个队列中 类似于nginx
---路由key 分发规则
RabbitMQ死信队列
产生背景
代码语言:javascript复制
RabbitMQ死信队列俗称,备胎队列;消息中间件因为某种原因拒收该消息后,可以转移到死信队列中存放,死信队列也可以有交换机和路由key等。
产生的原因
  1. 消息投递到MQ中存放 消息已经过期 消费者没有及时的获取到我们消息,消息如果存放到mq服务器中过期之后,会转移到备胎死信队列存放。
  2. 队列达到最大的长度 (队列容器已经满了)
  3. 消费者消费多次消息失败,就会转移存放到死信队列中

以上内容来自蚂蚁课堂

原文档地址:http://file.chenmx.net/s/YmUV

0 人点赞