【kafka】kafka的动态配置管理使用和分析

2022-04-13 20:06:28 浏览数 (1)

该文章可能已过期,已不做勘误并更新,请访问原文地址(持续更新) Kafka中的动态配置源码分析 kafka知识图谱: Kafka知识图谱大全 kafka管控平台推荐使用 滴滴开源 的 Kafka运维管控平台(戳我呀) 更符合国人的操作习惯 、更强大的管控能力 、更高效的问题定位能力 、更便捷的集群运维能力 、更专业的资源治理 、更友好的运维生态 、

kafka的动态配置

文章目录

代码语言:txt复制
- [源码分析](https://cloud.tencent.com/developer)
    - [1. Broker启动加载动态配置](https://cloud.tencent.com/developer)
        - [1.1 启动加载动态配置总流程](https://cloud.tencent.com/developer)
        - [1. 2 加载Topic动态配置](https://cloud.tencent.com/developer)
        - [1.3 加载Broker动态配置](https://cloud.tencent.com/developer)
    - [2. 查询动态配置 流程 `--describe`](https://cloud.tencent.com/developer)
    - [3. 新增/修改/删除/动态配置 的流程](https://cloud.tencent.com/developer)
        - [Topic配置](https://cloud.tencent.com/developer)
        - [其他的类型都一样](https://cloud.tencent.com/developer)
    - [4. Broker监听/config/changes的变更](https://cloud.tencent.com/developer)
- [源码总结](https://cloud.tencent.com/developer)
- [Q&A](https://cloud.tencent.com/developer)
    - [如果我想在我的项目中获取kafka的所有配置该怎么办?](https://cloud.tencent.com/developer)
    - [是否可以直接在zk中写入动态配置?](https://cloud.tencent.com/developer)
    - [为什么不直接监听 `/config/`下面的配置?](https://cloud.tencent.com/developer)

今天这篇文章,给大家分享一下最近看kafka中的动态配置,不需要重启Broker,即时生效的配置 欢迎留言一起探讨!

kafka中的配置

  1. Broker静态配置 .properties文件
  2. ZK中的动态配置 全局 default配置
  3. ZK中动态配置 指定配置

优先级从底到高

不想看过程,可以直接看最后的源码总结部分

源码分析

1. Broker启动加载动态配置

KafkaServer.startup

1.1 启动加载动态配置总流程

1. 动态配置初始化

代码语言:javascript复制
      config.dynamicConfig.initialize(zkClient)
  1. 构造当前配置文件 currentConfig, 然后从zk中获取节点 /config/brokers/<default>信息,然后更新配置updateDefaultConfig; (动态默认配置覆盖静态配置)
  2. 从节点/config/brokers/{当前BrokerId}获取配置, 如果配置中有ConfigType=PASSWORD的配置(例如ssl.keystore.password)存在,接着判断 是否存在password.encoder.old.secret 配置,(这个配置是用来加解密ConfigType=PASSWORD的旧的秘钥),尝试用旧秘钥解密秘钥; 然后将这些配置重新加密回写入/config/brokers/{当前BrokerId} ; 然后返回配置 (这里主要是动态配置里面有密码类型配置的时候需要做一次解密加密处理)
  3. 将上面得到的配置(password类型修改之后) 更新内存总的配置;优先级 静态配置<动态默认配置<指定动态配置

2. 注册可变更配置监听器

如果有对应的配置变更了,那么相应的监听器就会收到通知去修改自己相应的配置;

代码语言:javascript复制
        config.dynamicConfig.addReconfigurables(this)

DynamicBrokerConfig.addReconfigurables

代码语言:javascript复制
// .........
  def addReconfigurables(kafkaServer: KafkaServer): Unit = {
    kafkaServer.authorizer match {
      case Some(authz: Reconfigurable) => addReconfigurable(authz)
      case _ =>
    }
    addReconfigurable(new DynamicMetricsReporters(kafkaConfig.brokerId, kafkaServer))
    addReconfigurable(new DynamicClientQuotaCallback(kafkaConfig.brokerId, kafkaServer))

    addBrokerReconfigurable(new DynamicThreadPool(kafkaServer))
    if (kafkaServer.logManager.cleaner != null)
      addBrokerReconfigurable(kafkaServer.logManager.cleaner)
    addBrokerReconfigurable(new DynamicLogConfig(kafkaServer.logManager, kafkaServer))
    addBrokerReconfigurable(new DynamicListenerConfig(kafkaServer))
    addBrokerReconfigurable(kafkaServer.socketServer)
  }

3. 动态配置启动监听

代码语言:javascript复制
        // Create the config manager. start listening to notifications
        dynamicConfigManager = new DynamicConfigManager(zkClient, dynamicConfigHandlers)
        dynamicConfigManager.startup()
  1. 注册节点处理器change-notification-/config/changes = stateChangeHandler
  2. 注册节点处理器/config/changes = zNodeChildChangeHandler
  3. 获取/config/changes 所有子节点看看有哪些变更
  4. 遍历所有节点并截取节点的编号, 判断一下是不是大于上一次执行过变更的节点ID lastExecutedChange(启动的时候是-1)
  5. 上个条件满足的话,则执行通知操作;不同entity执行的操作不一样,具体请看下面每个类型
  6. 更新lastExecutedChange
  7. 清除过期的通知节点, 默认过期时间15 * 60 * 1000(15分钟) 就是删除/config/changes /下面的过期节点
1. 2 加载Topic动态配置

TopicConfigHandler.processConfigChanges

  1. 获取节点的data数据, 如果获取到了则执行通知流程notificationHandler.processNotification(d),处理器是ConfigChangedNotificationHandler; 它先解析节点的json数据,根据版本信息不同调用不同的处理方法; 下面是version=2的处理方式;
  2. 根据json数据可以得到 entityTypeentityName; 那么久可以去对应的zk数据里面getData获取数据; 并且将获取到的数据Decode成Properties对象entityConfig;
  3. 将key为下图中的属性 隐藏掉; 替换成value: hidden
  1. 调用EntityHandler; 这里是TopicConfigHandler.processConfigChanges来进行处理,方法里面再看看流程 ->
  2. 从动态配置entityConfig里面获取message.format.version配置消息格式版本号; 如果当前Broker的版本inter.broker.protocol.version 小于message.format.version配置; 则将message.format.version配置 排除掉
  3. 调用TopicConfigHandler.updateLogConfig 来更新指定Topic的所有TopicPartition的配置,其实是将TP正在加载或初始化的状态标记为没有完成初始化,这将会在后续过程中促成TP重新加载并初始化
  4. 将动态配置和并覆盖Server的默认配置为新的 newConfig, 然后根据Topic获取对应的Logs对象; 遍历Logs去更新newConfig;并尝试执行 initializeLeaderEpochCache; (需要注意的是:这里的动态配置不是支持所有的配置参数,请看【kafka运维】Kafka全网最全最详细运维命令合集(精品强烈建议收藏!!!)的附件部分)
  5. 当然特殊配置如leader.replication.throttled.replicas,follower.replication.throttled.replicas这两个限流相关;解析配置之后,然后通过quotaManager.markThrottled/quotaManager.removeThrottle更新/移除对应的限流分区集合
  6. 如果动态配置了unclean.leader.election.enable=true(允许非同步副本选主 );那么就会执行TopicUncleanLeaderElectionEnable方法来让它改变选举策略(前提是当前Broker是Controller角色)
1.3 加载Broker动态配置

BrokerConfigHandler.processConfigChanges

假设我们配置了默认配置; zk里面的节点是<default>

sh bin/kafka-configs.sh --bootstrap-server xxxxx:9090 --alter --entity-type brokers --entity-default --add-config log.segment.bytes=88888888

  1. 从zk节点/config/changes里面获取变更节点的json数据.然后去对应的 /config/{entityType}/{entituName}获取对应的数据
  2. 如果是<default>节点,说明有配置动态默认配置; 则按照 静态配置<动态默认配置<动态指定配置 的顺序重新加载覆盖一下; 如果 新旧配置有变更(有可能执行了一次命令但是参数并没有变化的情况,修改了个寂寞)的情况下 才会做更新的; 并且 通知到所有的 BrokerReconfigurable; 这个就是上面启动时候 1.1 启动加载动态配置总流程的第2步骤 (注册可变更配置监听器) 注册的;
  3. 如果是指定BrokerId, 则除了上面2重新加载覆盖之外, 相关限流 配置leader.replication.throttled.ratefollower.replication.throttled.ratereplica.alter.log.dirs.io.max.bytes.per.second 都会被更新一下quotaManagers.leader/leader/alterLogDirs.updateQuota ;如果这些配置没有配置的话,则用 Long.MaxValue(相当于是不限流)来更新

2. 查询动态配置 流程 --describe

  1. 简单检验
  2. 根据类型查询entities ; type是topics就获取所有topic; type是broker|broker-loggers则查询所有Broker节点
  3. 遍历entities获取配置 ;做些简单校验;然后想Broker发起describeConfigs请求; 节点策略是LeastLoadedNodeProvider 节点调用方法 KafkaApis.handleDescribeConfigsRequest
代码语言:txt复制
1. 未经授权配置不查询
2. 经过授权的配置开始查询 ;
3. 当查询的是`topics`时, 去zk节点`/confgi/类型/类型名` ,获取到动态配置数据之后, 然后将其覆盖本地跟Log相关的静态配置, 完事之后组装一下返回;(1.数据为空过滤2.敏感数据设置value=null; ConfigType=PASSWORD和不知道类型是啥的都是敏感数据 3. 组装所有的同义配置(`静态默认配置、本地静态、默认动态配置、指定动态配置、等等多个配置`)) 返回的数据类型如下:
 
  1. 如果有broker|broker-loggers节点, 则在 获取到数据之后 然后指定nodeId节点发起 describeBrokerConfigs请求
代码语言:txt复制
1. 如果查询的是`brokers`
代码语言:txt复制
1. 如果查询的是 `broker-loggers`

3. 新增/修改/删除/动态配置 的流程

1. 发起请求

  1. 查询当前的类型配置; 这里的查询 跟上面的--describe流程是一样的
  2. 相关校验;如果有delete-config配置, 需要校验一下当前配置有没有;如果没有抛出异常;
  3. 计算出需要变更的配置之后, 发起请求incrementalAlterConfigs;如果请求类型是 brokers/broker-loggers 则发起请求的接收方是 指定的Broker 节点; 否则就是LeastLoadedNodeProvider (当前负载最少的节点)

2. incrementalAlterConfigs 增量修改配置

KafkaApis.handleIncrementalAlterConfigsRequest

  1. 通过请求参数解析 配置 configs
  2. 过滤一下未授权的配置
  3. 如果配置中有重复的项则抛出异常
Topic配置
  1. 获取节点 /config/topics/{topicName} 中的配置数据;
  2. 然后根据请求参数的属性 ,组装好变更后的配置是什么样的 configs
  3. 简单校验一下, 并且支持自定义校验,如果有 alter.config.policy.class.name= 配置(默认null)的话,则会实例化指定的类(需要继承 AlterConfigPolicy类);并调用他的 validate方法来校验;
  4. 调用写入zk配置的接口, 将动态配置重新写入(SetDataRequest)到接口 /config/topics/{topicName}中;
  5. 创建并写入配置变更记录顺序节点 /config/changes/config_change_序列号 中; 这个节点主要是让Broker们来监听这个节点的来了解到哪个配置有变更的;
其他的类型都一样

省略

4. Broker监听/config/changes的变更

1. Broker启动加载动态配置 中我们了解到有对节点/config/change注册一个子节点变更的监听处理器

那么对动态配置做出修改之后, 这个节点就会新增一条数据,那么所有的Broker都会收到这个通知;

所以我们就要来看一看收到通知之后又做了哪些事情

这个流程是又回到了上面的 1. 2 加载Topics/Brokers动态配置 的流程中了;

源码总结

原理部分讲解比较详细的可以看 : Kafka动态配置实现原理解析 - 李志涛 - 博客园

Q&A

如果我想在我的项目中获取kafka的所有配置该怎么办?

  1. 启动的时候加载一次所有Broker的配置
  2. 监听节点/config/change节点的变化

是否可以直接在zk中写入动态配置?

不可以,因为Broker是监听 /config/changes/里面的Broker节点,来实时得知有数据变更;

为什么不直接监听 /config/下面的配置?

没有必要,这样监听的数据数据太多了,而且 你不知道具体是改了哪个配置,所以每次都要全部更新一遍,无缘无故的加重负担了, 用/config/change 节点来得知哪个类型的数据变更, 只变更这个相关数据就可以了

0 人点赞