RabbitMq 笔记,一篇文章入门

2022-05-09 10:40:14 浏览数 (1)

目录

  • 传统的http请求存在那些缺点
  • 为什么需要使用mq
  • java代码使用多线程的缺点
  • rabbitmq安装
  • Virtual Hosts
  • 入门案例(一个消费者)
  • 多个消费者的案例
    • 问题
    • 实现
  • 消息应答
    • 为什么要有这个
    • 自动应答
    • 手动应答
    • 消息自动重新入队
  • RabbitMQ 持久化
    • 为什么持久化
    • 队列如何实现持久化
  • 不要轮训分发(不公平分发)
  • 预取值
  • 发布确认
    • 发布确认的策略
      • 单个确认发布(在生产端)
      • 批量确认发布(在生产端)
      • 异步确认发布(在生产端)
  • 什么是交换机
    • 直接(direct)
    • 主题(topic) 路由
    • 标题(headers)
    • 扇出(fanout)发布订阅
  • 临时队列
  • 绑定(bindings)
  • 死信队列
  • 延迟队列
  • 高级发布确认

默认的端口15672:rabbitmq管理平台端口号 默认的端口5672: rabbitmq消息中间内部通讯的端口 默认的端口号25672 rabbitmq集群的端口号

传统的http请求存在那些缺点

我们学习mq之前,我们可以看看传统的http请求有什么缺点?

1 浏览器发送http请求,在高并发情况下,会对服务器造成压力; 2 有的服务器会设置最大的请求线程数,如果高并发,剩余的会放到队列里面,队列里面的线程多了,也会造成服务器崩溃; 3 如果这个请求的逻辑里面,处理的业务是比较的大,比较的耗时,这样客户端就会一直的等待,或者超时之后,客户端会一直尝试的重新请求,这样都是问题;

注意事项:接口是为http协议的情况下,最好不要处理比较耗时的业务逻辑,耗时的业务逻辑应该单独交给多线程或者是mq处理。

为什么需要使用mq

可以实现支撑高并发、异步解耦、流量削峰、降低耦合度。

java代码使用多线程的缺点

多线程会造成上下文的切换,抢cpu,会对主业务造成问题,有可能延迟主业务的执行时间;

优点:适合于小项目 实现异步 缺点:有可能会消耗服务器cpu资源资源

rabbitmq安装

我们是在docker里面安装的rabbitmq,所以很快

Virtual Hosts

在RabbitMQ中可以虚拟消息服务器VirtualHost,每

个VirtualHost相当月一个相对独立的RabbitMQ服务器,每个VirtualHost之间是相互

隔离的。exchange、queue、message不能互通。

代码语言:javascript复制
比如我们一个团队,做支付,做会员,都需要用到这个rabbitmq,我们就可以
使用这个VirtualHost  进行区分

入门案例(一个消费者)

1 创建一个maven项目 2 pom文件

代码语言:javascript复制
  <!--指定 jdk 编译版本--> <build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <configuration>
                <source>8</source>
                <target>8</target>
            </configuration>
        </plugin>
    </plugins>
</build>

    <dependencies>
            <!--rabbitmq 依赖客户端-->
            <dependency>
                <groupId>com.rabbitmq</groupId>
                <artifactId>amqp-client</artifactId>
                <version>5.8.0</version>
            </dependency>
            <!--操作文件流的一个依赖-->
            <dependency>
                <groupId>commons-io</groupId>
                <artifactId>commons-io</artifactId>
                <version>2.6</version>
            </dependency>
    </dependencies>

3 创建发送者的消息controller

代码语言:javascript复制
public class Producer {
//    创建一个队列
    private final static String QUEUE_NAME = "hello";
    public static void main(String[] args) throws Exception {
        //创建一个连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.40.129");
        factory.setUsername("root");
        factory.setPassword("123456");
        //channel 实现了自动 close 接口 自动关闭 不需要显示关闭
        try(
             Connection connection = factory.newConnection();
            Channel channel = connection.createChannel()) {
            /**
             * 生成一个队列(queueDeclare  里面的参数)
             * 1.队列名称
             * 2.队列里面的消息是否持久化 默认消息存储在内存中
             * 3.该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费
             * 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除
             * 5.其他参数
             */
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
            String message="hello world";
            /**
             * 发送一个消息(basicPublish 参数)
             * 1.发送到那个交换机
             * 2.路由的 key 是哪个
             * 3.其他的参数信息
             * 4.发送消息的消息体
             */
            channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
            System.out.println("消息发送完毕");
        }
    }
}

执行完以上的

4 消费者的代码

代码语言:javascript复制
public class Consume {

