Kafka消息分区&producer拦截器&无消息丢失(八)

2022-12-14 17:48:07 浏览数 (1)

上篇文章说了,acks,1代表什么都不管,即使配置了回调也不会起作用,0代表不会等待replic副本里的不会持久化,只要broker leader持久化成功则返回给producer。-1代表all,则表示全部持久化成功才返回成功给producer,Retries,batch.size:kafka,linger.ms,buffer.memory,compression.type等参数。

producer参数---Kafka从入门到精通(七)

一、消息分区机制

producer发送过程有个很重要的步骤,就是确定发送的消息在哪个topic分区中。Producer提供了分区策略和对应的分区器(partitioner)供用户使用。新版本的会把相同key的消息发送到partition上,如果没有指定key,则会通过轮询分配均匀在topic所在分区,而对于旧版本的无法分配均匀。

自定义分区机制:

对于有key的消息,java版本的producer会通过自己的算法计算key的哈希值,然后在总分区取模分配到目标分区。但有的时候用户想实现自己的分区策略,而这又是默认partitioner无法实现的,那么此刻就可以用producer提供的自定义分区策略。

代码语言:javascript复制
 
/**
 * @author keying
 */
public class AuditPartitioner implements Partitioner {
 
    private Random random;
 
    @Override
    public int partition(String topic, Object keyObj, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        String key = (String) keyObj;
        List<PartitionInfo> partitionInfoList = cluster.availablePartitionsForTopic(topic);
        int auditPartition = partitionInfoList.size() - 1;
        return key == null || key.isEmpty() ||
                !key.contains("audit") ? random.nextInt(partitionInfoList.size() - 1) : auditPartition;
        //return 0;
    }
 
    @Override
    public void close() {
 
    }
 
    @Override
    public void configure(Map<String, ?> configs) {
        random = new Random();
    }
}

若自定义分区机制,则需要做两件事:

1、先定义一个类,实现org.apache.kafka.clients.producer.Partitioner接口,主要重写partition方法。

2、在构造kafkaProducer的时候propertites设置partitioner参数。

Partition方法里主要接受参数有topic,key和value,还有集群元数据信息,一起来确定目标分区,而close方法则是用于关闭partitioner的,主要是为了关闭那些创建partitioner时初始化的系统资源等。

举个例子如何实现自定义的partitioner呢,假设我们有个类似审计功能,审计功能发送kafka的时候可以给他分配字符串“audit”,我们想让这类消息发到topic最后一个分区上,便于后续统一处理,而对于相同topic下的其他消息则采用随机发送的策略发送到其他分区上。

所以,用户可以根据key来指定一些策略,还可以根据value信息做一些定制化分区策略。

二、消息序列化

网络中发送数据都是以字节的方式,kafka也不例外,它可以是字符串,一个整数,一个数组或者其他任意对象类型。序列化器(serializer)负责在producer发送将消息转换成字节数组,而与之相反,解序列化器(deserializer)则用于将consumer接受到的字节数组转换成相应的对象。

Kafka1.0.0默认提供十几种序列化器,常见的serializer用的是StringSerializer,然后其他的还有LongSerializer,IntegerSerializer等。如果是复杂的类型,比如Avro则需要自定义序列化。

三、Producer拦截器

Producer拦截器相当于一个新的功能,他可以在producer发送消息之后以及回调之前有机会对消息做些定制化需求,比如修改消息等。同时,producer允许用户指定多个interceptor

按序作用于同一条消息从而形成一个拦截器,intercetpor的实现接口是producerInterceptor,其定义方法如下:

onSend(producerRecord):该方法封装进kafkaProducer.send方法中,即他运行在用户主线程中。Producer确保在消息被序列化以计算分区前调用该方法。用户可以在该方法对消息做任何处理,但最好不要修改消息的所属topic和分区,否则影响分区计算。

onAcknowledgement(recordMetadata,Exception):该消息会在被应答之前或者消息发送失败时候调用,并且通常在producer回调触发之前调用。OnAcknoewledgement运行在producer的I/O线程中,因此不要在该方法放入很重的逻辑,否则会拖慢producer的消息发送效率。

Close:关闭interceptor,主要做一些资源清理工作。

如前所述,interceptor可能运行在多个线程中,因此具体实现时候需要用户自行确认保护线程安全。若指定多个interceptor,则producer将按照指定顺序调用他们,同时把每个interceptor中捕获的异常记录到错误日志中而不是向上传递。

代码语言:javascript复制
 
/**
 * @author keying
 * @date 2022-08-07 17:24:21
 */
