Apache Kafka 详解

2021-12-15 15:32:23 浏览数 (1)

Kafka 是基于 发布与订阅消息系统 。它最初由 LinkedIn 公司开发,之后成为 Apache 项目的一部分。Kafka

是一个分布式的,可分区的,冗余备份的持久性的日志服务。它主要用于处理活跃的流式数据。

在大数据系统中,常常会碰到一个问题,整个大数据是由各个子系统组成,数据需要在各个子系统中高性能、低延迟的不停流转。传统的企业消息系统并不是非常适合大规模的数据处理。为了同时搞定在线应用(消息)和离线应用(数据文件、日志),Kafka

就出现了。Kafka 可以起到两个作用:

  • 降低系统组网复杂度。
  • 降低编程复杂度,各个子系统不在是相互协商接口,各个子系统类似插口插在插座上,Kafka 承担高速 数据总线 的作用。

Kafka 的主要特点?

  • 1、同时为发布和订阅提供高吞吐量。据了解,Kafka 每秒可以生产约 25 万消息(50MB),每秒处理 55 万消息(110MB)。
  • 2、可进行持久化操作。将消息持久化到磁盘,因此可用于批量消费,例如 ETL ,以及实时应用程序。通过将数据持久化到硬盘,以及replication ,可以防止数据丢失。
  • 3、分布式系统,易于向外扩展。所有的 Producer、Broker 和Consumer 都会有多个,均为分布式的。并且,无需停机即可扩展机器。
  • 4、消息被处理的状态是在 Consumer 端维护,而不是由 Broker 端维护。当失败时,能自动平衡。

这段是从网络上找来的。感觉想要表达的意思是

* 消息是否被处理完成,是通过 Consumer 提交消费进度给 Broker ,而不是 Broker 消息被 Consumer拉取后,就标记为已消费。 * 当 Consumer 异常崩溃时,可以重新分配消息分区到其它的 Consumer 们,然后继续消费。

  • 5、支持 online 和 offline 的场景。

聊聊 Kafka 的设计要点?

1)吞吐量

高吞吐是 Kafka 需要实现的核心目标之一,为此 kafka 做了以下一些设计:

  • 1、数据磁盘持久化:消息不在内存中 Cache ,直接写入到磁盘,充分利用磁盘的顺序读写性能。

直接使用 Linux 文件系统的 Cache ,来高效缓存数据。

  • 2、zero-copy:减少 IO 操作步骤

采用 Linux Zero-Copy 提高发送性能。

* 传统的数据发送需要发送 4 次上下文切换。 * 采用 sendfile 系统调用之后,数据直接在内核态交换,系统上下文切换减少为 2次。《为什么Kafka这么快》https://www.jianshu.com/p/99cc19dde7df

  • 3、数据批量发送
  • 4、数据压缩
  • 5、Topic 划分为多个 Partition ,提高并行度。

数据在磁盘上存取代价为 O(1)

* Kafka 以 Topic 来进行消息管理,每个 Topic 包含多个 Partition ,每个 Partition 对应一个逻辑 log,有多个 segment 文件组成。 每个 segment 中存储多条消息(见下图),消息 id 由其逻辑位置决定,即从消息 id 可直接定位到消息的存储位置,避免 id 到位置的额外映射。 每个 Partition 在内存中对应一个 index ,记录每个 segment 中的第一条消息偏移。

发布者发到某个 Topic 的消息会被均匀的分布到多个 Partition 上(随机或根据用户指定的回调函数进行分布),Broker 收到发布消息往对应 Partition 的最后一个 segment 上添加该消息。undefined 当某个 segment上 的消息条数达到配置值或消息发布时间超过阈值时,segment上 的消息会被 flush 到磁盘,只有 flush 到磁盘上的消息订阅者才能订阅到,segment 达到一定的大小后将不会再往该 segment 写数据,Broker 会创建新的 segment 文件。

2)负载均衡

  • 1、Producer 根据用户指定的算法,将消息发送到指定的 Partition 中。
  • 2、Topic 存在多个 Partition ,每个 Partition 有自己的replica ,每个 replica 分布在不同的 Broker 节点上。多个Partition 需要选取出 Leader partition ,Leader Partition 负责读写,并由 Zookeeper 负责 fail over 。
  • 3、相同 Topic 的多个 Partition 会分配给不同的 Consumer 进行拉取消息,进行消费。

3)拉取系统

由于 Kafka Broker 会持久化数据,Broker 没有内存压力,因此, Consumer 非常适合采取 pull

的方式消费数据,具有以下几点好处:

  • 1、简化 Kafka 设计。
  • 2、Consumer 根据消费能力自主控制消息拉取速度。
  • 3、Consumer 根据自身情况自主选择消费模式,例如批量,重复消费,从尾端开始消费等。

4)可扩展性

通过 Zookeeper 管理 Broker 与 Consumer 的动态加入与离开。

  • 当需要增加 Broker 节点时,新增的 Broker 会向 Zookeeper 注册,而 Producer 及 Consumer 会根据注册在 Zookeeper 上的 watcher 感知这些变化,并及时作出调整。
  • 当新增和删除 Consumer 节点时,相同 Topic 的多个 Partition 会分配给剩余的 Consumer 们。

Kafka 的架构是怎么样的?

