【从面试题看源码】-看完Kafka性能优化-让你吊打面试官

2022-12-02 16:55:23 浏览数 (1)

目录

ProducerConfig和ConsumerConfig释义

Kafka服务端脚本详解(1)-topics

kafka-topics.sh

connect-distributed.sh & connect-standalone.sh

 Kafka服务端脚本详解(2)一log,verifiable

kafka-log-dirs.sh

kafka-verifiable-consumer.sh

kafka-verifiable-producer.sh

 Kafka服务端脚本详解(3)-性能测试脚本

 kafka-producer-perf-test.sh

kafka-consumer-perf-test.sh

Kafka生产者端优化

Kafka 生产者端发送延迟优化


ProducerConfig和ConsumerConfig释义

Kafka客户端开发中有一个ProducerConfig和ConsumerConfig,熟悉这两个文件内容的含义对我们使用,调优Kafka是非常有帮助的。Ctrl F搜索吧。

生产者配置参数释义 1.bootstrap.servers 指定Kafka集群所需的broker地址清单,默认”“

2.metadata.max.age.ms 强制刷新元数据时间,毫秒,默认300000,5分钟

3.batch.size 指定ProducerBatch内存区域的大小,默认16kb

4.acks 指定分区中必须有多少个副本收到这条消息,才算消息发送成功,默认值1,字符串类型

5.linger.ms 指定ProducerBatch在延迟多少毫秒后再发送,但如果在延迟的这段时间内batch的大小已经到了batch.size设置的大小,那么消息会被立即发送,不会再等待,默认值0

6.client.id 用户设定,用于跟踪记录消息,默认”“

7.send.buffer.bytes Socket发送缓冲区大小,默认128kb,-1将使用操作系统的设置

8.receive.buffer.bytes Socket接收缓冲区大小,默认32kb,-1将使用操作系统的设置

9.max.request.size 限制生产者客户端发送消息的最大值,默认1MB

10.reconnect.backoff.ms 连接失败后,尝试连接Kafka的时间间隔,默认50ms

11.reconnect.backoff.max.ms 尝试连接到Kafka,生产者客户端等待的最大时间,默认1000ms

12.max.block.ms 控制生产者客户端send()方法和partitionsFor()方法的阻塞时间。当生产者的发送缓存区已满,或者没有可用元数据时,这些方法就会阻塞,默认60s

13.buffer.memory 生产者客户端中用于缓存消息的缓存区大小,默认32MB

14.retry.backoff.ms 消息发送失败重试时间间隔,默认100ms

15.compression.type 指定消息的压缩方式,默认不压缩

16.metrics.sample.window.ms 样本计算时间窗口,默认30000ms

17.metrics.num.samples 用于维护metrics的样本数量,默认2

18.metrics.log.level metrics日志记录级别,默认info

19.metric.reporters 类的列表,用于衡量指标,默认空list

20.max.in.flight.requests.per.connection 可以在一个connection中发送多个请求,叫作一个flight,这样可以减少开销,但是如果产生错误,可能会造成数据的发送顺序改变,默认5

21.retries 消息发送失败重试次数,默认0

22.key.serializer key的序列化方式

23.value.serializer value序列化类方式

24.connections.max.idle.ms 设置多久之后关闭空闲连接,默认540000ms

25.partitioner.class 分区类,实现Partitioner接口,可以自定义分区规则

26.request.timeout.ms 客户端将等待请求的响应的最大时间,如果在这个时间内没有收到响应,客户端将重发请求,超过重试次数将抛异常,默认30000ms

27.interceptor.classes 拦截器类,实现ProducerInterceptor接口,自定义拦截器

28.enable.idempotence true为开启幂等性

29.transaction.timeout.ms 事务超时时间,默认60000ms

30.transactional.id 设置事务id,必须唯一

消费者配置参数释义 1.group.id 消费者所属消费组的唯一标识

2.max.poll.records 一次拉取请求的最大消息数,默认500条

3.max.poll.interval.ms 指定拉取消息线程最长空闲时间,默认300000ms

4.session.timeout.ms 检测消费者是否失效的超时时间,默认10000ms

5.heartbeat.interval.ms 消费者心跳时间,默认3000ms

6.bootstrap.servers 连接集群broker地址

7.enable.auto.commit 是否开启自动提交消费位移的功能,默认true

8.auto.commit.interval.ms 自动提交消费位移的时间间隔,默认5000ms

9.partition.assignment.strategy 消费者的分区配置策略

10.auto.offset.reset 如果分区没有初始偏移量,或者当前偏移量服务器上不存在时,将使用的偏移量设置,earliest从头开始消费,latest从最近的开始消费,none抛出异常

