【Kafka系列】(二)Kafka的基本使用

2023-09-19 10:17:01 浏览数 (1)


Kafka 线上集群部署方案怎么做

操作系统

先说结论,Kafka 部署在 Linux 上要比 Windows 和 Mac 上性能高的多,主要是以下几个原因:

  1. 操作系统优化:Linux 操作系统在网络和文件系统性能方面通常比 Windows 和 Mac 更优秀。Linux 内核对网络和磁盘 I/O 的处理更高效,能够更好地利用硬件资源,从而提高 Kafka 的性能。
  2. 文件系统选择:Linux 上常用的文件系统如 ext4、XFS 等对大规模数据处理和高并发读写有更好的支持。而 Windows 上的 NTFS 文件系统在处理大量小文件和高并发读写时性能相对较差。
  3. 网络栈性能:Linux 的网络栈在处理高并发连接和大规模数据传输时表现更出色。Linux 内核对网络协议栈的优化更多,能够更好地处理网络数据包,提高 Kafka 的吞吐量和响应速度。
  4. 硬件资源管理:Linux 操作系统对硬件资源的管理更加灵活和高效。Linux 上的进程调度、内存管理等机制能够更好地利用多核处理器和大内存,提高 Kafka 的并发处理能力。
  5. 社区支持度

I/O 模型

主流的 I/O 模型通常有以下五种类型:

  1. 阻塞 I/O(Blocking I/O):在进行 I/O 操作时,应用程序会被阻塞,直到数据准备好或者操作完成。这种模型是最简单的,但是会导致应用程序的性能下降,因为在等待 I/O 完成时,CPU 无法处理其他任务。
  2. 非阻塞 I/O(Non-blocking I/O):在进行 I/O 操作时,应用程序可以继续执行其他任务,而不会被阻塞。但是,如果数据还没有准备好或者操作还没有完成,应用程序需要不断地轮询来检查状态,这会导致 CPU 的资源浪费。
  3. I/O 多路复用(I/O Multiplexing):通过使用 select、poll 或者 epoll 等系统调用,应用程序可以同时监视多个文件描述符的状态,当任何一个文件描述符准备好进行 I/O 操作时,应用程序就可以进行相应的读写操作。这种模型可以有效地处理多个连接,提高系统的并发性能。
  4. 信号驱动 I/O(Signal-driven I/O):应用程序通过注册信号处理函数,在数据准备好时接收到一个信号,然后进行相应的读写操作。这种模型相比于非阻塞 I/O,减少了轮询的开销,但是仍然需要应用程序不断地检查数据是否准备好。
  5. 异步 I/O(Asynchronous I/O):应用程序发起一个 I/O 操作后,可以继续执行其他任务,当数据准备好或者操作完成时,操作系统会通知应用程序进行相应的读写操作。这种模型是最高效的,因为应用程序不需要进行轮询或者阻塞等待,可以充分利用 CPU 的资源。

你不必详细了解每一种模型的实现细节,通常情况下我们认为后一种模型会比前一种模型要高级,比如 epoll 就比 select 要好,了解到这一程度应该足以应付我们下面的内容了。

说了这么多,I/O 模型与 Kafka 的关系又是什么呢?实际上 Kafka 客户端底层使用了 Java 的 selector,selector 在 Linux 上的实现机制是 epoll,而在 Windows 平台上的实现机制是 select。因此在这一点上将 Kafka 部署在 Linux 上是有优势的,因为能够获得更高效的 I/O 性能

零拷贝

Kafka 在 Linux 上支持零拷贝(Zero-copy)的主要原因是 Linux 操作系统提供了一些特性和系统调用,使得零拷贝成为可能。而 Windows 操作系统在设计上与 Linux 有所不同,因此不直接支持零拷贝。

零拷贝是一种优化技术,可以减少数据在内核空间和用户空间之间的拷贝次数,提高数据传输的效率。在传统的拷贝方式中,数据从磁盘读取到内核缓冲区,然后再从内核缓冲区拷贝到用户空间的应用程序缓冲区,这涉及到多次数据拷贝操作,增加了 CPU 和内存的开销。

