Springboot使用RabbitMQ看这几篇就够了(模式案例篇)!

2021-01-19 11:20:10 浏览数 (1)

前言

上篇我们说到了消息队列RabbitMQ的模式概念,那么这里将会针对模式使用SpringBoot联合RabbitMQ做一个案例,实现消息的生产和消费。

这一篇也是这个主题的最后一篇了,建议配合着看。助于理解。

博主会将Demo工程放在Gitee上,有兴趣的可以拉下来自己试试。

Gitee地址:https://gitee.com/lemon_ant/os.git

正文

准备工作

新建SpringBoot项目

添加配置文件

代码语言:javascript复制
server.port=8080

spring.application.name=cl
#RabbitMq所在服务器IP
spring.rabbitmq.host=127.0.0.1
#连接端口号
spring.rabbitmq.port=5672
#用户名
spring.rabbitmq.username=root
#用户密码
spring.rabbitmq.password=123456
# 开启发送确认
spring.rabbitmq.publisher-confirm-type=correlated
# 开启发送失败退回
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.virtual-host=/

添加pom文件

代码语言:javascript复制
<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-amqp</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
			<exclusions>
				<exclusion>
					<groupId>org.junit.vintage</groupId>
					<artifactId>junit-vintage-engine</artifactId>
				</exclusion>
			</exclusions>
		</dependency>
		<dependency>
			<groupId>org.springframework.amqp</groupId>
			<artifactId>spring-rabbit-test</artifactId>
			<scope>test</scope>
		</dependency>
	</dependencies>

启动类

代码语言:javascript复制
@SpringBootApplication
public class OsApplication {
	public static void main(String[] args) {
		SpringApplication.run(OsApplication.class, args);
	}

}

点对点模式

队列初始化

代码语言:javascript复制
//当没有这个队列的时候会自动创建
@Configuration
public class PointInitialization {
    @Bean
    Queue toPoint(){
        Queue queue = new Queue("point.to.point",true);
        return queue;
    }
}

生产者

代码语言:javascript复制
@Component
public class PointProducer {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void send(String name){
        String sendMsg = "点对点队列:"   name   "   "   new Date();
        //指定队列
      	this.rabbitTemplate.convertAndSend("point.to.point",sendMsg);
    }
}

消费者

代码语言:javascript复制
@Component
public class PointConsumer {
	//监听的队列名
    @RabbitListener(queues = "point.to.point")
    public void processOne(String name) {
        System.out.println("point.to.point:"   name);
    }

}

测试类(模仿控制层)

代码语言:javascript复制
@RestController
@RequestMapping("/Point")
public class PointController {
    @Autowired
    private PointProducer sayProducer;

    @RequestMapping("/point/{name}")
    public String send(@PathVariable String name){
        sayProducer.send(name);
        return "发送成功";
    }
}

使用postman模拟请求

控制台结果

work模式

队列初始化

代码语言:javascript复制
@Configuration
public class WorkInitialization {
    //当没有这个队列的时候会自动创建
    @Bean
    Queue work(){
        Queue queue = new Queue("WorkingMode",true);
        return queue;
    }
}

生产者

代码语言:javascript复制
@Component
public class WorkProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void send(String name){
        String sendMsg = "工作模式:"   name   "   "   new Date();
        //指定队列
        this.rabbitTemplate.convertAndSend("WorkingMode",sendMsg);
    }
}

消费者

代码语言:javascript复制
//三个队列同时监听
@Component
public class WorkConsumer {
    @RabbitListener(queues = "WorkingMode")
    public void processOne(String name) {
        System.out.println("WorkingMode1:"   name);
    }

    @RabbitListener(queues = "WorkingMode")
    public void processTwo(String name) {
        System.out.println("WorkingMode2:"   name);
    }

    @RabbitListener(queues = "WorkingMode")
    public void processThree(String name) {
        System.out.println("WorkingMode3:"   name);
    }

}

测试类(模仿控制层)

代码语言:javascript复制
@RestController
@RequestMapping("/work")
public class WorkController {

    @Autowired
    private WorkProducer sayProducer;

    @RequestMapping("/work/{name}")
    public String send(@PathVariable String name){
        sayProducer.send(name);
        return "发送成功";
    }
}

