Spring Boot Kafka 生产者示例
Spring Boot 是最流行和最常用的 Java 编程语言框架之一。它是一个基于微服务的框架,使用 Spring Boot 制作一个可用于生产的应用程序只需很少的时间。Spring Boot 可以轻松创建独立的、生产级的基于 Spring 的应用程序,您可以“直接运行”。下面列出了 Spring boot 的一些主要特性。
- 创建独立的 Spring 应用程序
- 直接嵌入 Tomcat、Jetty 或 Undertow。
- 提供“入门”依赖项以简化构建配置。
- 尽可能自动配置 Spring 和第 3 方库。
- 提供生产就绪的功能,例如运行状况检查、指标和外部化配置。
- 几乎不需要生成代码,也不需要 XML 配置。
Apache Kafka 是一个发布-订阅消息系统。消息传递系统允许您在进程、应用程序和服务器之间发送消息。从广义上讲,Apache Kafka
是一个可以定义并进一步处理主题(主题可能是一个类别)的软件。应用程序可以连接到该系统并将消息传输到该主题。消息可以包含来自您个人博客上的任何事件的任何类型的信息,也可以是会触发任何其他事件的非常简单的文本消息。
例子:
先决条件
确保您已在本地计算机上安装 Apache Kafka。
步骤 1:
转到此链接https://start.spring.io/并创建一个 Spring Boot 项目。将以下依赖项添加到您的 Spring Boot 项目中。
- Apache Kafka 的 Spring
步骤 2:
现在让我们创建一个名为DemoController的控制器类。
DemoController.java
代码语言:javascript复制// Java Program to Illustrate Controller Class
package com.amiya.kafka.apachekafkaproducer;
// Importing required classes
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.*;
// Annotation
@RestController
// Class
public class DemoController {
// Autowiring Kafka Template
@Autowired KafkaTemplate<String, String> kafkaTemplate;
private static final String TOPIC = "NewTopic";
// Publish messages using the GetMapping
@GetMapping("/publish/{message}")
public String publishMessage(@PathVariable("message")
final String message)
{
// Sending the message
kafkaTemplate.send(TOPIC, message);
return "Published Successfully";
}
}
第3步:
现在我们必须执行以下操作才能使用 Spring Boot 将消息发布到 Kafka 主题
- 运行 Apache Zookeeper 服务器
- 运行 Apache Kafka 服务器
- 监听来自新主题的消息
C:kafka>.binwindowszookeeper-server-start.bat .configzookeeper.properties
同样,使用此命令运行 Apache Kafka 服务器
代码语言:javascript复制C:kafka>.binwindowskafka-server-start.bat .configserver.properties
运行以下命令来监听来自新主题的消息
代码语言:javascript复制C:kafka>.binwindowskafka-console-consumer.bat --bootstrap-server localhost:9092 --topic NewTopic --from-beginning
第4步:
现在运行您的 Spring Boot 应用程序。确保您已更改application.properties文件中的端口号
代码语言:javascript复制server.port=8081
让我们在 ApacheKafkaProducerApplication 文件中运行 Spring boot 应用程序
第 5 步:
浏览此 URL 并在 /publish/ 后传递您的消息。
代码语言:javascript复制http://localhost:8081/publish/demo00
您可以看到我们得到了“Published Successed”作为回应。并且实时您可以看到该消息也已发布到服务器上。消息流是实时的。
同样,如果我们在此处传递了Hello World
,您可以看到我们得到了“发布成功”作为回报。并且实时您可以看到该消息也已发布到服务器上。
Spring Boot Kafka 消费者示例
第 1 步:
创建一个 Spring Boot 项目。将“ Spring for Apache Kafka ”依赖项添加到您的 Spring Boot 项目中。
第 2 步:
创建一个名为KafkaConfig的配置文件。以下是KafkaConfig.java文件的代码。
KafkaConfig.java
代码语言:javascript复制// Java Program to Illustrate Kafka Configuration
package com.amiya.kafka.apachekafkaconsumer.config;
// Importing required classes
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
// Annotations
@EnableKafka
@Configuration
// Class
public class KafkaConfig {
@Bean
public ConsumerFactory<String, String> consumerFactory()
{
// Creating a Map of string-object pairs
Map<String, Object> config = new HashMap<>();
// Adding the Configuration
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
"127.0.0.1:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG,
"group_id");
config.put(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
config.put(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(config);
}
// Creating a Listener
public ConcurrentKafkaListenerContainerFactory
concurrentKafkaListenerContainerFactory()
{
ConcurrentKafkaListenerContainerFactory<
String, String> factory
= new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
第 3 步:
创建一个名为KafkaConsumer的Consumer文件
KafkaConsumer.java
代码语言:javascript复制// Java Program to Illustrate Kafka Consumer
package com.amiya.kafka.apachekafkaconsumer.consumer;
// Importing required classes
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
// Class
public class KafkaConsumer {
@KafkaListener(topics = "NewTopic",
groupId = "group_id")
// Method
public void
consume(String message)
{
// Print statement
System.out.println("message = " message);
}
}
第 4 步:
现在我们必须执行以下操作才能使用 Spring Boot 消费来自 Kafka 主题的消息
- 运行 Apache Zookeeper 服务器
- 运行 Apache Kafka 服务器
- 从 Kafka 主题发送消息
使用此命令运行 Apache Zookeeper 服务器
代码语言:javascript复制C:kafka>.binwindowszookeeper-server-start.bat .configzookeeper.properties
同样,使用此命令运行 Apache Kafka 服务器
代码语言:javascript复制C:kafka>.binwindowskafka-server-start.bat .configserver.properties
运行以下命令从 Kafka Topics 发送消息
代码语言:javascript复制C:kafka>.binwindowskafka-console- Producer.bat --broker-list localhost:9092 --topic NewTopic
第 5 步:
现在运行您的 Spring Boot 应用程序。确保您已更改application.properties文件中的端口号
代码语言:javascript复制server.port=8081
让我们在 ApacheKafkaConsumerApplication 文件中运行 Spring boot 应用程序
输出:在输出中,您可以看到当您从 Kafka Topics 发送消息时,它会实时显示在控制台上。