public class OneInterceptor implements ProducerInterceptor<String, String> {
 
    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord record) {
        return new ProducerRecord(record.topic(), record.partition(), 
                record.timestamp(), System.currentTimeMillis()   ","   record.value().toString());
    }
 
    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
 
    }
 
    @Override
    public void close() {
 
    }
 
    @Override
    public void configure(Map<String, ?> configs) {
 
    }
}
 
 
 
 
/**
 * @author keying
 * @date 2022-08-07 17:27:40
 */
public class TwoInterceptor implements ProducerInterceptor<String, String> {
 
    private int errorCounter = 0;
    private int successCounter = 0;
 
    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        return null;
    }
 
    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        if (exception == null) {
            successCounter  ;
        } else {
            errorCounter  ;
        }
    }
 
    @Override
    public void close() {
 
    }
 
    @Override
    public void configure(Map<String, ?> configs) {
        System.out.println("成功:" successCounter);
        System.out.println("失败:" errorCounter);
    }
}

上面例子是实现一个简单的双inteceptor组成的拦截器,第一个拦截器会在消息发送前将时间戳加入到value,第二个拦截器则会统计成功和失败的次数。

四、无消息丢失配置

Producer采用的是异步发送消息机制,kafkaProducer.send方法仅仅把消息放入缓冲区,由一个专属的I/O线程负责提取缓冲区的消息并封装到batch中,然后发送出去。显然,整个过程存在数据丢失的窗口,若I/O线程在发送之前崩溃,则数据会丢失。

另一个问题则是消息会乱序,比如客户端依次发送两条消息到不同的分区:

Producer.send(records1);和producer.send(records2);

若此刻某些原因,网络出现瞬时抖动,导致records1发送失败,同时kafka又配置了重试机制,max.in.flight.requests.per.connection大于1(默认是5),这样会造成消息乱序,而实际场景很多情况需要包装按顺序消费。

所以这两个问题,kafka该如何规避呢?首先消息丢失很容易想到kafka的同步发送,但这样性能会很差,并不在实际场景中推荐使用。如何配置保证消息不会丢失呢?

Block.on.buffer.full = true

Acks=all 或者 -1

Retries=Integer.MAX_VALUE

Max.in.flight.request.per.connection=1

使用回调机制的send发送消息

CallBack逻辑中显式立即关闭producer,使用close(0)

Unclean.leader.election.enable=false

Replication.factor=3

Min.insync.replicas = 2

Replication.factor>min.insync.replicas

Enable.auto.commit=false

Producer端配置:

Block.on.buffer.full = true,实际上这个参数在kafka0.9.0版本已经被标记为deprecated的,并且使用max.block.ms替代,但还是推荐用户显示的设置它为true,使得内存缓冲区被填满时producer处于阻塞状态,并且停止接受新消息而不是抛出异常。否则producer生产速度过快会耗尽缓冲区,新版本0.10.0.0不用管这个参数,直接设置max.block.ms参数。

Acks = all很好理解,就是所有leader broker和副本replict里的follower都收到消息,才回复producer消息成功发送。

Retries=Integer.MAX_VALUE:这里设置无限大有点极端,想表达的是无线重试,但放心这里不会重试那些无法恢复的错误,只会重试那些可恢复的异常,所以可以放心的设置比较大的值,保证消息不会丢失。

max.in.flight.request.per.connection=1:设置为1防止消息在topic下乱序,这个设置的效果限制了producer在单个broker上连续发送的未响应请求数量。因此如果设置成1,则producer在某个broker发送响应之前将无法再给broker发送producer请求。

使用带回调的send,普通的send官方解释是fire and forget,只管把消息发出去,不管后续,如果发送失败,不会收到任何通知,这里肯定要带回调的send发送。

CallbackBack逻辑中显式处理立刻关闭producer:在calllback失败处逻辑立即使用kafkaProcuer.close(0),这样做的目的就是为了防止消息乱序问题。若不使用close关闭,默认情况下producer会被允许将未完成的消息发送出去,这样可能造成消息乱序。

Broker端配置:

Unclean.leader.election.eable = false:关闭unclean leader选举,即不允许非isr中的副本被选举成leader,从而避免broker端因为日志水位截断造成数据丢失。

Replication.factor>=3 :设置成3主要参考业界的三备份原则,强调多个副本才好。

Min.insync.replias>1:用于控制某条消息至少被写入ISR中多个副本才算成功,大于1代表提升持久性,只有在acks设置成-1或者all的时候才生效。

确保 replication.factors>min.insync.replicas :若两者相等,则只要有一个副本挂掉,则分区无法正常使用,虽然持久性很高,但可用性被降低,建议 replication.factory = min.insync.replicas 1。

0 人点赞