Kafka 架构中 ZooKeeper 以怎样的形式存在?

2021-03-10 14:38:03 浏览数 (1)

Kafka 运行环境还需要涉及 ZooKeeper,Kafka 和 ZooKeeper 都是运行在 JVM 之上的服务。但是Kafka架构中 ZooKeeper 以怎样的形式存在?

  • Broker 在 ZooKeeper 中的注册
  • Topic 在 ZooKeeper 中的注册
  • Consumer 在 ZooKeeper 中的注册
  • Consumer 负载均衡
  • 记录消费进度 Offset
  • 记录 Partition 与 Consumer 的关系

ZooKeeper 是一个分布式、开放源码的分布式应用程序协调服务,是 Google 的 Chubby 开源实现。分布式应用程序可以基于它实现统一命名服务、状态同步服务、集群管理、分布式应用配置项的管理等工作。

在基于 Kafka 的分布式消息队列中,ZooKeeper 的作用有 Broker 注册、Topic 注册、Producer 和 Consumer 负载均衡、维护 Partition 与 Consumer 的关系、记录消息消费的进度以及 Consumer 注册等。

1.Broker 在 ZooKeeper 中的注册

为了便于大家理解,我首先解释下“注册”一词。ZooKeeper 是一个共享配置中心,我们可以将一些信息存放入其中,比如 Broker 信息,本质上就是存放一个文件目录。这个配置中心是共享的,分布式系统的各个节点都可以从配置中心访问到相关信息。同时,ZooKeeper 还具有 Watch 机制(Raft 算法),一旦注册信息发生变化,比如某个注册的 Broker 下线,ZooKeeper 就会删除与之相关的注册信息,其它节点可以通过 Watch 机制监听到这一变化,进而做出响应。

言归正传,Broker 注册,也就是 Kafka 节点注册,本质上就是在 ZooKeeper 中创建一个专属的目录(又称为节点),其路径为 / brokers

在专属节点创建好后,Kafka 会将该 Broker 相关的信息存入其中,包括 broker.name 、端口号。

需要特别说明的是,Broker 在 ZooKeeper 中注册的节点是“临时节点”,一旦 Broker 故障下线,ZooKeeper 就会将该节点删除。同时,可以基于 Watch 机制监听到这一删除事件,进而做出响应(如负载均衡)。

2.Topic 在 ZooKeeper 中的注册

在 Kafka 中,所有 Topic 与 Broker 的对应关系都由 ZooKeeper 来维护,在 ZooKeeper 中,通过建立专属的节点来存储这些信息,其路径为 。

代码语言:javascript复制
/brokers/topics/{topic_name}

前面说过,为了保障数据的可靠性,每个 Topic 的 Partition 实际上是存在备份的,并且备份的数量由 Kafka 机制中的 Replicas 来控制。那么问题来了,如下图所示,假设某个 TopicA 被分为 2 个 Partition,并且存在两个备份,由于这 2 个 Partition(1-2)被分布在不同的 Broker 上,同一个 Partiton 与其备份不能(也不应该)存储于同一个 Broker 上。以 Partition1 为例,假设它被存储于 Broker2,其对应的备份分别存储于 Broker1 和 Broker4,有了备份,可靠性得到保障,但数据一致性却是个问题。

为了保障数据的一致性,ZooKeeper 机制得以引入。基于 ZooKeeper,Kafka 为每一个 Partition 找一个节点作为 Leader,其余备份作为 Follower;接续上图的例子,就 TopicA 的 Partition1 而言,如果位于 Broker2(Kafka 节点)上的 Partition1 为 Leader,那么位于 Broker1 和 Broker4 上面的 Partition1 就充当 Follower,则有下图:

基于上图的架构,当 Producer Push 的消息写入 Partition(分区)时,作为 Leader 的 Broker(Kafka 节点)会将消息写入自己的分区,同时还会将此消息复制到各个 Follower,实现同步。如果某个 Follower 挂掉,Leader 会再找一个替代并同步消息;如果 Leader 挂了,Follower 们会选举出一个新的 Leader 替代,继续业务,这些都是由 ZooKeeper 完成的。

3.Consumer 在 ZooKeeper 中的注册

3.1.Consumer Group 注册

与 Broker、Topic 注册类似,Consumer Group 注册本质上也是在 ZooKeeper 中创建专属的节点,以记录相关信息,其路径为 /consumers/{group_id}

这里补充一点,在 ZooKeeper 中,/consumers/{group_id} 虽然被称为节点,但本质上是一个目录。既然是目录,在记录信息时,就可以根据信息的不同,进一步创建子目录(子节点),分别记录不同类别的信息。对于 Consumer Group 而言,有三类信息需要记录,因此,/consumers/{group_id} 下还有三个子目录,如下所示。

  • ids:Consumer Group 中有多个 Consumer,ids 用于记录这些 Consumer;
  • owners:记录该 Consumer Group 可消费的 Topic 信息;
  • offsets:记录 owners 中每个 Topic 的所有 Partition 的 Offset。

3.2.Consumer 注册

原理同 Consumer Group 注册,不过需要注意的是,其节点路径比较特殊,需在路径 / consumers/{group_id}/ids 下创建专属子节点,它是临时的。比如,某 Consumer 的临时节点路径为

代码语言:javascript复制
/ consumers/{group_id}/ids/my_consumer_for_test-1223234-fdfv1233df23

3.3.负载均衡

通过前面的学习,我们知道,对于一条消息,订阅了它的 Consumer Group 中只能有一个 Consumer 消费它。那么就存在一个问题:一个 Consumer Group 中有多个 Consumer,如何让它们尽可能均匀地消费订阅的消息呢(也就是负载均衡)?这里不讨论实现细节,但要实现负载均衡,实时获取 Consumer 的数量显然是必要的,通过 Watch 机制监听 / consumers/{group_id}/ids 下子节点的事件便可实现。

