RabbitMQ消息发送、消费和确认

2020-06-23 16:20:46 浏览数 (1)

前提

前一篇文章介绍到RabbitMQ相关组件的声明,组件声明完成之后,就可以发送消息和消费消息,消费消息的时候需要考虑消息的确认。

消息的发送

消息的发送只依赖于交互器(名称)、可选路由键和可选的Header参数,可选路由键和Header可以认为是路由参数。因为RabbitMQ有四种内建的交换器,加上特殊的默认交换器可以认为有五种,这里列举一下通过这五种交换器发送消息需要的参数:

交换器类型

路由参数

默认交换器(AMQP default)

交换器名称(空字符串)和队列名称

Direct交换器

交换器名称和路由键

Fanout交换器

交换器名称(API中必须提供路由键,可以随意输入)

Topic交换器

交换器名称和路由键

Headers交换器

交换器名称和Header参数(API中必须提供路由键,可以随意输入)

消息的发布依赖于Channel的basicPublish方法,按照惯例查看其重载方法中参数列表长度最大的方法:

代码语言:javascript复制
void basicPublish(String exchange, 
                  String routingKey, 
                  boolean mandatory, 
                  boolean immediate, 
                  BasicProperties props, 
                  byte[] body) throws IOException;
  • exchange:交换器名称。
  • routingKey:路由键。
  • mandatory:是否强制的,如果此属性设置为true,消息发布的时候如果根据exchange和routingKey无法找到可达的目标队列,会调用AMQP方法basic.return将该消息返回给消息发布者;如果此属性设置为false,出现上面的情况,消息会被消息中间件代理直接丢弃。
  • immediate:是否立即的,如果此属性设置为true,消息通过exchange和routingKey找到目标队列(一个或者多个),如果所有的目标队列都没有消费者,那么会调用AMQP方法basic.return将该消息返回给消息发布者。
  • props:BasicProperties类型,消息属性或者叫消息元数据,com.rabbitmq.client.MessageProperties已经提供了一些列的实现,如果不满足可以使用BasicProperties.Builder自行构建。
  • body:字节数组类型,消息的有效负载,一般我们说的消息或者消息体就是指这个。

值得注意的是:immediate属性在RabbitMQ-3.0版本已经被移除,具体原因是:

翻译一下就是:immediate属性原有的功能对于基础代码的复杂性太高,特别是在镜像队列的条件下。它还影响到镜像队列的性能优化,推荐使用TTL(Time To Live,队列消息过期特性)或者DLX(Dead Letter Exchange,死信交换器)替代。在RabbitMQ-3.0后的版本如果immediate设置为true,会抛异常。

举个消息发送的例子(下面的例子中,每次发送之前都声明交换器、队列和绑定,实际上我们不需要这样操作,如果依赖于Servlet容器,可以在容器启动之后做一次声明即可):

代码语言:javascript复制
public class MessageSendMain extends BaseChannelFactory {

	private static final String DEFAULT_EXCHANGE = "";

