构造producer---Kafka从入门到精通(六)

2022-07-26 12:27:34 浏览数 (1)

上篇文章说了,kafka新版旧版的区别,producer全部异步发消息,并且提供回调机制callback,判断是否成功,通过分批次发送batching保证吞吐量,分区策略更加合理,旧版本默认是在一段时间内把消息发到固定区域,新版本采用轮询,消息更加均匀。Consumer新版为单线程执行,单个consumer线程管理多个socker,在10版本后,加入了心跳线程,这最多也就算了是双线程。偏移量 在新版本交给kafka处理,舍弃了zookeeper,这样可以依赖kafka备份机制天然实现高可用原理。

Kafka历史---Kafka从入门到精通(五)

一、构造producer

构造一个producer大致需要实现五个步骤:

1)构造一个properties,然后指定bootstrap.server,key.serializer和value.serializer这三个属性。这三个属性是必须的,service代表,localhost:9092,Key.serializer是

org.apache.kafka.common.serialization.stringSerializer。Value.serializer和key的 序列化一致。

2)使用propertites 构造kafkaProducer对象。

3)构造待发送对象,producerRecord,指定把消息发送到topic,partition,以及对应的key和value,其中partition和key不用指定。

4)调用kafkaProducer的send发送消息。

5)关闭kafkaProducer。

1、properties对象的构造

下面详细展开每一步要做的事,首先构造properties这里有三个参数是必须要指定的,他们分别如下:

1、bootstrap.servers:该参数指定host:port,用于kafka的broker服务器连接,producer使用时候会替换成实际的broker列表,如果kafka集群数量很多,那么只需要指定部分broker即可,不需要列出所有机器,因为不管指定几台broker,producer都会通过该参数发现集群中所有broker,该参数指定多台机器只为故障转移,这样即使一台broker挂了,producer重启后依然可以指定其他broker连接kafka集群。

2、Key.serializer:被发送到broker任何格式都必须是字节数组,因此消息的各个组件组件必须首先做序列化,然后才能发送到broker。该参数就是为消息key做序列化用的。这个参数指定是实现了serializer接口类全限定名称,kafka的大部分默认初始类型是primitive type,提供了现成的序列化器。StringSerializer,该类会将一个字符串转成字节数组,这个参数也揭示一个事实,这个用户可以自定义序列化器,只要实现serializer接口就可以。需要注意的时候,producer发送消息不指定key,也是需要配置这个参数的。

3、Value.serializer:org apache.kafka.common.serialization. Serializer,需要注意的是,这个必须写全称,和key.serializer类似,将消息的value部分转成字节数组。

2、KafkaProducer的构造

这时候就开始构造发消息的主入口,所有的功能都由kafkaProducer提供,只需要命令:

Producer<String,String> producer = new KafkaProducer<>(props);

创建producer的时候也可以把key和value序列化,如果序列化了,就不需要在properties的时候序列化。

3、producerRecord对象构造

下一步构造消息实例,java版本的producer使用producerRecord类来代表每条消息,创建producerRecord也很简单,指定topic和value,当然这里还可以指定pratition和key,值得注意的是,时间戳一定要谨慎使用,时间戳索引文件中索引项都是严格按照时间戳顺序,会导致该消息时间序列混乱,因此让kafka自行定义时间戳比较稳妥。

4、发送消息

Kafka producer发消息主要用send方法,虽然send只是两个简单方法签名,但是producer在底层完全实现了异步发送,并且使用java提供的future同时实现了同步发送 和 异步发送 回调(callback)两种方式。网上教程大部分是那种send,就不管,专有名称叫fire and forget,发送就忘记,这种在实际场景中不被推荐使用,因为对于发送结果producer完全不知道,所以真实的使用场景中,同步和异步发送还是最常见的方法。

异步发送

实际上所有写入操作都是默认异步,java版本的producer和send方法会返回一个java 的future对象供用户稍后获取发送结果,这就是所谓回调机制。

代码语言:javascript复制
  ProducerRecord producerRecord = new ProducerRecord("kafka-boot", "foo"   i);
                ProducerFactory factory = kafkaTemplate.getProducerFactory();
                Producer producer = factory.createProducer();
                producer.send(producerRecord, new Callback() {
                    @Override
                    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                        if (e == null) {
                           
                        }else if(e instanceof RetriableException){
                           
                        }else{
                           
                        }
                    }
                });

上面方法就是回调,实现了onCompletion方法,该方法两个参数metadata和exception,可以用if判断下,他们不会同时为null,也就是说至少有一个为null,当消息发送成功时候,exception为null,当消息发送失败的时候,metadata为null。

另外,上面的callback实际是java的接口,用户可以自定义callback实现类来处理消息发送后的逻辑,只需要实现org.apache.kafka.clients.producer.Callback接口即可。

同步发送

同步发送 和异步发送是通过future来区分的,调用future.get()无线等待结果返回,即实现同步发送效果。

使用future.get()会一直等待下去,直到kafka broker将返回结果给producer,当结果从broker处返回时get方法要么返回结果,要么抛出异常,由producer自行处理。如果没有错误,get将返回对应的recordMetada实例(包含已发送消息的所有元素),包括消息发送的topic,分区以及消息对应分区的位移信息。

不管同步发送还是异步发送都会发送失败的可能,导致返回异常错误,当前kafka的错误类型包含两类:可重试异常 和 不可重试异常。

常见可重试异常如下:

LeaderNotAvailableException:分区的leader副本不可用,通常出现在leader换届选举期间,通常是瞬时的异常,重试之后可以自行恢复。

NotControllerException:controller当前不可用,(后面会重点讲解controller),通常表面controller在选举,也可以重试恢复。

NetworkException:网络瞬时故障导致的异常。

对于这种可重试的异常,如果在 producer 程序中配置了重试次数,那么只要在规定的重试次数内自行恢复了,便不会出现在 onCompletion exception 中。不过若超过了重试次数仍没有成功,则仍然会被封装进 exception 中。此时就需要 producer 程序自行处理这种异常。

那么不可重试异常哪些呢:

RecordTooLargeException :发送的消息尺寸过大,超过了规定的大小上限 显然这种异常无论如何重试都是无法成功的。

SerializationException :序列化失败异常,这也是无法恢复的

KafkaException :其他类型的异常

所有这些不可重试异常 旦被捕获都会被封装进 Future 的计算结果井返回给 producer 程序,用户需要自行处理这些异常。由于不可重试异常和可重试异常在 producer 程序端可能有不同的处理逻辑,所以需要不同的区分。

5、关闭producer

程序结束一定要close,毕竟producer是占用系统资源的(比如创建了额外线程,申请了很多内存以及创建了socket连接等),因此必须要显式的调用kafkaProducer.close方法关闭producer。

如果只是普通的无参数调用close,则会等producer 会被允许先处理完之前的发送请求后再关闭,即所谓的“优雅”关闭退出( graceful shutdown) ;同时, KafkaProducer 还提供个带超时参数的 close 方法 close(timeout 如果调用此方法, producer 会等待 timeout 时间来完成所有处理中的请求,然后强行退出。这就是说,若 timeout 超时,则 producer 会强制结束,并立即丢弃所有未发送以及未应答的发送请求,在某种程度上,仿佛 producer端的程序丢失了要发送的消息。因此在实际场景中一定要谨慎使用带超时的 close 方法。

0 人点赞