在 Linux 上,零拷贝的实现主要依赖以下几个特性和系统调用:

  1. 文件描述符(File Descriptor):Linux 使用文件描述符来表示打开的文件,通过文件描述符可以进行文件的读写操作。
  2. 内核缓冲区(Kernel Buffer):Linux 内核提供了一块内存区域作为内核缓冲区,用于存放从磁盘读取的数据。
  3. sendfile 系统调用:sendfile 系统调用可以在内核空间和用户空间之间直接传输数据,而无需经过用户空间缓冲区。

通过使用 sendfile 系统调用,Kafka 可以直接将数据从磁盘读取到内核缓冲区,然后通过网络发送给消费者,避免了数据在内核空间和用户空间之间的多次拷贝。

而在 Windows 上,没有类似于 Linux 的 sendfile 系统调用,因此无法直接实现零拷贝。在 Windows 上,数据需要经过内核空间和用户空间之间的多次拷贝,导致性能上的损失。

一句话总结一下,在 Linux 部署 Kafka 能够享受到零拷贝技术所带来的快速数据传输特性。

磁盘

先说结论:

  • 追求性价比的公司可以不搭建 RAID,使用普通磁盘组成存储空间即可
  • 使用机械磁盘完全能够胜任 Kafka 线上环境

为什么说 Kafka 可以不搭建 RAID 环境

  1. 分布式架构 :Kafka 采用分布式架构,将消息分散存储在多个 Broker 节点上。每个 Broker 节点都是独立的,它们之间相互复制消息,实现数据的冗余和高可用性。因此,即使某个节点的磁盘发生故障,其他节点仍然可以提供服务,不会导致数据丢失。
  2. 数据复制 :Kafka 使用副本机制来保证数据的可靠性。每个分区都可以配置多个副本,这些副本分布在不同的 Broker 节点上。当消息写入到 Leader 副本后,Kafka 会将消息复制到其他副本,确保数据的冗余存储。如果某个副本所在的磁盘发生故障,Kafka 会自动选择其他副本作为 Leader,保证数据的可用性。
  3. 持久化存储 :Kafka 将消息持久化存储在磁盘上,而不是仅保存在内存中。这样即使 Broker 节点发生故障,消息也不会丢失。Kafka 使用顺序写入的方式将消息追加到磁盘上的日志文件中,这种方式对磁盘的要求相对较低,不需要特别高的磁盘性能。
  4. 水平扩展:Kafka 支持水平扩展,可以通过增加 Broker 节点来提高系统的吞吐量和容量。在扩展过程中,可以选择在新节点上添加磁盘,而不需要对现有节点进行改动。这种方式可以灵活地根据需求来调整磁盘的配置和容量。

为什么说使用机械磁盘完全能够胜任 Kafka 线上环境

Kafka 是一个高吞吐量、低延迟的分布式消息系统,它的性能和稳定性对于线上环境非常重要。虽然 SSD(固态硬盘)在性能方面有明显的优势,但机械磁盘仍然可以胜任 Kafka 线上环境的原因如下:

  1. 顺序写入:Kafka 的特点之一是顺序写入,即消息按照顺序追加到日志文件中。机械磁盘在顺序写入方面的性能表现通常比较好,因为它们具有较大的磁道和扇区,可以更好地支持连续写入操作。
  2. 容量成本低:相比于 SSD,机械磁盘的容量成本更低。对于大规模的 Kafka 集群,存储成本是一个重要的考虑因素。机械磁盘可以提供更大的存储容量,使得 Kafka 能够存储更多的消息数据。
  3. 持久性和可靠性:机械磁盘通常具有更高的持久性和可靠性。它们在写入数据时,通常会将数据缓存在磁盘的缓存区中,然后再进行持久化写入。这种写入方式可以提供更好的数据可靠性,即使在断电等异常情况下,数据也不容易丢失。
  4. 成熟稳定:机械磁盘是一种成熟的存储设备,已经在各种应用场景中得到广泛应用。相比之下,SSD 虽然在性能方面有优势,但在稳定性和寿命方面可能存在一些问题。对于对数据可靠性要求较高的线上环境,机械磁盘可能更受信任。