	public static void main(String[] args) throws Exception {
		provideChannel(channel -> {
			//使用Direct类型的交换器
			channel.queueDeclare("throwable.queue.direct", true, false, false, null);
			channel.exchangeDeclare("throwable.exchange.direct", BuiltinExchangeType.DIRECT, true, false, null);
			channel.queueBind("throwable.queue.direct", "throwable.exchange.direct", "direct.routingKey", null);
			//发送"Direct Message"到队列throwable.queue.direct
			channel.basicPublish("throwable.exchange.direct", "direct.routingKey",
					MessageProperties.TEXT_PLAIN, "Direct Message".getBytes(StandardCharsets.UTF_8));

			//其实也可以通过默认的交换器直接发送消息到队列throwable.queue.direct
			channel.basicPublish(DEFAULT_EXCHANGE, "throwable.queue.direct",
					MessageProperties.TEXT_PLAIN, "Direct Message".getBytes(StandardCharsets.UTF_8));

			//使用Fanout类型的交换器
			channel.queueDeclare("throwable.queue.fanout", true, false, false, null);
			channel.exchangeDeclare("throwable.exchange.fanout", BuiltinExchangeType.FANOUT, true, false, null);
			//这里路由键随便写,或者用空字符串也可以
			channel.queueBind("throwable.queue.fanout", "throwable.exchange.fanout", "random", null);
			channel.basicPublish("throwable.exchange.fanout", "random",
					MessageProperties.TEXT_PLAIN, "Fanout Message".getBytes(StandardCharsets.UTF_8));

			//使用Topic类型的交换器
			channel.queueDeclare("throwable.queue.topic", true, false, false, null);
			channel.exchangeDeclare("throwable.exchange.topic", BuiltinExchangeType.TOPIC, true, false, null);
			channel.queueBind("throwable.queue.topic", "throwable.exchange.topic", "topic.routingKey.#", null);
			channel.basicPublish("throwable.exchange.topic", "topic.routingKey.throwable",
					MessageProperties.TEXT_PLAIN, "Topic Message".getBytes(StandardCharsets.UTF_8));

			//使用Headers类型的交换器
			channel.queueDeclare("throwable.queue.headers", true, false, false, null);
			channel.exchangeDeclare("throwable.exchange.headers", BuiltinExchangeType.HEADERS, true, false, null);
			Map<String, Object> headerBindingArgs = new HashMap<>(8);
			headerBindingArgs.put("headers.name", "throwable");
			headerBindingArgs.put("headers.age", 25);
			headerBindingArgs.put("x-match", "all");
			//这里路由键随便写,或者用空字符串也可以,要添加Headers参数到绑定参数中
			channel.queueBind("throwable.queue.headers", "throwable.exchange.headers", "random", headerBindingArgs);
			AMQP.BasicProperties basicProperties = MessageProperties.TEXT_PLAIN;
			//这里发送消息的时候要添加Headers参数到消息属性中
			AMQP.BasicProperties realBasicProperties = basicProperties.builder().headers(headerBindingArgs).build();
			channel.basicPublish("throwable.exchange.headers", "random",
					realBasicProperties, "Headers Message".getBytes(StandardCharsets.UTF_8));
		});
	}
}

消息的元数据

消息元数据接口是com.rabbitmq.client.BasicProperties,实现类是com.rabbitmq.client.AMQP$BasicProperties,可以看一下具体的属性:

代码语言:javascript复制
public static class BasicProperties extends com.rabbitmq.client.impl.AMQBasicProperties {
    private String contentType;
    private String contentEncoding;
    private Map<String,Object> headers;
    private Integer deliveryMode;
    private Integer priority;
    private String correlationId;
    private String replyTo;
    private String expiration;
    private String messageId;
    private Date timestamp;
    private String type;
    private String userId;
    private String appId;
    private String clusterId;

    //省略Setter、Getter和Builder方法
}

属性分析:

  • contentType:消息内容类型,类似于HTTP协议中的Content-Type,例如:application/json。
  • contentEncoding:消息内容编码,类似于MIME的内容编码,例如:gzip。
  • headers:头部属性,K-V结构,一般使用在Headers的交换器和绑定中,很多时候被开发者滥用用来传输一些自定义属性,其实也无可厚非。
  • deliveryMode:消息的持久化模式,deliveryMode=1代表消息不持久化(nonpersistent),deliveryMode=2代表消息持久化(persistent)。
  • priority:消息优先级,可选值为0-255,值越大优先级越大,注意要和队列的优先级区分。
  • correlationId:客户端定义的用于客户端区分和标识消息的唯一标记。
  • replyTo:需要应答的目标队列名,只有一个持有值,不会有任何额外的操作,spring-amqp模块对这个值做了额外的操作,不要混淆原生Java驱动和二次封装的框架。
  • expiration:消息过期时间,单位为毫秒。
  • messageId:消息的唯一标识,消息中间件代理对消息接收去重的一个重要标识。
  • timestamp:消息发送时的时间戳。
  • type:可选的消息类型。
  • userId:可选的发布消息的用户的唯一标识。
  • appId:可选的发布消息的应用的唯一标识。
  • clusterId:集群唯一标识,AMQP-0-9-1已经弃用,供RabbitMQ集群应用程序使用的集群内路由标识符。

