RabbitMQ笔记(五)-RabbitTemplate

2019-08-31 12:27:49 浏览数 (1)

使用RabbitTemplate进行收发消息将十分的方便

首先配置bean
代码语言:javascript复制
     @Bean
    public RabbitTemplate rabbitTemplate(CachingConnectionFactory factory){
        log.info("caching factory: {}", factory.getChannelCacheSize());
        RabbitTemplate rabbitTemplate = new RabbitTemplate(factory);
        rabbitTemplate.setConfirmCallback(rabbitConfirmCallback);

        /**
         * 当mandatory标志位设置为true时
         * 如果exchange根据自身类型和消息routingKey无法找到一个合适的queue存储消息
         * 那么broker会调用basic.return方法将消息返还给生产者
         * 当mandatory设置为false时,出现上述情况broker会直接将消息丢弃
         */
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setReturnCallback(rabbitReturnCallback);
        //使用单独的发送连接,避免生产者由于各种原因阻塞而导致消费者同样阻塞
        rabbitTemplate.setUsePublisherConnection(true);

        return rabbitTemplate;
    }

ConfirmCallback: ConfirmCallback接口用于实现消息发送到RabbitMQ交换器后接收ack回调

ReturnCallback:ReturnCallback接口用于实现消息发送到RabbitMQ交换器,但无相应队列与交换器绑定时的回调

简单使用接口

给每一条信息添加一个dataId,放在CorrelationData,这样在RabbitConfirmCallback返回失败的时候可以知道是哪个消息失败

代码语言:javascript复制
    public void send(String dataId, String exchangeName, String rountingKey, String message){
        CorrelationData correlationData = new CorrelationData();
        correlationData.setId(dataId);

        rabbitTemplate.convertAndSend(exchangeName, rountingKey, message, correlationData);
    }

    public String receive(String queueName){
        return String.valueOf(rabbitTemplate.receiveAndConvert(queueName));
    }

从2.1版本开始,CorrelationData对象具有ListenableFuture,可用于获取结果,而不是在rabbitTemplate上使用ConfirmCallback

代码语言:javascript复制
CorrelationData cd1 = new CorrelationData();
this.templateWithConfirmsEnabled.convertAndSend("exchange", queue.getName(), "foo", cd1);
assertTrue(cd1.getFuture().get(10, TimeUnit.SECONDS).isAck());

0 人点赞