RabbitMQ 学习(七)----发布确认

2022-10-07 13:54:15 浏览数 (1)

文章目录

  • RabbitMQ 学习(七)----发布确认
    • (1)开启发布确认的方法
    • (2)单个发布确认
    • (3)批量确认发布
    • (4)异步发布确认

RabbitMQ 学习(七)----发布确认

发布确认是一个保证RabbitMQ 可靠性的一个机制

  保证生产者将信息成功的发送到 RabbitMQ的 server端了,那么broker就会回一个确认,如果没有收到或者收到拒绝信息,那么说明可能网络不好没有发送成功,server端宕机了,broker拒绝接收等情况,如果不进行后续处理,那么信息就会丢失,生产者收到失败的消息使用回调函数在进行处理。

  生产者将信道设置成 confirm 模式,所有在该信道上发布的消息都会指定一个唯一的ID,一旦消息投递到队列中,就是发送成功了,broker会立刻发送一个确认ack 给生产者,这个时候,生产者就知道消息已经发送成功了。

  如果队列和信息是持久化的,那么确认消息会在将消息写入磁盘之后再发出,broker返回的确认包含 确认消息的序列号,还可以设置 multiple,表示此序号前的所有消息都得到了处理。

  一旦发布消息,生产者等待确认的同时继续发送下一条消息,如果rabbitMq自身内部错误导致消息为发送成功,生产者就可以再回调方法中继续处理。

为了保证RabbitMQ的可靠性,生产者怎么做?

1、设置要求队列持久化

2、设置队列中的消息持久化

3、发布确认,保证写入磁盘,broker成功收到

(1)开启发布确认的方法

  channel 的cofirm模式默认是没有开启的,如果需要开启需要调用 confirmSelect(),当我们使用发布确认的时候。需要使用channel调用该方法。

代码语言:javascript复制
Channel channel =connection.createChannel();
channel.confirmSelect();

(2)单个发布确认

  这是一种同步确认发布的方式,就是发布一个消息之后等待确认后,后续的消息才能继续发布。waitForConfiemOrDie(long) 这个方法只有当消息被确认才会返回,如果在指定的时间内未返回就会抛出异常。

  这种确认方式最大的缺点:速度特别慢。如果消息没有确认,就会阻塞后续消息的发送,造成发送消息的速度很慢。

代码语言:javascript复制
public class SingleConfirm {
    /**
     * 发布确认模式
     * 1、单个确认
     * @param args
     */
    public static void main(String[] args) {
        Connection connection = RabbitMQUtils.getConnect();
        Channel channel = null;

        try {

            channel = connection.createChannel();

            // 开启确认模式
            channel.confirmSelect();

            // 声明队列
            channel.queueDeclare("confirm", true, false, false, null);

            long begin = System.currentTimeMillis();

            // 批量发送消息,每次发送进行确认
            for (int i = 0; i <1000 ; i  ) {

                String message = i "";

                // 发布单条消息
                channel.basicPublish("", "confirm", null, message.getBytes());

                // 单个消息发送之后,马上发布确认,使用 waitForConfirms
                if(channel.waitForConfirms()){
                    //System.out.println("消息发送成功:" i);
                }

            }

            long end = System.currentTimeMillis();

            System.out.println("发送1000条数据,使用单个发布确认的时间为:" (end-begin));

        } catch (Exception e){
            e.printStackTrace();
        }finally {
            RabbitMQUtils.close(channel, connection);
        }

    }
}

非常浪费时间

(3)批量确认发布

  每发送一部分消息,批量同步确认一次,若有消息无法发出,该模式无法确认是哪个消息无法发送;

发布1000条消息,每发送100条确认一次

代码语言:javascript复制
public class MultipleConfirm {
    public static void main(String[] args) {
        Connection connection = RabbitMQUtils.getConnect();
        Channel channel = null;

        try {

            channel = connection.createChannel();

            // 开启确认模式
            channel.confirmSelect();

            // 声明队列
            channel.queueDeclare("confirm", true, false, false, null);

            long begin = System.currentTimeMillis();

            // 批量确认消息的数量,没发送100个返回一个确认ack
            int notAck = 100;

            // 批量发送消息,每次发送进行确认
            for (int i = 0; i <1000 ; i  ) {

                String message = i "";

                // 发布单条消息
                channel.basicPublish("", "confirm", null, message.getBytes());

                if((i 1)%notAck==0){
                    // 每发送100条确认一次,查看是否这一批是否有发送失败的情况
                    channel.waitForConfirmsOrDie();
                }


            }

            long end = System.currentTimeMillis();

            System.out.println("发送1000条数据,使用批量发布确认的时间为:" (end-begin));

        } catch (Exception e){
            e.printStackTrace();
        }finally {
            RabbitMQUtils.close(channel, connection);
        }

    }
}

时间为 396毫秒

(4)异步发布确认

  生产者发送消息与 接收确认这两个步骤不是同步的,是异步的,生产者只管发送,同时使用监听(addConfirmListener)返回的确认,对成功确认、失败确认两种情况分别进行处理。非常高效且安全

  • 开启确认模式
  • 声明确认成功的callback
  • 声明确认失败的callback
  • 开启确认监听 addConfirmListener() ,设置callback
  • 信道发送消息,不需要额外设置接收waitForConfirm什么的
代码语言:javascript复制
 public static void main(String[] args) {
        Connection connection = RabbitMQUtils.getConnect();
        Channel channel = null;

        try {

            channel = connection.createChannel();

            // 开启确认模式
            channel.confirmSelect();

            // 声明队列
            channel.queueDeclare("confirm", true, false, false, null);

            // 作为接收成功的函数式接口 参数
            ConfirmCallback ackCallback =(deliveryTag, multiple)-> System.out.println("确认的消息: " deliveryTag);
                // 表示接收成功的回调函数


            // 作为接收失败的函数式接口 参数
            ConfirmCallback nackCallback = (deliveryTag,multiple)-> System.out.println("接收失败!");
                // 表示接收失败的回调函数


            // 这是一个异步的监听 消息返回确认信息的 反应
            channel.addConfirmListener(ackCallback,nackCallback);

            long begin = System.currentTimeMillis();


            // 批量发送消息,每次发送进行确认
            for (int i = 0; i <1000 ; i  ) {
                String message = i "";
                // 发布单条消息
                channel.basicPublish("", "confirm", null, message.getBytes());

            }

            long end = System.currentTimeMillis();

            System.out.println("发送1000条数据,使用异步发布确认的时间为:" (end-begin) "ms");

        } catch (Exception e){
            e.printStackTrace();
        }
        
        // 执行完不能关闭连接,还要继续监听确认的信息
        /**
         finally {
            RabbitMQUtils.close(channel, connection);
        }
         */

    }

时间非常高效

0 人点赞