KafkaConsumer-Kafka从入门到精通(十)

2022-12-14 17:49:02 浏览数 (1)

上篇文章说了,消息压缩可以看分情况进行,判断下服务器cpu空闲还是io空闲较多,如果cpu空闲较多,则考虑消息积压,反之则不考虑。还有消费者组,consumer group,对于同一个group,只会发送一条消息进入一个实例。位移提交在0.9.0.0版本之前是保存到zookeeper,后来版本是保存在内部topic的__consumer offsets。

消费者组consumer group详解-Kafka从入门到精通(九)

一、构建consumer

1、构建一个properties对象,至少指定bootstrap.services、key.deserializer、value.deserializer和groupid的值,即代码指明显示带注解的4个参数。

2、使用上一步创建properties实例构造kafkaConsumer对象。

3、调用kafkaConsumer.subscribe方法订阅consumer group感兴趣的topic列表。

4、循环调用kafkaConsumer.poll方法获取封装在consumerRecord的topic消息。

5、处理获取到的consumerRecord对象。

6、关闭kafkaConsumer。

BootStrap.services

这和producer相同,这是必须要指定的参数,该参数指定了一组host:port对,用于创建与kafkabroker服务器的socket连接,可以指定多组,可以用,隔开。另外和producer相同,如果broker集群很多,只需要指定部分的broker集群就好。

GroupId

该参数指定consumer group的名字,这是唯一的标识,通常会为groupId设置一个有业务意义的名称。

Key.deserializer

Consumer代码从broker端获取任何消息都是字节数组格式,因此消息的每个组件都要执行相应的序列化操作才能“还原”成原来对象格式,这个参数就是为消息的key做解序列化的。StringDeserializer类会将收到的字节数组转成utf-8编码字符串,consumer支持用户自定义deserializer,这通常与producer端的序列化编码遥相呼应。

值得注意的是,consumer无论是否指定了key,这个参数都必须要设置。

Value.deserializer

与前面类似,对消息体消息进行解序列化,而且把消息还原成原来的对象类型,当然,value.deserializer可以设置成与key.deserializer不同的值,前提是key.serializer和value.serializer设置了不同的值。

构造kafkaConsumer对象

设置好四个参数后,我们就可以构造kafkConsumer对象,kafkaConsumer是consumer的主入口:

KafkaConsumer<String,String> consumer = new KafkaConsumer<>(props);

创建kafkaConsumer也可以同时指定key和value的deserializer。

如果制定了deserializer则不需要显式在properties指定了。

订阅topic

如果要订阅多个

Consumer.subscribe(Arrays.asList(“topic1”,”topic2”,”topic3”));

值得注意的是,这个订阅不是增量,后面新设置的会覆盖前面设置好的。

获取消息

KafkaConsumer的关键方法就是kafkaConsumer.poll方法从订阅的topic中获取多个分区的消息,为了实现这点,这个有点类似于select I/O机制,所有相关事件(rebalance、获取消息等)都发生在一个事件循环(event loop)中。这样consumer端只用一个线程就可以完成所有类型的IO操作。

Consumer.poll(1000)。

这里1000代表超时时间,通常情况下consumer拿到了足够多的数据,那么可以立即返回,但若没有足够多的可用数据,则consumer会一直阻塞,这个超时就控制阻塞最大时间。

处理consumerRecords对象

上一步poll调用返回以consumerRecord类封装的kafka消息。拿到消息后,consumer通常包含处理这些消息的逻辑,毕竟consumer不光处理消息,还要对不同的业务消息进行处理。

那么在消费者抱怨我的consumer消费太慢,指的是调用poll这步呢,还是处理consumerRecord对象这步呢。

从kafkaConsumer的角度,只要poll访问确认返回即消费成功,因此回答上面的问题要明确,consumer消费慢的原因在哪,并且针对性的进行改进,所以如果poll返回消息速度慢,那么可以调节相应的参数来提升poll方法效率。若消费的业务处理逻辑太慢,则需要考虑逻辑优化或者放入单独的线程中。

关闭consumer

KafkaConsumer.close():默认等待30s关闭。consumer程序结束后要显式关闭以释放kafkaConsumer运行过程中占用的资源(比如线程资源,内存,socket连接)。

KafkaConsumer.close(1000),等待1秒关闭。

Consumer脚本命令

除了自己写的程序建立consumer外,kafka还自带了方便使用控制台consumer脚本用于日常验证调试,改脚本名称是kafka-console-consumer,在linux平台位于kafka的bin目录下,在windows平台位于kafka的bin/windows下。