11.fetch.min.bytes 消费者客户端一次请求从Kafka拉取消息的最小数据量,如果Kafka返回的数据量小于该值,会一直等待,直到满足这个配置大小,默认1b

12.fetch.max.bytes 消费者客户端一次请求从Kafka拉取消息的最大数据量,默认50MB

13.fetch.max.wait.ms 从Kafka拉取消息时,在不满足fetch.min.bytes条件时,等待的最大时间,默认500ms

14.metadata.max.age.ms 强制刷新元数据时间,毫秒,默认300000,5分钟

15.max.partition.fetch.bytes 设置从每个分区里返回给消费者的最大数据量,区别于fetch.max.bytes,默认1MB

16.send.buffer.bytes Socket发送缓冲区大小,默认128kb,-1将使用操作系统的设置

17.receive.buffer.bytes Socket发送缓冲区大小,默认64kb,-1将使用操作系统的设置

18.client.id 消费者客户端的id

19.reconnect.backoff.ms 连接失败后,尝试连接Kafka的时间间隔,默认50ms

20.reconnect.backoff.max.ms 尝试连接到Kafka,生产者客户端等待的最大时间,默认1000ms

21.retry.backoff.ms 消息发送失败重试时间间隔,默认100ms

22.metrics.sample.window.ms 样本计算时间窗口,默认30000ms

23.metrics.num.samples 用于维护metrics的样本数量,默认2

24.metrics.log.level metrics日志记录级别,默认info

25.metric.reporters 类的列表,用于衡量指标,默认空list

26.check.crcs 自动检查CRC32记录的消耗

27.key.deserializer key反序列化方式

28.value.deserializer value反序列化方式

29.connections.max.idle.ms 设置多久之后关闭空闲连接,默认540000ms

30.request.timeout.ms 客户端将等待请求的响应的最大时间,如果在这个时间内没有收到响应,客户端将重发请求,超过重试次数将抛异常,默认30000ms

31.default.api.timeout.ms 设置消费者api超时时间,默认60000ms

32.interceptor.classes 自定义拦截器

33.exclude.internal.topics 内部的主题:一consumer_offsets 和一transaction_state。该参数用来指定 Kafka 中的内部主题是否可以向消费者公开,默认值为 true。如果设置为 true,那么只能使用 subscribe(Collection)的方式而不能使用 subscribe(Pattern)的方式来订阅内部主题,设置为 false 则没有这个限制。

34.internal.leave.group.on.close

35.isolation.level 用来配置消费者的事务隔离级别。如果设置为“read committed”,那么消费者就会忽略事务未提交的消息,即只能消费到 LSO (LastStableOffset)的位置,默认情况下为 “read_uncommitted”,即可以消 费到 HW (High Watermark)处的位置

Kafka服务端脚本详解(1)-topics

脚本名称

脚本用途

kafka-topics.sh

topic管理脚本

connect-distributed.sh

连接分布式模式脚本

connect-standalone.sh

连接单机模式脚本


kafka-topics.sh

--partitions

创建或修改主题的分区数

--replication-factor

副本因子,副本数量

--replica-assignment

手动指定分区副本分配方案,使用该参数,不用指定--partitions 和 --replication-factor

--topic

主题名称

--zookeeper

连接kafka zk地址

--alter

修改分区,副本,配置

--bootstrap-server

kafka服务器地址

--create

创建主题

--delete

删除主题

--list

列出所有的可用主题

代码语言:javascript复制
 [root@10 kafka_2]# bin/kafka-topics.sh --zookeeper 10.211.55.3:2181 --list
 __consumer_offsets
 first
 test
 topic-3
 topic-4
 topic-5
 topic-6
 topic-admin
 topic-create-diff
 topic-two

--describe

列出主题的详细信息

--exclude-internal

使用--list --describe 命令时是否列出内部主题,默认列出内部主题

--command-config

以配置文件的形式修改Admin Client的配置,支持的配置见org.apache.kafka.clients.admin.AdminClientConfig

代码语言:javascript复制
//me.properties
request.timeout.ms=200000

//
bin/kafka-topics.sh --bootstrap-server  10.211.55.3:9092 --topic topic-two --list  --command-config config/me.properties 

--config

在创建/修改主题的时候可以对主题默认参数进行覆盖,具体支持的参数见http://kafka.apachecn.org/documentation.html#topicconfigs 该参数将在以后废弃,请使用kafka-configs.sh

代码语言:javascript复制
 [root@10 kafka_2.11-2.2.0]# bin/kafka-topics.sh --bootstrap-server  10.211.55.3:9092 --topic topic-two --describe