[Kafka

架构图](https://links.jianshu.com/go?to=http://static.iocoder.cn/ac883ce247c1ff31c7cd4244392dcaed)

Kafka 的整体架构非常简单,是分布式架构,Producer、Broker 和Consumer 都可以有多个。

  • Producer,Consumer 实现 Kafka 注册的接口。
  • 数据从 Producer 发送到 Broker 中,Broker 承担一个中间缓存和分发的作用。
  • Broker 分发注册到系统中的 Consumer。Broker 的作用类似于缓存,即活跃的数据和离线处理系统之间的缓存。
  • 客户端和服务器端的通信,是基于简单,高性能,且与编程语言无关的 TCP 协议。

几个重要的基本概念:

  • Topic:特指 Kafka 处理的消息源(feeds of messages)的不同分类。
  • Partition:Topic 物理上的分组(分区),一个 Topic 可以分为多个 Partition 。每个 Partition 都是一个有序的队列。Partition 中的每条消息都会被分配一个有序的 id(offset)。

* replicas:Partition 的副本集,保障 Partition 的高可用。 * leader:replicas 中的一个角色,Producer 和 Consumer 只跟 Leader 交互。 * follower:replicas 中的一个角色,从 leader 中复制数据,作为副本,一旦 leader 挂掉,会从它的followers 中选举出一个新的 leader 继续提供服务。

  • Message:消息,是通信的基本单位,每个 Producer 可以向一个Topic(主题)发布一些消息。
  • Producers:消息和数据生产者,向 Kafka 的一个 Topic 发布消息的过程,叫做 producers 。
  • Consumers:消息和数据消费者,订阅 Topic ,并处理其发布的消息的过程,叫做 consumers 。

Consumer group:每个 Consumer 都属于一个 Consumer group,每条消息只能被 Consumer group 中的一个 Consumer 消费,但可以被多个 Consumer group 消费。

  • Broker:缓存代理,Kafka 集群中的一台或多台服务器统称为 broker 。

Controller:Kafka 集群中,通过 Zookeeper 选举某个 Broker 作为 Controller ,用来进行 leader election 以及 各种 failover 。

  • ZooKeeper:Kafka 通过 ZooKeeper 来存储集群的 Topic、Partition 等元信息等。

单纯角色来说,Kafka 和 RocketMQ 是基本一致的。比较明显的差异是:

RocketMQ 从 Kafka 演化而来。

  • 1、Kafka 使用 Zookeeper 作为命名服务;RocketMQ 自己实现了一个轻量级的 Namesrv 。
  • 2、Kafka Broker 的每个分区都有一个首领分区;RocketMQ 每个分区的“首领”分区,都在 Broker Master 节点上。

RocketMQ 没有首领分区一说,所以打上了引号。

  • 3、Kafka Consumer 使用 poll 的方式拉取消息;RocketMQ Consumer 提供 poll 的方式的同时,封装了一个 push 的方式。

RocketMQ 的 push 的方式,也是基于 poll 的方式的封装。

  • … 当然还有其它 …

Kafka 为什么要将 Topic 进行分区?

为了负载均衡,从而能够水平拓展。

  • Topic 只是逻辑概念,面向的是 Producer 和 Consumer ,而 Partition 则是物理概念。如果 Topic 不进行分区,而将 Topic 内的消息存储于一个 Broker,那么关于该 Topic 的所有读写请求都将由这一个 Broker 处理,吞吐量很容易陷入瓶颈,这显然是不符合高吞吐量应用场景的。
  • 有了 Partition 概念以后,假设一个 Topic 被分为 10 个 Partitions ,Kafka 会根据一定的算法将 10 个 Partition 尽可能均匀的分布到不同的 Broker(服务器)上。
  • 当 Producer 发布消息时,Producer 客户端可以采用 random、key-hash 及轮询等算法选定目标 Partition ,若不指定,Kafka 也将根据一定算法将其置于某一分区上。
  • 当 Consumer 拉取消息时,Consumer 客户端可以采用 Range、轮询 等算法分配 Partition ,从而从不同的 Broker 拉取对应的 Partition 的 leader 分区。

所以,Partiton 机制可以极大的提高吞吐量,并且使得系统具备良好的水平扩展能力。

Kafka 的应用场景有哪些?

[Kafka

的应用场景](https://links.jianshu.com/go?to=http://static.iocoder.cn/3636ff4bd554ee1dfcfb92448073b5b8)

1)消息队列

比起大多数的消息系统来说,Kafka 有更好的吞吐量,内置的分区,冗余及容错性,这让 Kafka

成为了一个很好的大规模消息处理应用的解决方案。消息系统一般吞吐量相对较低,但是需要更小的端到端延时,并常常依赖于 Kafka

提供的强大的持久性保障。在这个领域,Kafka 足以媲美传统消息系统,如 ActiveMQ 或 RabbitMQ 。

2)行为跟踪

Kafka 的另一个应用场景,是跟踪用户浏览页面、搜索及其他行为,以发布订阅的模式实时记录到对应的 Topic

里。那么这些结果被订阅者拿到后,就可以做进一步的实时处理,或实时监控,或放到 Hadoop / 离线数据仓库里处理。

3)元信息监控

作为操作记录的监控模块来使用,即汇集记录一些操作信息,可以理解为运维性质的数据监控吧。

4)日志收集

日志收集方面,其实开源产品有很多,包括 Scribe、Apache Flume 。很多人使用 Kafka 代替日志聚合(log

aggregation)。日志聚合一般来说是从服务器上收集日志文件,然后放到一个集中的位置(文件服务器或 HDFS)进行处理。

然而, Kafka 忽略掉文件的细节,将其更清晰地抽象成一个个日志或事件的消息流。这就让 Kafka

处理过程延迟更低,更容易支持多数据源和分布式数据处理。比起以日志为中心的系统比如 Scribe 或者 Flume 来说,Kafka

提供同样高效的性能和因为复制导致的更高的耐用性保证,以及更低的端到端延迟。

5)流处理

这个场景可能比较多,也很好理解。保存收集流数据,以提供之后对接的 Storm 或其他流式计算框架进行处理。很多用户会将那些从原始 Topic

来的数据进行阶段性处理,汇总,扩充或者以其他的方式转换到新的 Topic 下再继续后面的处理。

例如一个文章推荐的处理流程,可能是先从 RSS 数据源中抓取文章的内容,然后将其丢入一个叫做“文章”的 Topic

中。后续操作可能是需要对这个内容进行清理,比如回复正常数据或者删除重复数据,最后再将内容匹配的结果返还给用户。这就在一个独立的 Topic

之外,产生了一系列的实时数据处理的流程。Strom 和 Samza 是非常著名的实现这种类型数据转换的框架。

6)事件源

事件源,是一种应用程序设计的方式。该方式的状态转移被记录为按时间顺序排序的记录序列。Kafka

可以存储大量的日志数据,这使得它成为一个对这种方式的应用来说绝佳的后台。比如动态汇总(News feed)。

7)持久性日志(Commit Log)

Kafka 可以为一种外部的持久性日志的分布式系统提供服务。这种日志可以在节点间备份数据,并为故障节点数据回复提供一种重新同步的机制。Kafka

中日志压缩功能为这种用法提供了条件。在这种用法中,Kafka 类似于 Apache BookKeeper 项目。

