书接上文:
Kafka核心知识点-技术探秘第一章
继续聊一聊Kafka相关的核心概念
Kafka如何保证消息的幂等性
所谓的消息幂等性就是如何保证消息只消费一次不重复消费。这需要从Kafka的多个角度去回答该问题一是要包含Kafka自身的机制,还需要考虑客户端自己的重复处理。
Consumer导致消息重复消费
Kafka的Consumer(消费者)offset还没来得及提交导致重复消费。所以,消费者可以手动提交offet来控制消息的消费情况。通过手动提交offset,消费者可以跟踪自己已经消费的消息,确保不会重复消费。
另外,消费者如何保证不重复消费消息的关键在于消费者做控制,因为MQ有可能无法保证不重复发送消息,所以在消费者端也应该控制:即使MQ重复发送了消息,消费者拿到消息之后,也要判断是否已经消费过该条消息。所以根据实际业务场景,有以下几种实现方式:
- 如果MQ(Kafka)拿到的数据是要存储到DB中,那么可以根据数据库创建唯一约束,这样的话,同样的数据从Kafak(MQ)发送过来之后,当插入到DB的时候,会违反唯一约束而不会插入成功。(再者也可以先查询一次,判断是否在DB中已经存在,从而决定是否让消息丢弃)
- 让Consumer(生产者)发送消息时,每条消息加一个全局唯一的ID,然后消费时,将该ID保存到Redis中。消费时先去Redis里面查一下有没有,没有再去消费。(原理和上面那条差不多)
- 如果拿到的数据直接放到Redis的set中的话,那就不用考虑了,因为其Set本身就是去重的
同时,在Kafka中每个消费者都必须加入至少一个消费者组(Consumer Group),同一个消费者组内的消费者可以共享消费者的负载。因此,如果一个消息被消费者组内的其中一个消费者消费了,那么其它消费者就不用在接收到这个消息了。
另外,客户端还可以自己做一些**幂等机制**,防止消息的重复消费。
Producer(生产者重复发送消息导致消息重复消费)
在Kafka中内部可以为每条消息生成一个全局唯一、与业务无关的消息ID,当MQ接收到消息时,会先根据ID判断消息是否重复发送,Kafka再决定是否接收该消息。
Kafka的Exactly-once来避免消息重复消费
Kafka内部提供了Exactly-once消费语义。简单理解其实就是引入事务,消费者使用事务来保证消息的消费和offset提交是原子的,而生产者可以使用事务来保证消息的生产和offset提交是原子的。Exactly-once消费语义则解决了重复问题。但是需要更复杂的设置和配置
Kafka的三种消息传递语义
在Kafka中,有三种比较常见的消息传递语义:
- at-least-once:至少一次
- at-most-once:至多一次
- exactly-once:仅一次
at-least-once
如何配置: 1. 设置enable.auto.commit 为false,禁用自动提交offset 2. 消息处理完之后手动调用
consumer.commitSync()
提交offset
这种语义有可能会对数据重复处理,因为该消费语义要保证消费者至少消费一次。在at-least-once语义中,在消费数据之后,手动调用函数consumer.commitSync()异步提交offset,有可能处理多次的场景是消费者的消息处理完并输出到结果库,但是offset还没提交,这个时候消费者挂掉了,再重启的时候会重新消费并处理消息,所以至少会处理一次
这种语义比较适用于实时数据处理或消费者不能容忍数据丢失的场景,比如金融交易或者电信指令。
at-most-noce(Kafka的默认实现)
如何配置: 1. 设置
enable.auto.commit
为true 2.auto.commit.interval.ms
设置一个较低的时间范围
由于上面的配置,此时的Kafka会有一个独立线程负责按照只当间隔提交offset。
消费者的offset已经提交,但是消息还在处理冲(没有处理完),这个时候程序挂了,导致数据没有被成功处理,再重启的时候会从上次提交的offset出处理,导致上次没有被成功处理的消息丢失了。
exactly-once
如何配置: 1. 将
enable.auto.commit
设置为false,禁用自动提交 2. 使用consumer.seek(topicParttion, offset)
来指定offset 3. 在处理消息的时候,要同时保存住每个消息的offset
这种语义可以保证数据只被消费处理一次。同时保证消息的顺序性。是以原子事务的方式保存offset和处理的消息结果。数据真正处理成功的时候才会保存offset信息。
在Kafka 0.11 版本之前,实现exactly-once语义需要一些特殊的配置和设置。但是在Kafka 0.11 版本之后,Kafka提供了原生的exactly-once支持,使得实现exactly-once语义变得更加简单和可靠
Kafka如何保证消息的顺序性
我们都知道Kafka的消息是存储在指定的Topic中的某个Partition中的,并且一个Topic是可以有多个Partition的,同一个Partition的消息是有序的,但是如果是不同的Partition或者不同的Topic的消息那就是无需的了。
为什么需要保证Kafka消息的顺序性呢?
假设需要做一个MySQL binlog的同步系统,在MySQL中有一个针对某条数据增删改的操作,对应出来的增删改三条binlog,接着这三条binlog需要发送到Kafka(MQ)中,消费者消费出来一次执行,此时就需要保证消息的一致性,否则数据就会出现问题。
为什么同一个Partition消息是有序的呢?
因为当生产者向某个Partition发送消息时,消息会被追加到该Partition中的日志文件中(log),并且会被分配一个唯一的offset,文件的读写是有顺序的。而消费者在该Partition消费消息时,会从该Partition的最早offset开始逐个读取消息 ,从而保证了消息的顺序性。
如何保证数据写入一个partition中去:
那么想要实现消息的顺序性消费,可以从一下角度参考:
- 因为Kafka中的Partition是可以保证消息的顺序性,如果消息只写入到一个Partition中,那么消息一定是有顺序性的。为了只写入一个Partition可以只在Topic中只创建一个Partition
- 如果确实需要存在多个Partition。发送消息的时候可以指定一个Partition。这样即使一个Topic中存在多个Partition,我们可以把需要保证顺序性的消息都发送到一个Partition中,这样就可以保证顺序消费了比如:生产者可以在生产写入数据的时候可以指定一个Key,比如指定某个订单id作为Key,这个订单相关的操作就会被分发到一个Partition中去。
什么情况下kafka会出现消息顺序不一致?
消费者内部利用了多个线程并发处理,则可能会出现顺序不一致的问题。
如图所示:
那么应该如何解决消费者端多线程并发处理消息导致消息顺序不一致的情况呢?
大致的思路可以按照hash
算法进行hash分发。因为相同的订单Key的数据会分发到一个内存queue
里面去。
如图:
Kafka将消息分发到同一个Partition中的具体实现方式
在Kafka中,当我们向其发送消息的时候,如果Key为null,那么Kafka会采用默认的Round-robin
策论,也就是轮转。具体实现类是:DefaultPartitioner
。所以如果想要指定发送消息到某个Partition中,可以参考下面的方式:
- 指定Partition发送消息的时候执行Partition,具体 可以在ProducerRecord中指定Partition
public class KafkaProducerExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 创建Kafka生产者
Producer<String, String> producer = new KafkaProducer<>(props);
// 指定要发送消息的主题
String topic = "Paidaxing_TOPIC";
// 要发送的消息内容
String message = "Hello Paidaxing!";
// 要发送消息的分区
int partition = 0;
// 创建包含分区信息的ProducerRecord
ProducerRecord<String, String> record = new ProducerRecord<>(topic, partition, null, message);
// 发送消息
producer.send(record);
// 关闭Kafka生产者
producer.close();
}
}
- 指定Key在没有指定Partition(null)时,如果有Key,Kafka会根据Key做Hash计算出一个Partition编号来,如果Key相同,那么也是可以分到一个Partition中的。
public class KafkaProducerExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 创建Kafka生产者
Producer<String, String> producer = new KafkaProducer<>(props);
// 指定要发送消息的主题
String topic = "paidaxing_topic";
// 要发送的消息内容
String message = "Hello Paidaxing!";
// 要发送消息的key
String key = "Paidaxing_KEY";
// 创建ProducerRecord,指定主题、键和消息内容
ProducerRecord<String, String> record = new ProducerRecord<>(topic, null, key, message);
// 发送消息
producer.send(record);
// 关闭Kafka生产者
producer.close();
}
}
- 自定义Partition
除了指定Partition和Key以外,还可以自定义实现自己的Partitioner(分区器)来指定消息发送到指定的Partition(分区)
创建一个自定义类并实现Partitioner接口,重写partition()
方法
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.record.InvalidRecordException;
import org.apache.kafka.common.utils.Utils;
import java.util.List;
import java.util.Map;
public class CustomPartitioner implements Partitioner {
@Override
public void configure(Map<String, ?> configs) {
// 这里处理和获取分区器的配置参数
}
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (keyBytes == null || !(key instanceof String)) {
throw new InvalidRecordException("键不能为空而且必须是字符串类型");
}`
// 根据自定义的逻辑,确定消息应该发送到哪个分区
String keyValue = (String) key;
int partition = Math.abs(keyValue.hashCode()) % numPartitions;
// 返回分区编号
return partition;
}
@Override
public void close() {
// 在这里进行一些清理操作
}
}
如上述代码所示,在partition()
方法中,利用了一简单的实现逻辑,根据键的Hash值将消息发送到相应的分区。为了在Kafka生产者中国使用自定义的Partitioner(分区器),需要在生产者的配置中指定Partitioner类(分区器类)
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class KafkaProducerExample {
public static void main(String[] args) {
// 设置Kafka生产者的配置属性
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 指定自定义分区器类
props.put("partitioner.class", "com.paidaxing.CustomPartitioner");
// 创建Kafka生产者
Producer<String, String> producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props);
// 指定要发送消息的主题
String topic = "Paidaxing_TOPIC";
// 要发送的消息内容
String message = "Hello Paidaxing!";
// 要发送消息的key
String key = "Paidaxing_KEY";
// 创建ProducerRecord,指定主题、键和消息内容
ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, message);
// 发送消息
producer.send(record);
// 关闭Kafka生产者
producer.close();
}
}
Kafka如何解决消息积压问题
消息积压问题大多数由于消费者故障导致,或者消费能力不足导致
解决思路如下:
- 如果Consumer有问题,先修复Consumer的问题,确保其能恢复正常消费速度。然后将现有Consumer都停掉
- 临时建立好原先10倍或者20倍queue的数量。(Kafka也新建一个Topic,Partition是原来的10倍)
- 写一个临时分发数据的Consumer程序,这个程序部署上去消费积压好的数据,消费之后不做耗时处理,直接均匀轮询写入已经建立好10倍/20倍数量的queue上
- 接着临时征用原来10倍的机器来部署Consumer,每一批Consumer来消费一个临时queue的数据
- 等快速消费完积压完的数据之后,得恢复原来的部署架构,重新用原来的Consumer机器来消费
Kafka可以保证消息100%不丢失吗?(为什么Kafka不能100%保证消息不丢失)
https://www.yuque.com/paidaxing-wskgg/axlvdy/zcf9sg3s3wzevd1v#XcWoQ
上面有提到过Kafka提供的Producer和Consumer之间的消息传递保证语义有三种。
- at-least-once:至少一次
- at-most-noce:至多一次
- exactly-once:每条消息保证精确传递一次。不多也不会少
目前,Kafka默认提供的交付可靠性保障时第二种,即at-least-once。但是其实Kafka如果仅靠自身是没办法保证消息时100%可靠的。
原因可以从一下角度考虑:
Producer(生产者)
Kafka时允许生产者以异步方式发送消息,这意味着Producer在发送消息后不会等待确认。淡然我们可以注册一个回调等待消息的成功回调。
但是,如果Producer在发送消息之后,Kafka的集群发生故障或崩溃,而消息尚未被完全写入Kafka的日志中,那么这些消息可能会丢失。虽然后续可能会有重试,但是如果重试也失败了呢?如果这个过程中刚好生产者也崩溃呢?那就可能会导致没人知道这条消息失败了。就会导致消息不再重试了。
Consumer(消费者)
消费者来说比较简单,只要保证再消息成功被消费时,再去提交offset,这样就不会导致消息丢失了。
Broker(集群)
- Kafka使用日志来做消息的持久化,日志文件事存储在磁盘上的,但是如果Broker在消息尚未完全写入日志之前就崩溃,那么消息就有可能丢失了。
- 并且,操作系统在写磁盘之前,会先把数据写入到
Page Cache
中,然后再由操作系统自己决定什么时候同步到磁盘当中,而在这个过程中,如果还没来得及同步到磁盘,就直接宕机了。那么这个消息也是丢失了。 - 不过,也可以通过配置
log.flush.interval.message=1
,来实现类似于同步刷盘的功能,但是这样又回到了之前的情况,就是还没来得及持久化就宕机了。 - 即使Kafka中引入了副本机制来提高消息的可靠性,但是如果发生同步延迟,还没来得及同步,主副本就挂掉了,那么消息还是可能发生丢失。
这几种情况是从Broker角度来分析,Broker自身是没办法保证消息不丢失的,但是如果配合Producer,在配置request.required.acks = -1
这种ACK策略,可以确保消息持久化成功之后,才会ACK给Producer,那么,如果我们的Producer再一定时间内,没有收到ACK是可以重新发送消息。
但是这种重新发送,就又回到了我们前面介绍Producer的时候的问题。生产者也有可能会挂掉,重新发送也有可能没有发送依据,导致消息最终丢失
归根到底,如果只靠Kafka自己,其实是没有办法保证极端情况下的消息100%不丢失的。
但是我们可以引入一些机制来解决保证这个问题,比如:引入分布式事务,或者引入本地消息表,保证Kafka Broker没有保存消息成功返回时,可以重新投递消息,这样才可以。
后续有机会讲讲分布式事务相关概念
好了,本章节到此告一段落。希望对你有所帮助,祝学习顺利。
我正在参与2024腾讯技术创作特训营第五期有奖征文,快来和我瓜分大奖!