Topic:topic-two PartitionCount:1        ReplicationFactor:1     Configs:segment.bytes=1073741824,retention.bytes=1073741824
Topic: topic-two        Partition: 0    Leader: 0       Replicas: 0     Isr: 0

 [root@10 kafka_2.11-2.2.0]# bin/kafka-topics.sh --zookeeper  10.211.55.3:2181 --alter --topic topic-two --config segment.bytes=1048577
 WARNING: Altering topic configuration from this script has been deprecated and may be removed in future releases.
         Going forward, please use kafka-configs.sh for this functionality
 Updated config for topic topic-two.
 
[root@10 kafka_2.11-2.2.0]# bin/kafka-topics.sh --zookeeper  10.211.55.3:2181 --describe --topic topic-two
Topic:topic-two PartitionCount:1        ReplicationFactor:1     Configs:segment.bytes=1048577
Topic: topic-two        Partition: 0    Leader: 0       Replicas: 0     Isr: 0

----delete-config

删除一个配置项

代码语言:javascript复制
1[root@10 kafka_2.11-2.2.0]# bin/kafka-topics.sh --zookeeper 10.211.55.3:2181 --topic topic-two --alter --delete-config segment.bytes 
2WARNING: Altering topic configuration from this script has been deprecated and may be removed in future releases.
3         Going forward, please use kafka-configs.sh for this functionality
4Updated config for topic topic-two.
5
6[root@10 kafka_2.11-2.2.0]# bin/kafka-topics.sh --zookeeper 10.211.55.3:2181 --topic topic-two --describe
7Topic:topic-two PartitionCount:1        ReplicationFactor:1     Configs:
8        Topic: topic-two        Partition: 0    Leader: 0       Replicas: 0     Isr: 0

--disable-rack-aware

忽略机架信息 有两个broker,一个配了机架信息,另一个没配,在创建topic的时候就会报错

代码语言:javascript复制
 1[root@10 kafka_2.11-2.2.0]# bin/kafka-topics.sh --zookeeper 10.211.55.3:2181 --create --topic topic-6 --replication-factor 1  --partitions 2
 2Error while executing topic command : Not all brokers have rack information. Add --disable-rack-aware in command line to make replica assignment without rack information.
 3[2018-12-27 05:22:40,834] ERROR kafka.admin.AdminOperationException: Not all brokers have rack information. Add --disable-rack-aware in command line to make replica assignment without rack information.
 4        at kafka.zk.AdminZkClient.getBrokerMetadatas(AdminZkClient.scala:71)
 5        at kafka.zk.AdminZkClient.createTopic(AdminZkClient.scala:54)
 6        at kafka.admin.TopicCommand$ZookeeperTopicService.createTopic(TopicCommand.scala:274)
 7        at kafka.admin.TopicCommand$TopicService$class.createTopic(TopicCommand.scala:134)
 8        at kafka.admin.TopicCommand$ZookeeperTopicService.createTopic(TopicCommand.scala:266)
 9        at kafka.admin.TopicCommand$.main(TopicCommand.scala:60)
10        at kafka.admin.TopicCommand.main(TopicCommand.scala)
11 (kafka.admin.TopicCommand$)
12
13[root@10 kafka_2.11-2.2.0]# bin/kafka-topics.sh --zookeeper 10.211.55.3:2181 --create --topic topic-6 --replication-factor 1  --partitions 2 --disable-rack-aware
14Created topic topic-6.

--if-exists

只有当主题存在时,相关命令才会执行,不会显示错误

代码语言:javascript复制
 1[root@10 kafka_2]# bin/kafka-topics.sh --zookeeper 10.211.55.3:2181 --topic topic-7  --alter --config segment.bytes=104857 --if-exists
 2
 3[root@10 kafka_2]# bin/kafka-topics.sh --zookeeper 10.211.55.3:2181 --topic topic-7  --alter --config segment.bytes=104857
 4Error while executing topic command : Topics in [] does not exist
 5[2018-12-27 06:01:25,638] ERROR java.lang.IllegalArgumentException: Topics in [] does not exist
 6        at kafka.admin.TopicCommand$.kafka$admin$TopicCommand$$ensureTopicExists(TopicCommand.scala:416)
 7        at kafka.admin.TopicCommand$ZookeeperTopicService.alterTopic(TopicCommand.scala:294)
 8        at kafka.admin.TopicCommand$.main(TopicCommand.scala:62)
 9        at kafka.admin.TopicCommand.main(TopicCommand.scala)
10 (kafka.admin.TopicCommand$)

--if-not-exists

