kafka拦截器实现队列插队效果

2022-04-11 18:34:00 浏览数 (1)

前言

突然出现一个任务需要对kafka处理的数据进行插队操作(内心小崩溃。。。),研究了一下,还是可以使用拦截器进行实现这样的效果的。

拦截器(Interceptor)

是早在Kafka 0.10.0.0中就已经引入的一个功能,Kafka一共有两种拦截器:生产者拦截器和消费者拦截器。

ProducerInterceptor

先看代码

代码语言:javascript复制
@Slf4j
@Service
public class MyProducerInterceptor implements ProducerInterceptor {

    //在发送broke之前的一个操作,可以对数据进行加工处理或者进行topic pationtion重新指向     
    @Override
    public ProducerRecord onSend(ProducerRecord producerRecord) {
        return null;
    }
    /**
    * KafkaProducer 会在消息被应答(Acknowledgement)之前或消息发送失败时调用生产者拦截器的 onAcknowledgement()方法
    * 
    *
    **/
    @Override
    public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {

    }

    //close()方法主要用于在关闭拦截器时执行一些资源的清理工作。
    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> map) {

    }
}

onSend

就是在发送之前对数据的处理

onAcknowledgement

接收kafka服务端接受到消息响应的处理

代码语言:javascript复制
    服务端应答的模式可以通过acks进行设置: 1 0 -1
1 : 在生产者发送消息之后,从节点保存完数据,就会进行响应,如果消息无法写入leader副本,比如在leader 副本崩溃、重新选举新的 leader 副本的过程中,那么生产者就会收到一个错误的响应,为了避免消息丢失,生产者可以选择重发消息。如果消息写入leader副本并返回成功响应给生产者,且在被其他follower副本拉取之前leader副本崩溃,那么此时消息还是会丢失,因为新选举的leader副本中并没有这条对应的消息。acks设置为1,是消息可靠性和吞吐量之间的折中方案。

0 : acks=0。生产者发送消息之后不需要等待任何服务端的响应。如果在消息从发送到写入Kafka的过程中出现某些异常,导致Kafka并没有收到这条消息,那么生产者也无从得知,消息也就丢失了。在其他配置环境相同的情况下,acks 设置为 0 可以达到最大的吞吐量。

-1: acks=-1或acks=all。生产者在消息发送之后,需要等待ISR中的所有副本都成功写入消息之后才能够收到来自服务端的成功响应。在其他配置环境相同的情况下,acks 设置为-1(all)可以达到最强的可靠性。但这并不意味着消息就一定可靠,因为ISR中可能只有leader副本,这样就退化成了acks=1的情况

close

方法主要用于在关闭拦截器时执行一些资源的清理工作。

configure

可以获取到生产者的配置

ConsumerInterceptor

代码语言:javascript复制
    AtomicInteger atomicInteger=new AtomicInteger(0);
    @Override
    public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> consumerRecords) {
        
        return new ConsumerRecords( );
    }

    @Override
    public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
        offsets.forEach((k,v)->{
            System.out.println(k "---" v);
        });
    }

    private String getJsonTrackingMessage(ConsumerRecord<String, String> record) {
        return record.value();
    }

    //拦截器关闭做一些操作
    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {
    }

onConsume

在consumer poll 返回之前的一个操作,一般我们如果没有设置(MAX_POLL_RECORDS_CONFIG,这个配置在0.9之后才有用) 那么默认poll返回的数据是500条,注意这个500也许来自多个分区,因为一个消费者可能被分配到多个分区。我们可能对poll到的数据进行重新修改或者过滤,然后返回一个新的ConsumerRecords,注意poll之后的commit是对服务端提交消费数据的偏移量,如果修改或者新增数据,需要注意如果修改了offset数据是否会造成重复消费的问题

onCommit

在commit成功之后,这个方法我们可以获取到消费数据的具体偏移量

onClose

和之前的生产者一样作用

实现方案

实现一个消费者拦截器,并且重新构造返回的消费数据,如果是新加入的消费数据,就不进行消费,如下面onConsume测试代码

代码语言:javascript复制
        int mark = atomicInteger.incrementAndGet();
        Map<TopicPartition, List<ConsumerRecord<String, String>>> newRecords
                = new HashMap<>();
        ConsumerRecord consumerRecord=
                new ConsumerRecord("xpp_test",0,0,null,1000);
        for (TopicPartition tp : records.partitions()) {
            List<ConsumerRecord<String, String>> tpRecords =
                    records.records(tp);
            List<ConsumerRecord<String, String>> newTpRecords = new ArrayList<>();
            newTpRecords.add(consumerRecord);
            newTpRecords.addAll(tpRecords);
            if (!newTpRecords.isEmpty()) {
                newRecords.put(tp, newTpRecords);
            }
        }
        return new ConsumerRecords<>(newRecords);

0 人点赞