我们往kafka集群中发送数据的时候,kafka是怎么感知到需要发送到哪一台节点中呢?其实这其中的奥秘就在kafka的Metadata中。这一篇我们就来看看kafka中的Metadata管理。
我们来看看构建Kakfa中的代码片段:
KafkaProducer构造函数代码片段
从上面的代码片段可以看出,如果metadata变量不为空,直接赋值给KafkaProducer类成员变量metadata,否则需要新构建一个ProducerMetadata对象,然后根据用户传递的kafka集群服务器地址信息,构建Metadata类中cache成员变量的值,类型为MetadataCache。
下面我们来分析一下Metadata这个类,看看里面都封装了哪些属性。
refreshBackoffMs
这个参数的作用是防止轮询的过于频繁。用于设置两次元数据刷新之间,最小有效时间间隔,超过这个设置的时间间隔,则这次元数据刷新就失效了。默认值是100ms。
metadataExpireMs
这个参数的含义是如果不刷新,元数据可以保持有效的最大时间。默认值是5分钟。
updateVersion
这个参数对应每一个元数据的响应。每一次自增 1。
requestVersion
这个参数对应每一次创建一个新的Topic。每一次自增 1。
lastRefreshMs
这个参数的含义是上一次更新元数据的时间。
lastSuccessfulRefreshMs
这个参数的含义是上一次成功更新元数据的时间。正常情况下每一次更新元数据都应该是成功的,那么lastRefreshMs和lastSuccessfulRefreshMs的值,应该是一样的。但是如果出现更新没有成功的情况,那么lastRefreshMs的值大于lastSuccessfulRefreshMs的值。
fatalException
这个参数的类型是kafka自己封装的KafkaException。继承了RuntimeException。如果在元数据相关的操作中抛出了这种异常,kafka将停止元数据相关的操作。
invalidTopics
这个参数的含义是存储非法的Topic元数据信息。
unauthorizedTopics
这个参数的含义是存储未授权的Topic元数据信息。
cache
这个参数的含义是在Metadata类的内部构建一个MetadataCache对象,把元数据信息缓存起来,方便在集群中进行快速的数据获取。
needFullUpdate
这个参数的含义是Metadata是否需要全部更新。
needPartialUpdate
这个参数的含义是Metadata是否需要部分更新。
clusterResourceListeners
这个参数的含义是抽象了一个接收元数据更新集群资源的监听器集合。
lastSeenLeaderEpochs
这个参数是一个Map结构,映射的是TopicPartition和Integer之间的关系。也就是说某一个主题分区,它的主分区上一次更新的版本号是多少,在这个Map结构中存储。真正构建Metadata对象的时候,实现类是HashMap。
接下来我们来看看MetadataCache这个类,看看里面封装了哪些属性。这个类存在是kafka一种缓存的思想,把一些重要的属性用缓存来保存起来,提高Metadata的读取效率。
clusterId
这个参数用来标识整个kafka集群。
nodes
这个参数是一个Map类型,用来映射kafka集群中节点编号和节点的关系。
unauthorizedTopics
这个参数是一个Set类型,用来存储未授权的Topic集合。
invalidTopics
这个参数是一个Set类型,用来存储无效的Topic集合。
internalTopics
这个参数是一个Set类型,用来存储kafka内部的Topic集合,例如__consumer_offsets。
controller
这个参数是表示kafka controller所在broker。
metadataByPartition
这个参数是Map类型,用来存储分区和分区对应的元数据的映射关系。
clusterInstance
这个参数抽象了集群中的数据,我们接下来进行重点分析。
Cluster类是封装在MetadataCache中的,用来表示kafka的集群信息。
nodes
这个参数封装了集群中节点信息列表。
unauthorizedTopics
这个参数是一个Set类型,用来存储未授权的Topic集合。
invalidTopics
这个参数是一个Set类型,用来存储无效的Topic集合。
internalTopics
这个参数是一个Set类型,用来存储kafka内部的Topic集合,例如__consumer_offsets。
partitionsByTopicPartition
这个参数记录了TopicPartition与PartitionInfo的映射关系。
partitionsByTopic
这个参数记录了Topic名称与PartitionInfo的映射关系。可以按照Topic名称查询其中全部分区的详细信息。
availablePartitionsByTopic
这个参数记录了Topic与PartitionInfo的映射关系。这里的List<PartitionInfo>中存放的分区必须是有Leader副本的Partition,而partitionsByTopic中记录的分区则不一定有Leader副本,因为某些中间状态,例如Leader副本所在节点,发生了节点下线,进而触发了Leader副本的选举,在这一时刻分区不一定有Leader副本。
partitionsByNode
这个参数记录了Node与PartitionInfo的映射关系。可以按照节点Id查询该节点上分布的全部分区的详细信息。
nodesById
这个参数记录了BrokerId与Node节点之间的映射关系。方便使用BrokerId进行索引,可以根据BrokerId得到关联的Node节点信息。
clusterResource
这个参数是ClusterResource类型,这个类只是封装了一个clusterId成员属性,用于区分每一个kafka的集群。
我们再来看看Node这个类。Node这个类是对kafka集群中一个物理服务器的抽象,它所拥有的属性如下所示。
id
这个参数记录了kafka集群中的服务器编号,是我们配置参数的时候指定的。
host
这个参数记录了服务器的主机名。
port
这个参数记录了服务器的端口号。
rack
这个参数记录了服务器所属的机架。
我们再来看看TopicPartition这个类。这个类里面封装了主题,以及对应的一个分区。它所拥有的属性如下所示:
partition
这个参数记录了一个分区编号。
topic
这个参数记录了主题名称。
我们再来看看PartitionInfo这个类。这个类抽象了一个分区的详细信息,它所拥有的属性如下所示:
topic
这个参数记录了主题名称,表示这个分区是属于哪一个主题的。
partition
这个参数记录了分区编号。
leader
这个参数记录了分区主副本在哪台服务器上。
replicas
这个参数是Node类型的数组,记录了这个分区所有副本所在服务器。
inSyncReplicas
这个参数是Node类型的数组,记录了这个分区同步正常的副本所在服务器。
offlineReplicas
这个参数是Node类型的数组,记录了这个分区同步不正常的副本所在服务器。