创建主题的时候,只有当主题不存在时,命令才执行,存在时不会报错

代码语言:javascript复制
1[root@10 kafka_2]# bin/kafka-topics.sh --zookeeper 10.211.55.3:2181 --topic topic-6  --create --partitions 1 --replication-factor 1 --if-not-exists
2
3[root@10 kafka_2]# bin/kafka-topics.sh --zookeeper 10.211.55.3:2181 --topic topic-6  --create --partitions 1 --replication-factor 1 
4Error while executing topic command : Topic 'topic-6' already exists.
5[2018-12-27 06:07:54,185] ERROR org.apache.kafka.common.errors.TopicExistsException: Topic 'topic-6' already exists.
6 (kafka.admin.TopicCommand$)

--topics-with-overrides

显示覆盖过配置的主题

--unavailable-partitions

查看没有leader副本的分区

代码语言:javascript复制
1[root@10 kafka_2]# bin/kafka-topics.sh --zookeeper 10.211.55.3:2181 --topic topic-6  --describe --unavailable-partitions
2        Topic: topic-6  Partition: 0    Leader: -1      Replicas: 1     Isr: 1

--under-replicated-partitions

查看所有包含失效副本的分区


connect-distributed.sh & connect-standalone.sh

Kafka Connect 是一款可扩展并且可靠的在 Apache Kafka 和其他系统之间进行数据传输的工具。

代码语言:javascript复制
1bin/connect-standalone.sh config/connect-standalone.properties  config/connect-file-source.properties
2
3bin/connect-distributed.sh config/connect-distributed.properties

 Kafka服务端脚本详解(2)一log,verifiable

脚本名称

脚本用途

kafka-log-dirs.sh

查看指定broker上日志目录使用情况

kafka-verifiable-consumer.sh

检验kafka消费者

kafka-verifiable-producer.sh

检验kafka生产者


kafka-log-dirs.sh

--bootstrap-server

kafka地址

--broker-list

要查询的broker地址列表,broker之间逗号隔开,不配置该命令则查询所有broker

--topic-list

指定查询的topic列表,逗号隔开

--command-config

配置Admin Client

--describe

显示详情

代码语言:javascript复制
1[root@10 kafka_2.11-2.2.0]# bin/kafka-log-dirs.sh --bootstrap-server 10.211.55.3:9092 --describe --broker-list 0 --topic-list first,topic-3
2Querying brokers for log directories information
3Received log directory information from brokers 0
4{"version":1,"brokers":[{"broker":0,"logDirs":[{"logDir":"/tmp/kafka-logs","error":null,"partitions":[{"partition":"topic-3-0","size":474,"offsetLag":0,"isFuture":false},{"partition":"first-0","size":310,"offsetLag":0,"isFuture":false}]}]}]}

kafka-verifiable-consumer.sh

--broker-list

broker列表, HOST1:PORT1,HOST2:PORT2,…

--topic

要消费的topic

--group-id

消费组id

--max-messages

最大消费消息数量,默认-1,一直消费

代码语言:javascript复制
 1#设置消费两次后,自动停止
 2[root@10 kafka_2.11-2.2.0]# bin/kafka-verifiable-consumer.sh --broker-list 10.211.55.3:9092 --topic first --group-id group.demo --max-messages 2
 3{"timestamp":1558869583036,"name":"startup_complete"}
 4{"timestamp":1558869583329,"name":"partitions_revoked","partitions":[]}
 5{"timestamp":1558869583366,"name":"partitions_assigned","partitions":[{"topic":"first","partition":0}]}
 6{"timestamp":1558869590352,"name":"records_consumed","count":1,"partitions":[{"topic":"first","partition":0,"count":1,"minOffset":37,"maxOffset":37}]}
 7{"timestamp":1558869590366,"name":"offsets_committed","offsets":[{"topic":"first","partition":0,"offset":38}],"success":true}
 8{"timestamp":1558869595328,"name":"records_consumed","count":1,"partitions":[{"topic":"first","partition":0,"count":1,"minOffset":38,"maxOffset":38}]}
 9{"timestamp":1558869595335,"name":"offsets_committed","offsets":[{"topic":"first","partition":0,"offset":39}],"success":true}
10{"timestamp":1558869595355,"name":"shutdown_complete"}

--session-timeout

消费者会话超时时间,默认30000ms,服务端如果在该时间内没有接收到消费者的心跳,就会将该消费者从消费组中删除

--enable-autocommit

自动提交,默认false

