Spring Cloud Stream与Kafka集成示例

2023-04-12 10:58:21 浏览数 (1)

下面是一个完整的示例,它使用Spring Cloud Stream和Kafka来创建一个简单的消息处理器和发布器:

1. 添加依赖

在pom.xml文件中添加以下依赖:

代码语言:javascript复制
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>

2. 配置Kafka

在application.properties文件中添加以下配置:

代码语言:javascript复制
propertiesCopy codespring.cloud.stream.kafka.binder.brokers=localhost:9092
spring.cloud.stream.kafka.binder.zkNodes=localhost:2181
spring.cloud.stream.kafka.binder.configuration.acks=all
spring.cloud.stream.kafka.binder.configuration.retries=3
spring.cloud.stream.kafka.binder.configuration.batch.size=16384
spring.cloud.stream.kafka.binder.configuration.linger.ms=1
spring.cloud.stream.kafka.binder.configuration.buffer.memory=33554432
spring.cloud.stream.kafka.binder.configuration.compression.type=gzip

3. 创建消息处理器

代码语言:javascript复制
@EnableBinding(MyProcessor.class)
@SpringBootApplication
public class MyApplication {

    public static void main(String[] args) {
        SpringApplication.run(MyApplication.class, args);
    }

    @Autowired
    private MyProcessor processor;

    @StreamListener(MyProcessor.INPUT)
    public void handle(Message<String> message) {
        System.out.println("Received message: "   message.getPayload());
    }

    public interface MyProcessor {

        String INPUT = "myInput";
        String OUTPUT = "myOutput";

        @Input(INPUT)
        SubscribableChannel input();

        @Output(OUTPUT)
        MessageChannel output();
    }
}

在这个示例中,我们定义了一个名为MyProcessor的声明式接口,其中包含了一个名为myInput的输入通道和一个名为myOutput的输出通道。我们使用@EnableBinding注解告诉Spring Boot应用程序使用MyProcessor接口中定义的输入和输出通道。

然后,我们定义了一个@StreamListener注解的方法handle(),该方法处理从输入通道接收到的消息,并将其打印到控制台。

4. 创建消息发布器

代码语言:javascript复制
@Component
public class MyPublisher {

    @Autowired
    private MyProcessor processor;

    public void publish(String message) {
        processor.output().send(MessageBuilder.withPayload(message).build());
    }
}

在这个示例中,我们创建了一个名为MyPublisher的组件,并在其中注入了MyProcessor接口。我们还定义了一个名为publish()的方法,该方法使用processor.output().send()方法将一个带有有效载荷的消息发送到名为myOutput的输出通道中。

5. 测试应用程序

代码语言:javascript复制
@RestController
public class MyController {

    @Autowired
    private MyPublisher publisher;

    @PostMapping("/publish")
    public void publishMessage(@RequestBody String message) {
        publisher.publish(message);
    }
}

在这个示例中,我们创建了一个名为MyController的REST控制器,并在其中注入了MyPublisher组件。我们还定义了一个名为publishMessage()的POST请求处理程序,该处理程序将消息正文作为输入,并使用MyPublisher组件将其发送到名为myOutput的输出通道中。

6. 运行应用程序

现在我们可以启动应用程序并测试它了。我们可以使用任何HTTP客户端向/publish端点发送POST请求,并将消息正文作为输入。

例如,我们可以使用curl命令向端口8080发送一条消息:

代码语言:javascript复制
curl -X POST -H "Content-Type: text/plain" -d "Hello, Kafka!" http://localhost:8080/publish

应用程序应该在控制台上输出以下内容:

代码语言:javascript复制
Received message: Hello, Kafka!

这证明消息已成功从myOutput输出通道发送到myInput输入通道,并由handle()方法处理。

0 人点赞