kafka事务机制
- kafka的事务机制,是kafka实现端到端有且仅有一次语义(end-to-end EOS)的基础;事务涉及到 transactional producer 和transactional consumer, 两者配合使用,才能实现端到端有且仅有一次的语义(end-to-end EOS),producer和consumer是解耦的,也可以使用非transactional的consumer来消费transactional producer生产的消息,但此时就丢失了事务ACID的支持;
- 通过事务机制,kafka可以实现对多个topic的多个partition的原子性的写入,即处于同一个事务内的所有消息,不管最终需要落地到哪个topic的哪个partition, 最终结果都是要么全部写成功,要么全部写失败(Atomic multi-partition writes);kafka的事务机制,在底层依赖于幂等生产者,幂等生产者是kafka事务的必要不充分条件;
- 事实上,开启kafka事务时,kafka会自动开启幂等生产者;
kafka事务支持的设计原理
Transaction Coordinator和Transaction Log:
- transaction coordinator是kafka broker内部的一个模块,transaction coordinator负责对分区写操作进行控制,而transaction log是kakfa的一个内部topic, 所以kafka可以通过内部的复制协议和选举机制(replication protocol and leader election processes),来确保transaction coordinator的可用性和transaction state的持久性;transaction log topic内部存储的只是事务的最新状态和其相关元数据信息,kafka producer生产的原始消息,仍然是只存储在kafka producer指定的topic中;
- Procedure就是和Transaction Coordinator交互获得TransactionID对应的任务状态。Transaction Coordinator还负责将事务写入kafka内部的一个topic,这样即使整个服务重启,由于事务状态得到保存,正在进行的事务状态可以得到恢复,从而继续进行;
kafka事务机制下读写流程
- kafka生产者通过initTransactions API将 transactional.id注册到 transactional coordinator:此时,此时 coordinator会关闭所有有相同transactional.id 且处于pending状态的事务,同时也会递增epoch来屏蔽僵尸生产者 (zombie producers). 该操作对每个 producer session只执行一次(producer.initTransaction());
- kafka生产者通过beginTransaction API开启事务,并通过send API发送消息到目标topic:此时消息对应的 partition会首先被注册到transactional coordinator,然后producer按照正常流程发送消息到目标topic,且在发送消息时内部会通过校验屏蔽掉僵尸生产者(zombie producers are fenced out.(producer.beginTransaction();producer.send()*N;);
- kafka生产者通过commitTransaction API提交事务或通过abortTransaction API回滚事务:此时会向 transactional coordinator提交请求,开始两阶段提交协议 (producer.commitTransaction();producer.abortTransaction(););
- 在两阶段提交协议的第一阶段,transactional coordinator 更新内存中的事务状态为 “prepare_commit”,并将该状态持久化到transaction log中;
- 在两阶段提交协议的第二阶段, coordinator首先写transaction marker标记到目标topic的目标partition,这里的transaction marker,就是我们上文说的控制消息,控制消息共有两种类型:commit和abort,分别用来表征事务已经成功提交或已经被成功终止;
- 在两阶段提交协议的第二阶段,coordinator在向目标topic的目标partition写完控制消息后,会更新事务状态为“commited” 或“abort”, 并将该状态持久化到transaction log中;
- kafka消费者消费消息时可以指定具体的读隔离级别,当指定使用read_committed隔离级别时,在内部会使用存储在目标topic-partition中的事务控制消息,来过滤掉没有提交的消息,包括回滚的消息和尚未提交的消息;kafka消费者消费消息时也可以指定使用read_uncommitted隔离级别,此时目标topic-partition中的所有消息都会被返回,不会进行过滤;
kafka事务在应用程序的使用
配置修改
- producer 配置项更改:
- enable.idempotence = true
- acks = “all”
- retries > 1 (preferably MAX_INT)
- transactional.id = ‘some unique id’
- consumer 配置项更改:
- 根据需要配置 isolation.level为 “read_committed”, 或 “read_uncommitted”;
程序层
代码语言:javascript复制/**
This specifies that the KafkaConsumer should only read non-transactional messages,
or committed transactional messages from its input topics.
*/
KafkaConsumer consumer = createKafkaConsumer(
“bootstrap.servers”, “localhost:9092”,
“group.id”, “my-consumerGroup-id”,
"isolation.level", "read_committed");
consumer.subscribe(Collections.singleton(“inputTopic”));
/**
Consume some records, start a transaction, process the consumed records,
write the processed records to the output topic, send the consumed offsets to
the offsets topic, and finally commit the transaction. With the guarantees mentioned
above, we know that the offsets and the output records will be committed as an atomic
unit.
*/
while (true) {
ConsumerRecords records = consumer.poll(Long.MAX_VALUE);
producer.beginTransaction();
for (ConsumerRecord record : records)
producer.send(new ProducerRecord(“outputTopic”, record));
//This method should be used when you need to batch consumed and produced messages
//together, typically in a consume-transform-produce pattern.
producer.sendOffsetsToTransaction(currentOffsets(consumer), my-consumerGroup-id);
producer.commitTransaction();
}
kafka全局一致的transactional.id维护
transactional.id在kafka的事务机制中扮演了关键的角色,kafka正是基于该参数来过滤掉僵尸生产者的 (fencing out zombies);生产者事务引入了一个全局唯一的TransactionId,将Procedure获得的PID和TransactionID绑定,这样Producer重启后就可以获得当前正在进行事务的PID;
那么如何在跨session的众多producer中 (向同一个kafka集群中生产消息的producer有多个,这些producer还有可能会重启),选用一个全局一致的transactional.id,以互不影响呢?
大体的思路有两种:
- 一是通过一个统一的外部存储,来记录生产者使用的transactional.id和该生产者涉及到的topic-partition之间的映射关系;
- 二是通过某些静态编码机制来生成一个全局唯一的transactional.id;
使用transactional API,用户需要配置transactional.id,但不需要配置ProducerId,Kafka内部会自动生成并维护一个全局唯一的ProducerIds;