Spring Boot 整合 RabbitMQ
搭建环境
创建测试项目:test_rabbitmq_boot
代码语言:javascript复制添加依赖
xc_test_parent
com.czxy.xuecheng
1.0-SNAPSHOT
4.0.0
test_rabbitmq_boot
org.springframework.boot
spring-boot-starter-amqp
org.springframework.boot
spring-boot-starter-test
代码语言:javascript复制添加yml文件
server:
port: 8090
spring:
application:
name: test_rabbitmq_producer
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
passowrd: guest
virtualHost: /
代码语言:javascript复制创建启动类:TestRabbitMQBootApplication
package com.czxy.xuecheng;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/>
* Created by liangtong.
*/
@SpringBootApplication
public class TestRabbitMQBootApplication {
public static void main(String[] args) {
SpringApplication.run(TestRabbitMQBootApplication.class, args);
}
}
配置类
代码语言:javascript复制package com.czxy.xuecheng.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/>
* Created by liangtong.
*/
@Configuration
public class RabbitConfig {
// 交换机名称
public static final String EXCHANGE_TOPIC_INFORM = "inform_exchange_topic";
//队列名称
public static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
public static final String QUEUE_INFORM_SMS = "queue_inform_sms";
/> 交换机配置
* ExchangeBuilder提供了fanout、direct、topic、header交换机类型的配置
* channel.exchangeDeclare(EXCHANGE_TOPIC_INFORM, BuiltinExchangeType.TOPIC);
* @return
*/
@Bean(EXCHANGE_TOPIC_INFORM)
public Exchange exchange_topic() {
//durable(true)持久化,消息队列重启后交换机仍然存在
return ExchangeBuilder.topicExchange(EXCHANGE_TOPIC_INFORM).durable(true).build();
}
/>
* 声明队列
* channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null);
* @return
*/
@Bean(QUEUE_INFORM_SMS)
public Queue queue_inform_sms(){
return new Queue(QUEUE_INFORM_SMS);
}
@Bean(QUEUE_INFORM_EMAIL)
public Queue queue_inform_email(){
return new Queue(QUEUE_INFORM_EMAIL,true,false,false);
}
/>
* 绑定队列到交换机
* channel.queueBind(QUEUE_INFORM_EMAIL, EXCHANGE_TOPIC_INFORM, "inform.#.email.#");
* @param queue
* @param exchange
* @return
*/
@Bean
public Binding binding_queue_inform_sms(@Qualifier(QUEUE_INFORM_SMS) Queue queue, @Qualifier(EXCHANGE_TOPIC_INFORM) Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("inform.#.sms.#").noargs();
}
@Bean
public Binding binding_queue_inform_email(@Qualifier(QUEUE_INFORM_EMAIL) Queue queue, @Qualifier(EXCHANGE_TOPIC_INFORM) Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("inform.#.email.#").noargs();
}
}
生产者
代码语言:javascript复制package com.czxy.xuecheng;
import com.czxy.xuecheng.config.RabbitConfig;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import javax.annotation.Resource;
/>
* Created by liangtong.
*/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = TestRabbitMQBootApplication.class)
public class Producer05Topic {
@Resource
private RabbitTemplate rabbitTemplate;
@Test
public void testSendEmail() {
//channel.basicPublish(EXCHANGE_TOPIC_INFORM, "inform.email", null, message.getBytes());
for(int i = 0 ; i < 5 ; i ) {
String message = "email inform to user" i;
rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_TOPIC_INFORM,"inform.email",message);
System.out.println("Send Message is:'" message "'");
}
}
@Test
public void testSendSms() {
//channel.basicPublish(EXCHANGE_TOPIC_INFORM, "inform.sms", null, message.getBytes());
for(int i = 0 ; i < 5 ; i ) {
String message = "sms inform to user" i;
rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_TOPIC_INFORM,"inform.sms",message);
System.out.println("Send Message is:'" message "'");
}
}
@Test
public void testSendSmsAndEmail() {
//channel.basicPublish(EXCHANGE_TOPIC_INFORM, "inform.sms.email", null, message.getBytes());
for(int i = 0 ; i < 5 ; i ) {
String message = "sms and email inform to user" i;
rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_TOPIC_INFORM,"inform.sms.email",message);
System.out.println("Send Message is:'" message "'");
}
}
}
消费者
代码语言:javascript复制package com.czxy.xuecheng.listener;
import com.czxy.xuecheng.config.RabbitConfig;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/>
* Created by liangtong.
*/
@Component
public class Consumer05Topic {
@RabbitListener(queues = RabbitConfig.QUEUE_INFORM_EMAIL)
public void receiveEmail(String msg , Message message){
System.out.println("receive message is:" msg);
}
/*
@RabbitListener(queues = RabbitConfig.QUEUE_INFORM_SMS)
public void receiveSmS(String msg , Message message){
System.out.println("receive message is:" msg);
}
*/
}