Kafka常用命令归纳

2024-09-30 09:42:38 浏览数 (5)

一. 日常Topic操作

这里的命令以kafka2.2之后版本进行说明,社区推荐命令指定 --bootstrap-server参数,受kafka安全认证体系的约束,如果使用 --zookeeper 会绕过 Kafka 的安全体系。

1. 创建topic

代码语言:javascript复制
bin/kafka-topics.sh --bootstrap-server broker_host:port --create --topic my_topic_name  --partitions 1 --replication-factor 1

2. 查看所有topic列表

代码语言:javascript复制
bin/kafka-topics.sh --bootstrap-server broker_host:port --list

3. 查看某个特定topic

代码语言:javascript复制
bin/kafka-topics.sh --bootstrap-server broker_host:port --describe --topic <topic_name>

4. 增加topic分区数

代码语言:javascript复制
bin/kafka-topics.sh --bootstrap-server broker_host:port --alter --topic <topic_name> --partitions <新分区数>

5. 动态修改主题参数

以 max.message.bytes为例

5.1 增加指定broker的配置

代码语言:javascript复制
bin/kafka-configs.sh --bootstrap-server broker_host:port --entity-type topics --entity-name <topic_name> --alter --add-config max.message.bytes=10485760

eg:bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name testInfoTopic --alter --add-config max.message.bytes=128000
查看topic修改情况
bin/kafka-topics.sh  --bootstrap-server localhost:9092 --describe --topic testInfoTopic 

Topic: testInfoTopic    TopicId: KzPy24fVSsCR03ZOYRzq8g PartitionCount: 3       ReplicationFactor: 1    Configs: max.message.bytes=128000,unclean.leader.election.enable=false
        Topic: testInfoTopic    Partition: 0    Leader: 0       Replicas: 0     Isr: 0
        Topic: testInfoTopic    Partition: 1    Leader: 2       Replicas: 2     Isr: 2
        Topic: testInfoTopic    Partition: 2    Leader: 1       Replicas: 1     Isr: 1

zookeeper 查看修改后内容

代码语言:javascript复制
./zookeeper-shell.sh localhost:2181

> get /config/topics/testInfoTopic
{"version":1,"config":{"max.message.bytes":"128000"}}

5.2 删除指定broker的配置

代码语言:javascript复制
bin/kafka-configs.sh --bootstrap-server broker_host:port --entity-type topics --entity-name my_topic_name --alter --delete-config max.message.bytes

6. 修改主题限速

限制某个主题副本在执行副本同步机制时,带宽消耗不要过多(不得占用超过 100MBps)

--entity-name 就是 Broker ID。倘若该主题的副本分别在 0、1、2 多个 Broker 上,那么你还要依次为 Broker 1、2、3 执行这条命令。

代码语言:javascript复制
for i in {0..2}
do 
bin/kafka-configs.sh --zookeeper zookeeper_host:port --alter --add-config 'leader.replication.throttled.rate=104857600,follower.replication.throttled.rate=104857600' --entity-type brokers --entity-name $i
done

7. 删除topic

代码语言:javascript复制
bin/kafka-topics.sh --bootstrap-server broker_host:port --delete  --topic <topic_name>

二. 数据生产消费

测试数据

broker_host:port ==> localhost:9092

Topic ==> testInfoTopic

Consumer Group ==> G1

1 生产数据

代码语言:javascript复制
./kafka-console-producer.sh --broker-list localhost:9092 --topic testInfoTopic

举例:

代码语言:javascript复制
# 指定生产者参数 acks 为 -1,同时启用了 LZ4 的压缩算法
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic testInfoTopic --request-required-acks -1 --producer-property compression.type=lz4
1.1 生产者性能测试

向topic 发送10w条消息,每条消息1KB,在producer-props 后面指定要设置的生产者参数,比如本例中的压缩算法、延时时间等

代码语言:javascript复制
bin/kafka-producer-perf-test.sh --topic testInfoTopic --num-records 100000 --throughput -1 --record-size 1024 --producer-props bootstrap.servers=localhost:9092 acks=-1 linger.ms=2000 compression.type=lz4

100000 records sent, 24764.735017 records/sec (24.18 MB/sec), 93.07 ms avg latency, 672.00 ms max latency, 56 ms 50th, 301 ms 95th, 325 ms 99th, 335 ms 99.9th.

