Spring Boot Kafka 生产者/消费者示例

2023-10-26 14:09:14 浏览数 (1)

Spring Boot Kafka 生产者示例

Spring Boot 是最流行和最常用的 Java 编程语言框架之一。它是一个基于微服务的框架,使用 Spring Boot 制作一个可用于生产的应用程序只需很少的时间。Spring Boot 可以轻松创建独立的、生产级的基于 Spring 的应用程序,您可以“直接运行”。下面列出了 Spring boot 的一些主要特性。

  • 创建独立的 Spring 应用程序
  • 直接嵌入 Tomcat、Jetty 或 Undertow。
  • 提供“入门”依赖项以简化构建配置。
  • 尽可能自动配置 Spring 和第 3 方库。
  • 提供生产就绪的功能,例如运行状况检查、指标和外部化配置。
  • 几乎不需要生成代码,也不需要 XML 配置。

Apache Kafka 是一个发布-订阅消息系统。消息传递系统允许您在进程、应用程序和服务器之间发送消息。从广义上讲,Apache Kafka 是一个可以定义并进一步处理主题(主题可能是一个类别)的软件。应用程序可以连接到该系统并将消息传输到该主题。消息可以包含来自您个人博客上的任何事件的任何类型的信息,也可以是会触发任何其他事件的非常简单的文本消息。

例子:

先决条件

确保您已在本地计算机上安装 Apache Kafka。

步骤 1:

转到此链接https://start.spring.io/并创建一个 Spring Boot 项目。将以下依赖项添加到您的 Spring Boot 项目中。 

  • Apache Kafka 的 Spring

步骤 2:

现在让我们创建一个名为DemoController的控制器类。

DemoController.java
代码语言:javascript复制
// Java Program to Illustrate Controller Class

package com.amiya.kafka.apachekafkaproducer;

// Importing required classes
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.*;

// Annotation
@RestController

// Class
public class DemoController {

	// Autowiring Kafka Template
	@Autowired KafkaTemplate<String, String> kafkaTemplate;

	private static final String TOPIC = "NewTopic";

	// Publish messages using the GetMapping
	@GetMapping("/publish/{message}")
	public String publishMessage(@PathVariable("message")
								final String message)
	{

		// Sending the message
		kafkaTemplate.send(TOPIC, message);

		return "Published Successfully";
	}
}

第3步:

现在我们必须执行以下操作才能使用 Spring Boot 将消息发布到 Kafka 主题

  1. 运行 Apache Zookeeper 服务器
  2. 运行 Apache Kafka 服务器
  3. 监听来自新主题的消息

C:kafka>.binwindowszookeeper-server-start.bat .configzookeeper.properties

同样,使用此命令运行 Apache Kafka 服务器

代码语言:javascript复制
C:kafka>.binwindowskafka-server-start.bat .configserver.properties

运行以下命令来监听来自新主题的消息 

代码语言:javascript复制
C:kafka>.binwindowskafka-console-consumer.bat --bootstrap-server localhost:9092 --topic NewTopic --from-beginning

第4步:

现在运行您的 Spring Boot 应用程序。确保您已更改application.properties文件中的端口号

代码语言:javascript复制
server.port=8081

让我们在 ApacheKafkaProducerApplication 文件中运行 Spring boot 应用程序

第 5 步:

浏览此 URL 并在 /publish/ 后传递您的消息。

代码语言:javascript复制
http://localhost:8081/publish/demo00

您可以看到我们得到了“Published Successed”作为回应。并且实时您可以看到该消息也已发布到服务器上。消息流是实时的。 

同样,如果我们在此处传递了Hello World,您可以看到我们得到了“发布成功”作为回报。并且实时您可以看到该消息也已发布到服务器上。

Spring Boot Kafka 消费者示例

第 1 步:

创建一个 Spring Boot 项目。将“ Spring for Apache Kafka ”依赖项添加到您的 Spring Boot 项目中。 

第 2 步:

创建一个名为KafkaConfig的配置文件。以下是KafkaConfig.java文件的代码。

KafkaConfig.java
代码语言:javascript复制
// Java Program to Illustrate Kafka Configuration 

package com.amiya.kafka.apachekafkaconsumer.config; 

// Importing required classes 
import java.util.HashMap; 
import java.util.Map; 
import org.apache.kafka.clients.consumer.ConsumerConfig; 
import org.apache.kafka.common.serialization.StringDeserializer; 
import org.springframework.context.annotation.Bean; 
import org.springframework.context.annotation.Configuration; 
import org.springframework.kafka.annotation.EnableKafka; 
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; 
import org.springframework.kafka.core.ConsumerFactory; 
import org.springframework.kafka.core.DefaultKafkaConsumerFactory; 

// Annotations 
@EnableKafka
@Configuration

// Class 
public class KafkaConfig { 

	@Bean
	public ConsumerFactory<String, String> consumerFactory() 
	{ 

		// Creating a Map of string-object pairs 
		Map<String, Object> config = new HashMap<>(); 

		// Adding the Configuration 
		config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
				"127.0.0.1:9092"); 
		config.put(ConsumerConfig.GROUP_ID_CONFIG, 
				"group_id"); 
		config.put( 
			ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
			StringDeserializer.class); 
		config.put( 
			ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
			StringDeserializer.class); 

		return new DefaultKafkaConsumerFactory<>(config); 
	} 

	// Creating a Listener 
	public ConcurrentKafkaListenerContainerFactory 
	concurrentKafkaListenerContainerFactory() 
	{ 
		ConcurrentKafkaListenerContainerFactory< 
			String, String> factory 
			= new ConcurrentKafkaListenerContainerFactory<>(); 
		factory.setConsumerFactory(consumerFactory()); 
		return factory; 
	} 
}
第 3 步:

创建一个名为KafkaConsumer的Consumer文件

KafkaConsumer.java
代码语言:javascript复制
// Java Program to Illustrate Kafka Consumer 

package com.amiya.kafka.apachekafkaconsumer.consumer; 

// Importing required classes 
import org.springframework.kafka.annotation.KafkaListener; 
import org.springframework.stereotype.Component; 

@Component

// Class 
public class KafkaConsumer { 

	@KafkaListener(topics = "NewTopic", 
				groupId = "group_id") 

	// Method 
	public void
	consume(String message) 
	{ 
		// Print statement 
		System.out.println("message = "   message); 
	} 
}

第 4 步:

现在我们必须执行以下操作才能使用 Spring Boot 消费来自 Kafka 主题的消息

  • 运行 Apache Zookeeper 服务器
  • 运行 Apache Kafka 服务器
  • 从 Kafka 主题发送消息

使用此命令运行 Apache Zookeeper 服务器

代码语言:javascript复制
C:kafka>.binwindowszookeeper-server-start.bat .configzookeeper.properties

同样,使用此命令运行 Apache Kafka 服务器

代码语言:javascript复制
C:kafka>.binwindowskafka-server-start.bat .configserver.properties

运行以下命令从 Kafka Topics 发送消息

代码语言:javascript复制
C:kafka>.binwindowskafka-console- Producer.bat --broker-list localhost:9092 --topic NewTopic

第 5 步:

现在运行您的 Spring Boot 应用程序。确保您已更改application.properties文件中的端口号

代码语言:javascript复制
server.port=8081

让我们在 ApacheKafkaConsumerApplication 文件中运行 Spring boot 应用程序

输出:在输出中,您可以看到当您从 Kafka Topics 发送消息时,它会实时显示在控制台上。 

0 人点赞