当Spring邂逅Kafka,有趣的知识增加了

2022-03-08 14:44:04 浏览数 (1)

theme: cyanosis

0.阅读完本文你将会学到

  • 一些linux的常用命令
  • 如何在linux上安装JDK、ZooKeeper、Kafka
  • 轻量级的Spring与Kafka的整合

Kafka起初是由LinkedIn公司采用Scala语言开发的一个多分区、多副本且基于ZooKeeper协调的分布式消息系统,现已被捐献给Apache基金会。

目前Kafka已经定位为一个分布式流式处理平台,它以高吞吐、可持续化、可水平扩展、支持流数据处理等多种特性而被广泛使用。

关于Kafka名字的由来,另有一段佳话。如果你的记忆力还不错的话,应该会记得高中有一篇课文叫做《变形记》,它的作者正是奥地利小说家Franz Kafka。笔者本人也非常喜欢他的《城堡》一作。而Apache Kafka的作者大学的时候也非常喜欢Franz Kafka,所以就将这个系统命名为Kafka。

现在让我们打开电脑,一起实践吧!

如果你的电脑上已经安装了Kafka,可以跳过第一部分,直接进入第二部分哦。

1. Kafka的安装与设置

安装Kafka之前,我们需要安装Java以及ZooKeeper。

1.1 安装JDK

1. 确认系统是否已安装过Java

安装JDK之前我们先确认下系统是否已安装过JDK,如下操作:

代码语言:javascript复制
rem -qa | grep java
rem -qa | grep jdk
rem -qa | grep gcj

如果没有任何信息,则表示系统没有安装过Java。

如果想要卸载已经安装过的JDK,则可以执行下方的命令。

代码语言:javascript复制
rpm -qa | grep java | xargs rpm -e --nodeps

2. 安装Java

下面开始安装Java,这里以1.8为例。

代码语言:javascript复制
yum list java-1.8*

通过这个命令我们可以看见Java 1.8版本的所有文件。

代码语言:javascript复制
java-1.8.0-openjdk.x86_64                                                                                                                                  
java-1.8.0-openjdk-accessibility.x86_64                                                                                                                    
java-1.8.0-openjdk-demo.x86_64                                                                                                                             
java-1.8.0-openjdk-devel.x86_64                                                                                                                            
java-1.8.0-openjdk-headless.x86_64                                                                                                                         
java-1.8.0-openjdk-headless-slowdebug.x86_64                                                                                                               
java-1.8.0-openjdk-javadoc.noarch                                                                                                                          
java-1.8.0-openjdk-javadoc-zip.noarch                                                                                                                      
java-1.8.0-openjdk-slowdebug.x86_64                                                                                                                        
java-1.8.0-openjdk-src.x86_64        

然后我们可以通过这个命令安装Java 1.8版本的所有文件。

代码语言:javascript复制
yum install java-1.8.0-openjdk* -y

当控制台返回Complete之后,显示Java已经安装成功。

3. 确认Java安装成功

使用下面这个命令进行确认

代码语言:javascript复制
java -version

结果显示如下,表示已安装成功。

使用yum安装的时候,环境变量就自动配好了。

代码语言:javascript复制
openjdk version "1.8.0_312"
OpenJDK Runtime Environment (build 1.8.0_312-b07)
OpenJDK 64-Bit Server VM (build 25.312-b07, mixed mode)

1.2 安装ZooKeeper

1. 创建目录data并且下载3.7.0版本的ZooKeeper

代码语言:javascript复制
mkdir /data
cd /data
wget https://mirrors.bfsu.edu.cn/apache/zookeeper/zookeeper-3.7.0/apache-zookeeper-3.7.0-bin.tar.gz

2. 解压

代码语言:javascript复制
tar -zxvf apache-zookeeper-3.7.0-bin.tar.gz

3. 修改配置文件

代码语言:javascript复制
// 进入配置文件目录
cd apache-zookeeper-3.7.0/conf

// 将zoo_sample.cfg这个文件复制为zoo.cfg 
cp zoo_sample.cfg  zoo.cfg

// 修改配置文件
vi zoo.cfg

输入vi zoo.cfg之后,需要按i进入insert模式才能做修改。修改完毕请先按ESC退出insert模式,进入命令行模式,再按连续两个大写ZZ进行保存并退出。

dataDir=/tmp/zookeeper修改成dataDir=/data/apache-zookeeper-3.7.0-bin/data

3. 创建对应的data目录

代码语言:javascript复制
mkdir /data/apache-zookeeper-3.7.0-bin/data

4. 启动ZooKeeper

进入ZooKeeper的bin目录并且启动服务

代码语言:javascript复制
cd /data/apache-zookeeper-3.7.0-bin/bin
./zkServer.sh start

Zookeeper成功后将会出现下面信息:

代码语言:javascript复制
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /data/apache-zookeeper-3.7.0-bin/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED

下面是其他几个常用命令

代码语言:javascript复制
// 停止
./zkServer.sh stop

// 重启
./zkServer.sh restart

// 查看状态
./zkServer.sh status

1.3 安装kafka

1. 下载版本为3.0.0的kakfa

