下面是一个完整的示例,它使用Spring Cloud Stream和Kafka来创建一个简单的消息处理器和发布器:
1. 添加依赖
在pom.xml文件中添加以下依赖:
代码语言:javascript复制<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
2. 配置Kafka
在application.properties文件中添加以下配置:
代码语言:javascript复制propertiesCopy codespring.cloud.stream.kafka.binder.brokers=localhost:9092
spring.cloud.stream.kafka.binder.zkNodes=localhost:2181
spring.cloud.stream.kafka.binder.configuration.acks=all
spring.cloud.stream.kafka.binder.configuration.retries=3
spring.cloud.stream.kafka.binder.configuration.batch.size=16384
spring.cloud.stream.kafka.binder.configuration.linger.ms=1
spring.cloud.stream.kafka.binder.configuration.buffer.memory=33554432
spring.cloud.stream.kafka.binder.configuration.compression.type=gzip
3. 创建消息处理器
代码语言:javascript复制@EnableBinding(MyProcessor.class)
@SpringBootApplication
public class MyApplication {
public static void main(String[] args) {
SpringApplication.run(MyApplication.class, args);
}
@Autowired
private MyProcessor processor;
@StreamListener(MyProcessor.INPUT)
public void handle(Message<String> message) {
System.out.println("Received message: " message.getPayload());
}
public interface MyProcessor {
String INPUT = "myInput";
String OUTPUT = "myOutput";
@Input(INPUT)
SubscribableChannel input();
@Output(OUTPUT)
MessageChannel output();
}
}
在这个示例中,我们定义了一个名为MyProcessor的声明式接口,其中包含了一个名为myInput的输入通道和一个名为myOutput的输出通道。我们使用@EnableBinding注解告诉Spring Boot应用程序使用MyProcessor接口中定义的输入和输出通道。
然后,我们定义了一个@StreamListener注解的方法handle(),该方法处理从输入通道接收到的消息,并将其打印到控制台。
4. 创建消息发布器
代码语言:javascript复制@Component
public class MyPublisher {
@Autowired
private MyProcessor processor;
public void publish(String message) {
processor.output().send(MessageBuilder.withPayload(message).build());
}
}
在这个示例中,我们创建了一个名为MyPublisher的组件,并在其中注入了MyProcessor接口。我们还定义了一个名为publish()的方法,该方法使用processor.output().send()方法将一个带有有效载荷的消息发送到名为myOutput的输出通道中。
5. 测试应用程序
代码语言:javascript复制@RestController
public class MyController {
@Autowired
private MyPublisher publisher;
@PostMapping("/publish")
public void publishMessage(@RequestBody String message) {
publisher.publish(message);
}
}
在这个示例中,我们创建了一个名为MyController的REST控制器,并在其中注入了MyPublisher组件。我们还定义了一个名为publishMessage()的POST请求处理程序,该处理程序将消息正文作为输入,并使用MyPublisher组件将其发送到名为myOutput的输出通道中。
6. 运行应用程序
现在我们可以启动应用程序并测试它了。我们可以使用任何HTTP客户端向/publish端点发送POST请求,并将消息正文作为输入。
例如,我们可以使用curl命令向端口8080发送一条消息:
代码语言:javascript复制curl -X POST -H "Content-Type: text/plain" -d "Hello, Kafka!" http://localhost:8080/publish
应用程序应该在控制台上输出以下内容:
代码语言:javascript复制Received message: Hello, Kafka!
这证明消息已成功从myOutput输出通道发送到myInput输入通道,并由handle()方法处理。