代码语言:javascript复制
 1#比较一下两者的差别
 2#没有--enable-autocommit
 3[root@10 kafka_2.11-2.2.0]# bin/kafka-verifiable-consumer.sh --broker-list 10.211.55.3:9092 --topic first --group-id group.demo
 4{"timestamp":1558875063613,"name":"startup_complete"}
 5{"timestamp":1558875063922,"name":"partitions_revoked","partitions":[]}
 6{"timestamp":1558875063952,"name":"partitions_assigned","partitions":[{"topic":"first","partition":0}]}
 7{"timestamp":1558875069603,"name":"records_consumed","count":1,"partitions":[{"topic":"first","partition":0,"count":1,"minOffset":47,"maxOffset":47}]}
 8{"timestamp":1558875069614,"name":"offsets_committed","offsets":[{"topic":"first","partition":0,"offset":48}],"success":true}
 9
10#有--enable-autocommit
11[root@10 kafka_2.11-2.2.0]# bin/kafka-verifiable-consumer.sh --broker-list 10.211.55.3:9092 --topic first --group-id group.demo --enable-autocommit
12{"timestamp":1558874772119,"name":"startup_complete"}
13{"timestamp":1558874772408,"name":"partitions_revoked","partitions":[]}
14{"timestamp":1558874772449,"name":"partitions_assigned","partitions":[{"topic":"first","partition":0}]}
15{"timestamp":1558874820898,"name":"records_consumed","count":1,"partitions":[{"topic":"first","partition":0,"count":1,"minOffset":46,"maxOffset":46}]}

--reset-policy

设置消费偏移量,earliest从头开始消费,latest从最近的开始消费,none抛出异常,默认earliest

--assignment-strategy

消费者的分区配置策略, 默认 RangeAssignor

--consumer.config

配置文件


kafka-verifiable-producer.sh

该脚本可以生产测试数据发送到指定topic,并将数据已json格式打印到控制台

--topic

主题名称

--broker-list

broker列表, HOST1:PORT1,HOST2:PORT2,…

--max-messages

最大消息数量,默认-1,一直生产消息

--throughput

设置吞吐量,默认-1

--acks

指定分区中必须有多少个副本收到这条消息,才算消息发送成功,默认-1

--producer.config

配置文件

--message-create-time

设置消息创建的时间,时间戳

--value-prefix

设置消息前缀

--repeating-keys

key从0开始,每次递增1,直到指定的值,然后再从0开始

代码语言:javascript复制
 1[root@10 kafka_2.11-2.2.0]# bin/kafka-verifiable-producer.sh --broker-list 10.211.55.3:9092 --topic first --message-create-time 1527351382000 --value-prefix 1 --repeating-keys 10 --max-messages 20
 2{"timestamp":1558877565069,"name":"startup_complete"}
 3{"timestamp":1558877565231,"name":"producer_send_success","key":"0","value":"1.0","topic":"first","partition":0,"offset":1541118}
 4{"timestamp":1558877565238,"name":"producer_send_success","key":"1","value":"1.1","topic":"first","partition":0,"offset":1541119}
 5{"timestamp":1558877565238,"name":"producer_send_success","key":"2","value":"1.2","topic":"first","partition":0,"offset":1541120}
 6{"timestamp":1558877565238,"name":"producer_send_success","key":"3","value":"1.3","topic":"first","partition":0,"offset":1541121}
 7{"timestamp":1558877565238,"name":"producer_send_success","key":"4","value":"1.4","topic":"first","partition":0,"offset":1541122}
 8{"timestamp":1558877565239,"name":"producer_send_success","key":"5","value":"1.5","topic":"first","partition":0,"offset":1541123}
 9{"timestamp":1558877565239,"name":"producer_send_success","key":"6","value":"1.6","topic":"first","partition":0,"offset":1541124}
