前言
今天是端午节,首先祝大家端午安康!
最近好久没发文,感觉人都能变懒惰了,这次重新拾起学习消息队列kafka的决心,系统学习如何掌握分布式消息队列Kafka的用法,技多不压身,感兴趣的读者可以跟着一起学一学。
在我的上一篇有关kafka的文章一网打尽Kafka入门基础概念 对Kafka的基本概念以及其应用场景做了一个详细的介绍,作为三大消息中间件(RabbitMQ, RocketMQ和Kafka)之一, kafka具有广泛应用于大数据实时计算、分布式流处理等。要学习kafka毫无疑问第一步就是要搭建kafka消息中间件服务,然后学习它的基本用法,等掌握了kafka的大部分用法了,有时间和精力了再去研究它的源码。本文的用意就是就带大家快速上手kafka,为系统掌握kafka消息中间件的用法打好基础。
1 Kafka环境搭建
1.1 下载kafka tar包并上传到服务器
读者可在kafka的官网下载,目前kafka的tar包已经更新到3.2.0版本,不过笔者使用的是kafka的上一个版本3.1.0版本
使用FinalShell客户端工具登录自己的Linux服务器,打开一个终端会话,切换到安装目录(笔者是上传到/usr/local)
上传到kafka tar包到安装目录后执行解压命令
代码语言:javascript复制tar -xzf kafka_2.13-3.1.0.tgz
cd kafka_2.13-3.1.0
1.2 启动zookeeper服务
因为kafka依赖于zookeeper,zookeeper是kafka服务的分布式协调器,因此需要先启动 kafka_2.13-3.1.0
文件夹中的zookeeper服务
在kafka_2.13-3.1.0目录执行以下命令启动zookeeper服务
代码语言:javascript复制 ./bin/zookeeper-server-start.sh ./config/zookeeper.properties
控制台出现如下信息代表zookeeper服务启动成功,其连接端口为2181
除此之外还可以在控制台信息中看到zookeeper的NIO连接处理器配置为1个selector线程和4个work线程以及64kb缓存, zookeeper的配置信息在config/zookeeper.peroperties
文件中, 数据默认保存在/temp/zookeeper目录下。使用者也可以在zookeeper.peroperties
文件中修改zookeeper的配置项
注意:在以后版本中apache kafka将不再强制依赖zookeeper
1.3 启动kafka Broker服务
打开另一个终端会话并执行如下命令启动kafka broker服务:
代码语言:javascript复制./bin/kafka-server-start.sh ./config/server.properties
kafka服务启动成功后会在控制台中看到如下信息:
一旦kafka broker服务服务启动成功,我们就可以使用kafka了
2 Kafka 的终端用法
2.1 创建用于存储事件的Topic
kafka是一个分布式流处理平台让能垮多台机器读取、写入、存储和处理事件(事件也可以看作文档中的记录和消息)
典型的事件如支付交易、移动手机的位置更新、网上下单发货、来自物联网设备或医疗设备的传感器测量等等。这些事件被组织和存储在事件当中。简单来说,事件类似于文件系统中的文件夹,事件相当于文件夹中的文件。
在写入事件之前,你需要创建一个Topic。打开另一个终端会话执行如下命令:
代码语言:javascript复制./bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092
所有的kafka命令行工具都有附加的可选项,不带任何参数运行kafka-topics.sh
命令显示它的使用信息。例如你可以使用下面这种方式创建一个新的topic,并显示分区数量等详情。
./bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092
执行创建Topic命令后控制台显示的Topic描述信息如下:
- Topic名称:quickstart-events
- 分区数量:1
- 分区因子:1
- Topic配置信息: quickstart-events 分区序号0, Leader序号0; 副本序号0,Isr序号0
2.2 向Topic中写入事件
kafka客户端通过网络与kafka broker
服务端通信,用于读取或写入事件。服务端代理将以持久和容错的方式存储事件,只要你需要甚至可以永远保存。这里也相当于生产消息
运行控制台生产者客户端将一些事件写入主题。默认情况下,您输入的每一行都将导致一个单独的事件被写入主题。
代码语言:javascript复制./bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
> This is my first event
> This is my second event
在控制台每次生产一条消息之后需要按住回车键才能投递到topic中去
你可以通过按住Ctrl C键停止生产者客户端
2.3 读取事件
读取事件也就是消费消息。打开另一个终端会话,运行控制台消费者客户端来读取刚才创建的事件。
代码语言:javascript复制./bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
你一样可以通过按住Ctrl C键停止消费者客户端
可以随意尝试:例如,切换回生产者终端(上一步)来编写额外的事件,并查看事件如何立即显示在消费者终端中。
因为事件被持久地存储在Kafka中,它们可以被任意多的消费者多次读取。你可以通过打开另一个终端会话并再次运行上一个命令来轻松地验证这一点。
2.4 使用kafka连接导入导出数据流
你可能在关系数据库或传统消息传递系统等现有系统中拥有大量数据,以及许多已经使用这些系统的应用程序
Kafka连接允许你不断地从外部系统摄取数据到Kafka,反之亦然。它是一个可扩展的工具,运行连接器,连接器实现与外部系统交互的自定义逻辑。因此,将现有系统与Kafka集成是非常容易的。为了使这个过程更加容易,有数百个这样的连接器可供使用。
在这个快速入门中,我们将看到如何使用简单的连接器来运行Kafka Connect,将数据从一个文件导入到一个Kafka Topic中,并将数据从一个Kafka Topic导出到一个文件中。
首先,确保添加connect-file-3.2.0.jar
这个jar包到连接器工作配置中的plugin.path
属性中。在这个快速入门中,我们使用相对路径并将连接器的包视作一个超级Jar包, 它会在快速启动命令从安装目录中运行时跑起来。然而必须注意,生产环境部署必须优先使用绝对路径。plugin.path
属性的示例配置如下:
plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors
编辑config/connect-standalone.properties
属性文件,添加plugin.path
属性配置
echo "plugin.path=lib/connect-file-3.2.0.jar"
添加种子数据:
代码语言:javascript复制echo -e "foonbar" > test.txt
windows系统中执行:
代码语言:javascript复制> echo foo> test.txt
> echo bar>> test.txt
接下来我们在单机模式下下启动两个连接,单机模式意味着运行的是单一的、本地的和固定的进程。我们提供的了三个配置文件作为参数,第一个是kafka 连接进程的常用配置,包括连接Kafka的broker和数据的序列化格式。其余的配置文件分别指定要创建的连接器。这些文件包括惟一的连接器名称、要实例化的连接器类和连接器所需的任何其他配置。
代码语言:javascript复制> ./bin/connect-standalone.sh ./config/connect-standalone.properties ./config/connect-file-source.properties ./config/connect-file-sink.properties
这些Kafka配置示例文件文件,使用你之前启动的默认本地集群配置,并创建两个连接器: 第一个是源连接器,它从输入文件中读取消息,并生成每个消息到一个Kafka topic;第二个是sink连接器,它从Kafka topic中读取消息,并在输出文件中生成一行消息。
启动过程中你会看到一系列的日志消息,包括表示kafka正在被实例化的日志。一旦kafka线程启动成功,source Connect将会从test.txt文件中逐行读取信息并生产到命名为connect-test的 topic中,同时sink connect会从connect-test topic中读取消息并写入到test.sink.txt文件中,我们可以通过测试输出文件的内容验证数据已经投递到了整个管道。
代码语言:javascript复制> more test.sink.txt
foo
bar
注意:数据将被存储到kafka的话题connect-test中,所以我们也可以运行kafka-console-consumer.sh
查看存储在topic中的数据(或者使用自定义消费者代码处理存储在topic中的数据)
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}
接器会持续处理数据,所以我们可以添加数据到文件中,并通过管道查看数据的移动
代码语言:javascript复制> echo Another line>> test.txt
读者应该在控制台消费者输出和接收器文件中看到这一行。
2.5 使用kafka Streams处理事件
一旦数据已事件的形式存储在kafka中,你就可以使用Java或Scale语言支持的Kafka Streams客户端处理数据。它允许你实现关键任务实时应用和微服务,其中输入或输出数据存储在Kafka Topic中
Kafka Streams结合了在客户端编写和部署标准Java和Scala应用程序的简单性,以及Kafka的服务器端集群技术的优势,使这些应用程序具有高度的可伸缩性、弹性、容错和分布式。该库支持恰好一次处理、有状态操作和聚合、窗口、连接、基于事件时间的处理等等。
为了给读者带来体验,我们以一个实现单词统计的算法为例:
代码语言:javascript复制// Serializers/deserializers (serde) for String and Long types
final Serde<String> stringSerde = Serdes.String();
final Serde<Long> longSerde = Serdes.Long();
// Construct a `KStream` from the input topic "streams-plaintext-input", where message values
// represent lines of text (for the sake of this example, we ignore whatever may be stored
// in the message keys).
KStream<String, String> textLines = builder.stream(
"streams-plaintext-input",
Consumed.with(stringSerde, stringSerde)
);
KTable<String, Long> wordCounts = textLines
// Split each text line, by whitespace, into words.
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\W ")))
// Group the text words as message keys
.groupBy((key, value) -> value)
// Count the occurrences of each word (message key).
.count();
// Store the running counts as a changelog stream to the output topic.
wordCounts.toStream().to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));
2.6 停用Kafka服务
我们来学习一下当我们需要停用kafka服务的时候如何来停止与kafka相关的服务
- 按住
Ctrl C
停用生产者和消费者控制台 - 按住
Ctrl C
停用kafka broker服务 - 按住
Ctrl C
停用zookeeper服务
如果你进一步想删除本地kafka环境包括任何你创建的Event中的数据,执行一下命令:
代码语言:javascript复制 rm -rf /tmp/kafka-logs /tmp/zookeeper
3 Kafka常用API
3.1 生产者API
生产者API允许应用程序在以数据流的形式发送数据到Kafka集群中的Topic中。
在使用生产者API之前,你需要在Maven项目的pom.xml文件中引入如下maven依赖:
代码语言:javascript复制<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.0.0</version>
</dependency>
kafka消息生产者实现类为KafkaProducer<K,V>
, 它是一个发布消息到kafka集群的kafka客户端,同时它是线程安全的,在多个线程中使用同一个KafkaProducer
实例比使用多个KafkaProducer
实例通常生产消息的速度更快。
KafkaProducer类构方法
KafkaProducer类为生产者实现类,我们使用这个类来想kafka的topic中投递消息
- KafkaProducer(Map<String, Object> configs)
- KafkaProducer(Map<String, Object> configs, SerializerkeySerializer, SerializervalueSerializer)
- KafkaProducer(Properties properties)
- KafkaProducer(Properties properties, SerializerkeySerializer, SerializervalueSerializer)
以上参数configs和properties均为kafka生产者的元数据配置信息,详情可通过配置信息类org.apache.kafka.clients.producer.ProducerConfig中的静态属性查看
keySerializer和valueSerializer为key和value的序列化器,org.apache.kafka.common.serialization.StringSerializer为常用的序列化器
KafkaProducer类实例方法
- void abortTransanction(): 抛弃正在进行中的事务
- void beginTransanction(): 开启事务
- void close():关闭生产者
- void close(Duration timeout): 超时后关闭生产者
- void commitTransaction(): 提交正在进行中的事务
- void flush(): 执行这个方法会立即将缓存的消息投递到topic中
- void initTransanction(): 这个方法需要在设置好事务ID之后的任意方法之前调用
- Map<MetricName, ? extends Metric> metrics:获取生产者监控信息
- ListpartitionsFor(String topic): 根据topic获取分区信息
- Futuresend(ProducerRecord<K,V> record): 投递消息到topic中
- Futuresend(ProducerRecord<K,V> record, Callback callback): 带确认回调的投递消息方法
这里只列举了消息生产者KafkaProducer
类部分常用的实例方法,更多实例方法请参考KafkaProducer
类的官方API文档
https://kafka.apache.org/32/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html
ProducerRecord类构造方法
ProducerRecord类为投递的消息记录实体,通过它将键值对的数据与话题、分区等绑定
- producerRecord(String topic, Integer partition, Long timestamp, K key, V value)
- producerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterableheaders)
- producerRecord(String topic, Integer partition, K key, V value)
- producerRecord(String topic, Integer partition, K key, V value, Iterableheaders)
- producerRecord(String topic, K key, V value)
- producerRecord(String topic, V value)
以上参数:topic为话题名称, partition为分区数, timestamp为时间戳, K为键,V为值
KafkaProducer类的用法
下面是一个使用生产者发送记录的简单示例,该记录使用包含连续数字的字符串作为key/value键值对。
代码语言:javascript复制 // 构造生产者配置属性
Properties props = new Properties();
// 配置kafka服务地址
props.put("bootstrap.servers", "localhost:9092");
// 配置kafka确认机制acks
props.put("acks", "all");
// 配置重试次数
props.put("retries", 0);
props.put("linger.ms", 1);
// 配置key序列化器
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 配置value序列化器
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 构造生产者KafkaProducer实例
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 100; i )
// 投递消息到名称为my-topic的话题中
producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));
// 关闭producer实例
producer.close();
3.2 消费者API
KafkaConsumer类构造方法
KafkaConsumer类为kafka的消费者实现类,通过它可以实现消息的消费处理。
- KafkaConsumer(Map<String, Object> configs);
- KafkaConsume(Map<String, Object> configs, DeserializerkeyDeserializer, DeservializervalueDeserializer);
- KafkaConsumer(Properties properties);
- KafkaConsumer(Properties properties, DeserializerkeyDeserializer, DeservializervalueDeserializer);
以上参数:configs和properties为kafka消费者的元数据配置信息, 配置详情可通过org.apache.kafka.clients.consumer.ConsumerConfig类中的静态属性变量查看
keyDeserializer为键反序列化器,Deservializer为值反序列化器
常用的反序列化器为org.apache.kafka.common.serialization.StringDeserializer
KafkaConsumer类实例常用方法
- Setassignment(): 获取分配给当前Consumer的话题分区列表;
- void assign(TopicPartition> partitions): 给当前KafkaConsumer指定话题分区列表;
- void subscrib(Collectiontopics): 订阅话题列表;
- void subscrib(Patter patter): 订阅匹配正则表达式的Topic
- ConsumerRecords<K,V> poll(Duration duration): 从topic拉取消费者投递生产者投递过来的消息, duration为拉取的时间间隔
- void commitSync(): 异步提交,偏移量offset会置为上次poll消费消息后的偏移量
KafkaConsumer
类的更多实例方法请参照此类的官方API文档
https://kafka.apache.org/32/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html
KafkaConsumer类消费消息的用法
- 在主线程中消费消息
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "test");
props.setProperty("enable.auto.commit", "true");
props.setProperty("auto.commit.interval.ms", "1000");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
上面这种方式让消费消息的逻辑在程序的主线程中运行,是同步阻塞的。
2) 以独立线程的方式消费消息
代码语言:javascript复制public class KafkaConsumerRunner implements Runnable {
private final AtomicBoolean closed = new AtomicBoolean(false);
private final KafkaConsumer consumer;
public KafkaConsumerRunner(KafkaConsumer consumer) {
this.consumer = consumer;
}
@Override
public void run() {
try {
consumer.subscribe(Arrays.asList("topic"));
while (!closed.get()) {
ConsumerRecords records = consumer.poll(Duration.ofMillis(10000));
// Handle new records
}
} catch (WakeupException e) {
// Ignore exception if closing
if (!closed.get()) throw e;
} finally {
consumer.close();
}
}
// Shutdown hook which can be called from a separate thread
public void shutdown() {
closed.set(true);
consumer.wakeup();
}
}
上面这张方式让消费消息的逻辑在一个独立线程中运行,是异步非阻塞的。这种方式需要自定义一个实现Runnable接口的线程类,并在其构造方法中传入KafkaConsumer
实例参数, 在run方法中调用KafkaConsumer实例进行订阅话题,并通过拉去话题中的消息进行消费。
4 写在最后
本文介绍了Kafka环境的搭建,以及如何在控制台创建Topic,使用生产者发送消息和使用消费者消费生产者投递过来的消息。并简要介绍了如何在Java项目中使用KafkaProducer类发送消息和使用KafkaConsumer类消费自己订阅的Topic消息。下一篇文章,笔者再系统介绍Kafka生产者、消费者的配置以及另一个更强大的类KafkaAdminClient
的用法,敬请期待!
今天恰逢农历端午节,再次祝大家端午安康。适当学习的时候不要忘了与家人团聚品尝美食和放松休闲为主。远创不易,希望大家看到这里都能随手点个【在看】。
5 参考链接
[1] Kafka Quick Start
https://kafka.apache.org/documentation/#quickstart
[2] KafkaProducer API
https://kafka.apache.org/32/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html
[3] KafkaConsumer API
https://kafka.apache.org/32/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html
往期原创文章
【1】 VueblogServer项目短信验证码登录功能前端实现
【2】巧用Druid数据源实现数据库连接密码的加密解密
【3】手把手带你在集成SpringSecurity的SpringBoot应用中添加短信验证码登录认证功能