Topic是怎么被删除的?
Kafka有很多状态机和管理器,如Controller通道管理器ControllerChannelManager、处理Controller事件的ControllerEventManager等。这些管理器和状态机,大多与各自“宿主”联系密切。就如Controller这俩管理器,必须与Controller组件紧耦合,才能实现各自功能。
Kafka还有一些状态机和管理器,具有相对独立的功能框架,不严重依赖使用方,如:
- TopicDeletionManager(主题删除管理器) 负责对指定Kafka主题执行删除操作,清除待删除主题在集群上的各类“痕迹”。
- ReplicaStateMachine(副本状态机) 负责定义Kafka副本状态、合法的状态转换,以及管理状态之间的转换。
- PartitionStateMachine(分区状态机) 负责定义Kafka分区状态、合法的状态转换,以及管理状态之间的转换。
本文看看Kafka是如何删除一个主题的。
前言
以为成功执行kafka-topics.sh --delete命令后,主题就会被删除。这种不正确的认知会导致经常发现主题没被删干净。于是,网传终极“武林秘籍”:手动删除磁盘上的日志文件,手动删除ZooKeeper下关于主题的各节点。但我不推荐这么干:
- 并不完整 除非你重启Broker,否则,这套“秘籍”无法清理Controller端和各个Broker上元数据缓存中的待删除主题的相关条目
- 并没有被官方所认证,后果自负
与其琢磨删除主题失败之后怎么自救,还是研究Kafka到底如何执行该操作。TopicDeletionManager.scala包括:
- DeletionClient接口:负责实现删除主题以及后续的动作 如更新元数据
- ControllerDeletionClient类:实现DeletionClient接口的类,分别实现了刚刚说到的那4个方法。
- TopicDeletionManager类:主题删除管理器类 定义方法维护主题删除前后集群状态的正确性。如,何时删除主题、何时主题不能被删除、主题删除过程中要规避哪些操作等
DeletionClient接口及实现
删除主题,并将删除主题的事件同步给其他Broker。
DeletionClient接口目前只有一个实现类ControllerDeletionClient,构造器的两个字段:
- KafkaController实例 Controller组件对象
- KafkaZkClient实例 Kafka与ZooKeeper交互的客户端对象
API
deleteTopic
删除主题在zk上的所有“痕迹”。分别调用KafkaZkClient的3个方法删除ZooKeeper下/brokers/topics/节点、/config/topics/节点和/admin/delete_topics/节点。
deleteTopicDeletions
删除zk下待删除主题的标记节点。调用KafkaZkClient#deleteTopicDeletions,批量删除一组主题在/admin/delete_topics下的子节点。注意,deleteTopicDeletions这个方法名结尾的Deletions,表示/admin/delete_topics下的子节点。所以:
- deleteTopic是删除主题
- deleteTopicDeletions是删除/admin/delete_topics下的对应子节点
这两个方法里都有epochZkVersion字段,代表期望的Controller Epoch版本号。若使用一个旧Epoch版本号执行这些方法,zk会拒绝,因为和它自己保存的版本号不匹配。若一个Controller的Epoch<ZooKeeper中保存的,则该Controller很可能是已过期的Controller。这就是Zombie Controller。epochZkVersion字段的作用,就是隔离Zombie Controller发送的操作。
mutePartitionModifications
屏蔽主题分区数据变更监听器:取消/brokers/topics/节点数据变更的监听。
当该主题的分区数据发生变更后,由于对应zk监听器已被取消,因此不会触发Controller相应处理逻辑。
为何取消该监听器?为避免操作相互干扰:假设用户A发起主题删除,同时用户B为这个主题新增分区。此时,这两个操作就会冲突,若允许Controller同时处理这俩操作,势必会造成逻辑混乱及状态不一致。为应对这种情况,在移除主题副本和分区对象前,代码要先执行这个方法,确保不再响应用户对该主题的其它操作。
mutePartitionModifications调用unregisterPartitionModificationsHandlers,并接着调用KafkaZkClient#unregisterZNodeChangeHandler,取消zk上对给定主题的分区节点数据变更的监听。
sendMetadataUpdate
调用KafkaController#sendUpdateMetadataRequest,给集群所有Broker发送更新请求,告诉它们不要再为已删除主题的分区提供服务:
该方法会给集群中的所有Broker发送更新元数据请求,告知它们要同步给定分区的状态。
TopicDeletionManager定义及初始化
创建TopicDeletionManager类实例
在KafkaController类初始化时被创建:
实例化了一个全新的ControllerDeletionClient对象,然后利用该对象实例和replicaStateMachine、partitionStateMachine,一起创建TopicDeletionManager实例。
KafkaServerStartable.startup()=》KafkaServer.startup()=》KafkaController.init=》TopicDeletionManager
TopicDeletionManager重要API
除了类定义和初始化,还有resumeDeletions:重启主题删除操作过程。
主题因为某些事件可能一时无法完成删除,如主题分区正在进行副本重分配等。一旦这些事件完成,主题重新具备可删除资格。就需调用resumeDeletions重启删除操作。
- 从元数据缓存中获取要删除主题列表,之后定义了两个空的主题列表,分别保存待重试删除主题和待删除主题
- 遍历每个要删除的主题,去看它所有副本的状态。如果副本状态都是ReplicaDeletionSuccessful,就表明该主题已经被成功删除,此时,再调用completeDeleteTopic方法,完成后续的操作就可以了。对于那些删除操作尚未开始,并且暂时无法执行删除的主题,源码会把这类主题加到待重试主题列表中,用于后续重试;如果主题是能够被删除的,就将其加入到待删除列表中。
- 最后,调用retryDeletionForIneligibleReplicas重试待重试主题列表中的主题删除操作。对待删除主题列表中的主题则调用onTopicDeletion删除。
retryDeletionForIneligibleReplicas重试主题删除:将对应主题副本的状态,从ReplicaDeletionIneligible变更到OfflineReplica。这样,后续再次调用resumeDeletions,就会尝试重新删除主题。
下面,我再用一张图来解释下resumeDeletions方法的执行流程:
resumeDeletions串联起了TopicDeletionManger中的很多方法,较关键的:
completeDeleteTopic:
onTopicDeletion:
onTopicDeletion会多次使用分区状态机,调整待删除主题的分区状态。最后调用onPartitionDeletion执行真正的底层物理磁盘文件删除。这是通过副本状态机状态转换操作完成的。
总结
在主题删除过程中,Kafka会调整集群中三个地方的数据:
- ZooKeeper 删除主题时,zk上与该主题相关的所有ZNode节点必须被清除
- 元数据缓存 Controller端元数据缓存中的相关项,也必须要被处理,并且要被同步到集群的其他Broker上
- 磁盘日志文件 要清理的首要目标
这三个地方须统一处理,就好似原子操作。回想“秘籍”,它无法清除Controller端的元数据缓存项。因此,避免使用这“大招”。
DeletionClient接口主要是操作ZooKeeper,实现ZooKeeper节点的删除等操作。
TopicDeletionManager,是在KafkaController创建过程中被初始化的,主要通过与元数据缓存进行交互的方式,来更新各类数据。