RabbitMQ 学习笔记3 - Java 使用 RabbitMQ 示例

2021-07-23 16:00:11 浏览数 (1)

1. 背景

本节讲述 Java 使用 RabbitMQ 的示例,和 发送者确认回调,消费者回执的内容。

2.知识

高级消息队列协议 (AMQP) 是面向消息的中间件的平台中立的协议。Spring AMQP 项目将 Spring 的概念应用于 AMQP,形成解决方案的开发。

AMQP 的一些基本概念:

开始之前, 要使用 RabbitMQ 首先要了解 AMQP 协议的基本概念,更多可阅读我的另一篇文章。

  • 生产者:一个发送消息的程序,它产生消息并发送到队列。这里是用Go写的发送端示程序例。
  • 消息队列:即 RabbitMQ 内部的队列,它安装在一个服务器中。做为消息中间件,它与具体开发语言无关,支持 Go,Java等接入连接。
  • 消费者:消费者是一个等待消息,接收消息的接收端程序示例
  • 交换机(Exchange)可以理解成邮局,交换机将收到的消息根据路由规则分发给绑定的队列(Queue)

image.png

安装 RabbitMQ

参考我的另一篇文章:https://cloud.tencent.com/developer/article/1611599

我们使用 Spring AMQP 框架来 操作 RabbitMQ 收发消息。

Spring AMQP 框架

Spring AMQP 项目将核心 Spring 概念应用于基于 AMQP 的消息传递解决方案的开发。它提供了一个“模板”作为发送和接收消息的高级抽象。

该项目由两部分组成;spring-amqp 是基础抽象,spring-rabbit 是 RabbitMQ 实现。

Spring AMQP 的特征

  • 用于异步处理入站消息的侦听器容器
  • RabbitTemplate 用于发送和接收消息
  • RabbitAdmin 用于自动声明队列、交换和绑定

3. 示例

下面通过一个示例看下基本的收发消息的操作。

3.1 编写程序“生产者”

第一步:配置Rabbit的数据连接

编辑 application.yml, 指定 rabbitmq 的服务器地址,端口号,账户名密码等。

代码语言:javascript复制
spring:
  application:
    name: producer
  rabbitmq:
    host: localhost
    virtual-host: /
    port: 5672
    username: admin
    password: admin

第二步:配置好 队列,交换机,和绑定(queue,exchange,binding)

队列里存储了消息,交换机类似邮局,而“绑定”是个“ 队列 交换机”关联关系。通过“绑定” binding 将 交换机和 队列连线在一起。

代码语言:javascript复制
@Configuration
public class RabbitConfig {
    // 路由的 key
    public static final String ROUTING_KEY = "hello_routing_key";
    public static final String EXCHANGE_NAME = "zyf_direct_exchange";
    public static final String QUEUE_NAME = "first_queue";

    //  一个队列
    @Bean
    public Queue getFirstQueue() {
        return new Queue(QUEUE_NAME);
    }

    // 一个 直接交换机
    @Bean
    public DirectExchange getDirectExchange() {
        return new DirectExchange(EXCHANGE_NAME);
    }

    // 进行绑定
    @Bean
    public Binding getBinding() {
        return BindingBuilder.bind(getFirstQueue()).to(getDirectExchange()).with(ROUTING_KEY);
    }
}

第三步:发消息

RabbitTemplate 是操作发送消息的 “模板方法”,springboot 已帮忙配置好注入关系,直接拿来用就可以了。调用: rabbitTemplate.convertAndSend(...) 来发消息,需要指明 交换机的名称,和路由key。

代码语言:javascript复制
@Service
public class BusinessService {

    @Autowired
    private RabbitTemplate rabbitTemplate;


    public void sendMessage(String msg) {
        System.out.println("# sendMessage,msg="   msg);


        String routingKey = RabbitConfig.ROUTING_KEY;
        String exchangeName = RabbitConfig.EXCHANGE_NAME;
        rabbitTemplate.convertAndSend(exchangeName, routingKey, msg);
    }

}

总结:

  • 先配置好从 “交换机”到“队列”的连线。
  • 发送者 就可以通过 交换机名称和路由 key 来发送消息。

3.2 编写程序“消费者”

然后就是准备接收消息了。

第一步:配置好 rabbitmq 的数据连接。

和上面的 发送者一样,编辑 application.yml, 指定 rabbitmq 的服务器地址,端口号,账户名密码等。

第二步:配置 异步消息的监听器

接收消息配置一个回调即可。使用 @RabbitMessageListener 注解标注。

代码语言:javascript复制
@Component
public class RabbitMessageListener {
    public static final String QUEUE_NAME = "first_queue";

    @RabbitListener(queues = QUEUE_NAME)
    public void receive(String msg) {
        System.out.println(msg);
    }
}

至此就完成了收发消息。我的代码示例见:https://github.com/vir56k/java_demo/tree/master/rabbitmq_demo1

4. 更多扩展

4.1 生产者发送时的结果回调(确认模式)

发布是异步的——如何检测成功和失败?

