RabbitMq备胎交换器

2022-08-11 15:54:11 浏览数 (1)

备胎交换器简称AE,生产者在发送消息的时候如果不设置mandatory参数,那么消息在未被路由的情况下将会丢失;如果设置了mandatory参数,那么需要添加reutrnListener的编程逻辑。生产者的代码将变得复杂,如果既不想复杂化生产者的编程逻辑,又不想消息丢失,那么可以备份交换器,这样可以将未被路由的消息存储在rabbitmq中,再需要的时候再去处理这些消息。

我们可以在定义交换器的时候添加alternate-exchange参数来实现。

代码语言:javascript复制
Mapargs=new HashMap();
args.put(”alternate-exchange”,”b”);
channel.exchangeDeclare(”a”,”direct”,true,false,args);
channel.exchangeDeclare(”b”,”fanout”,true,false,null);
channel.queueDeclare(”aa”,true,false,false,null);
channel.queueDeclare(”b”,true,false,false,null);
channel.queueBind(”a”,”a”,”aaa”);
channel.queueBind(”b”,”bb”,””);

如上图所示,当消息发送到交换机A的时候,如果路由键为aaa那么就发送到AA的消息队列中,如果路由键不是aaa,那么就发送到备胎交换器b,然后发送到队里BBB中!

代码语言:javascript复制
 /**
     * 任務接受隊列
     */
    @Bean
    public Queue AAQueue() {
        return new Queue("AA");
    }
    /**
     * 任務接受隊列
     */
    @Bean
    public Queue BBQueue() {
        return new Queue("BB");
    }




    @Bean
    public DirectExchange exchangeA() {
        Map map=new HashMap<>();
        map.put("alternate-exchange","B");
        return new DirectExchange("A",true,false,map);
    }




    /**
     * 直接交换
     */
    @Bean
    public FanoutExchange exchangeB() {
        return new FanoutExchange("B");
    }


    /**
     *
     * @param BBQueue    队列
     * @param exchangeB 延迟交换器
     */
    @Bean
    public Binding delayBindingB(Queue BBQueue, FanoutExchange exchangeB) {
        return BindingBuilder.bind(BBQueue).to(exchangeB);
    }


    /**
     *
     * @param AAQueue    队列
     * @param exchangeA 延迟交换器
     */
    @Bean
    public Binding delayBindingA(Queue AAQueue, DirectExchange exchangeA) {
        return BindingBuilder.bind(AAQueue).to(exchangeA).with("aaa");
    }


    /**
     *  @param exchangeB    队列
     * @param exchangeA 延迟交换器
     * @return
     */
    @Bean
    public Binding exchangeBinding(FanoutExchange exchangeB, DirectExchange exchangeA) {
        return BindingBuilder.bind(exchangeB).to(exchangeA).with("");
    }
代码语言:javascript复制
    @Test
    public void testAE(){
        //设置消息相关属性
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setMessageId(UUID.randomUUID().toString());
        messageProperties.setContentType(MediaType.APPLICATION_JSON_VALUE);
        rabbitMqTemplate.sendAndReceive("A","aaa",new Message("123".getBytes(),messageProperties));
        rabbitMqTemplate.sendAndReceive("A","aaab",new Message("1234".getBytes(),messageProperties));
    }

代码运行效果:消息队列

备胎交换器

0 人点赞