10{"timestamp":1558877565239,"name":"producer_send_success","key":"7","value":"1.7","topic":"first","partition":0,"offset":1541125}
11{"timestamp":1558877565239,"name":"producer_send_success","key":"8","value":"1.8","topic":"first","partition":0,"offset":1541126}
12{"timestamp":1558877565239,"name":"producer_send_success","key":"9","value":"1.9","topic":"first","partition":0,"offset":1541127}
13{"timestamp":1558877565239,"name":"producer_send_success","key":"0","value":"1.10","topic":"first","partition":0,"offset":1541128}
14{"timestamp":1558877565239,"name":"producer_send_success","key":"1","value":"1.11","topic":"first","partition":0,"offset":1541129}
15{"timestamp":1558877565239,"name":"producer_send_success","key":"2","value":"1.12","topic":"first","partition":0,"offset":1541130}
16{"timestamp":1558877565240,"name":"producer_send_success","key":"3","value":"1.13","topic":"first","partition":0,"offset":1541131}
17{"timestamp":1558877565240,"name":"producer_send_success","key":"4","value":"1.14","topic":"first","partition":0,"offset":1541132}
18{"timestamp":1558877565241,"name":"producer_send_success","key":"5","value":"1.15","topic":"first","partition":0,"offset":1541133}
19{"timestamp":1558877565244,"name":"producer_send_success","key":"6","value":"1.16","topic":"first","partition":0,"offset":1541134}
20{"timestamp":1558877565244,"name":"producer_send_success","key":"7","value":"1.17","topic":"first","partition":0,"offset":1541135}
21{"timestamp":1558877565244,"name":"producer_send_success","key":"8","value":"1.18","topic":"first","partition":0,"offset":1541136}
22{"timestamp":1558877565244,"name":"producer_send_success","key":"9","value":"1.19","topic":"first","partition":0,"offset":1541137}
23{"timestamp":1558877565262,"name":"shutdown_complete"}
24{"timestamp":1558877565263,"name":"tool_data","sent":20,"acked":20,"target_throughput":-1,"avg_throughput":100.50251256281408}

 Kafka服务端脚本详解(3)-性能测试脚本

脚本名称

脚本用途

kafka-producer-perf-test.sh

kafka 生产者性能测试脚本

kafka-consumer-perf-test.sh

kafka 消费者性能测试脚本

kafka-console-producer.sh

kafka 生产者控制台

kafka-console-consumer.sh

kafka 消费者控制台

 kafka-producer-perf-test.sh

kafka 生产者性能测试脚本

--topic 消息主题名称 ----num-records 需要生产的消息数量 --payload-delimiter 指定 --payload-file 文件的分隔符,默认为换行符 n --throughput 设置消息吞吐量,messages/sec --producer-props 发送端配置信息,配置信息优先于 --producer.config --producer.config 发送端配置文件 --print-metrics 是否打印测试指标,默认 false --transactional-id 用于测试并发事务的性能 (默认值:performance-producer-default-transactional-id) --transaction-duration-ms 事务时间最大值,超过这个值就提交事务,只有 > 0 时才生效 --record-size 每条消息字节数 --payload-file 测试数据文件

测试 10w 条数据,每条数据 1000 字节,每秒发送 2000 条数据

代码语言:javascript复制
[root@10 kafka_2.11-2.2.0]# bin/kafka-producer-perf-test.sh --producer-props bootstrap.servers=10.211.55.3:9092 --topic first --record-size 1000 --num-records 100000  --throughput 2000
9999 records sent, 1999.8 records/sec (1.91 MB/sec), 8.6 ms avg latency, 406.0 ms max latency.
10007 records sent, 2001.4 records/sec (1.91 MB/sec), 0.7 ms avg latency, 8.0 ms max latency.
10002 records sent, 2000.4 records/sec (1.91 MB/sec), 0.7 ms avg latency, 10.0 ms max latency.
10000 records sent, 2000.0 records/sec (1.91 MB/sec), 0.8 ms avg latency, 37.0 ms max latency.
10008 records sent, 2001.2 records/sec (1.91 MB/sec), 0.6 ms avg latency, 7.0 ms max latency.
10004 records sent, 2000.4 records/sec (1.91 MB/sec), 0.7 ms avg latency, 5.0 ms max latency.
10000 records sent, 2000.0 records/sec (1.91 MB/sec), 0.8 ms avg latency, 35.0 ms max latency.
10004 records sent, 2000.8 records/sec (1.91 MB/sec), 0.8 ms avg latency, 33.0 ms max latency.
10004 records sent, 2000.4 records/sec (1.91 MB/sec), 0.7 ms avg latency, 5.0 ms max latency.
100000 records sent, 1999.280259 records/sec (1.91 MB/sec), 1.50 ms avg latency, 406.00 ms max latency, 1 ms 50th, 2 ms 95th, 43 ms 99th, 91 ms 99.9th.

测试结果为:每秒发送 1.91MB 数据,平均延迟 1.5ms,最大延迟 406ms, 延迟小于 1ms 占 50%,小于 2ms 占 95%...


kafka-consumer-perf-test.sh