消息元数据的每个属性基本对应着AMQP规范中的属性值,以上的描述来源于AMQP协议,RabbitMQ中的实现要自行实践相关的属性com.rabbitmq.client.MessageProperties中已经有几个现成的BasicProperties实例,如果合适的话可以直接拿过来使用:

代码语言:javascript复制
public class MessageProperties {

    /** Empty basic properties, with no fields set */
    public static final BasicProperties MINIMAL_BASIC =
        new BasicProperties(null, null, null, null,
                            null, null, null, null,
                            null, null, null, null,
                            null, null);
    /** Empty basic properties, with only deliveryMode set to 2 (persistent) */
    public static final BasicProperties MINIMAL_PERSISTENT_BASIC =
        new BasicProperties(null, null, null, 2,
                            null, null, null, null,
                            null, null, null, null,
                            null, null);

    /** Content-type "application/octet-stream", deliveryMode 1 (nonpersistent), priority zero */
    public static final BasicProperties BASIC =
        new BasicProperties("application/octet-stream",
                            null,
                            null,
                            1,
                            0, null, null, null,
                            null, null, null, null,
                            null, null);

    /** Content-type "application/octet-stream", deliveryMode 2 (persistent), priority zero */
    public static final BasicProperties PERSISTENT_BASIC =
        new BasicProperties("application/octet-stream",
                            null,
                            null,
                            2,
                            0, null, null, null,
                            null, null, null, null,
                            null, null);

    /** Content-type "text/plain", deliveryMode 1 (nonpersistent), priority zero */
    public static final BasicProperties TEXT_PLAIN =
        new BasicProperties("text/plain",
                            null,
                            null,
                            1,
                            0, null, null, null,
                            null, null, null, null,
                            null, null);

    /** Content-type "text/plain", deliveryMode 2 (persistent), priority zero */
    public static final BasicProperties PERSISTENT_TEXT_PLAIN =
        new BasicProperties("text/plain",
                            null,
                            null,
                            2,
                            0, null, null, null,
                            null, null, null, null,
                            null, null);
}

mandatory属性的作用

mandatory属性主要是用于消息发送路由失败后配置消息返回(return)功能使用,可以故意造一下消息发布路由失败的场景,验证一下mandatory属性的作用。之前的例子中曾经建立过一个Direct类型的交换throwable.exchange.direct和队列throwable.queue.direct基于路由键direct.routingKey进行了绑定,我们开启mandatory特性,故意把路由键弄错,看效果:

代码语言:javascript复制
public class MandatoryMain extends BaseChannelFactory {

	public static void main(String[] args) throws Exception {
		provideChannel(channel -> {
			channel.basicPublish("throwable.exchange.direct", "doge",
					true, false, null, "Mandatory Message".getBytes(StandardCharsets.UTF_8));
			//使用addReturnListener(ReturnCallBack)也可以
			channel.addReturnListener(new ReturnListener() {
				@Override
				public void handleReturn(int replyCode, String replyText,
										 String exchange,
										 String routingKey,
										 AMQP.BasicProperties properties,
										 byte[] body) throws IOException {
					StringBuilder builder = new StringBuilder();
					builder.append("返回码:").append(replyCode).append("n");
					builder.append("返回信息:").append(replyText).append("n");
					builder.append("交换器:").append(exchange).append("n");
					builder.append("路由键:").append(routingKey).append("n");
					builder.append("消息体:").append(new String(body, StandardCharsets.UTF_8));
					System.out.println(builder.toString());
				}
			});
			//因为是异步回调,这里要sleep一下
			Thread.sleep(2000);
		});
	}
}

