Kafka 的架构概览:
如上图所示: 一个典型的 Kafka 集群中包含若干 Producer(可以是 web 前端产生的 Page View,或者是服务器日志,系统 CPU、Memory 等); 若干 broker(Kafka 支持水平扩展,一般 broker 数量越多,集群吞吐率越高),若干 Consumer Group,以及一个 Zookeeper 集群; Kafka 通过 Zookeeper 管理集群配置,选举 leader,以及在 Consumer Group 发生变化时进行 rebalance; Producer 使用 push 模式将消息发布到 broker,Consumer 使用 pull 模式从 broker 订阅并消费消息;
kafka 的初衷是希望作为一个统一的信息收集平台,能够实时的收集反馈信息,并需要能够支撑较大的数据量,且具备良好的容错能力。
1、持久性
kafka 使用文件存储消息,这就直接决定 kafka 在性能上严重依赖文件系统的本身特性。且无论任何 OS 下,对文件系统本身的优化几乎没有可能。文件缓存 / 直接内存映射 等是常用的手段。
因为 kafka 是对日志文件进行 append 操作,因此磁盘检索的开支是较小的;同时为了减少磁盘写入的次数,broker 会将消息暂时 buffer 起来,当消息的个数 (或尺寸) 达到一定阀值时,再 flush 到磁盘,这样减少了磁盘 IO 调用的次数。
2、性能
要考虑的影响性能点很多,除磁盘 IO 之外,我们还需要考虑网络 IO, 这直接关系到 kafka 的吞吐量问题。 kafka 并没有提供太多高超的技巧:
- 对于 producer 端,可以将消息 buffer 起来,当消息的条数达到一定阀值时,批量发送给 broker;
- 对于 consumer 端也是一样,批量 fetch 多条消息。不过消息量的大小可以通过配置文件来指定。
对于 kafka broker 端,似乎有个 sendfile 系统调用可以潜在的提升网络 IO 的性能:将文件的数据映射到系统内存中,socket 直接读取相应的内存区域即可,而无需进程再次 copy 和交换。
其实对于 producer/consumer/broker 三者而言,CPU 的开支应该都不大,因此启用消息压缩机制是一个良好的策略;压缩需要消耗少量的 CPU 资源,不过对于 kafka 而言,网络 IO 更应该需要考虑。可以将任何在网络上传输的消息都经过压缩.kafka 支持 gzip/snappy 等多种压缩方式.
3、生产者(Producer)
Producer 发送消息到 broker 时,会根据 Paritition 机制选择将其存储到哪一个 Partition。如果 Partition 机制设置合理,所有消息可以均匀分布到不同的 Partition 里,这样就实现了负载均衡。如果一个 Topic 对应一个文件,那这个文件所在的机器 I/O 将会成为这个 Topic 的性能瓶颈,而有了 Partition 后,不同的消息可以并行写入不同 broker 的不同 Partition 里,极大的提高了吞吐率。可以在配置文件中通过配置项 num.partitions 来指定新建 Topic 的默认 Partition 数量,也可在创建 Topic 时通过参数指定,同时也可以在 Topic 创建之后通过 Kafka 提供的工具修改。
负载均衡: producer 将会和 Topic 下所有 partition leader 保持 socket 连接;消息由 producer 直接通过 socket 发送到 broker, 中间不会经过任何 “路由层”。事实上,消息被路由到哪个 partition 上,有 producer 决定。比如可以采用 “random”, “key-hash”, “轮询” 等,如果一个 topic 中有多个 partitions,那么在 producer 端实现 “消息均衡分发” 是必要的。其中 partition leader 的位置 (host:port) 注册在 zookeeper 中,producer 作为 zookeeper client, 已经注册了 watch 用来监听 partition leader 的变更事件。
异步发送:将多条消息暂且在客户端 buffer 起来,并将他们批量的发送到 broker,小数据 IO 太多,会拖慢整体的网络延迟,批量延迟发送事实上提升了网络效率。不过这也有一定的隐患,比如说当 producer 失效时,那些尚未发送的消息将会丢失。
4、消费者(Consumer)&& 消费者组 (Consumer Group)
consumer 端向 broker 发送 “fetch” 请求,并告知其获取消息的 offset; 此后 consumer 将会获得一定条数的消息;consumer 端也可以重置 offset 来重新消费消息。
在 JMS 实现中,Topic 模型基于 push 方式,即 broker 将消息推送给 consumer 端。 不过在 kafka 中,采用了 pull 方式,即 consumer 在和 broker 建立连接之后,主动去 pull (或者说 fetch) 消息;这中模式有些优点,首先 consumer 端可以根据自己的消费能力适时的去 fetch 消息并处理,且可以控制消息消费的进度 (offset);此外,消费者可以良好的控制消息消费的数量,batch fetch。
其他 JMS 实现,消息消费的位置是有 prodiver 保留,以便避免重复发送消息或者将没有消费成功的消息重发等,同时还要控制消息的状态。这就要求 JMS broker 需要太多额外的工作。 在 kafka 中,partition 中的消息只有一个 consumer 在消费,且不存在消息状态的控制,也没有复杂的消息确认机制,可见 kafka broker 端是相当轻量级的。当消息被 consumer 接收之后,consumer 可以在本地保存最后消息的 offset, 并间歇性的向 zookeeper 注册 offset. 由此可见,consumer 也很轻量级。
- 每个 Consumer 客户端被创建时,会向 Zookeeper 注册自己的信息, 此作用主要是为了 “负载均衡”;
- 同一个 Consumer Group 中的 Consumers,Kafka 将相应 Topic 中的每个消息只发送给其中一个 Consumer;
- Consumer Group 中的每个 Consumer 读取 Topic 的一个或多个 Partitions,并且是唯一的 Consumer;
- 一个 Consumer Group 的多个 Consumer 的所有线程依次有序地消费一个 topic 的所有 partitions, 如果 Consumer Group 中所有 Consumer 总线程大于 Partitions 数量,则会出现空闲情况;
5、Topics 和 Partition
Topic 在逻辑上可以被认为是一个 queue,每条消费都必须指定它的 Topic,可以简单理解为必须指明把这条消息放进哪个 queue 里。为了使得 Kafka 的吞吐率可以线性提高,物理上把 Topic 分成一个或多个 Partition,每个 Partition 在物理上对应一个文件夹,该文件夹下存储这个 Partition 的所有消息和索引文件。创建一个 topic 时,同时可以指定分区数目,分区数越多,其吞吐量也越大,但是需要的资源也越多,同时也会导致更高的不可用性,kafka 在接收到生产者发送的消息之后,会根据均衡策略将消息存储到不同的分区中。因为每条消息都被 append 到该 Partition 中,属于顺序写磁盘,因此效率非常高(经验证,顺序写磁盘效率比随机写内存还要高,这是 Kafka 高吞吐率的一个很重要的保证)。
对于传统的 message queue 而言,一般会删除已经被消费的消息,而 Kafka 集群会保留所有的消息,无论其被消费与否。当然,因为磁盘限制,不可能永久保留所有数据(实际上也没必要),因此 Kafka 提供两种策略删除旧数据。
- 一是基于时间
- 二是基于 Partition 文件大小
6、消息传送机制
对于 JMS 实现,消息传输担保非常直接:有且只有一次 (exactly once)。 在 kafka 中稍有不同:
- at most once: 最多一次,这个和 JMS 中 “非持久化” 消息类似。发送一次,无论成败,将不会重发。
- at least once: 消息至少发送一次,如果消息未能接受成功,可能会重发,直到接收成功。
- exactly once: 消息只会发送一次。
如何处理:
- at most once: 消费者 fetch 消息,然后保存 offset, 然后处理消息;当 client 保存 offset 之后,但是在消息处理过程中出现了异常,导致部分消息未能继续处理。那么此后 “未处理” 的消息将不能被 fetch 到,这就是 “at most once”。
- at least once: 消费者 fetch 消息,然后处理消息,然后保存 offset。 如果消息处理成功之后,但是在保存 offset 阶段 zookeeper 异常导致保存操作未能执行成功,这就导致接下来再次 fetch 时可能获得上次已经处理过的消息,这就是 “at least once”,原因 offset 没有及时的提交给 zookeeper,zookeeper 恢复正常还是之前 offset 状态。
- exactly once: kafka 中并没有严格的去实现 (基于 2 阶段提交,事务), 我们认为这种策略在 kafka 中是没有必要的。
7、复制备份(Partition)
kafka 将每个 partition 数据复制到多个 server 上,任何一个 partition 有一个 leader 和多个 follower (可以没有); 备份的个数可以通过 broker 配置文件来设定。 leader 处理所有的 read-write 请求,follower 需要和 leader 保持同步。Follower 和 consumer 一样,消费消息并保存在本地日志中;leader 负责跟踪所有的 follower 状态,如果 follower”落后” 太多或者失效,leader 将会把它从 replicas 同步列表中删除。 当所有的 follower 都将一条消息保存成功,此消息才被认为是 “committed”, 那么此时 consumer 才能消费它。即使只有一个 replicas 实例存活,仍然可以保证消息的正常发送和接收,只要 zookeeper 集群存活即可。(不同于其他分布式存储,比如 hbase 需要 “多数派” 存活才行)
当 leader 失效时,需在 followers 中选取出新的 leader, 可能此时 follower 落后于 leader, 因此需要选择一个 “up-to-date” 的 follower。 选择 follower 时需要兼顾一个问题,就是新 leader 上所已经承载的 partition leader 的个数,如果一个 server 上有过多的 partition leader, 意味着此 server 将承受着更多的 IO 压力。在选举新 leader, 需要考虑到 “负载均衡”。
8、日志(Log)
如果一个 topic 的名称为 “my_topic“, 它有 2 个 partitions, 那么日志将会保存在 my_topic_0 和 my_topic_1 两个目录中;日志文件中保存了一序列 “log entries”(日志条目), 每个 log entry 格式为 “4 个字节的数字 N 表示消息的长度” “N个字节的消息内容” 中所处的起始位置。
每个 partition 在物理存储层面,有多个 log file 组成 (称为 segment)。segmentfile 的命名为 “最小 offset”。kafka。 例如 “00000000000.kafka”; 其中 “最小 offset” 表示此 segment 中起始消息的 offset。
其中每个 partiton 中所持有的 segments 列表信息会存储在 zookeeper 中。
- 当 segment 文件尺寸达到一定阀值时 (可以通过配置文件设定,默认 1G), 将会创建一个新的文件;当 buffer 中消息的条数达到阀值时将会触发日志信息 flush 到日志文件中,同时如果 “距离最近一次 flush 的时间差” 达到阀值时,也会触发 flush 到日志文件。如果 broker 合法并进行必要的修复。
- 获取消息时,需要指定 offset 和最大 chunk 尺寸,offset 用来表示消息的起始位置,chunk size 用来表示最大获取消息的总长度 (间接的表示消息的条数)。根据 offset, 可以找到此消息所在 segment 文件,然后根据 segment 的最小 offset 取差值,得到它在 file 中的相对位置,直接读取输出即可。
- 日志文件的删除策略非常简单:启动一个后台线程定期扫描 log file 列表,把保存时间超过阀值的文件直接删除 (根据文件的创建时间)。为了避免删除文件时仍然有 read 操作 (consumer 消费), 采取 copy-on-write 方式。
9、分布式
Kafka 使用 zookeeper 来存储一些 meta 信息,并使用了 zookeeper watch 机制来发现 meta 信息的变更并作出相应的动作 (比如 consumer 失效,触发负载均衡等)
- Broker node registry: 当一个 kafka broker 启动后,首先会向 zookeeper 注册自己的节点信息 (临时 znode), 同时当 broker 和 zookeeper 断开连接时,此 znode 也会被删除。
- Broker Topic Registry: 当一个 broker 启动时,会向 zookeeper 注册自己持有的 topic 和 partitions 信息,仍然是一个临时 znode。
- Consumer and Consumer group: 每个 consumer 客户端被创建时,会向 zookeeper 注册自己的信息;此作用主要是为了 “负载均衡”。 一个 group 中的多个 consumer 可以交错的消费一个 topic 的所有 partitions; 简而言之,保证此 topic 的所有 partitions 都能被此 group 所消费,且消费时为了性能考虑,让 partition 相对均衡的分散到每个 consumer 上。
- Consumer id Registry: 每个 consumer 都有一个唯一的 ID (host:uuid, 可以通过配置文件指定,也可以由系统生成), 此 id 用来标记消费者信息。
- Consumer offset Tracking: 用来跟踪每个 consumer 目前所消费的 partition 中最大的 offset。 此 znode 为持久节点,可以看出 offset 跟 group_id 有关,以表明当 group 中一个消费者失效,其他 consumer 可以继续消费。
- Partition Owner registry: 用来标记 partition 正在被哪个 consumer 消费。临时 znode。此节点表达了 “一个 partition” 只能被 group 下一个 consumer 消费,同时当 group 下某个 consumer 失效,那么将会触发负载均衡 (即:让 partitions 在多个 consumer 间均衡消费,接管那些 “游离” 的 partitions)
当 consumer 启动时,所触发的操作:
- 首先进行 “Consumer id Registry”;
- 然后在 “Consumer id Registry” 节点下注册一个 watch 用来监听当前 group 中其他 consumer 的 “leave” 和 “join”; 只要此 znode path 下节点列表变更,都会触发此 group 下 consumer 的负载均衡。(比如一个 consumer 失效,那么其他 consumer 接管 partitions)。
- 在 “Broker id registry” 节点下,注册一个 watch 用来监听 broker 的存活情况;如果 broker 列表变更,将会触发所有的 groups 下的 consumer 重新 balance。
总结:
- Producer 端使用 zookeeper 用来 “发现”broker 列表,以及和 Topic 下每个 partition leader 建立 socket 连接并发送消息。
- Broker 端使用 zookeeper 用来注册 broker 信息,已经监测 partition leader 存活性。
- Consumer 端使用 zookeeper 用来注册 consumer 信息,其中包括 consumer 消费的 partition 列表等,同时也用来发现 broker 列表,并和 partition leader 建立 socket 连接,并获取消息。
10、Leader 的选择
Kafka 的核心是日志文件,日志文件在集群中的同步是分布式数据系统最基础的要素。
如果 leaders 永远不会 down 的话我们就不需要 followers 了!一旦 leader down 掉了,需要在 followers 中选择一个新的 leader。 但是 followers 本身有可能延时太久或者 crash,所以必须选择高质量的 follower 作为 leader。 必须保证,一旦一个消息被提交了,但是 leader down 掉了,新选出的 leader 必须可以提供这条消息。大部分的分布式系统采用了多数投票法则选择新的 leader, 对于多数投票法则,就是根据所有副本节点的状况动态的选择最适合的作为 leader。Kafka 并不是使用这种方法。 Kafka 动态维护了一个同步状态的副本的集合(a set of in-sync replicas),简称 ISR,在这个集合中的节点都是和 leader 保持高度一致的,任何一条消息必须被这个集合中的每个节点读取并追加到日志中了,才回通知外部这个消息已经被提交了。因此这个集合中的任何一个节点随时都可以被选为 leader。ISR 在 ZooKeeper 中维护。ISR 中有 f 1 个节点,就可以允许在 f 个节点 down 掉的情况下不会丢失消息并正常提供服。ISR 的成员是动态的,如果一个节点被淘汰了,当它重新达到 “同步中” 的状态时,他可以重新加入 ISR。 这种 leader 的选择方式是非常快速的,适合 kafka 的应用场景。
一个邪恶的想法:如果所有节点都 down 掉了怎么办?Kafka 对于数据不会丢失的保证,是基于至少一个节点是存活的,一旦所有节点都 down 了,这个就不能保证了。 实际应用中,当所有的副本都 down 掉时,必须及时作出反应。可以有以下两种选择:
这是一个在可用性和连续性之间的权衡。如果等待 ISR 中的节点恢复,一旦 ISR 中的节点起不起来或者数据都是了,那集群就永远恢复不了了。如果等待 ISR 意外的节点恢复,这个节点的数据就会被作为线上数据,有可能和真实的数据有所出入,因为有些数据它可能还没同步到。Kafka 目前选择了第二种策略,在未来的版本中将使这个策略的选择可配置,可以根据场景灵活的选择。
这种窘境不只 Kafka 会遇到,几乎所有的分布式数据系统都会遇到。
11、副本管理(Replica)
以上仅仅以一个 topic 一个分区为例子进行了讨论,但实际上一个 Kafka 将会管理成千上万的 topic 分区。Kafka 尽量的使所有分区均匀的分布到集群所有的节点上而不是集中在某些节点上,另外主从关系也尽量均衡这样每个几点都会担任一定比例的分区的 leader。 优化 leader 的选择过程也是很重要的,它决定了系统发生故障时的空窗期有多久。Kafka 选择一个节点作为 “controller”, 当发现有节点 down 掉的时候它负责在游泳分区的所有节点中选择新的 leader, 这使得 Kafka 可以批量的高效的管理所有分区节点的主从关系。如果 controller down 掉了,活着的节点中的一个会备切换为新的 controller。
12、Leader与 Replica 同步
对于某个分区来说,保存正分区的”broker”为该分区的”leader”,保存备份分区的”broker”为该分区的”follower”。备份分区会完全复制正分区的消息,包括消息的编号等附加属性值。为了保持正分区和备份分区的内容一致,Kafka采取的方案是在保存备份分区的”broker”上开启一个消费者进程进行消费,从而使得正分区的内容与备份分区的内容保持一致。一般情况下,一个分区有一个“正分区”和零到多个“备份分区”。可以配置“正分区 备份分区”的总数量,关于这个配置,不同主题可以有不同的配置值。注意,生产者,消费者只与保存正分区的”leader”进行通信。
Kafka 允许 topic 的分区拥有若干副本,这个数量是可以配置的,你可以为每个 topic 配置副本的数量。Kafka 会自动在每个副本上备份数据,所以当一个节点 down 掉时数据依然是可用的。 Kafka 的副本功能不是必须的,你可以配置只有一个副本,这样其实就相当于只有一份数据。 创建副本的单位是 topic 的分区,每个分区都有一个 leader 和零或多个 followers。所有的读写操作都由 leader 处理,一般分区的数量都比 broker 的数量多的多,各分区的 leader 均匀的分布在 brokers 中。所有的 followers 都复制 leader 的日志,日志中的消息和顺序都和 leader 中的一致。followers 向普通的 consumer 那样从 leader 那里拉取消息并保存在自己的日志文件中。
许多分布式的消息系统自动的处理失败的请求,它们对一个节点是否 alive 有着清晰的定义。Kafka判断一个节点是否活着有两个条件:
- 节点必须可以维护和 ZooKeeper 的连接,Zookeeper 通过心跳机制检查每个节点的连接。
- 如果节点是个 follower,他必须能及时的同步 leader 的写操作,延时不能太久。
符合以上条件的节点准确的说应该是 “同步中的(in sync)”,而不是模糊的说是 “活着的” 或是 “失败的”。Leader 会追踪所有 “同步中” 的节点,一旦一个 down 掉了,或是卡住了,或是延时太久,leader 就会把它移除。至于延时多久算是 “太久”,是由参数 “replica、lag、time、max、ms” 决定的,怎样算是卡住了,怎是由参数 “replica、lag、time、max、ms” 决定的。只有当消息被所有的副本加入到日志中时,才算是 “committed”,只有 committed 的消息才会发送给 consumer,这样就不用担心一旦 leader down 掉了消息会丢失。Producer 也可以选择是否等待消息被提交的通知,这个是由参数 acks 决定的。 Kafka 保证只要有一个 “同步中” 的节点,“committed” 的消息就不会丢失。
总结:
一个典型的 Kafka 集群中包含若干 Producer(可以是 web 前端 FET,或者是服务器日志等),若干 broker(Kafka 支持水平扩展,一般 broker 数量越多,集群吞吐率越高),若干 ConsumerGroup,以及一个 Zookeeper 集群。Kafka 通过 Zookeeper 管理 Kafka 集群配置:选举 Kafka broker 的 leader,以及在 Consumer Group 发生变化时进行 rebalance,因为 consumer 消费 kafka topic 的 partition 的 offsite 信息是存在 Zookeeper 的。Producer 使用 push 模式将消息发布到 broker,Consumer 使用 pull 模式从 broker 订阅并消费消息。
分析过程分为以下 4 个步骤:
- topic 中 partition 存储分布
- partiton 中文件存储方式 (partition 在 linux 服务器上就是一个目录(文件夹))
- partiton 中 segment 文件存储结构
- 在 partition 中如何通过 offset 查找 message
喜欢(0) 打赏