    private final static String QUEUE_NAME = "hello";
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.40.129");
        factory.setUsername("root");
        factory.setPassword("123456");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        System.out.println("等待接收消息....");
        //推送的消息如何进行消费的接口回调
        DeliverCallback deliverCallback=(consumerTag, delivery)->{
            String message= new String(delivery.getBody());
            System.out.println(message);
        };
        //取消消费的一个回调接口 如在消费的时候队列被删除掉了
        CancelCallback cancelCallback=(consumerTag)->{
            System.out.println("消息消费被中断");
        };

        /**
         * 消费者消费消息
         * 1.消费哪个队列
         * 2.消费成功之后是否要自动应答 true 代表自动应答 false 手动应答
         * 3.消费者未成功消费的回调
         */
        channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
    }
}

执行这个代码,会一直运行,就等的队列,只要队列里面有消息,那么就立马输出;

多个消费者的案例

问题

实现

1 创建一个链接rabbitmq的工具类

代码语言:javascript复制
public class RabbitMqUtils {

    //得到一个连接的 channel
    public static Channel getChannel() throws Exception{
        //创建一个连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.40.129");
        factory.setUsername("root");
        factory.setPassword("123456");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        return channel;
    }
}

2 创建消费者

代码语言:javascript复制
public class Consume02 {

    private static final String QUEUE_NAME="hello";
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        DeliverCallback deliverCallback=(consumerTag, delivery)->{
            String receivedMessage = new String(delivery.getBody());
            System.out.println("接收到消息:" receivedMessage);
        };
        CancelCallback cancelCallback=(consumerTag)->{
            System.out.println(consumerTag "消费者取消消费接口回调逻辑");

        };
        System.out.println("C2 消费者启动等待消费......");
        channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
    }
}

3 使用idea工具,将同一个文件启动多次

勾选了那个就是多线程了;

4 写生产者的代码

代码语言:javascript复制
public class Producer01 {
    private static final String QUEUE_NAME="hello";
    public static void main(String[] args) throws Exception {
        try(Channel channel= RabbitMqUtils.getChannel();) {
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
            //从控制台当中接受信息
            Scanner scanner = new Scanner(System.in);
            while (scanner.hasNext()){
                String message = scanner.next();
                channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
                System.out.println("发送消息完成:" message);
            }
        }
    }
}

消息应答

为什么要有这个

为了保证消息在发送过程中不丢失,rabbitmq 引入消息应答机制,消息应答就是:消费者在接 收到消息并且处理该消息之后,告诉 rabbitmq 它已经处理了,rabbitmq 可以把该消息删除了

代码语言:javascript复制
就是为了防止消息丢失

自动应答

就是消费者一拿到队列里面的消息,就告诉队列,成功消费了,这种是不可取的,因为万一拿到之后,处理过程中报错了咋办;

代码语言:javascript复制
想要性能,就用自动应答,因为速度快,但是安全性较低,有可能丢失消息

手动应答

里面有3个方法

代码语言:javascript复制
A.Channel.basicAck(用于肯定确认)
RabbitMQ 已知道该消息并且成功的处理消息,可以将其丢弃了
B.Channel.basicNack(用于否定确认)

C.Channel.basicReject(用于否定确认) 
与 Channel.basicNack 相比少一个参数
不处理该消息了直接拒绝,可以将其丢弃了

手动应答的好处是可以批量应答并且减少网络拥堵

代码里面使用false,建议; 只应答当前处理完成的;

消息自动重新入队

代码语言:javascript复制
如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或 TCP 连接丢失),
导致消息未发送 ACK 确认,RabbitMQ 将了解到消息未完全处理,
并将对其重新排队。如果此时其他消费者可以处理,
它将很快将其重新分发给另一个消费者。这样,即使某个消费者偶尔死亡,
也可以确保不会丢失任何消息。

就是消费者没有返回ack,那么就将消息重新入队;

RabbitMQ 持久化

为什么持久化

刚刚我们已经看到了如何处理任务不丢失的情况,但是如何保障当 RabbitMQ 服务停掉以后消 息生产者发送过来的消息不丢失。默认情况下 RabbitMQ 退出或由于某种原因崩溃时,它忽视队列 和消息,除非告知它不要这样做。确保消息不会丢失需要做两件事:我们需要将队列和消息都标 记为持久化。

队列如何实现持久化

之前我们创建的队列都是非持久化的,rabbitmq 如果重启的化,该队列就会被删除掉,如果 要队列实现持久化 需要在声明队列的时候把 durable 参数设置为持久化

不要轮训分发(不公平分发)

能者多劳,建议使用不公平分发;

我们可以设置参数 channel.basicQos(1);设置在消费方

预取值

。该值定义通道上允许的未确认消息的最大数量。

发布确认

