从kafka与Flink的事务原理来看二阶段提交与事务日志的结合使用

2023-10-19 19:51:20 浏览数 (2)

两阶段提交的成立要基于以下假设:

  • 该分布式系统中,存在一个节点作为协调者,其他节点作为参与者,且节点之间可以进行网络通信。
  • 所有节点都采用预写式日志,且日志被写入后即被保存在可靠的存储设备上,即使节点损坏也不会导致日志数据的丢失。
  • 所有节点不会永久性损坏,即使损坏后也可以恢复。

kafka事务

kafka实现了Exactly Once(精确一次)语义,主要是基于生产者端幂等以及kafka服务端事务保障。

生产者幂等

生产者幂等的实现主要是通过序列号(Sequence Number)标识分区消息顺序:

  1. Kafka的生产者幂等性是一种特性,它确保生产者在发送消息时,无论消息是否成功传递,都不会导致重复消息的发送。
  2. 幂等性是通过分配唯一的序列号(Sequence Number)给每条消息来实现的。这个序列号通常是递增的,每次发送新消息时会增加。
  3. 当生产者发送一条消息时,Kafka会根据消息的主题、分区和序列号来识别该消息,如果消息已经被成功接收并记录,那么即使生产者尝试再次发送具有相同序列号的消息,Kafka也只会视它为一条消息,不会重复添加。

序列号(Sequence Number)的作用:

  • 序列号是为了确保消息的唯一性和有序性。它有助于Kafka在消息传递过程中跟踪消息,防止消息丢失或被重复传递。
  • 序列号还用于保持消息的顺序。在Kafka中,每个分区都有一个顺序的消息日志,序列号帮助确保消息按照正确的顺序添加到分区中。
事务原理

kafka引入了Transaction Coordinator(类似Seata AT模式中的TC组件)用于协调管理事务。

伪代码如下:

代码语言:javascript复制
// 创建 Producer 实例,并且指定 transaction id
KafkaProducer producer = createKafkaProducer(
  “bootstrap.servers”, “localhost:9092”,
  “transactional.id”, “my-transactional-id”);

// 初始化事务,这里会向 TC 服务申请 producer id
producer.initTransactions();

// 创建 Consumer 实例,并且订阅 topic
KafkaConsumer consumer = createKafkaConsumer(
  “bootstrap.servers”, “localhost:9092”,
  “group.id”, “my-group-id”,
  "isolation.level", "read_committed");
consumer.subscribe(singleton(“inputTopic”));

while (true) {
  ConsumerRecords records = consumer.poll(Long.MAX_VALUE);
  // 开始新的事务
  producer.beginTransaction();
  for (ConsumerRecord record : records) {
    // 发送消息到分区
    producer.send(producerRecord(“outputTopic_1”, record));
    producer.send(producerRecord(“outputTopic_2”, record));
  }
  // 提交 offset
  producer.sendOffsetsToTransaction(currentOffsets(consumer), "my-group-id");  
  // 提交事务
  producer.commitTransaction();
}

第一阶段 TC 服务收到事务提交请求后,会先将提交信息先持久化到事务 topic 。持久化成功后,服务端就立即发送成功响应给 Producer。然后找到该事务涉及到的所有分区,为每个分区生成提交请求,存到队列里等待发送。此时事务消息状态为事务提交.

第二阶段 后台线程会不停的从队列里,拉取请求并且发送到分区。当一个分区收到事务结果消息后,会将结果保存到分区里,并且返回成功响应到 TC服务。当 TC 服务收到所有分区的成功响应后,会持久化一条事务完成的消息到事务 topic。至此,一个完整的事务流程就完成了。

区别于一般的二阶段提交,协调者需要收到所有参与者的响应后,才能判断此事务是否成功,最后才将结果返回给客户。

kafka的处理逻辑则为:如果 TC 服务在发送响应给 Producer 后,还没来及向分区发送请求就挂掉了。因为每次事务的信息都会持久化,所以 TC 服务挂掉重新启动后,会先从 事务 topic 加载事务信息,如果发现只有事务提交信息,却没有后来的事务完成信息,说明存在事务结果信息没有提交到分区。

这里的事务消息就是事务日志。

参考

Kafka 事务实现原理

Exactly Once语义与事务机制原理

Flink 事务

Flink将两阶段提交协议中的通用逻辑抽象为了一个类——TwoPhaseCommitSinkFunction。

我们在实现端到端exactly-once的应用程序时,只需实现这个类的4个方法即可:

  • beginTransaction:开始事务时,会在目标文件系统上的临时目录中创建一个临时文件,之后将处理数据写入该文件。
  • preCommit:在预提交时,我们会刷新文件,关闭它并不再写入数据。我们还将为下一个Checkpoint的写操作启动一个新事务。
  • commit:在提交事务时,我们自动将预提交的文件移动到实际的目标目录。
  • abort:中止时,将临时文件删除。

第一阶段

Checkpoint的开始表示两阶段提交协议的"pre-commit"阶段,当触发Checkpoint时,Flink JobManager会向数据流注入一个barrier(它将数据流中的记录划分为进入当前Checkpoint的部分和进入下一个Checkpoint的部分)。Barrier会随着数据流在operator之间传递,对于每一个operator,都会触发它的状态后端来保存其状态数据。

预提交阶段在Checkpoint成功完成之后结束。在第一个阶段结束时,数据会被写入到外部存储。

第二阶段

当所有的实例做快照完成,并且都执行完 preCommit 时,会把快照完成的消息发送给 JobManager,JobManager(TC协调器)收到后会认为本次 Checkpoint 完成了,会向所有的实例发送 Checkpoint 完成的通知(Notify Checkpoint Completed),当 Sink 算子收到这个通知之后,就会执行 commit 方法正式提交。

这里的状态后端/外部存储对应的是事务日志。用于持久化日志信息。

Flink Checkpoint机制也是基于二阶段提交与事务日志来实现的。 可参考 <<Flink 内核原理与实现>>一书的第13章,见详细描述

参考

Flink——Flink CheckPoint之两阶段提交协议

剖析 Flink 端到端的一致性

0 人点赞