Kafka 消息发送和消费的简化流程是什么?

image.png

  • 1、Producer ,根据指定的 partition 方法(round-robin、hash等),将消息发布到指定 Topic 的 Partition 里面。
  • 2、Kafka 集群,接收到 Producer 发过来的消息后,将其持久化到硬盘,并保留消息指定时长(可配置),而不关注消息是否被消费。
  • 3、Consumer ,从 Kafka 集群 pull 数据,并控制获取消息的 offset 。至于消费的进度,可手动或者自动提交给 Kafka 集群。

1)Producer 发送消息

Producer 采用 push 模式将消息发布到 Broker,每条消息都被 append 到 Patition

中,属于顺序写磁盘(顺序写磁盘效率比随机写内存要高,保障 Kafka 吞吐率)。Producer 发送消息到 Broker

时,会根据分区算法选择将其存储到哪一个 Partition 。

其路由机制为:

  • 1、指定了 Partition ,则直接使用。
  • 2、未指定 Partition 但指定 key ,通过对 key 进行 hash 选出一个 Partition 。
  • 3、Partition 和 key 都未指定,使用轮询选出一个 Partition 。

写入流程:

  • 1、Producer 先从 ZooKeeper 的 "/brokers/.../state" 节点找到该 Partition 的 leader 。注意噢,Producer 只和 Partition 的 leader 进行交互。
  • 2、Producer 将消息发送给该 leader 。
  • 3、leader 将消息写入本地 log 。
  • 4、followers 从 leader pull 消息,写入本地 log 后 leader 发送 ACK 。
  • 5、leader 收到所有 ISR 中的 replica 的 ACK 后,增加 HW(high watermark ,最后 commit 的 offset) 并向 Producer 发送 ACK 。

2)Broker 存储消息

物理上把 Topic 分成一个或多个 Patition,每个 Patition 物理上对应一个文件夹(该文件夹存储该 Patition

的所有消息和索引文件)。

3)Consumer 消费消息

high-level Consumer API 提供了 consumer group 的语义,一个消息只能被 group 内的一个 Consumer

所消费,且 Consumer 消费消息时不关注 offset ,最后一个 offset 由 ZooKeeper 保存(下次消费时,该 group 中的

Consumer 将从 offset 记录的位置开始消费)。

注意:

  • 1、如果消费线程大于 Patition 数量,则有些线程将收不到消息。
  • 2、如果 Patition 数量大于消费线程数,则有些线程多收到多个 Patition 的消息。
  • 3、如果一个线程消费多个 Patition,则无法保证你收到的消息的顺序,而一个 Patition 内的消息是有序的。

Consumer 采用 pull 模式从 Broker 中读取数据。

  • push 模式,很难适应消费速率不同的消费者,因为消息发送速率是由 Broker 决定的。它的目标是尽可能以最快速度传递消息,但是这样很容易造成 Consumer 来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而 pull 模式,则可以根据 Consumer 的消费能力以适当的速率消费消息。
  • 对于 Kafka 而言,pull 模式更合适,它可简化 Broker 的设计,Consumer 可自主控制消费消息的速率,同时 Consumer 可以自己控制消费方式——即可批量消费也可逐条消费,同时还能选择不同的提交方式从而实现不同的传输语义。

Kafka Producer 有哪些发送模式?

Kafka 的发送模式由 Producer 端的配置参数 producer.type来设置。

  • 这个参数指定了在后台线程中消息的发送方式是同步的还是异步的,默认是同步的方式,即 producer.type=sync
  • 如果设置成异步的模式,即 producer.type=async ,可以是 Producer 以 batch 的形式 push 数据,这样会极大的提高 Broker的性能,但是这样会增加丢失数据的风险。
  • 如果需要确保消息的可靠性,必须要将 producer.type设置为 sync 。

对于异步模式,还有 4 个配套的参数,如下:

image.png

  • 以 batch 的方式推送数据可以极大的提高处理效率,Kafka Producer 可以将消息在内存中累计到一定数量后作为一个 batch 发送请求。batch 的数量大小可以通过 Producer 的参数(batch.num.messages)控制。通过增加 batch 的大小,可以减少网络请求和磁盘 IO 的次数,当然具体参数设置需要在效率和时效性方面做一个权衡。
  • 在比较新的版本中还有 batch.size 这个参数。Producer 会尝试批量发送属于同一个 Partition 的消息以减少请求的数量. 这样可以提升客户端和服务端的性能。默认大小是 16348 byte (16k).
    • 发送到 Broker 的请求可以包含多个 batch ,每个 batch 的数据属于同一个 Partition
    • 太小的 batch 会降低吞吐. 太大会浪费内存。

Kafka Consumer 是否可以消费指定的分区消息?

Consumer 消费消息时,向 Broker 发出“fetch”请求去消费特定分区的消息,Consumer

指定消息在日志中的偏移量(offset),就可以消费从这个位置开始的消息,Consumer 拥有了 offset

的控制权,可以向后回滚去重新消费之前的消息,这是很有意义的。

Kafka 的 high-level API 和 low-level API 的区别?

High Level API

  • 屏蔽了每个 Topic 的每个 Partition 的 offset 管理(自动读取Zookeeper 中该 Consumer group 的 last offset)、Broker 失败转移、以及增减 Partition 时 Consumer 时的负载均衡(Kafka 自动进行负载均衡)。
  • 如果 Consumer 比 Partition 多,是一种浪费。一个 Partition 上是不允许并发的,所以 Consumer 数不要大于 Partition 数。

Low Level API

Low-level API 也就是 Simple Consumer API ,实际上非常复杂。

  • API 控制更灵活,例如消息重复读取,消息 offset 跳读,Exactly Once 原语。
  • API 更复杂,offset 不再透明,需要自己管理,Broker 自动失败转移需要处理,增加 Consumer、Partition、Broker 需要自己做负载均衡。

Kafka 的NIO网络通信模型

Kafka的网络通信模型是基于NIO的Reactor多线程模型来设计的。这里先引用Kafka源码中注释的一段话:

An NIO socket server. The threading model isundefined 1 Acceptor thread that handles new connections.undefined Acceptor has N Processor threads that each have their own selector and read requests from sockets.undefined M Handler threads that handle requests and produce responses back to the processor threads for writing.