我们之前为了消息不丢失,要求了队列持久化,消息持久化,但是在消息持久化到磁盘之前,rabbitmq宕机了,咋办,消息还是会丢失的,所以我们需要第三个,就是在消息确保到硬盘的时候,返回给发送者一个确认机制,就是发布确认。

发布确认的策略

1 开启发布确认的方法 发布确认默认是没有开启的,如果要开启需要调用方法 confirmSelect,每当你要想使用发布确认,都需要在 channel 上调用该方法(发送端)

单个确认发布(在生产端)

一个确认了,后面的才发

代码语言:javascript复制
这是一种简单的确认方式,它是一种同步确认发布的方式,也就是发布一个
消息之后只有它被确认发布,后续的消息才能继续发布,
waitForConfirmsOrDie(long)这个方法只有在消息被确认
的时候才返回,如果在指定时间范围内这个消息没有被确认那么它将抛出异常。


这种确认方式有一个最大的缺点就是:发布速度特别的慢,

因为如果没有确认
发布的消息就会阻塞所有后续消息的发布,这种方式最多提供每秒不超过数百条
发布消息的吞吐量。当然对于某
些应用程序来说这可能已经足够了。

批量确认发布(在生产端)

最主要是这两个代码

代码语言:javascript复制
//开启发布确认
 channel.confirmSelect();
 
 //为了确保还有剩余没有确认消息 再次确认
 if (outstandingMessageCount > 0) {
 channel.waitForConfirms();
 }
代码语言:javascript复制
public static void publishMessageBatch() throws Exception {
 try (Channel channel = RabbitMqUtils.getChannel()) {
 String queueName = UUID.randomUUID().toString();
 channel.queueDeclare(queueName, false, false, false, null);
 
//开启发布确认
 channel.confirmSelect();


 //批量确认消息大小
 int batchSize = 100;
 //未确认消息个数
 int outstandingMessageCount = 0;
 long begin = System.currentTimeMillis();
 for (int i = 0; i < MESSAGE_COUNT; i  ) {
 String message = i   "";
 channel.basicPublish("", queueName, null, message.getBytes());
 outstandingMessageCount  ;
 if (outstandingMessageCount == batchSize) {
 channel.waitForConfirms();
 outstandingMessageCount = 0;
 }
 }
 //为了确保还有剩余没有确认消息 再次确认
 if (outstandingMessageCount > 0) {
 channel.waitForConfirms();
 }
 long end = System.currentTimeMillis();
 System.out.println("发布"   MESSAGE_COUNT   "个批量确认消息,耗时"   (end - begin)   
"ms");
 } }

异步确认发布(在生产端)

代码语言:javascript复制
就是发送端到rabbitmq之间有一个信道,在这个信道里面是一个队列,可以理解
为数组,每一个数组里面存到是map,键值对形式保存;

这个信道里面的消息到了rabbitmq里面的队列里面,不管是成功到达,还是失败
,都会异步返回给发送者,发送者不用管,因为是异步的,所以发送者只需要
一直发消息就可以了;

发送者根据key可以知道,那个成功,哪个失败;

什么是交换机

RabbitMQ 消息传递模型的核心思想是: 生产者生产的消息从不会直接发送到队列。实际上,通常生产 者甚至都不知道这些消息传递传递到了哪些队列中。 相反,生产者只能将消息发送到交换机(exchange),交换机工作的内容非常简单,一方面它接收来 自生产者的消息,另一方面将它们推入队列。交换机必须确切知道如何处理收到的消息。是应该把这些消 息放到特定队列还是说把他们到许多队列中还是说应该丢弃它们。这就的由交换机的类型来决定。

直接(direct)

消息只去到它绑定的routingKey 队列中去

主题(topic) 路由

标题(headers)

扇出(fanout)发布订阅

它是将接收到的所有消息广播到它知道的所有队列中。 和路由键是没有关系的,不管队列里面的路由键一样还是不一样,都会发到所有的队列;

临时队列

一旦我们断开了消费者的连接,队列将被自动删除。也就是不是持久化的队列,就是临时队列;

绑定(bindings)

什么是 bingding 呢,binding 其实是 exchange 和 queue 之间的桥梁,它告诉我们 exchange 和那个队列进行了绑定关系

死信队列

无法被消费的消息,放到死信队列,之后再处理

应用场景为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息 消费发生异常时,将消息投入死信队列中.还有比如说: 用户在商城下单成功并点击去支付后在指定时 间未支付时自动失效

延迟队列

延时队列,队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望 在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的 元素的队列。

高级发布确认

之前是消息到达队列了,在准备持久化之前,宕机了,要进行确认,现在是准备发消息呀,发现rabbitmq宕机了,宕机的时间是不一样的

0 人点赞