springboot 优雅学习RabbitMQ、RocketMQ、Kafka消息队列实战

2024-06-26 13:45:53 浏览数 (1)

Spring Boot 是一种简化创建基于 Spring 框架的 Java 应用程序的工具。它提供了一种快速入门的方式,并减少了繁琐的配置工作。消息队列是一种用于在分布式系统中解耦和异步通信的常用技术。RabbitMQ、RocketMQ 和 Kafka 是三种常见的消息队列实现。以下是它们的基本概念和在 Spring Boot 中的实战示例。

概念

RabbitMQ

RabbitMQ 是一个由 Pivotal 开发的开源消息代理,基于 AMQP(Advanced Message Queuing Protocol)协议。它支持多种消息模型,如发布/订阅、点对点和 RPC。

RocketMQ

RocketMQ 是阿里巴巴开源的分布式消息中间件,支持高吞吐量和低延迟。它通常用于处理大规模的消息流,如日志处理和交易系统。

Kafka

Kafka 是 Apache 基金会的一个开源流处理平台,最初由 LinkedIn 开发。Kafka 提供高吞吐量、低延迟的消息传输,并且更适合于处理实时数据流和事件流。

实战示例

1. Spring Boot 集成 RabbitMQ
依赖

pom.xml 文件中添加 RabbitMQ 的依赖:

代码语言:javascript复制
xml复制代码<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
配置

application.properties 文件中配置 RabbitMQ 连接信息:

代码语言:javascript复制
properties复制代码spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
代码示例
代码语言:javascript复制
java复制代码import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;

@SpringBootApplication
public class RabbitMqExampleApplication implements CommandLineRunner {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public static void main(String[] args) {
        SpringApplication.run(RabbitMqExampleApplication.class, args);
    }

    @Bean
    public Queue myQueue() {
        return new Queue("myQueue", false);
    }

    @Override
    public void run(String... args) throws Exception {
        rabbitTemplate.convertAndSend("myQueue", "Hello, RabbitMQ!");
    }

    @RabbitListener(queues = "myQueue")
    public void listen(String message) {
        System.out.println("Received: "   message);
    }
}
2. Spring Boot 集成 RocketMQ
依赖

pom.xml 文件中添加 RocketMQ 的依赖:

代码语言:javascript复制
xml复制代码<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.0</version>
</dependency>
配置

application.properties 文件中配置 RocketMQ 连接信息:

代码语言:javascript复制
properties复制代码rocketmq.name-server=localhost:9876
rocketmq.producer.group=springboot-producer-group
代码示例
代码语言:javascript复制
java复制代码import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;

@SpringBootApplication
public class RocketMqExampleApplication implements CommandLineRunner {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    public static void main(String[] args) {
        SpringApplication.run(RocketMqExampleApplication.class, args);
    }

    @Override
    public void run(String... args) throws Exception {
        Message<String> message = MessageBuilder.withPayload("Hello, RocketMQ!").build();
        rocketMQTemplate.send("myTopic", message);
    }
}

@Service
@RocketMQMessageListener(topic = "myTopic", consumerGroup = "springboot-consumer-group")
public class RocketMqConsumer {

    @RocketMQMessageListener
    public void listen(String message) {
        System.out.println("Received: "   message);
    }
}
3. Spring Boot 集成 Kafka
依赖

pom.xml 文件中添加 Kafka 的依赖:

代码语言:javascript复制
xml复制代码<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-kafka</artifactId>
</dependency>
配置

application.properties 文件中配置 Kafka 连接信息:

代码语言:javascript复制
properties复制代码spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=myGroup
代码示例
代码语言:javascript复制
java复制代码import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@SpringBootApplication
public class KafkaExampleApplication implements CommandLineRunner {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public static void main(String[] args) {
        SpringApplication.run(KafkaExampleApplication.class, args);
    }

    @Override
    public void run(String... args) throws Exception {
        kafkaTemplate.send("myTopic", "Hello, Kafka!");
    }
}

@Service
public class KafkaConsumer {

    @KafkaListener(topics = "myTopic", groupId = "myGroup")
    public void listen(String message) {
        System.out.println("Received: "   message);
    }
}

小结

以上是通过 Spring Boot 集成 RabbitMQ、RocketMQ 和 Kafka 的基本概念和简单示例。根据实际需求选择合适的消息队列,并进行配置和开发。各自的优劣和适用场景可以根据具体项目的需求进行详细评估。

我正在参与2024腾讯技术创作特训营最新征文,快来和我瓜分大奖!

0 人点赞