使用postman模拟请求

控制台结果

注意看时间,说明消息是轮询分发的,一个消息只由一个消费者消费。

发布/订阅者模式(Publish/Subscribe)

队列初始化

代码语言:javascript复制
//类型为fanout
@Configuration
public class PublishInitialization {

    //当没有这个队列的时候会自动创建
    @Bean
    Queue publishOne(){
        Queue queue = new Queue("queue.publish.one",true);
        return queue;
    }
    @Bean
    Queue publishTwo(){
        Queue queue = new Queue("queue.publish.two",true);
        return queue;
    }
    @Bean
    Queue publishThree(){
        Queue queue = new Queue("queue.publish.three",true);
        return queue;
    }

    //创建交换器
    @Bean
    FanoutExchange pulishExchange(){
        FanoutExchange directExchange = new FanoutExchange("publishExchange");
        return directExchange;
    }

    //绑定队列(不用指定routing key),参数名字要和bean名字一致
    @Bean
    Binding bindingPublishOne(Queue publishOne,FanoutExchange pulishExchange){
        Binding binding = BindingBuilder.bind(publishOne).to(pulishExchange);
        return binding;
    }
    @Bean
    Binding bindingPublishTwo(Queue publishTwo,FanoutExchange pulishExchange){
        Binding binding = BindingBuilder.bind(publishTwo).to(pulishExchange);
        return binding;
    }
    @Bean
    Binding bindingPublishThree(Queue publishThree,FanoutExchange pulishExchange){
        Binding binding = BindingBuilder.bind(publishThree).to(pulishExchange);
        return binding;
    }
}

生产者

代码语言:javascript复制
@Component
public class PublishProducer {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void send(String name){
        String sendMsg = "发布订阅模式:"   name   "   "   new Date();
        //指定队列
        this.rabbitTemplate.convertAndSend("publishExchange","",sendMsg);
    }
}

消费者

代码语言:javascript复制
@Component
public class PublishConsumer {
    @RabbitListener(queues = "queue.publish.one")
    public void processOne(String name) {
        System.out.println("queue.publish.one:"   name);
    }

    @RabbitListener(queues = "queue.publish.two")
    public void processTwo(String name) {
        System.out.println("queue.publish.two:"   name);
    }

    @RabbitListener(queues = "queue.publish.three")
    public void processThree(String name) {
        System.out.println("queue.publish.three:"   name);
    }
}

测试类(模仿控制层)

代码语言:javascript复制
@RestController
@RequestMapping("/Publish")
public class PublishController {

    @Autowired
    private PublishProducer sayProducer;

    @RequestMapping("/publish/{name}")
    public String send(@PathVariable String name){
        sayProducer.send(name);
        return "发送成功";
    }
}

使用postman模拟请求

控制台结果

注意看时间,交换机会将消息推送到所有绑定到它的队列。

路由模式

队列初始化

代码语言:javascript复制
//类型为direct
@Configuration
public class RoutingInitialization {

    //当没有这个队列的时候会自动创建
    @Bean
    Queue routingOne(){
        Queue queue = new Queue("queue.routing.one",true);
        return queue;
    }
    @Bean
    Queue routingTwo(){
        Queue queue = new Queue("queue.routing.two",true);
        return queue;
    }
    @Bean
    Queue routingThree(){
        Queue queue = new Queue("queue.routing.three",true);
        return queue;
    }

    //创建交换器
    @Bean
    DirectExchange routingExchange(){
        DirectExchange directExchange = new DirectExchange("routingExchange");
        return directExchange;
    }

    //绑定队列
    @Bean
    Binding bindingRoutingOne(Queue routingOne,DirectExchange routingExchange){
        Binding binding = BindingBuilder.bind(routingOne).to(routingExchange).with("1");
        return binding;
    }
    @Bean
    Binding bindingRoutingTwo(Queue routingTwo,DirectExchange routingExchange){
        Binding binding = BindingBuilder.bind(routingTwo).to(routingExchange).with("2");
        return binding;
    }
    @Bean
    Binding bindingRoutingThree(Queue routingThree,DirectExchange routingExchange){
        Binding binding = BindingBuilder.bind(routingThree).to(routingExchange).with("3");
        return binding;
    }
}

生产者

