之前的文章你可以参考:
《我们在学习Flink的时候,到底在学习什么》
《我们在学习Spark的时候,到底在学习什么》
我在之前《Kafka源码阅读的一些小提示》写了一些关于Kafka源码阅读的注意事项。
本文会从一个小白的角度讲Kafka学习的整体方法,包括背景、核心概念、核心原理、源码阅读、实际应用等。
注意,本文只是一个学习路径,不会详细展开,各位读者需要根据自己的实际情况针对性的去学习其中的某一个部分。
Kafka的背景
Kafka是LinkedIn开发并开源的一套分布式的高性能消息引擎服务,后来被越来越多的公司应用在自己的系统中,可以说,截止目前为止Kafka是大数据时代数据管道技术的首选。在设计的时候,它就实现了高可靠、高吞吐、高可用和可伸缩,得益于这些特性,加上活跃的社区,Kafka成为了一个完备的分布式消息引擎解决方案。
Kafka在大数据领域扮演者举足轻重的角色:
- 消息系统:Kafka具备系统解耦、冗余存储、流量削峰、缓冲、异步通信、扩展性、可恢复性等强大的功能。
- 存储系统:Kafka 的消息持久化功能和多副本机制,我们可以把Kafka作为长期的数据存储系统来使用。
- 流式处理平台:Kafka还提供了一个完整的流式处理类库,比如窗口、连接、变换和聚合等各类操作,也是一个分布式流处理平台。
Kafka的入门
这部分你需要对消息引擎有基本的了解,并且知道对Kafka系统术语、Kafka角色定位、和版本变迁有足够的了解。
我这里列出了部分核心概念如下:
- 消息:Record。Kafka 是消息引擎嘛,这里的消息就是指 Kafka 处理的主要对象。
- 主题:Topic。主题是承载消息的逻辑容器,在实际使用中多用来区分具体的业务。
- 分区:Partition。一个有序不变的消息序列。每个主题下可以有多个分区。
- 消息位移:Offset。表示分区中每条消息的位置信息,是一个单调递增且不变的值。
- 副本:Replica。Kafka 中同一条消息能够被拷贝到多个地方以提供数据冗余,这些地方就是所谓的副本。副本还分为领导者副本和追随者副本,各自有不同的角色划分。副本是在分区层级下的,即每个分区可配置多个副本实现高可用。
- 生产者:Producer。向主题发布新消息的应用程序。
- 消费者:Consumer。从主题订阅新消息的应用程序。
- 消费者位移:Consumer Offset。表征消费者消费进度,每个消费者都有自己的消费者位移。
- 消费者组:Consumer Group。多个消费者实例共同组成的一个组,同时消费多个分区以实现高吞吐。
- 重平衡:Rebalance。消费者组内某个消费者实例挂掉后,其他消费者实例自动重新分配订阅主题分区的过程。Rebalance 是 Kafka 消费者端实现高可用的重要手段。
- ISR:ISR是In-Sync Replica的缩写,ISR集合表示的是目前可用且消息量与Leader相差不多的副本集合。
- HW:HW(HightWatermark,水位线)标记了一个特殊的offset,消费者处理消息的时候,HW之后的消息对于消费者是不可见的。HW也是由leader副本管理的。
- LEO:LEO(Log End Offset)是所有副本都会有的一个offset标记,它指向当前副本的最后一个消息的offset。
除此之外,在Kafka的每一个模块,我们都能看到更多更细节的概念。
Kafka的生产者和消费者
这部分也是我们编程的核心,你需要知道生产者和消费者之间的关系。生产者就是负责向 Kafka 发送消息的应用程序,你需要知道Kafka提供了哪些常用的接口和方法,并且对其中的参数配置有详细了解。
在生产者中有一个非常重要的参数需要你注意并了解他们的作用:
- acks
- max.request.size
- retries和retry.backoff.ms
具体的参数列表如下:
代码语言:javascript复制必选属性有3个:
bootstrap.servers:该属性指定broker的地址清单,地址的格式为host:port。清单里不需要包含所有的broker地址,生产者会从给定的broker里查询其他broker的信息。不过最少提供2个broker的信息,一旦其中一个宕机,生产者仍能连接到集群上。
key.serializer:生产者接口允许使用参数化类型,可以把Java对象作为键和值传broker,但是broker希望收到的消息的键和值都是字节数组,所以,必须提供将对象序列化成字节数组的序列化器。key.serializer必须设置为实现org.apache.kafka.common.serialization.Serializer的接口类,默认为
org.apache.kafka.common.serialization.StringSerializer,也可以实现自定义的序列化器。
value.serializer:同上。
可选参数:
acks:指定了必须要有多少个分区副本收到消息,生产者才会认为写入消息是成功的,这个参数对消息丢失的可能性有重大影响。
acks=0:生产者在写入消息之前不会等待任何来自服务器的响应,容易丢消息,但是吞吐量高。
acks=1:只要集群的首领节点收到消息,生产者会收到来自服务器的成功响应。如果消息无法到达首领节点(比如首领节点崩溃,新首领没有选举出来),生产者会收到一个错误响应,为了避免数据丢失,生产者会重发消息。不过,如果一个没有收到消息的节点成为新首领,消息还是会丢失。默认使用这个配置。
acks=all:只有当所有参与复制的节点都收到消息,生产者才会收到一个来自服务器的成功响应。延迟高。
buffer.memory:设置生产者内存缓冲区的大小,生产者用它缓冲要发送到服务器的消息。
max.block.ms:指定了在调用send()方法或者使用partitionsFor()方法获取元数据时生产者的阻塞时间。当生产者的发送缓冲区已满,或者没有可用的元数据时,这些方法就会阻塞。在阻塞时间达到max.block.ms时,生产者会抛出超时异常。
batch.size:当多个消息被发送同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。当批次内存被填满后,批次里的所有消息会被发送出去。
retries:指定生产者可以重发消息的次数。
receive.buffer.bytes和send.buffer.bytes:指定TCP socket接受和发送数据包的缓存区大小。如果它们被设置为-1,则使用操作系统的默认值。如果生产者或消费者处在不同的数据中心,那么可以适当增大这些值,因为跨数据中心的网络一般都有比较高的延迟和比较低的带宽。
linger.ms:指定了生产者在发送批次前等待更多消息加入批次的时间。
一个典型的生产者代码如下:
代码语言:javascript复制public class KafkaProducer {
public static final String brokerList = "localhost:9092";
public static final String topic = "topic-demo";
public static Properties initConfig(){
Properties props = new Properties();
props.put("bootstrap.servers", brokerList);
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("client.id", "producer.client.id.demo");
return props;
}
public static void main(String[] args) {
Properties props = initConfig();
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record =
new ProducerRecord<>(topic, "Hello, Kafka!");
try {
producer.send(record);
} catch (Exception e) {
e.printStackTrace();
}
}
}
与生产者对应的是消费者,应用程序可以通过 KafkaConsumer 来订阅主题,并从订阅的主题中拉取消息。
消费者(Consumer)负责订阅 Kafka 中的主题(Topic),并且从订阅的主题上拉取消息。与其他一些消息中间件不同的是:在 Kafka 的消费理念中还有一层消费组(Consumer Group)的概念,每个消费者都有一个对应的消费组。当消息发布到主题后,只会被投递给订阅它的每个消费组中的一个消费者。
同样的,消费者端也有很多非常重要的参数,你可以在ConsumerConfig这个类中找到,这里就不一一列举了。
一个典型的消费者代码如下:
代码语言:javascript复制
public class KafkaConsumer {
public static final String brokerList = "localhost:9092";
public static final String topic = "topic-demo";
public static final String groupId = "group.demo";
public static final AtomicBoolean isRunning = new AtomicBoolean(true);
public static Properties initConfig(){
Properties props = new Properties();
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("bootstrap.servers", brokerList);
props.put("group.id", groupId);
props.put("client.id", "consumer.client.id.demo");
return props;
}
public static void main(String[] args) {
Properties props = initConfig();
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));
try {
while (isRunning.get()) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.println("topic = " record.topic()
", partition = " record.partition()
", offset = " record.offset());
System.out.println("key = " record.key()
", value = " record.value());
//do something to process record.
}
}
} catch (Exception e) {
log.error("occur exception ", e);
} finally {
consumer.close();
}
}
}
Kafka中的核心原理
在这部分你需要了解Kafka的最核心的设计原理,主要包括:
- 存储机制
- 备份和副本机制
- 日志设计
- Controller控制器
- Rebalance
- 可靠性设计
- 延迟、死信、重试队列等
Kafka的运维和监控
Kafka自身提供非常强大的运维和监控工具,在这部分如果你的工作包括了线上Kafka集群的运营,那么你需要对这些工具非常了解。
包括:
- 主题管理
- 副本和消息管理
- 权限管理
- 常见的工具和脚本
- 跨集群备份
Kafka源码阅读
这部分你需要参考:《Kafka源码阅读的一些小提示》
Kafka的应用
通常我们使用Kafka大部分情况会搭配Spark的Flink使用。
针对和Spark的结合,你需要对下面这个连接器非常熟悉:
代码语言:javascript复制<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
</dependency>
针对和Flink的结合,你需要对下面这个连接器非常熟悉:
代码语言:javascript复制<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.8_2.10</artifactId>
</dependency>
Kafka的野心
Kafka还有一个模块:Kafka Stream。
Kafka Stream定位是轻量级的流计算类库。他的出现使得Kafka的定位从原来的分布式、分区、有备份的提交日志服务变成了完整的分布式消息引擎和流式计算处理引擎。
Kafka Stream 的特点如下:
- Kafka Stream 提供了一个非常简单而轻量的 Library,它可以非常方便地嵌入任意Java应用中,也可以任意方式打包和部署
- 除了 Kafka 外,无任何外部依赖
- 充分利用 Kafka 分区机制实现水平扩展和顺序性保证
- 通过可容错的 state store 实现高效的状态操作(如 windowed join 和aggregation)
- 支持正好一次处理语义
- 提供记录级的处理能力,从而实现毫秒级的低延迟
- 支持基于事件时间的窗口操作,并且可处理晚到的数据(late arrival of records)
- 同时提供底层的处理原语 Processor(类似于 Storm 的 spout 和 bolt),以及高层抽象的DSL(类似于 Spark 的 map/group/reduce)
Kafka Stream 作为流式处理类库,直接提供具体的类给开发者调用,整个应用的运行方式主要由开发者控制,方便使用和调试。
Kafka作为大数据领域最成熟、最完善的框架之一,仍然在高速迭代和演进中,是每个大数据开发者都必须掌握的框架。