3.4.Producers 负载均衡

前面已经介绍过,为了负载均衡和避免连锁反应,Kafka 中,同一个 Topic 的 Partition 会尽量分散到不同的 Broker 上。而 Producers 则根据指定的 Topic 将消息 Push 到相应的 Partition,那么,如何将消息均衡地 Push 到各个 Partition 呢?这便是 Producers 负载均衡的问题。

Producers 启动后同样也要进行注册(依然是创建一个专属的临时节点),为了负载均衡,Producers 会通过 Watcher 机制监听 Brokers 注册节点的变化。一旦 Brokers 发生变化,如增加、减少,Producers 可以收到通知并更新自己记录的 Broker 列表 。此外,基于 ZooKeeper 提供的 Watcher 机制,还可以监听其它在 ZooKeeper 上注册的节点,如 Topic、Consumer 等。

Producer 向 Kafka 集群 Push 消息的时候,必须指定 Topic,不过,Partition 却是非必要的。事实上,目前高级的客户端已经不提供指定 Partition 的接口。虽然不提供,但并不代表无须指定 Partition,只是隐藏了细节。通常有两种方式用于指定 Partition。

  • 低级接口

在指定 Topic 的同时,需指定 Partition 编号(0、1、2……N),消息数据将被 Push 到指定的 Partition 中。从负载均衡的角度看,这并不是一种友好的方式。

  • 高级接口

不支持指定 Partition,隐藏相关细节,内部则采用轮询、对传入 Key 进行 Hash 等策略将消息数据均衡地发送到各个 Partition。此外,有一些 Kafka 客户端还支持自定义负载均衡策略。

5.Consumer 负载均衡

基于 Producer 的负载均衡策略,对于任意一个 Topic,其各个 Partition 中消息量相对均衡。进一步,对于 Topic 的任意一条消息,订阅了它的任何一个 Consumer Group 中都只能有一个 Consumer 消费它,在此约束下,如何实现 Consumer 均衡地消费消息呢?

一种最朴实的想法是,对于订阅的 Topic,既然 Partition 中的消息是均衡的,那么,可以为 Consumer Group 中的各个 Consumer 分别指定不同的 Partition,只要保证该过程“相对公平”即可。不过,需要注意的是,Consume Group 中 Consumer 的数量是动态变化的,Topic 的 Partition 数量也不是固定值,如何“均匀”分配呢?

5.1 借助 ZooKeeper 实现负载均衡

在 Consumer 消费消息时,高级别 API 只需指定 Topic 即可,隐藏了负载均衡策略;而低级别的 API 通常需要同时指定 Topic 和 Partition,需要自行实现负载均衡策略。高级别 API 的负载均衡策略需借助 ZooKeeper 实现,具体原理如下。

前已述及,Consumer Group、Consumer、Broker 都会在 ZooKeeper 中注册节点,因此,基于 ZooKeeper 提供的 Watcher,Consumer 可以监听同一 Group 中 Consumers 的变化,以及 Broker 列表的变化。进一步,根据 Consumer 列表,将 Partition 排序后,轮流进行分配。由于这是一个动态过程,相应的负载均衡被称为 Rebalance,其描述如下:

  • 对任意一个 Topic 中所有的 Partirtion 进行排序,用数组 PT 记录;
  • 某一 Consumer Group 订阅了上述 Topic,对它的所有 Consumer 排序,用数组 CG 记录,第 i 个 Consumer 记为 CG[i]
  • 比例系数为 F=size(PT)/size(CG),向上取整;
  • 解除 CG[i] 对原来分配的 Partition 的消费权(i 从 0 开始,最大值为 size(CG)-1);
  • 将第 i*F(i 1)*F-1 个 Partition 分配给 CG[i]。
6.记录消费进度 Offset

前面提及Offset,它是 /consumers/[group_id] 下的一个子节点。Kafka 中,Consumer 采用 Pull 模式消费相应 Partition 中的消息,是一种异步消费模式。为了避免因 Consumer 故障、重启、Rebalance 等原因造成重复消费、遗漏消费消息,需要记录 Consumer 对 Partition 中消息的消费进度,即偏移量 Offset。Offset 在 ZooKeeper 中,有一个专属的节点(目录)用于记录 Offset,其路径样式如下:

代码语言:javascript复制
#节点内容就是Offset的值。
/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id]

需要说明的是,在 Kafka 的最新版本 Kafka 2.0 中,Offset 信息不再记录于 ZooKeeper,而是保存于 Kafka 的 Topic 中,路径如下:

代码语言:javascript复制
__consumer_offsets(/brokers/topics/__consumer_offsets)

7.记录 Partition 与 Consumer 的关系

Consumer Group 在 ZooKeeper 上的注册节点为 /consumers/[group_id],而 Consumer Group 中的 Consumer 在 ZooKeeper 上的注册节点为 /consumers/[group_id] 下的子节点 owners,它们共享一个 Group ID。为了 Consumer 负载均衡,同一个 Group 订阅的 Topic 下的任一 Partition 都只能分配给一个 Consumer。Partition 与 Consumer 的对应关系也需要在 ZooKeeper 中记录,路径为:

代码语言:javascript复制
代码语言:javascript复制
/consumers/[group_id]/owners/[topic]/[broker_id-partition_id]

补充:这个路径也是一个临时节点,进行 Rebalance 时会被删除,而后依据新的对应关系重建。此外,[broker_id-partition_id] 是一个消息分区的标识,其内容就是该消息分区消费者的 Consumer ID,通常采用 hostname:UUID 形式表示。

- END -

0 人点赞