需要注意的是,尽管机械磁盘可以胜任 Kafka 线上环境,但在某些特定场景下,如对延迟要求非常高的应用,或者对存储容量要求非常大的应用,可能需要考虑使用 SSD 等更高性能的存储设备。此外,随着技术的发展,未来可能会有更多新型存储设备出现,对于 Kafka 的存储需求可能会有不同的选择。

磁盘容量

总结下磁盘容量需要考虑的因素:

  • 新增消息数
  • 消息留存时间
  • 平均消息大小
  • 备份数
  • 是否启用压缩

我举一个简单的例子来说明该如何思考这个问题。假设你所在公司有个业务每天需要向 Kafka 集群发送 1 亿条消息,每条消息保存两份以防止数据丢失,另外消息默认保存两周时间。现在假设消息的平均大小是 1KB,那么你能说出你的 Kafka 集群需要为这个业务预留多少磁盘空间吗?

我们来计算一下:每天 1 亿条 1KB 大小的消息,保存两份且留存两周的时间,那么总的空间大小就等于 1 亿 * 1KB * 2 / 1000 / 1000 = 200GB。一般情况下 Kafka 集群除了消息数据还有其他类型的数据,比如索引数据等,故我们再为这些数据预留出 10% 的磁盘空间,因此总的存储容量就是 220GB。既然要保存两周,那么整体容量即为 220GB _ 14,大约 3TB 左右。Kafka 支持数据的压缩,假设压缩比是 0.75,那么最后你需要规划的存储空间就是 0.75 _ 3 = 2.25TB

带宽

举个例子,我们使用以下假设和计算:

  1. 带宽资源:假设机房环境是千兆网络(1Gbps),即每秒处理 1Gb 的数据。
  2. 带宽利用率:假设 Kafka 服务器最多使用 70%的带宽资源,即每秒最多使用 700Mb 的带宽。
  3. 预留资源:为了避免网络丢包,我们额外预留了 2/3 的带宽资源,即单台服务器使用带宽 240Mbps。
  4. 数据处理目标:在 1 小时内处理 1TB 的业务数据,即每秒需要处理 2336Mb 的数据。

根据以上计算,我们得出了需要 10 台 Kafka 服务器来完成这个业务目标。这个计算还没有考虑到消息的复制,如果消息需要额外复制两份,那么总的服务器台数还要乘以 3,即需要 30 台服务器。

在实际部署中,你可以根据自己的网络环境和业务需求进行调整和优化。

另外,如果你的环境中还涉及跨机房传输,那么带宽资源的瓶颈可能会更加明显。在跨机房传输的情况下,网络延迟和带宽限制都会对性能产生影响。你可能需要考虑使用更高带宽的网络或者采取其他优化措施来解决这个问题。

总结起来,对于带宽资源的规划,你需要考虑以下几个因素:

  1. 网络带宽:根据网络环境确定每秒处理的数据量。
  2. 带宽利用率:根据实际情况确定 Kafka 服务器使用的带宽比例。
  3. 预留资源:为了避免网络丢包,额外预留一部分带宽资源。
  4. 数据处理目标:根据业务需求确定每秒需要处理的数据量。
  5. 消息复制:如果消息需要复制,考虑复制的数量。

根据以上因素,你可以计算出所需的 Kafka 服务器数量,并根据实际情况进行调整和优化。

小结

集群参数配置

静态参数和动态参数

静态参数是指在 Kafka 启动时配置的参数,一旦设置后,只能通过重启 Kafka 来更改。这些参数通常是对 Kafka 整体行为的全局设置,例如 Kafka 的监听端口、日志目录、副本数量等。静态参数的配置通常在 Kafka 的配置文件(如 server.properties)中进行。

动态参数是指在 Kafka 运行时可以动态修改的参数,而无需重启 Kafka。这些参数通常是对 Kafka 的某个特定组件或功能进行细粒度的调整。动态参数可以通过 Kafka 的命令行工具或 API 进行修改。

Broker

磁盘相关