执行之后,控制台打印:

代码语言:javascript复制
返回码:312
返回信息:NO_ROUTE
交换器:throwable.exchange.direct
路由键:doge
消息体:Mandatory Message

可见路由失败的消息直接原样返回,这样就能确保路由错误的情况下消息也不会丢失。

消息发送的确认机制

前面提到的mandatory属性和消息返回机制能保证路由失败的消息也不丢失,实际上消息发送的时候允许使用消息发送确认(Confirm)机制,这样可以确认客户端发送的消息是否已经到达了消息中间件代理。消息发送的确认机制主要包括轻量级的确认和消息事务,这一小节介绍一下轻量级的确认。消息发送的轻量级确认需要把信道(Channel)更变为Confirm模式,通过等待消息中间件代理消息是否到达的确认回调,依赖到的方法或者类如下:

代码语言:javascript复制
//信道更变为Confirm模式
Confirm.SelectOk confirmSelect() throws IOException;

//等待消息中间件确认消息到达 - 同步阻塞法法
boolean waitForConfirms() throws InterruptedException;

//等待消息中间件确认消息到达,可以设置超时,单位毫秒 - 同步阻塞方法
boolean waitForConfirms(long timeout) throws InterruptedException, TimeoutException;

//等待消息中间件确认消息到达,存在任一消息未到达,则抛出IOException - 同步阻塞方法
void waitForConfirmsOrDie() throws IOException, InterruptedException;

//等待消息中间件确认消息到达,可以设置超时,单位毫秒,存在任一消息未到达,则抛出IOException - 同步阻塞方法
void waitForConfirmsOrDie(long timeout) throws IOException, InterruptedException, TimeoutException;

//消息发布确认回调接口
public interface ConfirmListener {
    void handleAck(long deliveryTag, boolean multiple)
        throws IOException;

    void handleNack(long deliveryTag, boolean multiple)
        throws IOException;
}

消息发送确认模式开启之后,每条消息都会基于同一个信道下新增一个投递标签(deliveryTag)属性,deliveryTag属性是从1开始递增的整数,只要新建一个信道实例就会重置为1,一定要十分注意,这个消息投递标签和消息消费中的信封(Envelope)中的deliveryTag不是同一个属性,后者虽然也是从1开始递增,但是它是基于队列而不是信道。举个简单的使用例子:

代码语言:javascript复制
public class ConfirmMain extends BaseChannelFactory {

	public static void main(String[] args) throws Exception {
		provideChannel(channel -> {
			channel.addConfirmListener(new ConfirmListener() {
				@Override
				public void handleAck(long deliveryTag, boolean multiple) throws IOException {
					System.out.println(String.format("消息发送确认回调成功,序号:%s,是否批量:%s", deliveryTag, multiple));
				}

				@Override
				public void handleNack(long deliveryTag, boolean multiple) throws IOException {
					System.out.println(String.format("消息发送确认回调失败,序号:%s,是否批量:%s", deliveryTag, multiple));
				}
			});
			channel.confirmSelect();
			channel.basicPublish("throwable.exchange.direct", "direct.routingKey",
					MessageProperties.TEXT_PLAIN, "Direct Message".getBytes(StandardCharsets.UTF_8));
			if (!channel.waitForConfirms()) {
				System.out.println("消息发送确认失败!");
			} else {
				System.out.println("消息发送确认成功!");
			}
			Thread.sleep(2000);
		});
	}
}

消息发布确认基本上是遵循下面的代码模板:

