生产者消息
消息发送流程
首先生产者线程main生成消息后调用send方法,然后会经过拦截器、序列化器、分区器(Partition),分区器会对消息进行分区放入不同的本地队列,本地队列保存在计算机的内存中,每个队列32m,每16k数据形成一批消息;
sender线程专门从内存中获取数据发送到kafka集群中,这里有2个主要参数:
- batch.size:只有数据累加到batch.size之后,sender线程才会发送数据到kafka,默认16k;
- linger.ms:如果数据一直都没达到batch.size,sender可以设置等待时间linger.ms,到了设定时间后就会发送数据,单位ms,默认是0ms,表示默认情况下内存不会保留数据有数据就会被发送;
Network Client:里面保存着数据已发送等待应答的请求,每个broker最大缓存5个请求;
Selector:选择器,由于与Kafka集群进行网络通信,发消息与收ack都通过这里,当收到ACK成功消息后会清除Network Client中的请求和内存中的batch数据,若失败会重试,重试次数可设置;
异步消息生产者
批量发送,如果设置成异步的模式,可以运行生产者以batch的形式push数据,这样会极大的提高broker的性能,但是这样会增加丢失数据的风险;异步方式,可以发送一条,也可以批量发送多条,特性是不需等第一次(注意这里单位是次,因为单次可以是单条,也可以是批量数据)响应,就立即发送第二次;
Property | Default | Description |
---|---|---|
queue.buffering.max.ms | 5000 | 启用异步模式时,producer缓存消息的时间。比如我们设置成1000时,它会缓存1s的数据再一次发送出去,这样可以极大的降低broker吞吐量,但也会造成时效性的降低; |
queue.buffering.max.messages | 10000 | 启用异步模式时,producer缓存队列里最大缓存的消息数量,如果超过这个值,producer就会阻塞或者丢掉消息; |
queue.enqueue.timeout.ms | -1 | 当达到上面参数时producer会阻塞等待的时间。如果设置为0,buffer队列满时producer不会阻塞,消息直接被丢掉;若设置为-1,producer会被阻塞,不会丢消息; |
batch.num.messages | 200 | 启用异步模式时,一个batch缓存的消息数量。达到这个数值时,producer才会发送消息。(每次批量发送的数量); |
producer.send(new ProducerRecord(this.topic, value), new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if(exception != null){
exception.printStackTrace();
}
}
});
同步和异步指client(producer)是否收到leader给的ack后才发下一条(对于异步就是同一批),acks = 0, -1和1是指leader节点和follower节点数据同步的方式,可靠性机制,是保证数据能成功备份到其他节点的机制,二者是独立关系,说简单点就是(ack就代表消息发送成功失败与否,ack的配置代表是否写入磁盘);
同步消息生产者
代码语言:javascript复制 RecordMetadata recordMetadata = producer.send(new ProducerRecord(this.topic, value)).get();
调用send返回future时,需要调用get方法,此时主线程会被阻塞;recordMetadata对象,在recordMetadata对象里包含了消息的一些元数据信息,比如当前消息的主题、分区号、分区中的偏移量offset、时间戳等;
生产者消息重试
发送消息会默认重试三次,每次间隔100ms;发送的消息会先进入到本地缓冲区(32mb),kakfa会跑一个线程,该线程去缓冲区中取16k的数据,发送到kafka,如果到 10 毫秒数据没取满16k,也会发送一次。异步的时候假如设置了缓存消息数量为200,但是一直没有200条数据,那么不可能会一直等下去,就会取16kb大小的数据,直接发,不够16kb也会发;
消费者消息
earliest:当分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费; latest:当分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据; none:当该topic下所有分区中存在未提交的offset时,抛出异常;
可靠性机制(ack属性配置)
producer可以一步的并行向kafka发送消息,但是通常producer在发送完消息之后会得到一个响应,返回的是offset值或者发送过程中遇到的错误。这其中有个非常重要的参数“request.required.acks",这个参数决定了producer要求leader partition收到确认的副本个数:
- acks=0:表示producer不需要等待任何broker确认收到消息的回复,就可以继续发送下一条消息。性能最高,但是最容易丢消息。这里要注意这个回复,回复的是这条消息是否写入磁盘,设置为0,就代表只要消息发出去了,就认为消息发送成功,就会继续发;
- acks=1:至少要等待leader已经成功将数据写入本地log,但是不需要等待所有follower是否成功写入。就可以继续发送下一条消息。这种情况下,如果follower没有成功备份数据,而此时leader又挂掉,则消息会丢失;
- acks=-1或all:需要等待min.insync.replicas(默认为 1 ,推荐配置大于等于2) 这个参数配置的副本个数都成功写入日志,这种策略会保证只要有一个副本存活就不会丢失数据。这是最强的数据保证。一般除非是金融级别,或跟钱打交道的场景才会使用这种配置;
同步异步与ack的联系
关于重试队列和死信队列
Kafka不支持重试机制也就不支持消息重试,也不支持死信队列,因此使用kafka做消息队列时,如果遇到了消息在业务处理时出现异常的场景时,需要额外实现消息重试的功能;
kafka消息顺序与重复
kafka消息顺序
全局顺序
全局顺序就目前的应用范围来讲,可以列举出来的也就限于binlog日志传输,如mysql binlog日志传输要求全局的顺序,不能有任何的乱序,这种的解决办法通常是最为保守的方式:
- 全局使用一个生产者;
- 全局使用一个消费者(并严格到一个消费线程);
- 全局使用一个分区(当然不同的表可以使用不同的分区或者topic实现隔离与扩展);
局部顺序
局部有序是指在某个业务功能场景下保证消息的发送和接收顺序是一致的。如:订单场景,要求订单的创建、付款、发货、收货、完成消息在同一订单下是有序发生的,即消费者在接收消息时需要保证在接收到订单发货前一定收到了订单创建和付款消息;
针对这种场景的处理思路是:针对部分消息有序(message.key相同的message要保证消费顺序)场景,可以在producer往kafka插入数据时控制,同一key分发到同一partition上面。因为每个partition是固定分配给某个消费者线程进行消费的,所以对于在同一个分区的消息来说,是严格有序的(在kafka 0.10.x以前的版本中,kafka因消费者重启或者宕机可能会导致分区的重新分配消费,可能会导致乱序的发生,0.10.x版本进行了优化,减少重新分配的可能性);
消息重试对顺序消息的影响
对于一个有着先后顺序的消息A、B,正常情况下应该是A先发送完成后再发送B,但是在异常情况下,在A发送失败的情况下,B发送成功,而A由于重试机制在B发送完成之后重试发送成功了,这时对于本身顺序为AB的消息顺序变成了BA;
消息producer发送逻辑的控制
消息producer在发送消息的时候,对于同一个broker连接是存在多个未确认的消息在同时发送的,也就是存在上面场景说到的情况,虽然A和B消息是顺序的,但是由于存在未知的确认关系,有可能存在A发送失败,B发送成功,A需要重试的时候顺序关系就变成了BA,简之一句就是在发送B时A的发送状态是未知的;
针对以上的问题,严格的顺序消费还需要以下参数支持:max.in.flight.requests.per.connection(在发送阻塞前对于每个连接,正在发送但是发送状态未知的最大消息数量。如果设置大于1,那么就有可能存在有发送失败的情况下,因为重试发送导致的消息乱序问题,所以将其设置为1,保证在后一条消息发送前,前一条的消息状态已经是可知的;)
kafka消息重复
kafka生产者在发送数据的时候,通常会有同步与异步发送,异步就是缓存部分数据,达到一定条数或时间后批量发送,效率高效。那么不管同步还是异步,消息是否发送成功,Kafka通过acks这个参数来控制的:
- 0--- 就是kafka生产端发送消息之后,不管broker的副本有没有成功收到消息,在producer端都会认为是发送成功了,这种情况提供了最小的延迟,和最弱的持久性,如果在发送途中leader异常,就会造成数据丢失,但可以保证数据不重复;
- 1--- 是kafka默认的消息发送确认机制,此机制是在producer发送数据成功,并且leader接收成功并确认后就算消息发送成功,但是这种情况如果leader接收成功了,但是follwer未同步时leader异常,就会造成上位的follwer丢失数据,提供了较好的持久性和较低的延迟性,不可以保证数据不重复;
- -1--- 也可以设置成all,此机制是producer发送成功数据,并且leader接收成功,并且follwer也同步成功之后,producer才会发送下一条数据,可以保证数据不丢失,不能保证数据不重复;
通常为了兼顾效率与数据安全,将acks设置为1,只让每个分区的leader确认收到消息即可,不能副本是否同步数据完毕。那么在生产者发送数据到kafka后,如果返回成功的时候,由于网络等原因出现异常,那么生产者是收不到成功信号的,会重发,导致消息重复;消费者在成功消费后,可能还没有来得及提交偏移量,程序异常,即偏移量没有成功提交,下次启动也会重复消费;生产者重复发送数据,消费者重复消费数据,这些都导致消息重复,那么避免重复也应该在消息的生产与消费来避免;
对于生产端:
- 每个分区使用一个单独的写入器,每当你发现一个网络错误,检查该分区中的最后一条消息,以查看您的最后一次写入是否成功;
- 在消息中包含一个主键(UUID或其他),并在用户中进行反复制;
对于消费端:
- 采用exactly-once语义,消息消费结果保存与手动提交偏移量做成一个事务,比如一条sql语句既保存结果也保存偏移量,要么一起成功,要么一起失败;
- 也可以根据数据唯一字段进行重复判断;