代码语言:javascript复制
cd /data
wget https://mirrors.bfsu.edu.cn/apache/kafka/3.0.0/kafka_2.13-3.0.0.tgz

2. 解压

代码语言:javascript复制
tar -zxvf kafka_2.13-3.0.0.tgz kafka_2.13-3.0.0

3. 启动

config/server.properties中的zookeeper.connect的默认地址是localhost:2181,如果你的Zookeeper安装在本机,保持默认即可。

代码语言:javascript复制
cd kafka_2.13-3.0.0.tgz kafka_2.13-3.0.0

// 前台启动:bin/kafka-server-start.sh config/server.properties
// 下面的命令行是后台启动,不会像前台启动一直打印日记。
bin/kafka-server-start.sh -daemon config/server.properties

现在你已经成功启动了Kafka,恭喜你终于迈出了第一步!

2. Spring与Kafka的整合

2.1 配置pom

我们需要在pom.xml里面添加Kafka的依赖:

代码语言:javascript复制
<dependency> 
    <groupId>org.springframework.kafka</groupId> 
    <artifactId>spring-kafka</artifactId> 
    <version>2.7.2</version> 
</dependency>

文中的demo应用将是一个Spring Boot的应用,你可以在这里方便快捷地创建一个Spring Boot的应用。

2.2 配置Topic

我们先来回顾下什么是topic:

在 Kafka 中,使用一个类别属性来划分数据的所属类,划分数据的这个类称为 topic 。如果把 Kafka 看做为一个数据库, topic 可以理解为数据库中的一张表, topic 的名字即为表名。

之前我们可以通过命令行创建Topic

代码语言:javascript复制
bin/kafka-topics.sh --create  --zookeeper localhost:2181  --replication-factor 1 --partitions 1  --topic mytopic

现在由于有了Kafka中AdminClient的引入,我们可以在程序中创建topic。 我们需要添加KafkaAdmin这个bean,它可以自动地带入NewTopic的所有bean的topic。

代码语言:javascript复制
@Configuration
public class KafkaTopicConfig {

    @Value(value = "${kafka.bootstrapAddress}")
    private String bootstrapAddress;

    @Bean
    public KafkaAdmin kafkaAdmin() {
        Map<String, Object> configs = new HashMap<>();
        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        return new KafkaAdmin(configs);
    }

    @Bean
    public NewTopic topic1() {
        return new NewTopic("jayxu", 1, (short) 1);
    }
}

2.3 生产消息

为了创建消息,我们首先需要配置一个ProducerFactory。ProducerFactory设置了创建Kafka Producer实例的策略。

然后我们需要一个KafkaTemplate,它包装了一个Producer实例,并提供了向Kafka Topic发送消息的方法。

Producer实例是线程安全的。在整个应用环境中使用单例会有更高的性能。KakfaTemplate实例也是线程安全的,建议使用一个实例。

2.3.1 Producer配置
代码语言:javascript复制
@Configuration
public class KafkaProducerConfig {

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(
                ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                bootstrapAddress);
        configProps.put(
                ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class);
        configProps.put(
                ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}
2.3.2 发布消息

我们可以使用KafkaTemplate来发送消息。

代码语言:javascript复制
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

public void sendMessage(String msg) {
    kafkaTemplate.send(topicName, msg);
}

sendAPI返回一个ListenableFuture对象。如果我们想阻止发送线程,并获得关于已发送消息的结果,我们可以调用ListenableFuture对象的get API。该线程将等待结果,但它会减慢producer的速度。

Kafka是一个快速的流处理平台。因此,最好是异步处理结果,这样后续的消息就不会等待前一个消息的结果了。

我们可以通过回调来做到这一点:

代码语言:javascript复制
public void sendMessage(String message) {

    ListenableFuture<SendResult<String, String>> future =
            kafkaTemplate.send(topicName, message);

    future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {

        @Override
        public void onSuccess(SendResult<String, String> result) {
            System.out.println("Sent message=["   message  
                    "] with offset=["   result.getRecordMetadata().offset()   "]");
        }
        @Override
        public void onFailure(Throwable ex) {
            System.out.println("Unable to send message=["
                      message   "] due to : "   ex.getMessage());
        }
    });
}

2.4 消费消息

2.4.1 Consumer配置

为了消费消息,我们需要配置一个ConsumerFactory和一个KafkaListenerContainerFactory。一旦这些bean在Spring bean工厂中可用,就可以使用@KafkaListener注解来配置基于POJO的consumer。

配置类中需要有@EnableKafka注解,以便在Spring管理的bean上检测@KafkaListener注解。

