问题描述:当以下参数取不同值时的情况,是否会触发重平衡?
request.timeout.ms
session.timeout.ms
heartbeat.interval.ms
max.poll.interval.ms
使用的是java SDK :kafka-clients:2.1.0, kafka版本:2.4.1
代码语言:txt复制package ckafka.demo;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.security.JaasUtils;
public class CKafkaSaslConsumerDemo {
public static void main(String[] args) {
//加载kafka.properties。
Properties kafkaProperties = CKafkaConfigure.getCKafkaProperties();
//设置 jaas 配置信息
System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, kafkaProperties.getProperty("java.security.auth.login.config"));
Properties props = new Properties();
//
// SASL_PLAINTEXT 公网接入
//
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
// SASL 采用 Plain 方式。
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
//
// SASL_SSL 公网接入
//
// 接入协议。
// props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
// SASL 采用 Plain 方式。
// props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
// SSL 加密。
// props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, kafkaProperties.getProperty(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG));
// props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, kafkaProperties.getProperty(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG));
// props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG,kafkaProperties.getProperty(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG));
//设置接入点,请通过控制台获取对应Topic的接入点。
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getProperty("bootstrap.servers"));
//两次Poll之间的最大允许间隔。
//消费者超过该值没有返回心跳,服务端判断消费者处于非存活状态,服务端将消费者从Consumer Group移除并触发Rebalance,默认30s。
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);
props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG,11000);
//每次Poll的最大数量。
//注意该值不要改得太大,如果Poll太多数据,而不能在下次Poll之前消费完,则会触发一次负载均衡,产生卡顿。
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,22000);
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG,9000);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
//消息的反序列化方式。
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
//当前消费实例所属的消费组,请在控制台申请之后填写。
//属于同一个组的消费实例,会负载消费消息。
props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProperties.getProperty("group.id"));
//构造消费对象,也即生成一个消费实例。
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
//设置消费组订阅的Topic,可以订阅多个。
//如果GROUP_ID_CONFIG是一样,则订阅的Topic也建议设置成一样。
List<String> subscribedTopics = new ArrayList<>();
//如果需要订阅多个Topic,则在这里添加进去即可。
//每个Topic需要先在控制台进行创建。
String topicStr = kafkaProperties.getProperty("topic");
String[] topics = topicStr.split(",");
for (String topic : topics) {
subscribedTopics.add(topic.trim());
}
consumer.subscribe(subscribedTopics);
//循环消费消息。
while (true) {
try {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
//必须在下次Poll之前消费完这些数据, 且总耗时不得超过SESSION_TIMEOUT_MS_CONFIG。
System.out.printf("sum records: %d",records.count());
for (ConsumerRecord<String, String> record : records) {
System.out.printf("[consumer partition: %d][offset: %d][record: "%s"]%n", record.partition(), record.offset(), record.value());
System.out.println("睡眠前: " java.time.LocalTime.now());
// 休眠11秒(10000毫秒)
Thread.sleep(21000);
System.out.println("醒来后: " java.time.LocalTime.now());
}
} catch (Exception e) {
System.out.println("consumer error! error = " e.getMessage());
}
}
}
}
情况一:每次拉取15条消息,每条消息处理1秒,max.poll.interval.ms=22000,heartbeat.interval.ms=3000,request.timeout.ms=11000,session.timeout.ms=10000
答:正常消费,不会触发重平衡,一个消费者,每次拉15条消息,消费完分区0后、消费分区1里的消息
情况二:每次拉取25条消息,每条消息处理1秒,max.poll.interval.ms=22000,heartbeat.interval.ms=3000,request.timeout.ms=11000,session.timeout.ms=10000
答:会触发重平衡,消费完22条消息时候,打印重平衡的日志
22917 [kafka-coordinator-heartbeat-thread | testfageGroup] WARN org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=testfageGroup] This member will leave the group because consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
22917 [kafka-coordinator-heartbeat-thread | testfageGroup] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=testfageGroup] Sending LeaveGroup request to coordinator 1.15.158.102:50012 (id: 2147363066 rack: null)
消费完25条数据后,重新加入消费分组 26135 [main] WARN org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=testfageGroup] Synchronous auto-commit of offsets {testfage-0=OffsetAndMetadata{offset=33, leaderEpoch=null, metadata=''}, testfage-1=OffsetAndMetadata{offset=33, leaderEpoch=null, metadata=''}, testfage-2=OffsetAndMetadata{offset=59, leaderEpoch=null, metadata=''}} failed: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records. 26135 [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=testfageGroup] Revoking previously assigned partitions [testfage-0, testfage-1, testfage-2] 26135 [main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=testfageGroup] (Re-)joining group 26167 [main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=testfageGroup] Successfully joined group with generation 9 26167 [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=testfageGroup] Setting newly assigned partitions [testfage-0, testfage-1, testfage-2]
默认的enable.auto.commit=true,导致offset提交失败,一直在重复拉消息消费
情况三:每次拉取1条消息,每条消息处理11秒,max.poll.interval.ms=22000,heartbeat.interval.ms=3000,request.timeout.ms=11000,session.timeout.ms=10000
答:11秒消费一条消息,能正常消费
情况四:每次拉取1条消息,每条消息处理15秒,max.poll.interval.ms=22000,heartbeat.interval.ms=3000,request.timeout.ms=11000,session.timeout.ms=10000
答:15秒消费一条消息,能正常消费
情况五:每次拉取1条消息,每条消息处理25秒,max.poll.interval.ms=22000,heartbeat.interval.ms=3000,request.timeout.ms=11000,session.timeout.ms=10000
答:拉一条消息重平衡一次,也提交offset失败
3141 [kafka-coordinator-heartbeat-thread | testfageGroup] WARN org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=testfageGroup] This member will leave the group because consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records. 23141 [kafka-coordinator-heartbeat-thread | testfageGroup] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=testfageGroup] Sending LeaveGroup request to coordinator 1.15.158.102:50012 (id: 2147363066 rack: null) 26114 [main] WARN org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=testfageGroup] Synchronous auto-commit of offsets {testfage-0=OffsetAndMetadata{offset=42, leaderEpoch=null, metadata=''}, testfage-1=OffsetAndMetadata{offset=33, leaderEpoch=null, metadata=''}, testfage-2=OffsetAndMetadata{offset=40, leaderEpoch=null, metadata=''}} failed: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records. 26114 [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=testfageGroup] Revoking previously assigned partitions [testfage-0, testfage-1, testfage-2] 26114 [main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=testfageGroup] (Re-)joining group 26141 [main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=testfageGroup] Successfully joined group with generation 108 26141 [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=testfageGroup] Setting newly assigned partitions [testfage-0, testfage-1, testfage-2]
情况六:每次拉取1条消息,每条消息处理21秒,max.poll.interval.ms=22000,heartbeat.interval.ms=23000,request.timeout.ms=11000,session.timeout.ms=10000
答:直接启动不了,报错
439 [main] INFO org.apache.kafka.common.security.authenticator.AbstractLogin - Successfully logged in. Exception in thread "main" org.apache.kafka.common.KafkaException: Failed to construct kafka consumer at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:805) at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:652) at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:632) at ckafka.demo.CKafkaSaslConsumerDemo.main(CKafkaSaslConsumerDemo.java:63) Caused by: java.lang.IllegalArgumentException: Heartbeat must be set lower than the session timeout at org.apache.kafka.clients.consumer.internals.Heartbeat.<init>(Heartbeat.java:43) at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:758) ... 3 more
# 使用 Kafka 消费分组机制时,消费者发送心跳的间隔。这个值必须小于 session.timeout.ms,一般小于它的三分之一
情况七:每次拉取1条消息,每条消息处理20秒,max.poll.interval.ms=22000,heartbeat.interval.ms=10000,request.timeout.ms=11000,session.timeout.ms=10000
答:也是一样,启动不了
467 [main] INFO org.apache.kafka.common.security.authenticator.AbstractLogin - Successfully logged in. Exception in thread "main" org.apache.kafka.common.KafkaException: Failed to construct kafka consumer at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:805) at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:652) at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:632) at ckafka.demo.CKafkaSaslConsumerDemo.main(CKafkaSaslConsumerDemo.java:63) Caused by: java.lang.IllegalArgumentException: Heartbeat must be set lower than the session timeout at org.apache.kafka.clients.consumer.internals.Heartbeat.<init>(Heartbeat.java:43) at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:758) ... 3 more
总结:不同语言的SDK实现上会有一些差异,但服务端是一样的,所以客户端的表现会有一些不一样。