在 Kafka 中,Broker 是消息队列的核心组件,负责接收、存储和转发消息。为了配置存储信息,我们需要设置一些重要的参数。

  1. log.dirs:这是一个非常重要的参数,用于指定 Broker 使用的文件目录路径。这个参数没有默认值,因此必须由用户自己指定。在生产环境中,建议为 log.dirs 配置多个路径,以提高读写性能和实现故障转移。具体格式是一个 CSV 格式,多个路径之间用逗号分隔,例如:/home/kafka1,/home/kafka2,/home/kafka3。如果有条件的话,最好将这些目录挂载到不同的物理磁盘上,以提高性能和可靠性。
  2. log.dir:这是 log.dirs 的补充参数,用于指定单个路径。在实际使用中,我们只需要设置 log.dirs 参数即可,不需要设置 log.dir。

为什么要为 log.dirs 配置多个路径呢?这是因为多块物理磁盘同时读写数据可以提高吞吐量,同时也能实现故障转移。在 Kafka 1.1 版本之前,如果 Broker 使用的任何一块磁盘挂掉了,整个 Broker 进程都会关闭。但是从 Kafka 1.1 版本开始,引入了 Failover 功能,坏掉的磁盘上的数据会自动转移到其他正常的磁盘上,Broker 仍然可以正常工作。这个改进使得我们不再依赖 RAID 来提供数据的可靠性,而是通过多块磁盘的故障转移来实现。

需要注意的是,如果使用了多个路径,Kafka 会根据一定的策略将消息分配到不同的路径上,以实现负载均衡。同时,Kafka 也会自动管理磁盘空间,当某个路径的磁盘空间不足时,会自动将消息转移到其他路径上。

总结一下,为了配置存储信息,我们需要设置 log.dirs 参数,为其配置多个路径,最好挂载到不同的物理磁盘上。这样可以提高读写性能和实现故障转移。同时,Kafka 会自动管理磁盘空间和实现负载均衡。这些配置可以在 Kafka 的配置文件中进行设置。

Zookeeper 相关

ZooKeeper 是一个分布式协调框架,用于协调和管理 Kafka 集群的元数据信息。它负责保存 Kafka 集群的配置信息,例如 Broker 的运行状态、Topic 的创建情况、分区信息以及 Leader 副本的位置等。

在 Kafka 中,与 ZooKeeper 相关的最重要的参数是zookeeper.connect。这个参数是一个 CSV 格式的字符串,用于指定连接到 ZooKeeper 集群的地址和端口。例如,zk1:2181,zk2:2181,zk3:2181表示连接到三个 ZooKeeper 节点,默认端口为 2181。

如果要让多个 Kafka 集群共享同一个 ZooKeeper 集群,可以使用chroot参数来进行区分。chroot是 ZooKeeper 的概念,类似于别名。假设有两个 Kafka 集群,分别命名为 kafka1 和 kafka2,那么可以将zookeeper.connect参数设置为zk1:2181,zk2:2181,zk3:2181/kafka1zk1:2181,zk2:2181,zk3:2181/kafka2。这样就可以通过chroot来区分不同的 Kafka 集群。

需要注意的是,chroot只需要在参数中指定一次,并且应该添加到最后。有时候会遇到这样的错误格式:zk1:2181/kafka1,zk2:2181/kafka2,zk3:2181/kafka3,这是不正确的。

总结一下,设置与 ZooKeeper 相关的参数时,需要注意以下几点:

  1. zookeeper.connect参数是一个 CSV 格式的字符串,用于指定连接到 ZooKeeper 集群的地址和端口。
  2. 如果多个 Kafka 集群共享同一个 ZooKeeper 集群,可以使用 chroot参数来进行区分。
  3. chroot参数只需要在参数中指定一次,并且应该添加到最后。

下面是一个示例配置:

代码语言:javascript复制
zookeeper.connect=zk1:2181,zk2:2181,zk3:2181/kafka1

这个配置表示连接到三个 ZooKeeper 节点,并使用kafka1作为chroot

Broker 相关

在 Kafka 中,listeners 参数用于指定外部连接者通过什么协议访问 Kafka 服务。它是一个逗号分隔的三元组列表,每个三元组由协议名称、主机名和端口号组成。协议名称可以是标准的协议,如 PLAINTEXT 表示明文传输,SSL 表示使用 SSL 或 TLS 加密传输等,也可以是自定义的协议名称。