Kafka的网络通信层模型,主要采用了 1(1个Acceptor线程) N(N个Processor线程) M(M个业务处理线程)

。下面的表格简要的列举了下

线程数

线程名

线程具体说明

1

kafka-socket-acceptor_%x

Acceptor线程,负责监听Client端发起的请求

N

kafka-network-thread_%d

Processor线程,负责对Socket进行读写

M

kafka-request-handler-_%d

Worker线程,处理具体的业务逻辑并生成Response返回

Kafka网络通信层的完整框架图如下图所示:

image

Kafka的网络通信层框架结构有几个重要概念:

(1) Acceptor :1个接收线程,负责监听新的连接请求,同时注册OP_ACCEPT 事件,将新的连接按照 "round robin"

方式交给对应的 Processor 线程处理;

(2) Processor :N个处理器线程,其中每个 Processor 都有自己的 selector,它会向 Acceptor 分配的

SocketChannel 注册相应的 OP_READ 事件,N 的大小由 “num.networker.threads” 决定;

(3) KafkaRequestHandler

:M个请求处理线程,包含在线程池—KafkaRequestHandlerPool内部,从RequestChannel的全局请求队列—requestQueue中获取请求数据并交给KafkaApis处理,M的大小由

“num.io.threads” 决定;

(4) RequestChannel :其为Kafka服务端的请求通道,该数据结构中包含了一个全局的请求队列

requestQueue和多个与Processor处理器相对应的响应队列responseQueue,提供给Processor与请求处理线程KafkaRequestHandler和KafkaApis交换数据的地方。

(5) NetworkClient :其底层是对 Java NIO

进行相应的封装,位于Kafka的网络接口层。Kafka消息生产者对象—KafkaProducer的send方法主要调用NetworkClient完成消息发送;

(6) SocketServer

:其是一个NIO的服务,它同时启动一个Acceptor接收线程和多个Processor处理器线程。提供了一种典型的Reactor多线程模式,将接收客户端请求和处理请求相分离;

(7) KafkaServer :代表了一个Kafka Broker的实例;其startup方法为实例启动的入口;

(8) KafkaApis :Kafka的业务逻辑处理Api,负责处理不同类型的请求;比如 “发送消息”

“获取消息偏移量—offset”“处理心跳请求” 等;

Kafka网络通信层的设计与具体实现

结合Kafka网络通信层的源码来分析其设计与实现,这里主要详细介绍网络通信层的几个重要元素—SocketServer、Acceptor、Processor、RequestChannel和KafkaRequestHandler。本文分析的源码部分均基于Kafka的0.11.0版本。

1、SocketServer

SocketServer是接收客户端Socket请求连接、处理请求并返回处理结果的核心类,Acceptor及Processor的初始化、处理逻辑都是在这里实现的。在KafkaServer实例启动时会调用其startup的初始化方法,会初始化1个

Acceptor和N个Processor线程(每个EndPoint都会初始化,一般来说一个Server只会设置一个端口),其实现如下:

代码语言:txt复制
def startup() {
代码语言:txt复制
    this.synchronized {
代码语言:txt复制
      connectionQuotas = new ConnectionQuotas(maxConnectionsPerIp, maxConnectionsPerIpOverrides)
代码语言:txt复制
      val sendBufferSize = config.socketSendBufferBytes
代码语言:txt复制
      val recvBufferSize = config.socketReceiveBufferBytes
代码语言:txt复制
      val brokerId = config.brokerId
代码语言:txt复制
      var processorBeginIndex = 0
代码语言:txt复制
      // 一个broker一般只设置一个端口
代码语言:txt复制
      config.listeners.foreach { endpoint =>
代码语言:txt复制
        val listenerName = endpoint.listenerName
代码语言:txt复制
        val securityProtocol = endpoint.securityProtocol
代码语言:txt复制
        val processorEndIndex = processorBeginIndex   numProcessorThreads
代码语言:txt复制
        //N 个 processor
代码语言:txt复制
        for (i <- processorBeginIndex until processorEndIndex)
代码语言:txt复制
          processors(i) = newProcessor(i, connectionQuotas, listenerName, securityProtocol, memoryPool)
代码语言:txt复制
        //1个 Acceptor
代码语言:txt复制
        val acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize, brokerId,
代码语言:txt复制
          processors.slice(processorBeginIndex, processorEndIndex), connectionQuotas)
代码语言:txt复制
        acceptors.put(endpoint, acceptor)
代码语言:txt复制
        KafkaThread.nonDaemon(s"kafka-socket-acceptor-$listenerName-$securityProtocol-${endpoint.port}", acceptor).start()
代码语言:txt复制
        acceptor.awaitStartup()
代码语言:txt复制
        processorBeginIndex = processorEndIndex
代码语言:txt复制
      }
代码语言:txt复制
    }
2、Acceptor

Acceptor是一个继承自抽象类AbstractServerThread的线程类。Acceptor的主要任务是监听并且接收客户端的请求,同时建立数据传输通道—SocketChannel,然后以轮询的方式交给一个后端的Processor线程处理(具体的方式是添加socketChannel至并发队列并唤醒Processor线程处理)。

在该线程类中主要可以关注以下两个重要的变量:

(1),nioSelector:通过NSelector.open()方法创建的变量,封装了JAVA NIO Selector的相关操作;

(2),serverChannel:用于监听端口的服务端Socket套接字对象;

下面来看下Acceptor主要的run方法的源码:

代码语言:txt复制
def run() {
代码语言:txt复制
    //首先注册OP_ACCEPT事件
代码语言:txt复制
    serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)
代码语言:txt复制
    startupComplete()
