RabbitMQ入门Demo,基于springboot

2020-08-21 17:45:40 浏览数 (1)

缘起

前面几章我们基本了解了RabbitMQ的基本概念,以及RabbitMQ是如何保证消息的可靠性的,那么本章开始,将真正用java代码去连接使用一些RabbitMQ,通过阅读本章内容,你会明白如何在java springboot的项目中使用RabbitMQ。

阅读人群

项目采用springboot搭建,所以你对springboot需要有一个基本的了解,并且我们假设已经在你的服务器或本机安装了RabbitMQ,所以本章不会涉及关于如何安装RabbitMQ知识。

前置条件

在前面我们说到,对于Exchanges来说,他的路由规则有以下三种

  • Direct exchange:完全根据key进行投递的叫做Direct交换机。如果Routing key匹配, 那么Message就会被传递到相应的queue中。其实在queue创建时,它会自动的以queue的名字作为routing key来绑定那个exchange。例如,绑定时设置了Routing key为”abc”,那么客户端提交的消息,只有设置了key为”abc”的才会投递到队列。
  • Fanout exchange:不需要key的叫做Fanout交换机。它采取广播模式,一个消息进来时,投递到与该交换机绑定的所有队列。
  • Topic exchange:对key进行模式匹配后进行投递的叫做Topic交换机。比如符号”#”匹配一个或多个词,符号””匹配正好一个词。例如”abc.#”匹配”abc.def.ghi”,”abc.”只匹配”abc.def”。

项目采用springboot搭建,基础配置文件信息为

代码语言:txt复制
spring.application.name=spirng-boot-rabbitmq-sender
spring.rabbitmq.host=192.168.23.128
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

对于这三种方式,我们的代码实现也会有所不同,我们一一讲解。

Direct exchange

Config

代码语言:txt复制
@Configuration
public class RabbitConfig {
    @Bean
    public Queue helloQueue(){
        return new Queue("hello");
    }
}

producer

代码语言:txt复制
@Component
public class HelloSender {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send(){
        String context = "hello "   new Date();
        System.out.println("Sender : "  context);
        this.rabbitTemplate.convertAndSend("hello", context);
    }
}

consumer

代码语言:txt复制
@Component
@RabbitListener(queues = "hello")
public class HelloReceiver {

    @RabbitHandler
    public void process(String hello){
        System.out.println("Receiver : "   hello);
    }
}

测试类

代码语言:txt复制
@SpringBootTest
public class Test {
    @Autowired
    private HelloSender helloSender;

    @org.junit.jupiter.api.Test
    void contextLoads(){
        helloSender.send();
    }
}

运行结果

image.pngimage.png

Topic exchange

Config

代码语言:txt复制
@Configuration
public class TopicRabbitConfig {
    final static String message = "topic.message";
    final static String messages = "topic.messages";

    /**
     * 创建队列
     * @return
     */
    @Bean
    public Queue queueMessage() {
        return new Queue(TopicRabbitConfig.message);
    }

    /**
     * 创建队列
     * @return
     */
    @Bean
    public Queue queueMessages() {
        return new Queue(TopicRabbitConfig.messages);
    }

    /**
     * 将对列绑定到Topic交换器
     * @return
     */
    @Bean
    TopicExchange exchange() {
        return new TopicExchange("topicExchange");
    }

    /**
     * 将queueMessage队列绑定到Topic交换器并监听为topic.message的routingKey
     * 也就是说,当你发送消息时的key为topic.message的话,他就会被投递到queueMessage队列中,这个队列名字需要与上面声明的方法名相同
     *
     *     public Queue queueMessage() {
     *         return new Queue(TopicRabbitConfig.message);
     *     }
     *
     * @param queueMessage
     * @param exchange
     * @return
     */
    @Bean
    Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) {
        return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");
    }

    /**
     * 将队列绑定到Topic交换器 采用#的方式,与上述方法类似,只是本处采用通配符。也就是说,所有以topic.开头的消息都会被投递到queueMessages队列中
     * @param queueMessages
     * @param exchange
     * @return
     */
    @Bean
    Binding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) {
        return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");
    }
}

producer

代码语言:txt复制
@Component
public class TopicSender {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send1() {
        String context = "hi, i am message 1";
        System.out.println("Sender : "   context);
        this.rabbitTemplate.convertAndSend("topicExchange", "topic.message", context);
    }

    public void send2() {
        String context = "hi, i am messages 2";
        System.out.println("Sender : "   context);
        this.rabbitTemplate.convertAndSend("topicExchange", "topic.ffsda", context);
    }
}

Consumer

代码语言:txt复制
@Component
@RabbitListener(queues = "topic.message")
public class TopicReceiver {

    @RabbitHandler
    public void process(String message) {
        System.out.println("Topic Receiver1  : "   message);
    }

}

Consumer2

代码语言:txt复制
@Component
@RabbitListener(queues = "topic.messages")
public class TopicReceiver2 {

    @RabbitHandler
    public void process(String message) {
        System.out.println("Topic Receiver2  : "   message);
    }

}

测试类

代码语言:txt复制
@SpringBootTest
public class Test {

    @Autowired
    private TopicSender topicSender;

    @org.junit.jupiter.api.Test
    public void send1() {
        topicSender.send1();
    }
    @org.junit.jupiter.api.Test
    public void send2() {
        topicSender.send2();
    }
}

运行结果

send1()

image.pngimage.png

send1()的key为topic.message,所以queueMessage队列会被放入,queueMessages队列监听的key为通配符topic.#,所以也会被放入,所以消费者两个都会消费这条消息。

send2()

image.pngimage.png

send2()的key为topic.ffsda,由于queueMessage队列监听的key为topic.message,所以不会被放入。queueMessages队列监听的key为通配符topic.#,所以会被放入,所以消费者两只有一个会消费这条消息。

Fanout exchange

0 人点赞