1 分类
Kafka拦截器共两种:
- Producer端
- Consumer端
本篇主要讲述Kafka Producer端拦截器,对消息进行拦截或修改,也可用于Producer的Callback回调之前进行预处理。
2 使用
Kafka Producer端拦截器,主要实现ProducerInterceptor接口,此接口包含4个方法:
2.1 onSend
这是在序列化键和值并分配分区之前从 KafkaProducer.send(ProducerRecord) 和 KafkaProducer.send(ProducerRecord, Callback) 方法调用的(如果未在 ProducerRecord 中指定分区)。
允许此方法修改记录,在这种情况下,将返回新记录。修改键/值的含义是分区分配(如果未在 ProducerRecord 中指定)将基于修改后的键/值完成,而不是来自客户端的键/值。因此,在 onSend() 中完成的键和值转换需要保持一致:相同的键和值应该变异为相同的(修改后的)键和值。否则,日志压缩将无法按预期工作。
同样,由侦听器实现来确保在 ProducerRecord 中返回正确的主题/分区。大多数情况下,它应该与“record”的主题/分区相同。
此方法引发的任何异常都将被调用方捕获并记录下来,但不会进一步传播。
由于生产者可以运行多个拦截器,因此将按照 指定的顺序 ProducerConfig.INTERCEPTOR_CLASSES_CONFIG调用特定拦截器的 onSend() 回调。列表中的第一个侦听器获取从客户端传递的记录,下一个侦听器将传递前一个侦听器返回的记录,依此类推。由于允许侦听器修改记录,因此侦听器可能会获得已被其他侦听器修改的记录。但是,不建议构建依赖于前一个侦听器输出的可变侦听器管道,因为侦听器可能无法修改记录并引发异常,这可能会导致潜在的副作用。如果列表中的某个拦截器抛出来自 onSend() 的异常,则捕获并记录该异常,并使用列表中最后一个成功拦截器或客户端返回的记录调用下一个拦截器。
参数:
record – 来自客户端的记录或侦听器链中前一个拦截器返回的记录。
返回:
要发送到主题/分区的生产者记录
代码语言:java复制ProducerRecord<K, V> onSend(ProducerRecord<K, V> record)
Pro将消息序列化和分配分区之前,会调用拦截器的该方法对消息进行相应操作。一般最好不要修改消息ProducerRecord的topic、key及partition等信息,如要修改,也需确保对其有准确判断,否则会与预想偏差。如修改key不仅影响分区的计算,也影响Broker端日志压缩(Log Compaction)。
2.2 onAcknowledgement
当已确认发送到服务器的记录时,或者当发送记录在发送到服务器之前发送失败时,将调用此方法。
此方法通常在调用用户回调之前调用,在其他情况下,当引发异常时 KafkaProducer.send() 调用。
调用方将忽略此方法引发的任何异常。
此方法通常在后台 I/O 线程中执行,因此实现速度应该相当快。否则,从其他线程发送消息可能会延迟。
参数:
metadata – 已发送记录的元数据(即分区和偏移量)。如果发生错误,元数据将仅包含有效的主题和分区。如果 ProducerRecord 中未给出分区,并且在分配分区之前发生错误,则分区将设置为 RecordMetadata.NO_PARTITION。如果客户端将 null 记录 KafkaProducer.send(ProducerRecord)传递给 ,则元数据可能为 null。 exception – 在处理此记录期间引发的异常。如果未发生错误,则为 Null
代码语言:java复制void onAcknowledgement(RecordMetadata metadata, Exception exception)
消息被应答(Acknowledgement)之前或消息发送失败时调用,优先于用户设定的Callback之前执行。该方法运行在Producer的IO线程,所以实现逻辑越简单越好,否则影响消息发送速率。
2.3 close
代码语言:java复制void close()
关闭当前的拦截器,此方法主要用于执行一些资源的清理工作。
2.4 configure
代码语言:java复制configure(Map<String, ?> configs)
用来初始化此类的方法,这个是ProducerInterceptor接口的父接口Configurable中的方法。
一般情况下只需要关注并实现onSend或onAcknowledgement方法即可。
3 示例
onSend
代码语言:java复制@Override
public ProducerRecord<Integer, String> onSend(ProducerRecord<Integer, String> record) {
// 统计发送的消息个数
onSendCount ;
if (throwExceptionOnSend)
throw new KafkaException("Injected exception in AppendProducerInterceptor.onSend");
return new ProducerRecord<>(
record.topic(), record.partition(), record.key(), record.value().concat(appendStr));
}
onAcknowledgement
代码语言:java复制@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
// 统计发送消息的成功次数
onAckCount ;
if (exception != null) {
onErrorAckCount ;
// the length check is just to call topic() method and let it throw an exception
// if RecordMetadata.TopicPartition is null
if (metadata != null && metadata.topic().length() >= 0) {
onErrorAckWithTopicSetCount ;
if (metadata.partition() >= 0)
onErrorAckWithTopicPartitionSetCount ;
}
}
if (throwExceptionOnAck)
throw new KafkaException("Injected exception in AppendProducerInterceptor.onAcknowledgement");
}