kafka 消费者性能测试脚本 --topic 消费的主题名称 --broker-list kafka 地址 --consumer.config 消费端配置文件 --date-format 格式化时间 --fetch-size 一次请求拉取的消息大小,默认 1048576 字节 --from-latest 如果消费者还没有已建立的偏移量,就从日志中的最新消息开始,而不是最早的消息 --group 消费者组 id,默认 perf-consumer-94851 --hide-header 如果设置,就跳过打印统计信息的标题 --messages 要获取的消息数量 --num-fetch-threads 获取消息的线程数量 --print-metrics 打印指标信息 --reporting-interval 打印进度信息的间隔,默认 5000ms --show-detailed-stats 如果设置,将按 --reporting-interval 的间隔打印统计信息 --socket-buffer-size TCP 获取信息的缓存大小 默认 2097152(2M) --threads 处理线程数,默认 10 --timeout 返回记录的超时时间

测试消费 50w 条数据

代码语言:javascript复制
[root@10 kafka_2.11-2.2.0]# bin/kafka-consumer-perf-test.sh --topic first --broker-list 10.211.55.3:9092 --messages 500000  --timeout 300000
start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
2019-05-30 01:21:27:072, 2019-05-30 01:21:30:801, 488.6162, 131.0314, 500343, 134176.1866, 25, 3704, 131.9158, 135081.8035

测试结果为:共消费 488.6162MB 数据,每秒消费 131.0314MB, 共消费 500343 条数据,每秒消费 134176.1866 条

Kafka生产者端优化

测试环境虚拟机 CPU:2 核 RAM:2G Kafka Topic 为 1 分区,1 副本

Kafka 生产者端发送延迟优化

batch.size

batch.size 单位为字节,为了方便这里都表示为kb 默认配置,batch.size=16kb

代码语言:javascript复制
[root@10 kafka_2.11-2.2.0]# ./bin/kafka-producer-perf-test.sh --producer.config config/me.properties --topic first  --record-size 1024 --num-records 1000000  --throughput 50000
249892 records sent, 49978.4 records/sec (48.81 MB/sec), 153.6 ms avg latency, 537.0 ms max latency.
250193 records sent, 50038.6 records/sec (48.87 MB/sec), 1.4 ms avg latency, 12.0 ms max latency.
211747 records sent, 42349.4 records/sec (41.36 MB/sec), 194.3 ms avg latency, 1106.0 ms max latency.
1000000 records sent, 49972.515117 records/sec (48.80 MB/sec), 119.65 ms avg latency, 1106.00 ms max latency, 2 ms 50th, 488 ms 95th, 1043 ms 99th, 1102 ms 99.9th.

结果显示平均延迟有 456.94 ms,最高延迟 5308.00 ms 现在我要降低最高延迟数,batch.size 的意思是 ProducerBatch 的内存区域充满后,消息就会被立即发送,那我们把值改小看看 batch.size=8kb

代码语言:javascript复制
[root@10 kafka_2.11-2.2.0]# ./bin/kafka-producer-perf-test.sh --producer.config config/me.properties --topic first  --record-size 1024 --num-records 1000000  --throughput 50000
148553 records sent, 29710.6 records/sec (29.01 MB/sec), 812.4 ms avg latency, 1032.0 ms max latency.
195468 records sent, 39093.6 records/sec (38.18 MB/sec), 735.9 ms avg latency, 907.0 ms max latency.
189700 records sent, 37940.0 records/sec (37.05 MB/sec), 763.4 ms avg latency, 1053.0 ms max latency.
208418 records sent, 41683.6 records/sec (40.71 MB/sec), 689.7 ms avg latency, 923.0 ms max latency.
196504 records sent, 39300.8 records/sec (38.38 MB/sec), 718.1 ms avg latency, 1056.0 ms max latency.
1000000 records sent, 37608.123355 records/sec (36.73 MB/sec), 741.56 ms avg latency, 1056.00 ms max latency, 725 ms 50th, 937 ms 95th, 1029 ms 99th, 1051 ms 99.9th.

但经过测试发现,延迟反而很高,连设定的 50000 吞吐量都达不到,原因应该是这样:batch.size 小了,消息很快就会充满,这样消息就会被立即发送的服务端,但这样的话发送的次数就变多了,但由于网络原因是不可控的,有时候网络发生抖动就会造成较高的延迟 那就改大看看。 batch.size=32kb

代码语言:javascript复制
[root@10 kafka_2.11-2.2.0]# ./bin/kafka-producer-perf-test.sh --producer.config config/me.properties --topic first  --record-size 1024 --num-records 1000000  --throughput 50000
249852 records sent, 49970.4 records/sec (48.80 MB/sec), 88.8 ms avg latency, 492.0 ms max latency.
250143 records sent, 50028.6 records/sec (48.86 MB/sec), 1.2 ms avg latency, 15.0 ms max latency.
250007 records sent, 49991.4 records/sec (48.82 MB/sec), 1.2 ms avg latency, 17.0 ms max latency.
1000000 records sent, 49952.545082 records/sec (48.78 MB/sec), 31.07 ms avg latency, 492.00 ms max latency, 1 ms 50th, 305 ms 95th, 440 ms 99th, 486 ms 99.9th.

