两阶段提交的成立要基于以下假设:
- 该分布式系统中,存在一个节点作为协调者,其他节点作为参与者,且节点之间可以进行网络通信。
- 所有节点都采用预写式日志,且日志被写入后即被保存在可靠的存储设备上,即使节点损坏也不会导致日志数据的丢失。
- 所有节点不会永久性损坏,即使损坏后也可以恢复。
kafka事务
kafka实现了Exactly Once(精确一次)语义,主要是基于生产者端幂等以及kafka服务端事务保障。
生产者幂等
生产者幂等的实现主要是通过序列号(Sequence Number)标识分区消息顺序:
- Kafka的生产者幂等性是一种特性,它确保生产者在发送消息时,无论消息是否成功传递,都不会导致重复消息的发送。
- 幂等性是通过分配唯一的序列号(Sequence Number)给每条消息来实现的。这个序列号通常是递增的,每次发送新消息时会增加。
- 当生产者发送一条消息时,Kafka会根据消息的主题、分区和序列号来识别该消息,如果消息已经被成功接收并记录,那么即使生产者尝试再次发送具有相同序列号的消息,Kafka也只会视它为一条消息,不会重复添加。
序列号(Sequence Number)的作用:
- 序列号是为了确保消息的唯一性和有序性。它有助于Kafka在消息传递过程中跟踪消息,防止消息丢失或被重复传递。
- 序列号还用于保持消息的顺序。在Kafka中,每个分区都有一个顺序的消息日志,序列号帮助确保消息按照正确的顺序添加到分区中。
事务原理
kafka引入了Transaction Coordinator(类似Seata AT模式中的TC组件)用于协调管理事务。
伪代码如下:
代码语言:javascript复制// 创建 Producer 实例,并且指定 transaction id
KafkaProducer producer = createKafkaProducer(
“bootstrap.servers”, “localhost:9092”,
“transactional.id”, “my-transactional-id”);
// 初始化事务,这里会向 TC 服务申请 producer id
producer.initTransactions();
// 创建 Consumer 实例,并且订阅 topic
KafkaConsumer consumer = createKafkaConsumer(
“bootstrap.servers”, “localhost:9092”,
“group.id”, “my-group-id”,
"isolation.level", "read_committed");
consumer.subscribe(singleton(“inputTopic”));
while (true) {
ConsumerRecords records = consumer.poll(Long.MAX_VALUE);
// 开始新的事务
producer.beginTransaction();
for (ConsumerRecord record : records) {
// 发送消息到分区
producer.send(producerRecord(“outputTopic_1”, record));
producer.send(producerRecord(“outputTopic_2”, record));
}
// 提交 offset
producer.sendOffsetsToTransaction(currentOffsets(consumer), "my-group-id");
// 提交事务
producer.commitTransaction();
}
第一阶段 TC 服务收到事务提交请求后,会先将提交信息先持久化到事务 topic 。持久化成功后,服务端就立即发送成功响应给 Producer。然后找到该事务涉及到的所有分区,为每个分区生成提交请求,存到队列里等待发送。此时事务消息状态为事务提交.
第二阶段 后台线程会不停的从队列里,拉取请求并且发送到分区。当一个分区收到事务结果消息后,会将结果保存到分区里,并且返回成功响应到 TC服务。当 TC 服务收到所有分区的成功响应后,会持久化一条事务完成的消息到事务 topic。至此,一个完整的事务流程就完成了。
区别于一般的二阶段提交,协调者需要收到所有参与者的响应后,才能判断此事务是否成功,最后才将结果返回给客户。
kafka的处理逻辑则为:如果 TC 服务在发送响应给 Producer 后,还没来及向分区发送请求就挂掉了。因为每次事务的信息都会持久化,所以 TC 服务挂掉重新启动后,会先从 事务 topic 加载事务信息,如果发现只有事务提交信息,却没有后来的事务完成信息,说明存在事务结果信息没有提交到分区。
这里的事务消息就是事务日志。
参考
Kafka 事务实现原理
Exactly Once语义与事务机制原理
Flink 事务
Flink将两阶段提交协议中的通用逻辑抽象为了一个类——TwoPhaseCommitSinkFunction。
我们在实现端到端exactly-once的应用程序时,只需实现这个类的4个方法即可:
- beginTransaction:开始事务时,会在目标文件系统上的临时目录中创建一个临时文件,之后将处理数据写入该文件。
- preCommit:在预提交时,我们会刷新文件,关闭它并不再写入数据。我们还将为下一个Checkpoint的写操作启动一个新事务。
- commit:在提交事务时,我们自动将预提交的文件移动到实际的目标目录。
- abort:中止时,将临时文件删除。
第一阶段
Checkpoint的开始表示两阶段提交协议的"pre-commit"阶段,当触发Checkpoint时,Flink JobManager会向数据流注入一个barrier(它将数据流中的记录划分为进入当前Checkpoint的部分和进入下一个Checkpoint的部分)。Barrier会随着数据流在operator之间传递,对于每一个operator,都会触发它的状态后端来保存其状态数据。
预提交阶段在Checkpoint成功完成之后结束。在第一个阶段结束时,数据会被写入到外部存储。
第二阶段
当所有的实例做快照完成,并且都执行完 preCommit 时,会把快照完成的消息发送给 JobManager,JobManager(TC协调器)收到后会认为本次 Checkpoint 完成了,会向所有的实例发送 Checkpoint 完成的通知(Notify Checkpoint Completed),当 Sink 算子收到这个通知之后,就会执行 commit 方法正式提交。
这里的状态后端/外部存储对应的是事务日志。用于持久化日志信息。
Flink Checkpoint机制也是基于二阶段提交与事务日志来实现的。 可参考 <<Flink 内核原理与实现>>一书的第13章,见详细描述
参考
Flink——Flink CheckPoint之两阶段提交协议
剖析 Flink 端到端的一致性