代码语言:javascript复制
channel.addConfirmListener(new ConfirmListener() {
    //...
};
channel.confirmSelect();
channel.basicPublish();
if (!channel.waitForConfirms()) {
    //...
}    

当然,这里演示的仅仅是单条的消息发布确认,这种做法性能会相对低下,但是可靠性会提高,编码难度也相对比较低。

消息发布事务

消息发布事务能够保证消息发布到RabbitMQ的Broker这个动作是一个原子操作,也就是开启了消息发布事务模式,消息能明确知道发布成功或者失败。使用消息发布事务需要把信道转换为事务模式,然后进行消息发布和事务提交(或者回滚),依赖于下面的方法:

代码语言:javascript复制
//信道转换为事务模式
Tx.SelectOk txSelect() throws IOException;

//提交事务
Tx.CommitOk txCommit() throws IOException;

//回滚事务
Tx.RollbackOk txRollback() throws IOException;

消息发布事务的基本代码模板如下:

代码语言:javascript复制
try { 
      channel.txSelect();
	  channel.basicPublish();
	  channel.txCommit();
	}catch (Exception e){
	  channel.txRollback();
    }

举个简单的例子:

代码语言:javascript复制
public class MessagePublishTxMain extends BaseChannelFactory {

	public static void main(String[] args) throws Exception {
		provideChannel(channel -> {
			try {
				channel.txSelect();
				channel.basicPublish("throwable.exchange.direct", "direct.routingKey",
						MessageProperties.TEXT_PLAIN, "Direct Message".getBytes(StandardCharsets.UTF_8));
				channel.txCommit();
			} catch (Exception e) {
				channel.txRollback();
			}
		});
	}
}

一般来说,事物基本是遵循"等价交换的原则",消息发布的可靠性是需要性能换取的,消息发布事务的可靠性是最高的,但是代价是它的性能是比较低的。

消息的消费

消息消费主要包括推模式和拉模式,区别如下:

  • 推模式:客户端注册消费者到消息中间件代理的队列,也就是对队列进行订阅,消息中间件代理通过队列投递(deliver)消息到消费者中,典型方法是basic-consume
  • 拉模式:客户端主动向消息中间件代理拉取队列中的消息,典型方法是basic-get

消息消费之推模式

推模式下,消息的消费依赖于Channel的basicConsume方法(用的是最新的RabbitMQ的Java驱动,关于消息消费的方法新增了不少,在3.X版本只有几个方法):

代码语言:javascript复制
String basicConsume(String queue,
                    boolean autoAck, 
                    String consumerTag, 
                    boolean noLocal, 
                    boolean exclusive, 
                    Map<String, Object> arguments, 
                    Consumer callback) throws IOException;

String basicConsume(String queue, 
                    boolean autoAck, 
                    String consumerTag, 
                    boolean noLocal, 
                    boolean exclusive, 
                    Map<String, Object> arguments, 
                    DeliverCallback deliverCallback, 
                    CancelCallback cancelCallback, 
                    ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException;                    

别看第二个方法的参数列表很庞大很吓人,它只是把com.rabbitmq.client.Consumer接口的部分方法拆解出来做成单独的接口,方便使用Lambda表达式,参数分析如下:

  • queue:消费者订阅的队列名称。
  • autoAck:是否自动确认(主动ack)。
  • consumerTag:消费者标签,队列中消费者的唯一标识,如果不指定则由消息中间件代理自动生成,停止消费者和取消消费者都是基于此标识属性。
  • noLocal:是否非本地的,如果此属性为true,则消息中间件代理不会投递消息到此消费者如果发布消息使用的连接和当前消费者建立的通道所在的连接是同一个连接,但是RabbitMQ不支持此属性。
  • exclusive:是否独占(排他)的,如果此属性为true,队列中只能有一个消费者(也就是当前设置了exclusive=true的消费者),消费者关闭(shutdown)
  • arguments:消费者参数,K-V结构。

下面看一下ConsumerDeliverCallbackCancelCallbackConsumerShutdownSignalCallback的定义:

代码语言:javascript复制
public interface Consumer {
    
    //创建消费者成功的回调
    void handleConsumeOk(String consumerTag);

    //消费者取消成功的回调 - 主动调用basicCancel或者队列删除等因素
    void handleCancelOk(String consumerTag);
    
    //消费者取消的回调 - 主动调用basicCancel或者队列删除等因素
    void handleCancel(String consumerTag) throws IOException;
    
    //消费者关闭的信号回调
    void handleShutdownSignal(String consumerTag, ShutdownSignalException sig);
    
    //AMQP方法basic.recover-ok的回调
    void handleRecoverOk(String consumerTag);

    //消息推模式下接受消息中间件代理投递的消息 - 核心方法
    void handleDelivery(String consumerTag,
                        Envelope envelope,
                        AMQP.BasicProperties properties,
                        byte[] body)
        throws IOException;
}

@FunctionalInterface
public interface DeliverCallback {
    
    //Delivery中持有了Envelope、AMQP.BasicProperties和消息体body
    void handle(String consumerTag, Delivery message) throws IOException;
}

@FunctionalInterface
public interface CancelCallback {

    void handle(String consumerTag) throws IOException;
}

@FunctionalInterface
public interface ConsumerShutdownSignalCallback {

   void handleShutdownSignal(String consumerTag, ShutdownSignalException sig); 
}

DeliverCallbackCancelCallbackConsumerShutdownSignalCallback实际上就是把Consumer接口中的部分方法抽离出来编写为函数式接口,没有特别的东西,所以我们还是关注Consumer接口的使用就可以了。Consumer接口有一个默认的实现类DefaultConsumer,可以直接使用。在旧版本的Java驱动中还存在一个废弃的QueueingConsumer,在5.X版本的驱动已经删除该类。其实,我们也可以自行实现Consumer接口,因为DefaultConsumer也仅仅是对Consumer接口进行了空实现,具体的方法还是需要我们覆盖实现自定义的逻辑。这里举个简单的使用例子:

代码语言:javascript复制
public class ConsumeMain extends BaseChannelFactory {

	public static void main(String[] args) throws Exception {
		provideChannel(channel -> {
			channel.basicConsume("throwable.queue.direct", true, new DefaultConsumer(channel) {
				@Override
				public void handleDelivery(String consumerTag,
										   Envelope envelope,
										   AMQP.BasicProperties properties,
										   byte[] body) throws IOException {
					System.out.println("消息序号:"   envelope.getDeliveryTag());
					System.out.println("交换器:"   envelope.getExchange());
					System.out.println("路由键:"   envelope.getRoutingKey());
					System.out.println("消息内容:"   new String(body, StandardCharsets.UTF_8));
				}
			});
			//消息消费是异步的,要想办法阻塞消费者所在的线程和主线程,否则会退出
			Thread.sleep(Integer.MAX_VALUE);
		});
	}
}

可以从Web管理界面看到消费者已经启动,消费者标签是由RabbitMQ代理随机生成的,我们开启了消息自动确认,所以Ack required一栏是空心的圆形,也就是不需要进行消息消费确认。还有一点需要注意的是:Consumer**接口的回调也就是**DefaultConsumer**中的方法回调是委托到RabbitMQ的Java驱动中的线程池执行,过程是异步的,也就是我们在写Demo的时候需要想办法挂起**DefaultConsumer**实例所在的线程和主线程,否则会导致线程退出无法消费消息**。

消息消费之拉模式

拉模式下,消息消费主要依赖于Channel的basicGet方法:

代码语言:javascript复制
GetResponse basicGet(String queue, boolean autoAck) throws IOException;

此方法很简单,只依赖于队列名称和是否自动确认两个参数,如果autoAck为false,需要手动确认。返回值如下:

代码语言:javascript复制
public class GetResponse {
    //信封对象,主要用于获取消息的投递标签DeliveryTag、交换器、路由键等
    private final Envelope envelope;
    //消息元数据
    private final BasicProperties props;
    //消息体
    private final byte[] body;
    //当前队列中剩余消息数量,只是个参考值
    private final int messageCount;

    //省略Getter方法
}    

信封对象中的投递标签DeliveryTag很重要,用于手动确认的时候指定对应的值。举个简单的使用例子:

代码语言:javascript复制
public class BasicGetMain extends BaseChannelFactory{
	
	public static void main(String[] args) throws Exception{
	       provideChannel(channel -> {
			   GetResponse getResponse = channel.basicGet("throwable.queue.direct", true);
			   System.out.println(String.format("消息内容:%s,消息序号:%s", new String(getResponse.getBody(), StandardCharsets.UTF_8),
					   getResponse.getEnvelope().getDeliveryTag()));
		   });
	}
}

消息消费的确认机制

消息消费的确认机制保障消息中间件代理的消息成功投递到消费者中,主要包括三种类确认:

  • 主动积极确认:主动积极确认成功后,消息会从队列中移除,支持批量确认操作,典型方法是basic-ack,下面直接叫ack。
  • 主动消极确认:消极积极确认成功后,基于basic-get或者basic-consume等到的消息标签,可以选择消息重新入队列或者直接丢弃,支持批量操作,典型方法是basic-nack,下面直接叫nack。
  • 拒绝:基于basic-get或者basic-consume等到的消息标签进行消息拒绝,可以选择丢弃或者重新入队,下面叫做reject。

nack和reject的基本功能是相同的,nack同时支持批量操作和单条操作,而reject只支持单条操作。消息消费确认其实是手动确认,如果针对的是basicConsume方法,则其autoAck属性需要设置为false,否则有可能会出现重复确认导致异常。

ack

ack依赖于Channel的basicAck方法:

代码语言:javascript复制
void basicAck(long deliveryTag, boolean multiple) throws IOException;
  • deliveryTag:Envelope(信封)对象中的消息标签。
  • multiple:是否使用批量积极确认,如果此属性为true,则消息标签小于当前deliveryTag的所有消息都会被主动积极确认,不了解此属性最好使用false。
代码语言:javascript复制
public class AckMain extends BaseChannelFactory {

	public static void main(String[] args) throws Exception {
		provideChannel(channel -> {
			//这里autoAck设置为false
			channel.basicConsume("throwable.queue.direct", false, new DefaultConsumer(channel) {
				@Override
				public void handleDelivery(String consumerTag,
										   Envelope envelope,
										   AMQP.BasicProperties properties,
										   byte[] body) throws IOException {
					System.out.println("消息序号:"   envelope.getDeliveryTag());
					System.out.println("交换器:"   envelope.getExchange());
					System.out.println("路由键:"   envelope.getRoutingKey());
					System.out.println("消息内容:"   new String(body, StandardCharsets.UTF_8));
					channel.basicAck(envelope.getDeliveryTag(), false);
				}
			});
			//消息消费是异步的,要想办法阻塞消费者所在的线程和主线程,否则会退出
			Thread.sleep(Integer.MAX_VALUE);
		});
	}
}

nack

nack依赖于Channel的basicNack方法:

代码语言:javascript复制
void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;
  • deliveryTag:Envelope(信封)对象中的消息标签。
  • multiple:是否使用批量消极确认,如果此属性为true,则消息标签小于当前deliveryTag的所有消息都会被主动消极确认,不了解此属性最好使用false。
  • requeue:是否重新入队,如果此属性为true,消息会被重新放置回去对应队列(如果可能的话,会放回到原来的位置),如果此属性为false,消息直接被丢弃。
代码语言:javascript复制
public class NackMain extends BaseChannelFactory {

	public static void main(String[] args) throws Exception {
		provideChannel(channel -> {
			//这里autoAck设置为false
			channel.basicConsume("throwable.queue.direct", false, new DefaultConsumer(channel) {
				@Override
				public void handleDelivery(String consumerTag,
										   Envelope envelope,
										   AMQP.BasicProperties properties,
										   byte[] body) throws IOException {
					System.out.println("消息序号:"   envelope.getDeliveryTag());
					System.out.println("交换器:"   envelope.getExchange());
					System.out.println("路由键:"   envelope.getRoutingKey());
					System.out.println("消息内容:"   new String(body, StandardCharsets.UTF_8));
					channel.basicNack(envelope.getDeliveryTag(), false, false);
				}
			});
			//消息消费是异步的,要想办法阻塞消费者所在的线程和主线程,否则会退出
			Thread.sleep(Integer.MAX_VALUE);
		});
	}
}

属性requeue如果设置为true,需要谨慎设计程序的逻辑,否则很有可能导致消息一直重复消费失败并且重复重新入队,表现为消费者线程出现死循环逻辑,耗尽服务器CPU资源。

reject

reject的用法和nack基本一致,不过reject没有批量处理功能,依赖于Channel的basicReject方法:

代码语言:javascript复制
void basicReject(long deliveryTag, boolean requeue) throws IOException;

简单举个使用例子:

代码语言:javascript复制
public class RejectMain extends BaseChannelFactory {

	public static void main(String[] args) throws Exception {
		provideChannel(channel -> {
			GetResponse getResponse = channel.basicGet("throwable.queue.direct", true);
			channel.basicReject(getResponse.getEnvelope().getDeliveryTag(), false);
		});
	}
}

消费消息不进行消息确认会怎么样

消息消费方法basiConsume中的autoAck属性设置为false,但是消费者接收到消息后不进行消息确认会怎么样?举个例子:

代码语言:javascript复制
public class NoneAckMain extends BaseChannelFactory {

	public static void main(String[] args) throws Exception {
		provideChannel(channel -> {
			channel.basicPublish("throwable.exchange.direct", "direct.routingKey",
					MessageProperties.TEXT_PLAIN, "Direct Message".getBytes(StandardCharsets.UTF_8));
			//autoAck = false
			channel.basicConsume("throwable.queue.direct", false, new DefaultConsumer(channel) {
				@Override
				public void handleDelivery(String consumerTag,
										   Envelope envelope,
										   AMQP.BasicProperties properties,
										   byte[] body) throws IOException {
					System.out.println("消息序号:"   envelope.getDeliveryTag());
					System.out.println("交换器:"   envelope.getExchange());
					System.out.println("路由键:"   envelope.getRoutingKey());
					System.out.println("消息内容:"   new String(body, StandardCharsets.UTF_8));
				}
			});
			//消息消费是异步的,要想办法阻塞消费者所在的线程和主线程,否则会退出
			Thread.sleep(Integer.MAX_VALUE);
		});
	}
}

执行之后,控制台输出:

代码语言:javascript复制
消息序号:1
交换器:throwable.exchange.direct
路由键:direct.routingKey
消息内容:Direct Message

查看RabbitMQ的Web管理界面,发现消息由Ready转变为Unacked状态:

尝试终止消费者所在线程,再次观察RabbitMQ的Web管理界面对应队列:

发现消息由Unacked状态恢复为Ready。这里需要注意,只有Ready状态的消息才能被消息中间件代理投递到消费者。总的来说就是:

  • 被路由到队列的新消息的状态为Ready,这种消息可以被消息中间件代理投递到客户端的消费者中。
  • 客户端消费者在消费消息的时候,如果采用手动确认(autoAck=false)并且没有主动确认(也就是没有调用basicAckbasicNack或者basicReject),那么消息就会变为Unacked状态,Unacked状态的消息只有当所有的消费者线程终止的时候,才会重新转变为Ready状态。

小结

这篇文章仅仅从基本使用来分析RabbitMQ中的消息发送、消费和确认的例子。关于消息发布确认机制和消息发布事务机制后面有专门的文章分析其性能和具体使用场景。

RabbitMQ中的消息发布确认(publish confirm)和消息消费(投递)确认(deliver confirm)能够确保消息发布和消息消费阶段消息不会丢失,至于策略应该根据具体场景选择,autoAck并不适合所有的场景。

(本文完 c-2-d e-a-20181125)

本文是Throwable的原创文章,转载请提前告知作者并且标明出处。 博客内容遵循 署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0) 协议 本文永久链接是:https://cloud.tencent.com/developer/article/1650063

0 人点赞