代码语言:txt复制
    try {
代码语言:txt复制
      var currentProcessor = 0
代码语言:txt复制
      //以轮询方式查询并等待关注的事件发生
代码语言:txt复制
      while (isRunning) {
代码语言:txt复制
        try {
代码语言:txt复制
          val ready = nioSelector.select(500)
代码语言:txt复制
          if (ready > 0) {
代码语言:txt复制
            val keys = nioSelector.selectedKeys()
代码语言:txt复制
            val iter = keys.iterator()
代码语言:txt复制
            while (iter.hasNext && isRunning) {
代码语言:txt复制
              try {
代码语言:txt复制
                val key = iter.next
代码语言:txt复制
                iter.remove()
代码语言:txt复制
                if (key.isAcceptable)
代码语言:txt复制
                  //如果事件发生则调用accept方法对OP_ACCEPT事件处理
代码语言:txt复制
                  accept(key, processors(currentProcessor))
代码语言:txt复制
                else
代码语言:txt复制
                  throw new IllegalStateException("Unrecognized key state for acceptor thread.")
代码语言:txt复制
                //轮询算法
代码语言:txt复制
                // round robin to the next processor thread
代码语言:txt复制
                currentProcessor = (currentProcessor   1) % processors.length
代码语言:txt复制
              } catch {
代码语言:txt复制
                case e: Throwable => error("Error while accepting connection", e)
代码语言:txt复制
              }
代码语言:txt复制
            }
代码语言:txt复制
          }
代码语言:txt复制
        }
代码语言:txt复制
       //代码省略
代码语言:txt复制
  }
代码语言:txt复制
  def accept(key: SelectionKey, processor: Processor) {
代码语言:txt复制
    val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
代码语言:txt复制
    val socketChannel = serverSocketChannel.accept()
代码语言:txt复制
    try {
代码语言:txt复制
      connectionQuotas.inc(socketChannel.socket().getInetAddress)
代码语言:txt复制
      socketChannel.configureBlocking(false)
代码语言:txt复制
      socketChannel.socket().setTcpNoDelay(true)
代码语言:txt复制
      socketChannel.socket().setKeepAlive(true)
代码语言:txt复制
      if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
代码语言:txt复制
        socketChannel.socket().setSendBufferSize(sendBufferSize)
代码语言:txt复制
      processor.accept(socketChannel)
代码语言:txt复制
    } catch {
代码语言:txt复制
        //省略部分代码
代码语言:txt复制
    }
代码语言:txt复制
  }
代码语言:txt复制
  def accept(socketChannel: SocketChannel) {
代码语言:txt复制
    newConnections.add(socketChannel)
代码语言:txt复制
    wakeup()
代码语言:txt复制
  }

在上面源码中可以看到,Acceptor线程启动后,首先会向用于监听端口的服务端套接字对象—ServerSocketChannel上注册OP_ACCEPT

事件。然后以轮询的方式等待所关注的事件发生。如果该事件发生,则调用accept()方法对OP_ACCEPT事件进行处理。这里,Processor是通过

round robin 方法选择的,这样可以保证后面多个Processor线程的负载基本均匀。

Acceptor的accept()方法的作用主要如下:

(1)通过SelectionKey取得与之对应的serverSocketChannel实例,并调用它的accept()方法与客户端建立连接;

(2)调用connectionQuotas.inc()方法增加连接统计计数;并同时设置第(1)步中创建返回的socketChannel属性(如sendBufferSize、KeepAlive、TcpNoDelay、configureBlocking等)

(3)将socketChannel交给processor.accept()方法进行处理。这里主要是将socketChannel加入Processor处理器的并发队列newConnections队列中,然后唤醒Processor线程从队列中获取socketChannel并处理。其中,newConnections会被Acceptor线程和Processor线程并发访问操作,所以newConnections是ConcurrentLinkedQueue队列(一个基于链接节点的无界线程安全队列)

3、Processor

Processor同Acceptor一样,也是一个线程类,继承了抽象类AbstractServerThread。其主要是从客户端的请求中读取数据和将KafkaRequestHandler处理完响应结果返回给客户端。在该线程类中主要关注以下几个重要的变量:

(1) newConnections :在上面的 Acceptor

一节中已经提到过,它是一种ConcurrentLinkedQueueSocketChannel类型的队列,用于保存新连接交由Processor处理的socketChannel;

(2) inflightResponses :是一个Map[String,

RequestChannel.Response]类型的集合,用于记录尚未发送的响应;

(3) selector :是一个类型为KSelector变量,用于管理网络连接;

下面先给出Processor处理器线程run方法执行的流程图:

image

从上面的流程图中能够可以看出Processor处理器线程在其主流程中主要完成了这样子几步操作:

(1) 处理newConnections队列中的socketChannel

。遍历取出队列中的每个socketChannel并将其在selector上注册OP_READ事件;

(2) 处理RequestChannel中与当前Processor对应响应队列中的Response

。在这一步中会根据responseAction的类型(NoOpAction/SendAction/CloseConnectionAction)进行判断,若为“NoOpAction”,表示该连接对应的请求无需响应;若为“SendAction”,表示该Response需要发送给客户端,则会通过“selector.send”注册OP_WRITE事件,并且将该Response从responseQueue响应队列中移至inflightResponses集合中;“CloseConnectionAction”,表示该连接是要关闭的;

(3) 调用selector.poll()方法进行处理 。该方法底层即为调用nioSelector.select()方法进行处理。

(4) 处理已接受完成的数据包队列—completedReceives

。在processCompletedReceives方法中调用“requestChannel.sendRequest”方法将请求Request添加至requestChannel的全局请求队列—requestQueue中,等待KafkaRequestHandler来处理。同时,调用“selector.mute”方法取消与该请求对应的连接通道上的OP_READ事件;

(5) 处理已发送完的队列—completedSends

。当已经完成将response发送给客户端,则将其从inflightResponses移除,同时通过调用“selector.unmute”方法为对应的连接通道重新注册OP_READ事件;

(6) 处理断开连接的队列

。将该response从inflightResponses集合中移除,同时将connectionQuotas统计计数减1;

4、RequestChannel

在Kafka的网络通信层中,RequestChannel为Processor处理器线程与KafkaRequestHandler线程之间的数据交换提供了一个数据缓冲区,是通信过程中Request和Response缓存的地方。因此,其作用就是在通信中起到了一个数据缓冲队列的作用。Processor线程将读取到的请求添加至RequestChannel的全局请求队列—requestQueue中;KafkaRequestHandler线程从请求队列中获取并处理,处理完以后将Response添加至RequestChannel的响应队列—responseQueue中,并通过responseListeners唤醒对应的Processor线程,最后Processor线程从响应队列中取出后发送至客户端。

5、KafkaRequestHandler

KafkaRequestHandler也是一种线程类,在KafkaServer实例启动时候会实例化一个线程池—KafkaRequestHandlerPool对象(包含了若干个KafkaRequestHandler线程),这些线程以守护线程的方式在后台运行。在KafkaRequestHandler的run方法中会循环地从RequestChannel中阻塞式读取request,读取后再交由KafkaApis来具体处理。

6、KafkaApis

KafkaApis是用于处理对通信网络传输过来的业务消息请求的中心转发组件。该组件反映出Kafka Broker Server可以提供哪些服务。

Kafka 的数据存储模型是怎么样的?

Kafka 每个 Topic 下面的所有消息都是以 Partition 的方式分布式的存储在多个节点上。同时在 Kafka 的机器上,每个

Partition 其实都会对应一个日志目录,在目录下面会对应多个日志分段(LogSegment)。

下面先介绍一下partition中的segment file的组成:

  • segment file 组成 :由2部分组成,分别为index file和data file,这两个文件是一一对应的,后缀”.index”和”.log”分别表示索引文件和数据文件;
  • segment file 命名规则 :partition的第一个segment从0开始,后续每个segment文件名为上一个segment文件最后一条消息的offset,ofsset的数值最大为64位(long类型),20位数字字符长度,没有数字用0填充。如下图所示:

segment

关于segment file中index与data file对应关系图,这里我们选用网上的一个图片,如下所示:

index

segment的索引文件中存储着大量的元数据,数据文件中存储着大量消息,索引文件中的元数据指向对应数据文件中的message的物理偏移地址。以索引文件中的3,497为例,在数据文件中表示第3个message(在全局partition表示第368772个message),以及该消息的物理偏移地址为497。

注:Partition中的每条message由offset来表示它在这个partition中的偏移量,这个offset并不是该Message在partition中实际存储位置,而是逻辑上的一个值(如上面的3),但它却唯一确定了partition中的一条Message(可以认为offset是partition中Message的id)。

Kafka 的消息格式是怎么样的?

message中的物理结构为:

message

参数说明:

关键字

解释说明

8 byte offset |

在parition(分区)内的每条消息都有一个有序的id号,这个id号被称为偏移(offset),它可以唯一确定每条消息在parition(分区)内的位置。即offset表示partiion的第多少message

4 byte message size | message大小

4 byte CRC32 | 用crc32校验message

1 byte “magic” | 表示本次发布Kafka服务程序协议版本号

1 byte “attributes” | 表示为独立版本、或标识压缩类型、或编码类型

4 byte key length | 表示key的长度,当key为-1时,K byte key字段不填

K byte key | 可选

value bytes payload | 表示实际消息数据

3.3.通过offset查找message

假如我们想要读取offset=368776的message,需要通过下面2个步骤查找。

1). 查找segment file