举个例子,如果你定义了一个名为 CONTROLLER 的自定义协议,你可以在 listeners 参数中添加 CONTROLLER://localhost:9092,表示该协议通过 localhost 的 9092 端口进行通信。

需要注意的是,如果你自定义了协议名称,你还需要通过 listener.security.protocol.map 参数告诉 Kafka 使用哪种安全协议。比如,如果你定义了 CONTROLLER 协议,并且该协议使用明文传输数据,你需要设置 listener.security.protocol.map=CONTROLLER:PLAINTEXT

另外,主机名和端口号比较直观,不需要过多解释。但是需要注意的是,建议在 Broker 端和客户端应用的配置中都使用主机名而不是 IP 地址。因为在 Kafka 的源代码中,也是使用主机名进行连接的。如果你在某些地方使用了 IP 地址进行连接,可能会导致连接失败的问题。

总结一下,listeners 参数用于指定 Kafka 服务的监听器,告诉外部连接者通过什么协议访问 Kafka。它是一个三元组列表,每个三元组由协议名称、主机名和端口号组成。建议使用主机名而不是 IP 地址进行配置。如果使用自定义协议,还需要通过 listener.security.protocol.map参数指定安全协议。

Topic 相关

auto.create.topics.enable

该参数用于控制是否允许自动创建 Topic。建议将该参数设置为 false,即不允许自动创建 Topic。

在线上环境中,如果该参数被设置为 true,可能会导致出现很多名字稀奇古怪的 Topic。例如,当我们想要为名为 test 的 Topic 发送事件时,由于拼写错误将 test 写成了 tst,启动生产者程序后,一个名为 tst 的 Topic 就会被自动创建。这种情况下,好的运维应该防止这种情况的发生,特别是对于大公司而言,每个部门被分配的 Topic 应该由运维严格把控,不允许自行创建任何 Topic。

unclean.leader.election.enable

该参数用于控制是否允许 Unclean Leader 选举。Unclean Leader 选举是指在 Kafka 中,当保存数据较多的副本都挂掉时,是否允许从保存数据较少的副本中选举出新的 Leader。

在 Kafka 中,每个分区都有多个副本来提供高可用性,其中只有一个副本对外提供服务,即 Leader 副本。只有保存数据较多的副本才有资格竞选 Leader,而那些落后进度太多的副本没有资格竞选。

如果设置unclean.leader.election.enable为 false,那么 Kafka 将坚持之前的原则,坚决不允许那些落后太多的副本竞选 Leader。这样做的后果是该分区将不可用,因为没有 Leader。

如果设置unclean.leader.election.enable为 true,那么 Kafka 允许从那些保存数据较少的副本中选举出新的 Leader。这样做的后果是数据有可能丢失,因为这些副本保存的数据本来就不全,当成为 Leader 后,它本身就变得膨胀了,认为自己的数据才是权威的。

需要注意的是,该参数在最新版的 Kafka 中默认为 false。但是由于社区对该参数的默认值进行了多次更改,所以建议在使用时显式地将其设置为 false。

auto.leader.rebalance.enable

该参数用于控制是否允许 Kafka 定期进行 Leader 选举。建议将该参数设置为 false。

设置auto.leader.rebalance.enable为 true 表示允许 Kafka 定期对一些 Topic 分区进行 Leader 重选举。需要满足一定的条件才会触发 Leader 重选举。

unclean.leader.election.enable参数不同的是,auto.leader.rebalance.enable并不是选举新的 Leader,而是更换现有的 Leader。例如,如果 Leader A 一直表现良好,但是当auto.leader.rebalance.enable为 true 时,经过一段时间后,Leader A 可能会被强制卸任,换成 Leader B。

需要注意的是,Leader 的更换代价很高。原本向 Leader A 发送请求的所有客户端都需要切换成向 Leader B 发送请求。而且这种 Leader 的更换本质上没有任何性能收益。

因此,在生产环境中,建议将auto.leader.rebalance.enable设置为 false,避免不必要的 Leader 更换。

数据留存方面

