Kafka 是一个完整的消息系统,常用于实时系统中的消息中转和数据持久化。Kafka 集群安装部署依赖于 Zookeeper,本专栏前面文章介绍了 Zookeeper 安装部署及运行,参见 “安装部署 Zookeeper 集群”。本篇继续介绍在相同主机环境下安装部署 Kafka 集群。
一、主机规划
所需安装包:kafka-3.7.0 下表描述了四个节点上分别将会运行的相关进程。简便起见,安装部署过程中所用的命令都使用操作系统的 root 用户执行。
节点 进程 | node1 | node2 | node3 | node4 |
---|---|---|---|---|
Kafka | * | * | * |
二、安装部署 Kafka 集群
1. 设置环境变量
在 node2、node3、node4 执行:
代码语言:javascript复制# 编辑 /etc/profile
vim /etc/profile
添加下面两行:
代码语言:javascript复制export KAFKA_HOME=/root/kafka_2.13-3.7.0/
export PATH=$KAFKA_HOME/bin:$PATH
加载生效:
代码语言:javascript复制source /etc/profile
2. 配置集群中的一个节点
在 node2 上执行下面步骤:
(1)解压
代码语言:javascript复制tar -zxvf kafka_2.13-3.7.0.tgz
(2)创建数据目录
代码语言:javascript复制mkdir $KAFKA_HOME/logs
(3)修改配置文件
代码语言:javascript复制# 备份缺省配置文件
cp $KAFKA_HOME/config/server.properties $KAFKA_HOME/config/server.properties.bak
# 编辑配置文件
vim $KAFKA_HOME/config/server.properties
修改下面的配置项:
代码语言:javascript复制# 修改数据目录
log.dirs=/root/kafka_2.13-3.7.0/logs
# 配置 Zookeeper 连接,/kafka 指定 Kafka 在 Zookeeper 中的根 znode 名称
zookeeper.connect=node1:2181,node2:2181,node3:2181/kafka
# 配置 socket 服务器监听地址
listeners=PLAINTEXT://node2:9092
# 修改副本数,缺省为 1,只适用于测试
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
3. 分发相关目录到其它节点
代码语言:javascript复制scp -r $KAFKA_HOME node3:/root
scp -r $KAFKA_HOME node4:/root
4. 配置集群其它节点
在 node3 上执行:
代码语言:javascript复制# 编辑配置文件
vim $KAFKA_HOME/config/server.properties
修改下面两个配置项:集群中的个节点的 broker.id 不能相同;socket 监听地址中的主机名换成本机的主机名。
代码语言:javascript复制broker.id=1
listeners=PLAINTEXT://node3:9092
在 node4 上执行:
代码语言:javascript复制# 编辑配置文件
vim $KAFKA_HOME/config/server.properties
# 修改两个配置项
broker.id=2
listeners=PLAINTEXT://node4:9092
三. 启动 Kafka 集群
1. 启动服务器
代码语言:javascript复制# 三台都要执行
kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
2. 查看 Kafka 进程
代码语言:javascript复制# node2
[root@vvml-yz-hbase-test~]#jps
1249 DataNode
17219 NodeManager
13059 Jps
1925 JournalNode
12949 Kafka
20844 HRegionServer
15007 QuorumPeerMain
[root@vvml-yz-hbase-test~]#
# node3
[root@vvml-yz-hbase-test~]#jps
32640 Jps
32515 Kafka
5316 QuorumPeerMain
12452 DataNode
13144 JournalNode
30920 HRegionServer
7483 NodeManager
[root@vvml-yz-hbase-test~]#
# node4
[root@vvml-yz-hbase-test~]#jps
8352 NodeManager
19857 NameNode
11154 HRegionServer
10531 ResourceManager
11285 HMaster
19206 DFSZKFailoverController
11992 Kafka
19116 DataNode
12158 Jps
[root@vvml-yz-hbase-test~]#
3. 查看 Kafka 在 Zookeeper 中的 znode
代码语言:javascript复制[root@vvml-yz-hbase-test~]#zkCli.sh -server node1:2181
...
[zk: node1:2181(CONNECTED) 0] ls /
[hadoop-ha, hbase, kafka, rmstore, yarn-leader-election, zookeeper]
[zk: node1:2181(CONNECTED) 1] ls /kafka
[admin, brokers, cluster, config, consumers, controller, controller_epoch, feature, isr_change_notification, latest_producer_id_block, log_dir_event_notification]
[zk: node1:2181(CONNECTED) 2]
四、测试
1. 创建主题
代码语言:javascript复制# 创建单分区三副本主题
kafka-topics.sh --bootstrap-server node2:9092,node3:9092,node4:9092 --create --topic test-1 --partitions 1 --replication-factor 3
# 创建三分区三副本主题
kafka-topics.sh --bootstrap-server node2:9092,node3:9092,node4:9092 --create --topic test-3 --partitions 3 --replication-factor 3
# 列出所有主题
kafka-topics.sh --bootstrap-server node2:9092,node3:9092,node4:9092 --list
# 列出给定主题的详细信息
kafka-topics.sh --bootstrap-server node2:9092,node3:9092,node4:9092 --describe --topic test-1
kafka-topics.sh --bootstrap-server node2:9092,node3:9092,node4:9092 --describe --topic test-3
输出:
代码语言:javascript复制[root@vvml-yz-hbase-test~]#kafka-topics.sh --bootstrap-server node2:9092,node3:9092,node4:9092 --create --topic test-1 --partitions 1 --replication-factor 3
Created topic test-1.
[root@vvml-yz-hbase-test~]#kafka-topics.sh --bootstrap-server node2:9092,node3:9092,node4:9092 --create --topic test-3 --partitions 3 --replication-factor 3
Created topic test-3.
[root@vvml-yz-hbase-test~]#kafka-topics.sh --bootstrap-server node2:9092,node3:9092,node4:9092 --list
kafka-topics.sh --bootstrap-server node2:9092,node3:9092,node4:9092 --describe --topic test-1
test-1
test-3
[root@vvml-yz-hbase-test~]#kafka-topics.sh --bootstrap-server node2:9092,node3:9092,node4:9092 --describe --topic test-1
Topic: test-1 TopicId: K-MFpVyDTBKNRxp2QrxlNA PartitionCount: 1 ReplicationFactor: 3 Configs:
Topic: test-1 Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
[root@vvml-yz-hbase-test~]#kafka-topics.sh --bootstrap-server node2:9092,node3:9092,node4:9092 --describe --topic test-3
Topic: test-3 TopicId: _3TZlR5GR2-sE_P2MsSvvQ PartitionCount: 3 ReplicationFactor: 3 Configs:
Topic: test-3 Partition: 0 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2
Topic: test-3 Partition: 1 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1
Topic: test-3 Partition: 2 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0
[root@vvml-yz-hbase-test~]#
2. 测量 kafka 单分区生产者吞吐量
代码语言:javascript复制kafka-producer-perf-test.sh --topic test-1 --num-records 500000 --record-size 2048 --throughput -1 --producer-props bootstrap.servers=node2:9092,node3:9092,node4:9092 acks=1
kafka-producer-perf-test.sh 是 Kafka 提供的生产者性能测试命令行工具,这里所使用的选项说明:
- num-records:指定发送的消息总数。
- record-size:指定每条消息的字节数,这里假设约为一个 MySQL binlog event 的大小。在 MySQL 中可用 show binlog events 命令查看每个 event 的大小。
- throughput:指定每秒发送的消息数,-1为不限制。
- acks:指定生产者的应答方式,有效值为0、1、all。0表示生产者在成功写入消息之前不会等待任何来自服务器的响应,吞吐量最高,但最可能丢失消息。1表示只要首领节点收到消息,生产者就会收到一个来自服务器的成功响应。all表示只有所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应,最安全但延迟最高。
输出:
代码语言:javascript复制[root@vvml-yz-hbase-test~]#kafka-producer-perf-test.sh --topic test-1 --num-records 500000 --record-size 2048 --throughput -1 --producer-props bootstrap.servers=node2:9092,node3:9092,node4:9092 acks=1
230266 records sent, 46053.2 records/sec (89.95 MB/sec), 276.6 ms avg latency, 527.0 ms max latency.
500000 records sent, 50489.750581 records/sec (98.61 MB/sec), 268.18 ms avg latency, 527.00 ms max latency, 260 ms 50th, 333 ms 95th, 479 ms 99th, 522 ms 99.9th.
[root@vvml-yz-hbase-test~]#
可以看到单分区平均吞吐量约 98.61 MB/S,平均每秒发送 50489 条 2KB 的消息。实际生产环境的硬件配置会比本实验环境高得多,单分区吞吐量通常可达每秒几百兆。而一个高负载的 MySQL 数据库每秒产生的 binlog 通常小于 10MB(https://wxy0327.blog.csdn.net/article/details/121973100#t17 是一个用 tpcc-mysql 压测工具测量 MySQL binlog 日志量的例子),因此通过这个粗略测试得出的结论是单分区可以承载一般的生产数据库负载。
3. 测量 kafka 三分区生产者吞吐量
代码语言:javascript复制kafka-producer-perf-test.sh --topic test-3 --num-records 500000 --record-size 2048 --throughput -1 --producer-props bootstrap.servers=node2:9092,node3:9092,node4:9092 acks=1
输出:
代码语言:javascript复制[root@vvml-yz-hbase-test~]#kafka-producer-perf-test.sh --topic test-3 --num-records 500000 --record-size 2048 --throughput -1 --producer-props bootstrap.servers=node2:9092,node3:9092,node4:9092 acks=1
419552 records sent, 83910.4 records/sec (163.89 MB/sec), 138.7 ms avg latency, 1045.0 ms max latency.
500000 records sent, 87032.201915 records/sec (169.98 MB/sec), 134.71 ms avg latency, 1045.00 ms max latency, 107 ms 50th, 296 ms 95th, 738 ms 99th, 991 ms 99.9th.
[root@vvml-yz-hbase-test~]#
三分区的平均吞吐量约 169.98 MB/S,平均每秒发送 87032 条 2KB 的消息,比单分区提高了 72%。
4. 测量消费者吞吐量
代码语言:javascript复制# 列出当前消费组
kafka-consumer-groups.sh --bootstrap-server node2:9092,node3:9092,node4:9092 --list
# 消费单分区主题
kafka-consumer-perf-test.sh --bootstrap-server node2:9092,node3:9092,node4:9092 --topic test-1 --messages 500000 --reporting-interval 1000 --show-detailed-stats
# 列出当前消费组
kafka-consumer-groups.sh --bootstrap-server node2:9092,node3:9092,node4:9092 --list
# 消费三分区主题
kafka-consumer-perf-test.sh --bootstrap-server node2:9092,node3:9092,node4:9092 --topic test-3 --messages 500000 --reporting-interval 1000 --show-detailed-stats
# 列出当前消费组
kafka-consumer-groups.sh --bootstrap-server node2:9092,node3:9092,node4:9092 --list
# 查看指定消费组详细信息
kafka-consumer-groups.sh --bootstrap-server node2:9092,node3:9092,node4:9092 --describe --group perf-consumer-9066
kafka-consumer-groups.sh --bootstrap-server node2:9092,node3:9092,node4:9092 --describe --group perf-consumer-28422
输出:
代码语言:javascript复制[root@vvml-yz-hbase-test~]#kafka-consumer-groups.sh --bootstrap-server node2:9092,node3:9092,node4:9092 --list
[root@vvml-yz-hbase-test~]#kafka-consumer-perf-test.sh --bootstrap-server node2:9092,node3:9092,node4:9092 --topic test-1 --messages 500000 --reporting-interval 1000 --show-detailed-stats
time, threadId, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
2024-03-11 15:07:50:042, 0, 16.7363, 16.5870, 8569, 8492.5669, 862, 147, 113.8526, 58292.5170
2024-03-11 15:07:51:044, 0, 309.0957, 291.7758, 158257, 149389.2216, 0, 1002, 291.7758, 149389.2216
2024-03-11 15:07:52:045, 0, 680.2051, 370.7386, 348265, 189818.1818, 0, 1001, 370.7386, 189818.1818
[root@vvml-yz-hbase-test~]#kafka-consumer-groups.sh --bootstrap-server node2:9092,node3:9092,node4:9092 --list
perf-consumer-9066
[root@vvml-yz-hbase-test~]#kafka-consumer-perf-test.sh --bootstrap-server node2:9092,node3:9092,node4:9092 --topic test-3 --messages 500000 --reporting-interval 1000 --show-detailed-stats
time, threadId, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
2024-03-11 15:08:08:344, 0, 282.5000, 282.5000, 144640, 144640.0000, 364, 636, 444.1824, 227421.3836
2024-03-11 15:08:09:345, 0, 803.1133, 520.0932, 411194, 266287.7123, 0, 1001, 520.0932, 266287.7123
[root@vvml-yz-hbase-test~]#kafka-consumer-groups.sh --bootstrap-server node2:9092,node3:9092,node4:9092 --list
perf-consumer-28422
perf-consumer-9066
[root@vvml-yz-hbase-test~]#kafka-consumer-groups.sh --bootstrap-server node2:9092,node3:9092,node4:9092 --describe --group perf-consumer-9066
Consumer group 'perf-consumer-9066' has no active members.
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
perf-consumer-9066 test-1 0 500000 500000 0 - - -
[root@vvml-yz-hbase-test~]#kafka-consumer-groups.sh --bootstrap-server node2:9092,node3:9092,node4:9092 --describe --group perf-consumer-28422
Consumer group 'perf-consumer-28422' has no active members.
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
perf-consumer-28422 test-3 0 211342 211342 0 - - -
perf-consumer-28422 test-3 1 211616 211616 0 - - -
perf-consumer-28422 test-3 2 77042 77042 0 - - -
[root@vvml-yz-hbase-test~]#
开始时没有任何消费组。消费单分区主题时,产生了一个消费者组 perf-consumer-9066,用10个消费者线程(kafka-consumer-perf-test.sh 工具缺省值,而且新版本会忽略 --threads 设置的线程数)进行消费,每秒消费 370MB / 189818条 消息。消费三分区主题时,产生了一个消费者组 perf-consumer-28422,每秒消费 520MB / 266287条 消息。
单纯从这两个命令行工具的测试结果看,消费者性能比生产者高 3-4 倍,不可能产生消费延迟。但在实际应用中,一套 Kafka 集群同时提供写和读(生产和消费),双方共享集群资源,比较常见的情况是消费者落后于生产者。后面介绍实时数据同步时,将自建脚本测试压测环境下的消费延迟。