Kafka client版本0.10
ProducerInterceptor
List-1
代码语言:javascript复制public interface ProducerInterceptor<K, V> extends Configurable {
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);
public void onAcknowledgement(RecordMetadata metadata, Exception exception);
public void close();
}
- onSend方法,在消息发送到Broker之前会调用
- onAcknowledgement,是Broker端返回确认消息后调用
ConsumerInterceptor
List-2
代码语言:javascript复制public interface ConsumerInterceptor<K, V> extends Configurable {
public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records);
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets);
public void close();
}
- onConsume方法是从Broker端取到消息,但是poll方法返回前调用
- onCommit是提交offset后调用
使用场景:我们可以在Producer端统一拦截,加上处理时间,再在consumer端统一拦截统计端到端的处理时间,这也是一种监控方式。