在 Kafka 中,有一组参数用于控制数据的留存。下面我将逐个介绍这些参数。

  1. log.retention.{hours|minutes|ms}:这是一组参数,用于控制消息数据在 Kafka 中保存的时间。这三个参数分别是以小时(hours)、分钟(minutes)和毫秒(ms)为单位的时间间隔。优先级上,ms 设置最高,minutes 次之,hours 最低。通常情况下,我们会设置较长的时间间隔,比如 log.retention.hours=168 表示默认保存 7 天的数据,自动删除 7 天前的数据。如果将 Kafka 用作存储系统,那么这个值可能需要相应调大。
  2. log.retention.bytes:这个参数用于指定 Broker 在磁盘上保存的消息数据的总容量大小。默认值为-1,表示没有容量限制,即可以保存任意大小的数据。这个参数在构建云上的多租户 Kafka 集群时发挥作用。假设你要提供一个云上的 Kafka 服务,每个租户只能使用 100GB 的磁盘空间,为了避免某个租户占用过多的磁盘空间,设置这个参数就非常重要了。
  3. message.max.bytes:这个参数用于控制 Broker 能够接收的最大消息大小。默认值为 1000012,即不到 1MB。然而,在实际场景中,超过 1MB 的消息是很常见的。因此,在生产环境中,将这个值设置得比较大是比较保险的做法。这个参数只是一个标尺,仅仅衡量 Broker 能够处理的最大消息大小,即使设置得大一点也不会占用太多磁盘空间。

需要注意的是,这些参数都是可配置的,可以根据实际需求进行调整。在配置文件中,可以通过设置对应的属性来修改这些参数的值。例如,可以在server.properties文件中添加以下配置来修改log.retention.hours参数的值:

代码语言:javascript复制
log.retention.hours=168

这样就将消息数据的保存时间设置为 7 天。

总结一下,这些参数在 Kafka 中起到了重要的作用,可以根据实际需求来调整,以满足不同的业务场景。

Topic 级别参数

Topic 级别的参数在 Kafka 中非常重要,它允许我们为每个 Topic 设置特定的参数值,这些参数会覆盖全局 Broker 参数的值。这样做的好处是可以根据不同的业务需求,为不同的 Topic 设置不同的参数,提高系统的灵活性和效率。

下面我将详细介绍几个重要的 Topic 级别参数,按照用途分组。

  1. 保存消息方面的参数:
    • retention.ms:规定了该 Topic 消息被保存的时长。默认值是 7 天,即该 Topic 只保存最近 7 天的消息。如果设置了这个值,它会覆盖 Broker 端的全局参数值。通过设置不同的 retention.ms 值,我们可以根据业务需求来控制消息的保存时长,避免无效的数据占用过多的存储空间。
    • retention.bytes:规定了为该 Topic 预留的磁盘空间大小。和全局参数的作用类似,这个值在多租户的 Kafka 集群中非常有用。默认值是-1,表示可以无限使用磁盘空间。通过设置不同的 retention.bytes 值,我们可以根据不同的 Topic 的数据量来合理分配磁盘空间,避免存储空间不足的问题。
  2. 处理消息大小方面的参数:
    • max.message.bytes:决定了 Kafka Broker 能够正常接收该 Topic 的最大消息大小。在很多公司中,Kafka 作为基础架构组件运行,承载了大量的业务数据。如果在全局层面上无法给出一个合适的最大消息值,那么允许不同的业务部门自行设定 Topic 级别的 max.message.bytes参数就显得非常必要了。通过设置不同的 max.message.bytes值,我们可以根据不同的业务需求来控制消息的大小,确保系统能够正常处理各种大小的消息。

通过设置 Topic 级别的参数,我们可以根据不同的业务需求来灵活地调整 Kafka 的配置,提高系统的性能和可用性。同时,这也是 Kafka 作为一个高性能分布式消息系统的重要特性之一。

需要注意的是,Topic 级别的参数只对该 Topic 中的消息生效,不会影响其他 Topic。如果没有为某个 Topic 设置特定的参数值,那么将会使用全局 Broker 参数的默认值。