生产吞吐量,消息发送延迟都可以看到

2 消费数据

代码语言:javascript复制
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic testInfoTopic

举例:

代码语言:javascript复制
# 注意,这里消费最好指定一个消费组G1,如果没有指定的话,每次运行 Console Consumer,它都会自动生成一个新的消费者组来消费。时间长久后,就会产生大量的以 console-consumer的消费者组
# --from-beginning 等同于Consumer 端参数 auto.offset.reset 设置成 earliest;如果不指定,会默认从最新位移消费
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic testInfoTopic --group G1 --from-beginning --consumer-property enable.auto.commit=false 
2.1消费者性能测试
代码语言:javascript复制
 bin/kafka-consumer-perf-test.sh --broker-list localhost:9092 --messages 100000 --topic testInfoTopic

start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
2022-11-05 10:35:46:535, 2022-11-05 10:35:48:699, 97.5835, 45.0940, 100013, 46216.7283, 665, 1499, 65.0990, 66719.8132

消费吞吐量的指标

2.2 查看消费进度
代码语言:javascript复制
bin/kafka-consumer-groups.sh --bootstrap-server <Kafka broker连接信息> --describe --group <group名称>

./bin/kafka-consumer-groups.sh --bootstrap-server=localhost:9092 --group G1 --describe

1 https://zhuanlan.zhihu.com/p/578421962 Kafka消费者组消费进度实现窥探

三. 内部topic操作

1. __consumer_offsets

该主题保存了消费者组的位移数据,默认有50个分区

1.1 变更主题副本数

如果该主题的副本值已经是 1 了,我们如何增加该主题的副本到3

第一步:创建一个 json 文件,显式提供 50 个分区对应的副本数,注意要将replicas 中的 3 台 Broker 排列顺序不同,使 Leader 副本均匀地分散在 Broker上

代码语言:javascript复制
{"version":1, "partitions":[
 {"topic":"__consumer_offsets","partition":0,"replicas":[0,1,2]}, 
  {"topic":"__consumer_offsets","partition":1,"replicas":[0,2,1]},
  {"topic":"__consumer_offsets","partition":2,"replicas":[1,0,2]},
  ...
  {"topic":"__consumer_offsets","partition":49,"replicas":[0,1,2]}
]}`

第二步:执行kafka-reassign-partitions.sh

代码语言:javascript复制
bin/kafka-reassign-partitions.sh --zookeeper zookeeper_host:port --reassignment-json-file reassign.json --execute

1.2 查看__consumer_offsets消费者组提交的位移数据

代码语言:javascript复制
bin/kafka-console-consumer.sh --bootstrap-server kafka_host:port --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter" --from-beginning

1.3 读取__consumer_offsets消息,查看消费者组状态信息

代码语言:javascript复制
bin/kafka-console-consumer.sh --bootstrap-server kafka_host:port --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager$GroupMetadataMessageFormatter" --from-beginning

2. __transaction_state

该主题为了支持事务引入的,默认有50个分区,操作方法参考__consumer_offsets

四. 常见错误处理

1. topic删除失败

原因1:副本所在的broker宕机

解决办法:重启broker后,会自动恢复

原因2:待删除的全部或者部分分区在迁移中

解决办法:

第 1 步,手动删除 ZooKeeper 节点 /admin/delete_topics 下待删除topic的 znode。

第 2 步,手动删除该主题在磁盘上的分区目录。

第 3 步,在 ZooKeeper 中执行 rmr /controller,触发 Controller 重选举,刷新 Controller 缓存。(会导致大量的leader重选举)

2. __consumer_offsets占用太多磁盘

原因:kafka-log-cleaner-thread线程挂了

可以用 jstack 命令查看一下 kafka-log-cleaner-thread 前缀的线程状态。通常情况下,这都是因为该线程挂掉了,无法及时清理此内部主题。

解决办法

重启对应的broker节点

五. kafka 推荐配置

代码语言:javascript复制
#### kafka推荐配置

auto.create.topics.enable=false # 是否允许自动创建Topic
unclean.leader.election.enable=false    # 是否允许 Unclean Leader 选举
auto.leader.rebalance.enable=false  # 是否允许定期进行 Leader 选举。

六.参考资料

https://kafka.apache.org/documentation/#operations

0 人点赞