00000000000000000000.index表示最开始的文件,起始偏移量(offset)为0.第二个文件00000000000000368769.index的消息量起始偏移量为368770

= 368769 1.同样,第三个文件00000000000000737337.index的起始偏移量为737338=737337

1,其他后续文件依次类推,以起始偏移量命名并排序这些文件,只要根据offset 二分查找文件列表,就可以快速定位到具体文件。

当offset=368776时定位到00000000000000368769.index|log

2). 通过segment file查找message

通过第一步定位到segment

file,当offset=368776时,依次定位到00000000000000368769.index的元数据物理位置和00000000000000368769.log的物理偏移地址,然后再通过00000000000000368769.log顺序查找直到offset=368776为止。

segment index

file并没有为数据文件中的每条message建立索引,而是采取稀疏索引存储方式,每隔一定字节的数据建立一条索引,它减少了索引文件大小,通过map可以直接内存操作,稀疏索引为数据文件的每个对应message设置一个元数据指针,它比稠密索引节省了更多的存储空间,但查找起来需要消耗更多的时间。

Kafka高效文件存储设计特点:

  • Kafka把topic中一个parition大文件分成多个小文件段,通过多个小文件段,就容易定期清除或删除已经消费完文件,减少磁盘占用。
  • 通过索引信息可以快速定位message和确定response的最大大小。
  • 通过index元数据全部映射到memory,可以避免segment file的IO磁盘操作。
  • 通过索引文件稀疏存储,可以大幅降低index文件元数据占用空间大小。

为什么不能以 Partition 作为存储单位?

如果就以 Partition 为最小存储单位,可以想象,当 Kafka Producer 不断发送消息,必然会引起 Partition

文件的无限扩张,将对消息文件的维护以及已消费的消息的清理带来严重的影响,因此,需以 segment 为单位将 Partition 进一步细分。

每个 Partition(目录)相当于一个巨型文件,被平均分配到多个大小相等的 segment(段)数据文件中(每个 segment

文件中消息数量不一定相等),这种特性也方便 old segment 的删除,即方便已被消费的消息的清理,提高磁盘的利用率。每个 Partition

只需要支持顺序读写就行,segment 的文件生命周期由服务端配置参数(log.segment.bytes,log.roll.{ms,hours}

等若干参数)决定。

Kafka 的副本机制是怎么样的?

Kafka 的副本机制,是多个 Broker 节点对其他节点的 Topic

分区的日志进行复制。当集群中的某个节点出现故障,访问故障节点的请求会被转移到其他正常节点(这一过程通常叫 Reblance),Kafka

每个主题的每个分区都有一个主副本以及 0 个或者多个副本,副本保持和主副本的数据同步,当主副本出故障时就会被替代。

副本机制

注意哈,下面说的 Leader 指的是每个 Topic 的某个分区的 Leader ,而不是 Broker 集群中的【集群控制器】。

在 Kafka 中并不是所有的副本都能被拿来替代主副本,所以在 Kafka 的Leader 节点中维护着一个 ISR(In sync

Replicas)集合,翻译过来也叫正在同步中集合,在这个集合中的需要满足两个条件:

  • 1、节点必须和 Zookeeper 保持连接。
  • 2、在同步的过程中这个副本不能落后主副本太多。

另外还有个 AR(Assigned Replicas)用来标识副本的全集,OSR 用来表示由于落后被剔除的副本集合,所以公式如下:

  • ISR = Leader 没有落后太多的副本。
  • AR = OSR ISR 。

这里先要说下两个名词:HW 和 LEO 。

  • HW(高水位 HighWatermark),是 Consumer 能够看到的此 Partition 的位置。
  • LEO(logEndOffset),是每个 Partition 的 log 最后一条 Message 的位置。
  • HW 能保证 Leader 所在的 Broker 失效,该消息仍然可以从新选举的Leader 中获取,不会造成消息丢失。