除了上述介绍的参数,Kafka 还有其他一些 Topic 级别的参数,如cleanup.policycompression.type等,它们都可以根据具体的业务需求进行设置。在实际应用中,我们可以根据不同的场景和需求,灵活地使用这些参数来优化 Kafka 集群的性能和可靠性。

总结一下,Topic 级别的参数允许我们为每个 Topic 设置特定的参数值,覆盖全局 Broker 参数的值。通过设置不同的参数值,我们可以根据业务需求来控制消息的保存时长、磁盘空间使用和消息大小等,提高系统的灵活性和效率。这是 Kafka 作为一个高性能分布式消息系统的重要特性之一。

在 Kafka 中,可以通过两种方式来设置 Topic 级别的参数:在创建 Topic 时设置和修改已存在的 Topic 时设置。

1. 创建 Topic 时设置参数

在创建 Topic 时,可以通过--config参数来设置 Topic 级别的参数。例如,我们要创建一个名为transaction的 Topic,并设置retention.ms为 15552000000,max.message.bytes为 5242880,可以使用以下命令:

代码语言:javascript复制
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic transaction --partitions 1 --replication-factor 1 --config retention.ms=15552000000 --config max.message.bytes=5242880

在上述命令中,--config后面的参数用于指定要设置的 Topic 级别参数。

2. 修改已存在的 Topic 时设置参数

可以使用kafka-configs命令来修改已存在的 Topic 的参数。假设我们要将transaction Topic 的max.message.bytes修改为 10485760,可以使用以下命令:

代码语言:javascript复制
bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name transaction --alter --add-config max.message.bytes=10485760

在上述命令中,--entity-type topics表示要修改的实体类型为 Topic,--entity-name transaction表示要修改的 Topic 名称,--alter表示要进行修改操作,--add-config后面的参数用于指定要修改的 Topic 级别参数及其新值。

个人建议

个人建议始终坚持使用第二种方式来设置 Topic 级别参数,并且在未来,Kafka 社区很有可能统一使用kafka-configs脚本来调整 Topic 级别参数。这样做的好处是统一了设置参数的方式,减少了学习成本和混淆,同时也更加方便管理和维护。

JVM 参数

JVM 参数对于 Kafka 集群的性能和稳定性非常重要。在设置 JVM 参数之前,首先需要确定 Java 版本。对于 Kafka 来说,不推荐在 Java 6 或 7 的环境上运行,建议至少使用 Java 8。

在 JVM 参数设置中,堆大小是一个关键参数。尽管后面我们还会讨论如何调优 Kafka 性能的问题,但是现在我想给出一个通用的建议:将 JVM 堆大小设置为 6GB,这是目前业界普遍认可的一个合理值。很多人使用默认的堆大小来运行 Kafka,但是默认的 1GB 有点小,因为 Kafka Broker 在与客户端进行交互时会在 JVM 堆上创建大量的 ByteBuffer 实例,堆大小不能太小。

另一个重要的 JVM 参数是垃圾回收器(GC)的设置。如果你仍在使用 Java 7,可以根据以下规则选择合适的垃圾回收器:如果 Broker 所在机器的 CPU 资源非常充裕,建议使用 CMS(Concurrent Mark Sweep)收集器,启用方法是指定-XX: UseConcMarkSweepGC。否则,使用吞吐量收集器,启用方法是指定-XX: UseParallelGC。如果你使用 Java 8,可以手动设置使用 G1(Garbage First)收集器。在没有任何调优的情况下,G1 表现要比 CMS 更出色,主要体现在更少的 Full GC 和需要调整的参数更少等方面,所以使用 G1 就可以了。

现在我们确定了要设置的 JVM 参数,接下来我们来为 Kafka 进行设置。奇怪的是,这个问题在 Kafka 官网上居然没有被提及。实际上,设置的方法非常简单,你只需要设置下面这两个环境变量即可:

  • KAFKA_HEAP_OPTS:指定堆大小。例如,你可以这样设置: export KAFKA_HEAP_OPTS="-Xms6g -Xmx6g"
  • KAFKA_JVM_PERFORMANCE_OPTS:指定 GC 参数。例如,你可以这样设置: export KAFKA_JVM_PERFORMANCE_OPTS="-server -XX: UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX: ExplicitGCInvokesConcurrent -Djava.awt.headless=true"