测试后,平均延迟,最高延迟都降下来很多,而且比默认值延迟都要小很多,那再改大延迟还会降低吗 batch.size=50kb

代码语言:javascript复制
[root@10 kafka_2.11-2.2.0]# ./bin/kafka-producer-perf-test.sh --producer.config config/me.properties --topic first  --record-size 1024 --num-records 1000000  --throughput 50000
249902 records sent, 49970.4 records/sec (48.80 MB/sec), 27.3 ms avg latency, 219.0 ms max latency.
250200 records sent, 50030.0 records/sec (48.86 MB/sec), 1.2 ms avg latency, 8.0 ms max latency.
250098 records sent, 50019.6 records/sec (48.85 MB/sec), 18.6 ms avg latency, 288.0 ms max latency.
242327 records sent, 48407.3 records/sec (47.27 MB/sec), 121.3 ms avg latency, 920.0 ms max latency.
1000000 records sent, 49823.127896 records/sec (48.66 MB/sec), 41.98 ms avg latency, 920.00 ms max latency, 1 ms 50th, 221 ms 95th, 792 ms 99th, 910 ms 99.9th.

如上测试在不同的机器上结果会有不同,但总体的变化曲线是一样的,成 U 型变化

batch.size 代码实现

Kafka 客户端有一个 RecordAccumulator 类,叫做消息记录池,内部有一个 BufferPool 内存区域

代码语言:javascript复制
RecordAccumulator(LogContext logContext,
                             int batchSize,
                             CompressionType compression,
                             int lingerMs,
                             long retryBackoffMs,
                             int deliveryTimeoutMs,
                             Metrics metrics,
                             String metricGrpName,
                             Time time,
                             ApiVersions apiVersions,
                             TransactionManager transactionManager,
                             BufferPool bufferPool)

当该判断为 true,消息就会被发送

代码语言:javascript复制
if (result.batchIsFull || result.newBatchCreated) {
   log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
   this.sender.wakeup();
}

max.in.flight.requests.per.connection

该参数可以在一个 connection 中发送多个请求,叫作一个 flight, 这样可以减少开销,但是如果产生错误,可能会造成数据的发送顺序改变,默认 5

在 batch.size=100kb 的基础上,增加该参数值到 10,看看效果

代码语言:javascript复制
[root@10 kafka_2.11-2.2.0]# ./bin/kafka-producer-perf-test.sh --producer.config config/me.properties --topic two   --record-size 1024 --num-records 1000000  --throughput 50000
249902 records sent, 49960.4 records/sec (48.79 MB/sec), 16.1 ms avg latency, 185.0 ms max latency.
250148 records sent, 50019.6 records/sec (48.85 MB/sec), 1.3 ms avg latency, 14.0 ms max latency.
239585 records sent, 47917.0 records/sec (46.79 MB/sec), 6.4 ms avg latency, 226.0 ms max latency.
1000000 records sent, 49960.031974 records/sec (48.79 MB/sec), 9.83 ms avg latency, 226.00 ms max latency, 1 ms 50th, 83 ms 95th, 182 ms 99th, 219 ms 99.9th.

多次测试结果延迟都比原来降低了 10 倍多,效果还是很明显的 但物极必反,如果你再调大后,效果就不明显了,最终延迟反而变高,这个 batch.size 道理是一样的

compression.type

指定消息的压缩方式,默认不压缩

在原来 batch.size=100kb,max.in.flight.requests.per.connection=10 的基础上,设置 compression.type=gzip 看看延迟是否还可以降低

代码语言:javascript复制
[root@10 kafka_2.11-2.2.0]# ./bin/kafka-producer-perf-test.sh --producer.config config/me.properties --topic two   --record-size 1024 --num-records 1000000  --throughput 50000
249785 records sent, 49957.0 records/sec (48.79 MB/sec), 2.5 ms avg latency, 199.0 ms max latency.
250091 records sent, 50008.2 records/sec (48.84 MB/sec), 1.9 ms avg latency, 17.0 ms max latency.
250123 records sent, 50024.6 records/sec (48.85 MB/sec), 1.5 ms avg latency, 18.0 ms max latency.
1000000 records sent, 49960.031974 records/sec (48.79 MB/sec), 1.89 ms avg latency, 199.00 ms max latency, 2 ms 50th, 4 ms 95th, 6 ms 99th, 18 ms 99.9th.

测试结果发现延迟又降低了,是不是感觉很强大

0 人点赞