Kafka-console-consumer脚本常见命令如下:

--bootstrap-services:指定kafka broker列表,多台broker则以逗号隔开。目前来说,consumer脚本中的名字是bootstrap-server,到了producer脚本中变成了broker-list。

--topic:指定消费者的topic。

--from-beginning:是否指定从头消费,指定参数与java api中设置auto.offset.reset=earilest效果一样。

Consumer主要参数

session.timeout.ms:非常重要的参数之一,简单来说他是监测消费组内成员发送崩溃时间,假设你设置改参数为5分钟,那么group某个成员崩溃,group coordinator可能需要五分钟才能感知到这个崩溃,显然我们不想那么久时间。但这个参数还有另外一层含义,consumer消息处理逻辑的最大时间,倘若consumer两次poll之间时间超过了这个时间,那么coodinator会认为consumer已经追不上组内其他成员消费,因此会将这个consumer踢出组,该consumer负责的分区也会被分配给其他consumer,这会导致没必要的rebalance,因为consumer之后需要重新加入group。

更糟糕的是,那些被移除的group后处理的消息,consumer无法提交位移,这就意味着后面rebalance会被重新消费。

于是kafka版本在0.10.1.0版本对该参数进行了拆分,明确session.timeout.ms明确为coorditor监测失效时间。因为实际应用中,可以设置一个较小的值来监测是否崩溃。默认都是10秒。

Max.poll.interval.ms

如上所述,上面代表coorditor监测consumer失效时间代表session.timeout.ms。则最大拉取poll间隔时间也需要单独表示,在一个典型的使用场景中,consumer可能需要花费很长时间,假设用户业务是需要把消息落地到数据库中,而这个业务需要执行两分钟,那么这个参数至少需要设置成2分钟以上。

所以实际业务场景中,设置较小的session.timeout.ms 和实际业务场景设置max.poll.interval.ms则可以实现快速发现崩溃,保证不必要的balance。

Auto.offset.reset

指定了无位移信息或者位移越界时kafka的对应策略(consumer要消费的消息位移不在当前消息日志的合理区间范围)。无位移信息 或者 位移越界只需要满足两个条件当中的任何一个该参数才有效果。举个例子,假设你首次运行一个consumer group并且指定从头消息。显然该consumer会从头消费所有数据,因为此刻没有位移信息,如果你提交位移后,重启group,这时候参数就不会生效,此刻会发现group并不会从头消费,而是从提交的位移处开始。

目前改参数可能取值:

Earlies:指定从最早的位移开始消费,注意这里最早的位移不一定是0。

Latest: 指定从最新的位移开始消费。

None:指定如果未发生位移或者位移越界,则抛出异常。(至今未遇到过改参数设置成none的)

Enable.auto.commit

该参数指定consumer是否自动提交位移,若设置为true,则consumer在后台自动提交位移。否则用户手动提交位移。对于有较强“精确处理一次”语义需求用户来说,最好将参数设置成false,用户自行处理位移提交。

Fetch.max.bytes

指定了consumer单次获取数据的最大字节,若实际应用场景很大,则必须要设置很大的参数,否则无法消费。

Max.poll.records

该参数控制单次poll返回的最大消息数。比较极端的做法是设置该参数为1,那么每次poll只会返回一条消息。如果用户发现consumer端在poll速度太慢,可以适当增加该参数的值,如果用户消息处理逻辑很轻量,默认500条消息不能满足实际处理速度。

Heartbeat.interval.ms

表面看是心跳间隔时间,既然有上面的session.timeout.ms用于设置超时,为何还要引入新的参数呢。这里要搞清楚consumer group的其他成员要开启新的rebalance,当coordinator决定开启新一轮rebalance时,他会决定以rebalance_in_progress 异常的形式“塞进”consumer心跳请求的response中,这样其他成员拿到response后才知道它需要重新加入group。

比较推荐的做法就是设置一个比较低的值,让group其他的consumer成员更快感知到新一轮的rebalance开启了。注意这个值必须小于sesstion.timeout.ms,这很容易理解,毕竟consumer在这段时间内都不发送心跳,会认为已经dead。

Connection.max.idle.ms

这又是一个忽略的参数,经常用户抱怨再生产周期下观测到平均请求处理时间在飙升,这很有可能因为kafka定期关闭socket连接导致下次consumer处理请求需要重新创建broker的socke连接。当前默认是9分钟,如果用户在实际环境不在乎socket资源开销,比较推荐设置为-1,即不要关闭这些空闲时间。

0 人点赞