在启动 Kafka Broker 之前,先设置好这两个环境变量,然后执行启动命令,例如:

代码语言:javascript复制
$ export KAFKA_HEAP_OPTS="-Xms6g -Xmx6g"
$ export KAFKA_JVM_PERFORMANCE_OPTS="-server -XX: UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX: ExplicitGCInvokesConcurrent -Djava.awt.headless=true"
$ bin/kafka-server-start.sh config/server.properties

这样就完成了 JVM 参数的设置。通过合理的设置,可以提高 Kafka 集群的性能和稳定性。需要注意的是,具体的参数设置可能因环境和需求而有所不同,可以根据实际情况进行调整。

操作系统参数

Kafka 集群通常需要设置一些操作系统参数来优化性能和稳定性。下面是一些常见的操作系统参数设置:

  1. 文件描述符限制(ulimit -n):文件描述符是操作系统用于跟踪打开文件的标识符。Kafka 集群需要同时打开大量的文件描述符,因此需要增加文件描述符限制。默认情况下,操作系统的文件描述符限制较低,可能会导致 Kafka 进程无法打开足够的文件描述符,从而影响性能。建议将文件描述符限制设置为一个较大的值,例如 ulimit -n 1000000。
  2. 文件系统类型的选择:Kafka 集群的性能和稳定性受到文件系统的影响。根据官方测试报告,XFS 文件系统的性能要优于 ext4 文件系统。因此,在生产环境中最好选择 XFS 文件系统。最近也有一些关于 Kafka 使用 ZFS 文件系统的报告,显示其性能更强劲,如果条件允许,可以尝试使用 ZFS 文件系统。
  3. Swap 的调优:Swap 是操作系统用于将内存中不常用的数据暂时存储在磁盘上的机制。一些文章建议将 Swap 设置为 0,完全禁用 Swap,以防止 Kafka 进程使用 Swap 空间。然而,个人认为最好不要将 Swap 设置为 0,而是设置为一个较小的值。这是因为当物理内存耗尽时,操作系统会触发 OOM killer 组件,随机选择一个进程并杀死它,而不给用户任何预警。如果将 Swap 设置为一个较小的值,当开始使用 Swap 空间时,你至少能观察到 Broker 性能的急剧下降,从而有时间进行进一步的调优和问题诊断。建议将 swappiness 设置为接近 0 但不为 0 的值,例如 1。
  4. 提交时间(Flush 落盘时间):Kafka 发送数据时,并不需要等待数据被写入磁盘才认为成功,只要数据被写入操作系统的页缓存(Page Cache)即可。操作系统会根据 LRU 算法定期将页缓存上的“脏”数据写入物理磁盘。提交时间决定了这个定期的间隔,默认为 5 秒。通常情况下,这个时间间隔可能太频繁,可以适当增加提交时间间隔来降低物理磁盘的写操作。需要注意的是,如果数据在写入磁盘之前发生机器宕机,数据将会丢失。但由于 Kafka 在软件层面提供了多副本的冗余机制,因此可以适当增加提交时间间隔以换取性能。

需要注意的是,以上参数设置是一般情况下的建议,具体的设置还需要根据实际情况和硬件配置进行调整。另外,不同的操作系统和版本可能会有不同的参数设置方式,请参考相应的操作系统文档或官方建议进行设置。

下面是一个示例,展示如何在 Linux 系统上设置 ulimit -n 参数:

代码语言:javascript复制
# 查看当前文件描述符限制
ulimit -n

# 修改文件描述符限制为 1000000
ulimit -n 1000000

# 验证修改是否生效
ulimit -n

请注意,以上示例仅适用于 Linux 系统,其他操作系统可能有不同的设置方式。

参考资料

[1]

首发博客地址: https://blog.zysicyj.top/

[2]

文章更新计划: https://blog.zysicyj.top/update_plan/

[3]

系列文章地址: https://blog.zysicyj.top/categories/技术文章/后端技术/系列文章/Kafka/

0 人点赞