当 Producer 向 Leader 发送数据时,可以通过request.required.acks 参数来设置数据可靠性的级别:

  • 1(默认):这意味着 Producer 在 ISR 中的 Leader 已成功收到的数据并得到确认后发送下一条 message 。如果 Leader 宕机了,则会丢失数据。
  • 0:这意味着 Producer 无需等待来自 Broker 的确认而继续发送下一批消息。这种情况下数据传输效率最高,但是数据可靠性确是最低的。
  • -1:Producer 需要等待 ISR 中的所有 Follower 都确认接收到数据后才算一次发送完成,可靠性最高。但是这样也不能保证数据不丢失,比如当 ISR 中只有 Leader 时(其他节点都和 Zookeeper 断开连接,或者都没追上),这样就变成了 acks=1 的情况。

ZooKeeper 在 Kafka 中起到什么作用?

在基于 Kafka 的分布式消息队列中,ZooKeeper 的作用有:

  • 1、Broker 在 ZooKeeper 中的注册。
  • 2、Topic 在 ZooKeeper 中的注册。
  • 3、Consumer 在 ZooKeeper 中的注册。
  • 4、Producer 负载均衡。主要指的是,Producer 从 Zookeeper 拉取 Topic 元数据,从而能够将消息发送负载均衡到对应 Topic 的分区中。
  • 5、Consumer 负载均衡。
  • 6、记录消费进度 Offset 。Kafka 已推荐将 consumer 的 Offset 信息保存在 Kafka 内部的 Topic 中。
  • 7、记录 Partition 与 Consumer 的关系。

Kafka 如何实现高可用?

  • Zookeeper 部署 2N 1 节点,形成 Zookeeper 集群,保证高可用。
  • Kafka Broker 部署集群。每个 Topic 的 Partition ,基于【副本机制】,在 Broker 集群中复制,形成 replica 副本,保证消息存储的可靠性。每个 replica 副本,都会选择出一个 leader 分区(Partition),提供给客户端(Producer 和 Consumer)进行读写。
  • Kafka Producer 无需考虑集群,因为和业务服务部署在一起。Producer 从 Zookeeper 拉取到 Topic 的元数据后,选择对应的 Topic 的 leader 分区,进行消息发送写入。而 Broker 根据 Producer 的 request.required.acks 配置,是写入自己完成就响应给 Producer 成功,还是写入所有 Broker 完成再响应。这个,就是胖友自己对消息的可靠性的选择。
  • Kafka Consumer 部署集群。每个 Consumer 分配其对应的 Topic Partition ,根据对应的分配策略。并且,Consumer 只从 leader 分区(Partition)拉取消息。另外,当有新的 Consumer 加入或者老的 Consumer 离开,都会将 Topic Partition 再均衡,重新分配给 Consumer 。

注意噢,此处说的都是同一个 Kafka Consumer group 。

总的来说,Kafka 和 RocketMQ 的高可用方式是比较类似的,主要的差异在 Kafka Broker 的副本机制,和 RocketMQ Broker

的主从复制,两者的差异,以及差异带来的生产和消费不同。当然,实际上,都是和“主” Broker 做消息的发送和读取不是!

Kafka 是否会弄丢数据?

消费端弄丢了数据

唯一可能导致消费者弄丢数据的情况,就是说,你消费到了这个消息,然后消费者那边 自动提交了 offset ,让 Kafka

以为你已经消费好了这个消息,但其实你才刚准备处理这个消息,你还没处理,你自己就挂了,此时这条消息就丢咯。

这不是跟 RabbitMQ 差不多吗,大家都知道 Kafka 会自动提交 offset,那么只要 关闭自动提交

offset,在处理完之后自己手动提交 offset,就可以保证数据不会丢。但是此时确实还是 可能会有重复消费 ,比如你刚处理完,还没提交

offset,结果自己挂了,此时肯定会重复消费一次,自己保证幂等性就好了。

生产环境碰到的一个问题,就是说我们的 Kafka 消费者消费到了数据之后是写到一个内存的 queue 里先缓冲一下,结果有的时候,你刚把消息写入内存

queue,然后消费者会自动提交 offset。然后此时我们重启了系统,就会导致内存 queue 里还没来得及处理的数据就丢失了。

