RabbitMq消息发送

2022-08-11 15:52:51 浏览数 (1)

如果要发送一个消息,可以使用channel类的basicPublish方法参考如下,其中在rabbitmqTemplate中封装的方式是:

代码语言:javascript复制
rabbitMqTemplate.send(RabbitConstant.MESSAGE_EXCHANGE, "", Message对象);

其对应的底层方法:convertedMessageProperties为message对象中设置的对象属性信息,也就是下边的messageProperties 。

代码语言:javascript复制
channel.basicPublish(exchange, routingKey, mandatory, convertedMessageProperties, message.getBody());

我们发送的消息(Message对象)包括消息体和消息的一些描述信息。

代码语言:javascript复制
//生成消息体
byte[] body = JSON.toJSONBytes(message, SerializeConfig.globalInstance);
 //设置消息相关属性
MessageProperties messageProperties = new MessageProperties();
//设置消息的序列号,对消息进行标记
messageProperties.setMessageId(UUID.randomUUID().toString());
//设置内容的格式
messageProperties.setContentType(MediaType.APPLICATION_JSON_VALUE);
//设置userId
messageProperties.setUserId("hidden");
//设置头信息,可以在消息被消费的时候进识别,
// 然后监听者通过message.getMessageProperties().getHeader()可以拿到设置的值
messageProperties.setHeader("h",1);
//设置投递模式,2表示会持久化,1表示不持久化
messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
//设置优先级,优先级越高的相比优先级低的会提前消费掉
messageProperties.setPriority(1);
//消息的过期时间,
messageProperties.setExpiration("60000");
//封装一个消息(消息体内容,消息配置信息),消息的配置信息可以做一些鉴别作用
 return new Message(body, messageProperties);

发送消息的方法所对应的关键配置解析:

代码语言:javascript复制
channel.basicPublish(exchange, routingKey, mandatory, convertedMessageProperties, message.getBody());

exchange:交换机的名称,需要指名消息需要发送到那个交换机中,如果设置为空字符串,消息会被发送到默认交换机中。

routingKey:路由键,交换器根据路由键将消息存储到相应的队列中!

convertedMessageproperties:消息的描述信息,有headers、deliveryModel、priority、correlationid、replyto、expiration、messageId、timestemp、type、userId、appId、clusterId等。

byte[]:消息体,真实要发送的消息。

mandatory和immediate:channel.basicPublish方法中的两个参数,他们都有当消息传递过程中不可达目的地的时候将消息返回给生产者的功能,rabbitmq提供的备份交换机可以将未能被交换器路由的消息存储起来,而不用返回给客户端。mandatory为true,rabbitmq根据路由无法将消息投递出去的时候就会将消息返回给客户端,为false的时候就会直接抛弃该消息。

immediate:为true,如果交换机在将消息路由到队里时候发现队列并不存在任何消费者的时候,那这条消息将不会存入队列,当与路由键匹配的所有队列都没有消费者的时候该消息会通过basic.reture返回给生产者。

概括来说mandatory参数告诉服务器至少将该消息路由到一个队列,否则会返回该生产者。immediate参数则告诉服务器,如果该消息关联的队列上有消费者则立即投递,如果所有匹配的队列都没有消费者,则返回给生产者。不用将消息存入队列而等待消费者。rabbitmq 3.0中已经去掉immediate参数,immediate会影响队列性能,增加代码复杂度,建议采用ttl和dlx方法替代。 《RabbitMq实战指南》

在方法的底层我们发现this.returnCallBack不为null的时候才会有可能mandatory为true,然后才有可能在投送的时候没有相应队列然后返回给生产者的可能。因为我们需要在rabbitmqTemplate中设置returnCallback。

具体代码如下:

代码语言:javascript复制
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
//设置没有路由队列的时候返回给客户端
rabbitTemplate.setMandatory(true);
//设置消费确认消息
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> log.info("消息发送成功:correlationData({}),ack({}),cause({})", correlationData, ack, cause));
//设置没有路由队列返回客户端消息
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> log.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}", exchange, routingKey, replyCode, replyText, message));

0 人点赞