使用RabbitTemplate进行收发消息将十分的方便
首先配置bean
代码语言:javascript复制 @Bean
public RabbitTemplate rabbitTemplate(CachingConnectionFactory factory){
log.info("caching factory: {}", factory.getChannelCacheSize());
RabbitTemplate rabbitTemplate = new RabbitTemplate(factory);
rabbitTemplate.setConfirmCallback(rabbitConfirmCallback);
/**
* 当mandatory标志位设置为true时
* 如果exchange根据自身类型和消息routingKey无法找到一个合适的queue存储消息
* 那么broker会调用basic.return方法将消息返还给生产者
* 当mandatory设置为false时,出现上述情况broker会直接将消息丢弃
*/
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnCallback(rabbitReturnCallback);
//使用单独的发送连接,避免生产者由于各种原因阻塞而导致消费者同样阻塞
rabbitTemplate.setUsePublisherConnection(true);
return rabbitTemplate;
}
ConfirmCallback: ConfirmCallback接口用于实现消息发送到RabbitMQ交换器后接收ack回调
ReturnCallback:ReturnCallback接口用于实现消息发送到RabbitMQ交换器,但无相应队列与交换器绑定时的回调
简单使用接口
给每一条信息添加一个dataId,放在CorrelationData,这样在RabbitConfirmCallback返回失败的时候可以知道是哪个消息失败
代码语言:javascript复制 public void send(String dataId, String exchangeName, String rountingKey, String message){
CorrelationData correlationData = new CorrelationData();
correlationData.setId(dataId);
rabbitTemplate.convertAndSend(exchangeName, rountingKey, message, correlationData);
}
public String receive(String queueName){
return String.valueOf(rabbitTemplate.receiveAndConvert(queueName));
}
从2.1版本开始,CorrelationData对象具有ListenableFuture,可用于获取结果,而不是在rabbitTemplate上使用ConfirmCallback
代码语言:javascript复制CorrelationData cd1 = new CorrelationData();
this.templateWithConfirmsEnabled.convertAndSend("exchange", queue.getName(), "foo", cd1);
assertTrue(cd1.getFuture().get(10, TimeUnit.SECONDS).isAck());