[](https://links.jianshu.com/go?to=https://github.com/doocs/advanced-

java/blob/main/docs/high-concurrency/how-to-ensure-the-reliable-

transmission-of-

messages.md#kafka-%E5%BC%84%E4%B8%A2%E4%BA%86%E6%95%B0%E6%8D%AE)Kafka

弄丢了数据

这块比较常见的一个场景,就是 Kafka 某个 broker 宕机,然后重新选举 partition 的 leader。大家想想,要是此时其他的

follower 刚好还有些数据没有同步,结果此时 leader 挂了,然后选举某个 follower 成 leader

之后,不就少了一些数据?这就丢了一些数据啊。

生产环境也遇到过,我们也是,之前 Kafka 的 leader 机器宕机了,将 follower 切换为 leader 之后,就会发现说这个数据就丢了。

所以此时一般是要求起码设置如下 4 个参数:

  • 给 topic 设置 replication.factor 参数:这个值必须大于 1,要求每个 partition 必须有至少 2 个副本。
  • 在 Kafka 服务端设置 min.insync.replicas 参数:这个值必须大于 1,这个是要求一个 leader 至少感知到有至少一个 follower 还跟自己保持联系,没掉队,这样才能确保 leader 挂了还有一个 follower 吧。
  • 在 producer 端设置 acks=all :这个是要求每条数据,必须是 写入所有 replica 之后,才能认为是写成功了
  • 在 producer 端设置 retries=MAX (很大很大很大的一个值,无限次重试的意思):这个是 要求一旦写入失败,就无限重试 ,卡在这里了。

我们生产环境就是按照上述要求配置的,这样配置之后,至少在 Kafka broker 端就可以保证在 leader 所在 broker 发生故障,进行

leader 切换时,数据不会丢失。

[](https://links.jianshu.com/go?to=https://github.com/doocs/advanced-

java/blob/main/docs/high-concurrency/how-to-ensure-the-reliable-

transmission-of-

messages.md#%E7%94%9F%E4%BA%A7%E8%80%85%E4%BC%9A%E4%B8%8D%E4%BC%9A%E5%BC%84%E4%B8%A2%E6%95%B0%E6%8D%AE)生产者会不会弄丢数据?

如果按照上述的思路设置了 acks=all ,一定不会丢,要求是,你的 leader 接收到消息,所有的 follower

都同步到了消息之后,才认为本次写成功了。如果没满足这个条件,生产者会自动不断的重试,重试无限次。

Kafka 如何保证消息的顺序性?

Kafka 本身,并不像 RocketMQ 一样,提供顺序性的消息。所以,提供的方案,都是相对有损的。如下:

这里的顺序消息,我们更多指的是,单个 Partition 的消息,被顺序消费。

方式一,Consumer ,对每个 Partition 内部单线程消费,单线程吞吐量太低,一般不会用这个。

方式二,Consumer ,拉取到消息后,写到 N 个内存 queue,具有相同 key 的数据都到同一个内存 queue 。然后,对于 N

个线程,每个线程分别消费一个内存 queue 即可,这样就能保证顺序性。

这种方式,相当于对【方式一】的改进,将相同 Partition 的消息进一步拆分,保证相同 key 的数据消费是顺序的。

不过这种方式,消费进度的更新会比较麻烦。

当然,实际情况也不太需要考虑消息的顺序性,基本没有业务需要。

Leader 选举

为了保证可靠性,对于任意一条消息,只有它被 ISR 中的所有 follower 都从 leader 复制过去才会被认为已提交,并返回信息给

producer。如此,可以避免因部分数据被写进 leader,而尚未被任何 follower 复制就宕机的情况下而造成数据丢失。对于 producer

而言,它可以选择是否等待消息 commit,这可以通过参数 request.required.acks 来设置。这种机制可以确保:只要 ISR

中有一个或者以上的 follower,一条被 commit 的消息就不会丢失。

问题 1:如何在保证可靠性的前提下避免吞吐量下降?

有一个很重要的问题是当 leader 宕机了,怎样在 follower 中选举出新的 leader,因为 follower 可能落后很多或者直接 crash

了,所以必须确保选择 “最新” 的 follower 作为新的 leader。一个基本的原则就是,如果 leader 挂掉,新的 leader

必须拥有原来的 leader 已经 commit 的所有消息,这不就是 ISR 中副本的特征吗?

但是,存在一个问题,ISR 列表维持多大的规模合适呢?换言之,leader 在一个消息被 commit 前需要等待多少个 follower 确认呢?等待

follower 的数量越多,与 leader 保持同步的 follower 就越多,可靠性就越高,但这也会造成吞吐率的下降。

少数服从多数的选举原则

一种常用的选举 leader 的策略是 “少数服从多数” ,不过,Kafka 并不是采用这种方式。这种模式下,如果有 2f 1 个副本,那么在 commit

之前必须保证有 f 1 个 replica 复制完消息,同时为了保证能正确选举出新的 leader,失败的副本数不能超过 f

个。这种方式有个很大的优势,系统的延迟取决于最快的几台机器,也就是说比如副本数为 3,那么延迟就取决于最快的那个 follower 而不是最慢的那个。

“少数服从多数” 的策略也有一些劣势,为了保证 leader 选举的正常进行,它所能容忍的失败的 follower 数比较少,如果要容忍 1 个

follower 挂掉,那么至少要 3 个以上的副本,如果要容忍 2 个 follower 挂掉,必须要有 5

个以上的副本。也就是说,在生产环境下为了保证较高的容错率,必须要有大量的副本,而大量的副本又会在大数据量下导致性能的急剧下降。这种算法更多用在

ZooKeeper 这种共享集群配置的系统中,而很少在需要大量数据的系统中使用。

Kafka 选举 leader 的策略是怎样的?

实际上,leader 选举的算法非常多,比如 ZooKeeper 的 Zab、Raft 以及 Viewstamped Replication。而 Kafka

所使用的 leader 选举算法更像是微软的 PacificA 算法。

Kafka 在 ZooKeeper 中为每一个 partition 动态的维护了一个 ISR,这个 ISR 里的所有 replica 都与 leader

保持同步,只有 ISR 里的成员才能有被选为 leader

的可能(通过参数配置:unclean.leader.election.enable=false)。在这种模式下,对于 f 1 个副本,一个 Kafka

topic 能在保证不丢失已经 commit 消息的前提下容忍 f

个副本的失败,在大多数使用场景下,这种模式是十分有利的。事实上,对于任意一条消息,只有它被 ISR 中的所有 follower 都从 leader

复制过去才会被认为已提交,并返回信息给 producer,从而保证可靠性。但与 “少数服从多数” 策略不同的是,Kafka ISR

列表中副本的数量不需要超过副本总数的一半,即不需要满足 “多数派” 原则,通常,ISR 列表副本数大于等于 2 即可,如此,便在可靠性和吞吐量方面取得平衡。

极端情况下的 leader 选举策略

前已述及,当 ISR 中至少有一个 follower 时(ISR 包括 leader),Kafka 可以确保已经 commit 的消息不丢失,但如果某一个

partition 的所有 replica 都挂了,自然就无法保证数据不丢失了。这种情况下如何进行 leader 选举呢?通常有两种方案:

  • 等待 ISR 中任意一个 replica 恢复过来,并且选它作为 leader;
  • 选择第一个恢复过来的 replica(并不一定是在 ISR 中)作为leader。

如何选择呢?这就需要在可用性和一致性当中作出抉择。如果一定要等待 ISR 中的 replica 恢复过来,不可用的时间就可能会相对较长。而且如果 ISR

中所有的 replica 都无法恢复了,或者数据丢失了,这个 partition 将永远不可用。

选择第一个恢复过来的 replica 作为 leader,如果这个 replica 不是 ISR 中的 replica,那么,它可能并不具备所有已经

commit 的消息,从而造成消息丢失。默认情况下,Kafka 采用第二种策略,即

unclean.leader.election.enable=true,也可以将此参数设置为 false 来启用第一种策略。

unclean.leader.election.enable 这个参数对于 leader

的选举、系统的可用性以及数据的可靠性都有至关重要的影响。生产环境中应慎重权衡。

0 人点赞