代码语言:javascript复制
@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(
                ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                bootstrapAddress);
        props.put(
                ConsumerConfig.GROUP_ID_CONFIG,
                groupId);
        props.put(
                ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class);
        props.put(
                ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String>
    kafkaListenerContainerFactory() {

        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}
2.4.2 消费消息
代码语言:javascript复制
@KafkaListener(topics = "topicName", groupId = "foo")
public void listenGroupFoo(String message) {
    System.out.println("Received Message in group foo: "   message);
}

我们可以为一个topic实现多个listener,每个都有不同的group ID。此外,一个consumer可以监听来自不同topic的消息。

代码语言:javascript复制
@KafkaListener(topics = "topic1, topic2", groupId = "foo")

Spring还支持使用监听器中的@Header注解来检索一个或多个消息头。

代码语言:javascript复制
@KafkaListener(topics = "topicName")
public void listenWithHeaders(
        @Payload String message,
        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
    System.out.println(
            "Received Message: "   message"
                      "from partition: "   partition);
}
2.4.3 消费特定分区的信息

注意,我们创建的话题jayxu只有一个分区。

然而,对于一个有多个分区的topic,@KafkaListener可以明确地订阅一个有initial offset的topic的特定分区。

代码语言:javascript复制
@KafkaListener(
        topicPartitions = @TopicPartition(topic = "topicName",
                partitionOffsets = {
                        @PartitionOffset(partition = "0", initialOffset = "0"),
                        @PartitionOffset(partition = "3", initialOffset = "0")}),
        containerFactory = "partitionsKafkaListenerContainerFactory")
public void listenToPartition(
        @Payload String message,
        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
    System.out.println(
            "Received Message: "   message"
                      "from partition: "   partition);
}

由于在这个监听器中,initialOffset被设置为0,所以每次初始化这个监听器时,所有之前消耗的0和3分区的消息都会被重新消费。

如果我们不需要设置offset,我们可以使用@TopicPartition注解的partitions属性,只设置没有offset的分区。

代码语言:javascript复制
@KafkaListener(topicPartitions = @TopicPartition(topic = "topicName", partitions = { "0", "1" }))
2.4.4 为监听器添加消息过滤器

我们可以通过添加一个自定义的过滤器来配置监听器来消费特定类型的消息。这可以通过给KafkaListenerContainerFactory设置一个RecordFilterStrategy来完成。

代码语言:javascript复制
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
filterKafkaListenerContainerFactory() {

    ConcurrentKafkaListenerContainerFactory<String, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setRecordFilterStrategy(
            record -> record.value().contains("World"));
    return factory;
}

然后我们可以配置一个监听器来使用这个容器工厂。

代码语言:javascript复制
@KafkaListener(
        topics = "topicName",
        containerFactory = "filterKafkaListenerContainerFactory")
public void listenWithFilter(String message) {
    System.out.println("Received Message in filtered listener: "   message);
}

在这个监听器中,所有符合过滤器的信息都将被丢弃。

2.5 自定义消息转换器

到目前为止,我们只涵盖了发送和接收字符串的消息。然而,我们也可以发送和接收自定义的Java对象。这需要在ProducerFactory中配置适当的序列化器,在ConsumerFactory中配置解序列化器。

让我们看看一个简单的bean类,我们将把它作为消息发送。

代码语言:javascript复制
public class Greeting {

    private String msg;
    private String name;

    // standard getters, setters and constructor
}
2.5.1 生产自定义消息

在这个例子中,我们将使用JsonSerializer。

让我们看看ProducerFactory和KafkaTemplate的代码。

代码语言:javascript复制
@Bean
public ProducerFactory<String, Greeting> greetingProducerFactory() {
    // ...
    configProps.put(
            ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
            JsonSerializer.class);
    return new DefaultKafkaProducerFactory<>(configProps);
}

@Bean
public KafkaTemplate<String, Greeting> greetingKafkaTemplate() {
    return new KafkaTemplate<>(greetingProducerFactory());
}

我们可以使用这个新的KafkaTemplate来发送Greeting信息。

代码语言:javascript复制
kafkaTemplate.send(topicName, new Greeting("Hello", "World"));
2.5.2 消费自定义消息

同样地,让我们修改ConsumerFactory和KafkaListenerContainerFactory,以正确地反序列化Greeting消息。

代码语言:javascript复制
@Bean
public ConsumerFactory<String, Greeting> greetingConsumerFactory() {
    // ...
    return new DefaultKafkaConsumerFactory<>(
            props,
            new StringDeserializer(),
            new JsonDeserializer<>(Greeting.class));
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Greeting>
greetingKafkaListenerContainerFactory() {

    ConcurrentKafkaListenerContainerFactory<String, Greeting> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(greetingConsumerFactory());
    return factory;
}

spring-kafka的JSON序列化器和反序列化器使用Jackson库,这也是spring-kafka项目的可选Maven依赖。

所以,让我们把它添加到我们的pom.xml中。

代码语言:javascript复制
<dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.9.7</version> </dependency>

建议不要使用Jackson的最新版本,而是使用spring-kafka的pom.xml中加入的版本。

最后,我们需要写一个监听器来消费Greeting消息。

代码语言:javascript复制
@KafkaListener(
        topics = "topicName",
        containerFactory = "greetingKafkaListenerContainerFactory")
public void greetingListener(Greeting greeting) {
    // process greeting message
}

3. 总结

在这篇文章中,我们介绍了如何安装Kafka以及Spring支持Apache Kafka的基本情况。我们简单学习了一下用于发送和接收消息的类。

在运行代码之前,请确保Kafka服务器正在运行,并且topic是手动创建的。

0 人点赞