Spring Boot 整合 RabbitMQ

2022-03-30 08:38:49 浏览数 (1)

Spring Boot 整合 RabbitMQ

搭建环境

创建测试项目:test_rabbitmq_boot

添加依赖

代码语言:javascript复制
        xc_test_parent
        com.czxy.xuecheng
        1.0-SNAPSHOT
    
    4.0.0

    test_rabbitmq_boot

    
        
            org.springframework.boot
            spring-boot-starter-amqp
        
        
            org.springframework.boot
            spring-boot-starter-test

添加yml文件

代码语言:javascript复制
server:
  port: 8090
spring:
  application:
    name: test_rabbitmq_producer
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    passowrd: guest
    virtualHost: /

创建启动类:TestRabbitMQBootApplication

代码语言:javascript复制
package com.czxy.xuecheng;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/> 
 * Created by liangtong.
 */
@SpringBootApplication
public class TestRabbitMQBootApplication {
    public static void main(String[] args) {
        SpringApplication.run(TestRabbitMQBootApplication.class, args);
    }
}

配置类

代码语言:javascript复制
package com.czxy.xuecheng.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/> 
 * Created by liangtong.
 */
@Configuration
public class RabbitConfig {
    // 交换机名称
    public static final String EXCHANGE_TOPIC_INFORM = "inform_exchange_topic";

    //队列名称
    public static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
    public static final String QUEUE_INFORM_SMS = "queue_inform_sms";

    />  交换机配置
     *  ExchangeBuilder提供了fanout、direct、topic、header交换机类型的配置
     *  channel.exchangeDeclare(EXCHANGE_TOPIC_INFORM, BuiltinExchangeType.TOPIC);
     * @return
     */
    @Bean(EXCHANGE_TOPIC_INFORM)
    public Exchange exchange_topic() {
        //durable(true)持久化,消息队列重启后交换机仍然存在
        return ExchangeBuilder.topicExchange(EXCHANGE_TOPIC_INFORM).durable(true).build();
    }

    /> 
     * 声明队列
     * channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null);
     * @return
     */
    @Bean(QUEUE_INFORM_SMS)
    public Queue queue_inform_sms(){
        return new Queue(QUEUE_INFORM_SMS);
    }
    @Bean(QUEUE_INFORM_EMAIL)
    public Queue queue_inform_email(){
        return new Queue(QUEUE_INFORM_EMAIL,true,false,false);
    }

    /> 
     * 绑定队列到交换机
     * channel.queueBind(QUEUE_INFORM_EMAIL, EXCHANGE_TOPIC_INFORM, "inform.#.email.#");
     * @param queue
     * @param exchange
     * @return
     */
    @Bean
    public Binding binding_queue_inform_sms(@Qualifier(QUEUE_INFORM_SMS) Queue queue, @Qualifier(EXCHANGE_TOPIC_INFORM) Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("inform.#.sms.#").noargs();
    }
    @Bean
    public Binding binding_queue_inform_email(@Qualifier(QUEUE_INFORM_EMAIL) Queue queue, @Qualifier(EXCHANGE_TOPIC_INFORM) Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("inform.#.email.#").noargs();
    }

}

生产者

代码语言:javascript复制
package com.czxy.xuecheng;

import com.czxy.xuecheng.config.RabbitConfig;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import javax.annotation.Resource;

/> 
 * Created by liangtong.
 */
@RunWith(SpringRunner.class)
@SpringBootTest(classes = TestRabbitMQBootApplication.class)
public class Producer05Topic {

    @Resource
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSendEmail() {
        //channel.basicPublish(EXCHANGE_TOPIC_INFORM, "inform.email", null, message.getBytes());
        for(int i = 0 ; i < 5 ; i   ) {
            String message = "email inform to user"   i;
            rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_TOPIC_INFORM,"inform.email",message);
            System.out.println("Send Message is:'"   message   "'");
        }
    }

    @Test
    public void testSendSms() {
        //channel.basicPublish(EXCHANGE_TOPIC_INFORM, "inform.sms", null, message.getBytes());
        for(int i = 0 ; i < 5 ; i   ) {
            String message = "sms inform to user"   i;
            rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_TOPIC_INFORM,"inform.sms",message);
            System.out.println("Send Message is:'"   message   "'");
        }
    }

    @Test
    public void testSendSmsAndEmail() {
        //channel.basicPublish(EXCHANGE_TOPIC_INFORM, "inform.sms.email", null, message.getBytes());
        for(int i = 0 ; i < 5 ; i   ) {
            String message = "sms and email inform to user"   i;
            rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_TOPIC_INFORM,"inform.sms.email",message);
            System.out.println("Send Message is:'"   message   "'");
        }
    }

}

消费者

代码语言:javascript复制
package com.czxy.xuecheng.listener;

import com.czxy.xuecheng.config.RabbitConfig;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/> 
 * Created by liangtong.
 */
@Component
public class Consumer05Topic {

    @RabbitListener(queues = RabbitConfig.QUEUE_INFORM_EMAIL)
    public void receiveEmail(String msg , Message message){
        System.out.println("receive message is:"   msg);
    }
/*
    @RabbitListener(queues = RabbitConfig.QUEUE_INFORM_SMS)
    public void receiveSmS(String msg , Message message){
        System.out.println("receive message is:"   msg);
    }
    */
}

0 人点赞