代码语言:javascript复制
@Component
public class RoutingProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void send(String type){
        String sendMsg = "路由模式:"   type   "   "   new Date();
        //指定队列
        if (type.equals("1")){
            this.rabbitTemplate.convertAndSend("routingExchange","1",sendMsg);
        }
        if (type.equals("2")){
            this.rabbitTemplate.convertAndSend("routingExchange","2",sendMsg);
        }
        if (type.equals("3")){
            this.rabbitTemplate.convertAndSend("routingExchange","3",sendMsg);
        }
    }
}

消费者

代码语言:javascript复制
@Component
public class RoutingConsumer {


    @RabbitListener(queues = "queue.routing.one")
    public void processOne(String name) {
        System.out.println("queue.routing.one:"   name);
    }

    @RabbitListener(queues = "queue.routing.two")
    public void processTwo(String name) {
        System.out.println("queue.routing.two:"   name);
    }

    @RabbitListener(queues = "queue.routing.three")
    public void processThree(String name) {
        System.out.println("queue.routing.three:"   name);
    }

}

测试类(模仿控制层)

代码语言:javascript复制
@RestController
@RequestMapping("/Routing")
public class RoutingController {

    @Autowired
    private RoutingProducer sayProducer;

    @RequestMapping("/routing/{name}")
    public String send(@PathVariable String name){
        sayProducer.send(name);
        return "发送成功";
    }
}

使用postman模拟请求

我这里测试传的就是routing key,方便看。

控制台结果

这里用时间来区别。

主题模式(Topic)

队列初始化

代码语言:javascript复制
//类型为topic
@Configuration
public class TopicInitialization {

    //当没有这个队列的时候会自动创建
    @Bean
    Queue topicOne(){
        Queue queue = new Queue("queue.topic.one",true);
        return queue;
    }
    @Bean
    Queue topicTwo(){
        Queue queue = new Queue("queue.topic.two",true);
        return queue;
    }
    @Bean
    Queue topicThree(){
        Queue queue = new Queue("queue.topic.three",true);
        return queue;
    }

    //创建交换器
    @Bean
    TopicExchange topicExchange(){
        TopicExchange directExchange = new TopicExchange("topicExchange");
        return directExchange;
    }

    //绑定队列
    @Bean
    Binding bindingTopicOne(Queue topicOne,TopicExchange topicExchange){
        Binding binding = BindingBuilder.bind(topicOne).to(topicExchange).with("#.error");
        return binding;
    }
    @Bean
    Binding bindingTopicTwo(Queue topicTwo,TopicExchange topicExchange){
        Binding binding = BindingBuilder.bind(topicTwo).to(topicExchange).with("#.log");
        return binding;
    }
    @Bean
    Binding bindingTopicThree(Queue topicThree,TopicExchange topicExchange){
        Binding binding = BindingBuilder.bind(topicThree).to(topicExchange).with("good.#.timer");
        return binding;
    }
}

生产者

代码语言:javascript复制
@Component
public class TopicProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void send(String routing){
        String sendMsg = "主题模式:"   routing   "   "   new Date();
        //指定队列
        this.rabbitTemplate.convertAndSend("topicExchange",routing,sendMsg);

    }
}

消费者

代码语言:javascript复制
@Component
public class TopicConsumer {
    @RabbitListener(queues = "queue.topic.one")
    public void processOne(String name) {
        System.out.println("queue.topic.one:"   name);
    }

    @RabbitListener(queues = "queue.topic.two")
    public void processTwo(String name) {
        System.out.println("queue.topic.two:"   name);
    }

    @RabbitListener(queues = "queue.topic.three")
    public void processThree(String name) {
        System.out.println("queue.topic.three:"   name);
    }

}

测试类(模仿控制层)

代码语言:javascript复制
@RestController
@RequestMapping("/Topic")
public class TopicController {

    @Autowired
    private TopicProducer sayProducer;

    @RequestMapping("/topic/{type}")
    public String send(@PathVariable String type){
        sayProducer.send(type);
        return "发送成功";
    }
}

请求以及对应结果

注意看请求的key和打印日志的对应关系。

尾言

消息队列在这里基本就结束了,结合前面两篇基本就能够了解队列的基本概念和用法了。

0 人点赞