SpringBoot连接kafka——JavaDemo

2023-10-06 11:47:23 浏览数 (1)

​一、SpringBoot与Kafka简介

  1. 定义

Spring Boot是一个用于快速构建基于Spring框架的Java应用程序的框架。Kafka是一种分布式流处理平台,用于实时传输和处理大规模数据。通过Spring Boot与Kafka的连接,可以轻松地在Spring应用程序中使用Kafka进行数据流处理。

  1. 背景和意义

随着大数据和实时数据处理需求的不断增长,Kafka作为一种分布式流处理平台,越来越受到开发者的青睐。Spring Boot作为一款快速、简便的Java开发框架,能够帮助开发者快速搭建应用程序。将Spring Boot与Kafka连接,可以使开发者更加便捷地在Spring应用程序中使用Kafka进行数据流处理。

二、SpringBoot连接Kafka的应用场景与操作步骤

应用场景

Spring Boot与Kafka的连接适用于多种应用场景,如实时数据流处理、日志收集、事件驱动型微服务等。以下是一些具体应用场景:

  • 实时数据流处理:通过连接Kafka和Spring Boot,可以实时处理和传输来自不同数据源的数据,并对其进行整合和分析。
  • 日志收集:Kafka可以用于收集各种日志数据,而Spring Boot则可以用于构建一个简单的日志收集系统,以方便对日志进行分析和处理。
  • 事件驱动型微服务:通过连接Kafka和Spring Boot,可以构建事件驱动型微服务架构,实现不同服务之间的解耦和通信。

操作步骤

1.服务器配置kafka,进入kafka bin目录下,修改server.properties

vi config/server.properties

主要修改地方:

修改运行地址地址,改为虚拟机ip

代码语言:javascript复制
# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092

# host.name=192.168.217.142 192.168.217.142为虚拟机服务ip,不修改java连接会报:
# java.net.ConnectException: Connection refused: no further information
listeners=PLAINTEXT://192.168.217.142:9092

修改kafka日志

log.dirs=/opt/kafka/kafka_2.12-2.5.0/logs

2.启动kafka

bin/kafka-server-start.sh config/server.properties

查看是否启动成功

ps ef|grep kafka

查看进程

jps -l

3.查看防火墙是否开启,关闭防火墙(否则客户端无法访问)

firewall-cmd --state 如果是running的话

关闭防火墙(centos7关闭firewalld)

systemctl stop firewalld.service

Windows使用telnet测试端口能否访问

telnet 192.168.217.142 9092 如果可以进入界面就说明可以访问

4、Springboot引入spring-kafka依赖。

代码语言:javascript复制
<dependency>  
    <groupId>org.springframework.kafka</groupId>  
    <artifactId>spring-kafka</artifactId>  
</dependency>

5.在application.yml中添加配置

代码语言:javascript复制
spring:  
  kafka:  
    bootstrap-servers: localhost:9092  
    producer:  
      retries: 0  
      batch-size: 16384

6.java编写消息生产者

代码语言:javascript复制
package com.qiming.kafka.chapter1;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class ProducerFastStart {
    // 主题名称-之前已经创建
    private static final String topic = "heima-per";
    // Kafka集群地址
    private static final String brokerList = "192.168.217.142:9092";

    public static void main(String[] args) {
        Properties properties = new Properties();
        // 设置key序列化器
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        //设置重试次数
        properties.put(ProducerConfig.RETRIES_CONFIG,10);

        // 设置值序列化器
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        //设置集群地址
        properties.put("bootstrap.servers", brokerList);

        KafkaProducer<String,String> producer = new KafkaProducer<String,String>(properties);
        ProducerRecord<String,String> producerRecord = new ProducerRecord<String,String>(topic,"kafka-demo","hello.kafka 23!!");

        try {
            producer.send(producerRecord);
        } catch (Exception e) {
            e.printStackTrace();
        }
        producer.close();
    }
}

7.java编写消息消费者

代码语言:javascript复制
package com.qiming.kafka.chapter1;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

/**
 * Kafka 消息消费者
 */
public class ConsumerFastStart {
    // Kafka集群地址
    private static final String brokerList = "192.168.217.142:9092";
    // 主题名称-之前已经创建
    private static final String topic = "heima-per";
    // 消费组
    private static
    final String groupId = "group.demo";

    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put("key.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("bootstrap.servers", brokerList);
        properties.put("group.id", groupId);

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
        consumer.subscribe(Collections.singletonList(topic));

        while (true) {
            ConsumerRecords<String, String> records =
                    consumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(record.value());
            }
        }
    }
}

8.测试

启动消费者,不断监听,再启动消息生产者,可以看到消费者日志打出

hello.kafka 23!!

注意:一定要先启动kafka

我正在参与2023腾讯技术创作特训营第二期有奖征文,瓜分万元奖池和键盘手表

0 人点赞