发布消息是一种异步机制,默认情况下,"无法路由的消息" 会被 RabbitMQ 丢弃。为了成功发布,您可以收到异步确认,如相关发布者确认和返回 中所述。

考虑两种失败情况:

  • 发消息到不存在的交换机。
  • 发消息到交换机,但没有匹配的队列。

第一种情况的场景是 指定了 错误的交换机名称。

第二种情况的场景是 “发送者的退货” 。

(1)发送者发送消息后的 “消息确认” 回调事件

对于发布者确认 ,RabbitTemplate 需要 设置:

代码语言:javascript复制
connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
和
rabbitTemplate.setConfirmCallback(confirmCallback());

然后注册回调的实习,示例:

代码语言:javascript复制
// 用来确认生产者 producer 将消息发送到 broker 的回调
 @Bean
 public RabbitTemplate.ConfirmCallback confirmCallback() {
     return new RabbitTemplate.ConfirmCallback() {
         @Override
         public void confirm(CorrelationData correlationData, boolean ack, String cause) {
             String id = correlationData == null ? "" : correlationData.getId();
             if (ack) {
                 // log.info(String.format("# [%s] 投递到 broker 成功!, cause=%s", id, cause));
             } else {
                 log.error(String.format("# [%s] 投递到 broker 失败!, cause=%s", id, cause));
             }
         }
     };
 }

上面的 ack 参数 指示了是否投递(到交换机)成功。

注意:一个 ConfirmCallback 仅支持 一个RabbitTemplate。

**(2)发送者的 “退货” 回调事件

对于返回的消息,模板的 mandatory 属性必须设置为true 。也需要将 CachingConnectionFactory 其 publisherReturns属性设置为true

即:

代码语言:javascript复制
connectionFactory.setPublisherReturns(true);
...
rabbitTemplate.setReturnsCallback(returnsCallback());
// 强制标志,当 setReturnsCallback 被设置时,这里要设置为 true
rabbitTemplate.setMandatory(true);

示例:

代码语言:javascript复制
// 发布到交换机,但没有匹配的目标队列 时,退货
@Bean
public RabbitTemplate.ReturnsCallback returnsCallback() {
    return new RabbitTemplate.ReturnsCallback() {
        @Override
        public void returnedMessage(ReturnedMessage returnedMessage) {
            int replyCode = returnedMessage.getReplyCode();
            String replyText = returnedMessage.getReplyText();
            String exchange = returnedMessage.getExchange();
            System.out.println(String.format("# 退货消息:原因=%s, replyCode=%s, exchange=%s", replyText, replyCode, exchange));
        }
    };
}

上面方法里的 ReturnedMessage 具有以下属性:

  • message - 返回的消息本身
  • replyCode - 指示退货原因的代码
  • replyText - 退货的文字原因 - 例如 NO_ROUTE
  • exchange - 消息发送到的交换
  • routingKey - 使用的路由密钥

每个 ReturnsCallback 仅支持一个RabbitTemplate。

4.2消费者回执(确认模式)

消息接收回执是指 消息接收者 收到消息后 向 “broker” 消息代理 回复的“ 确认消息 ”

注意:这里的回执和 发送者 “没有任何关系” 。它通知到 rabbitmq ,rabbitmq 根据回执决定是 重复,或者放弃。

有三种回执模式:

  • NONE:不发送确认。RabbitMQ 将此称为“自动确认”,因为代理假定所有消息都已确认,而消费者没有采取任何行动。
  • MANUAL:侦听器必须通过调用来确认所有消息Channel.basicAck()。
  • AUTO:容器自动确认消息,除非MessageListener抛出异常。

实现手动回执时,注入 ackMode 就可以了。

代码语言:javascript复制
    @RabbitListener(queues = QUEUE_NAME, ackMode = "MANUAL")

示例:

代码语言:javascript复制
    int i = 0;

    // 异步 接收消息
    @RabbitListener(queues = QUEUE_NAME, ackMode = "MANUAL")
    public void receiveForManual(String msg, Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
        System.out.println(msg);

        String returned_message_correlation = message.getMessageProperties().getHeader("spring_returned_message_correlation");
        log.info(String.format("# 触发 receiveForManual msg =%s ,correlationId = %s", msg, returned_message_correlation));

        i  ;
        if (i % 3 == 0) {
            log.info("# 接收消息...");
            channel.basicAck(tag, false);
        } else if (i % 3 == 1) {
            log.error("# 消息已重复处理失败,拒绝再次接收...");
            channel.basicReject(tag, false);
        } else if (i % 3 == 2) {
            log.error("# 消息即将再次返回队列处理...");
            channel.basicNack(tag, false, true);
        }

    }

我的代码示例见:https://github.com/vir56k/java_demo/tree/master/rabbitmq_demo2

5.参考:

Spring AMQP 文档

https://spring.io/projects/spring-amqp

https://docs.spring.io/spring-boot/docs/current/reference/html/features.html#features.messaging.amqp

https://github.com/spring-projects/spring-amqp-samples

https://docs.spring.io/spring-framework/docs/current/reference/html/integration.html#scheduling

0 人点赞