导读
Apache Pulsar 在 2.8.0 正式支持了事务相关的功能,Pulsar 这里提供的事务区别于 RocketMQ 中 2PC 那种事务的实现方式,没有本地事务回查的机制,更类似于 Kafka 的事务实现机制。Apache Pulsar 中的事务主要用来保证类似 Pulsar Functions 这种流计算场景中 Exactly Once 语义的实现,这也符合 Apache Pulsar 本身 Event Streaming 的定位,即保证端到端(End-to-End)的事务实现的语义。
作者简介
冉小龙-腾讯云中间件团队研发工程师
Apache Pulsar committer
RoP 作者及 Maintainer
Apache BookKeeper contributor
Apache Pulsar Go client 作者
Apache Pulsar Go Functions作者
Stremnative/pulsarctl 作者
基本概念
为了更好的理解和实现事务相关的逻辑,Apache Pulsar 在这里抽象了如下几个核心概念:
1. Transaction Coordinator
Transaction Coordinator(简称:TC) 本质上是一个 ManagerLedger 对象,是事务的管理器,所有有关事务处理的请求都会发送到 TC 中来协调处理 commit 和 abort 事务等请求。一个 TC 会对应一个 Partitioned Topic,这个 Topic 用来确认当前 TC 对应的 Owner Broker,这里可以通过 Pulsar 自身的 Lookup 机制定位到这个 Topic 对应的 Owner Broker。这里使用 Partitioned Topic 也可以充分利用 Partitioned 自身的扩展能力来扩展 TC 的处理性能。Pulsar 中定义了一个特殊的 Topic 来标记这个 Topic:transaction_coordinator_assign。
TC 的生命周期涵盖整个事务的处理过程,其一可以确保事务的正确运行,防止事务进入错误的状态,其二,可以确保在 client 端设置的 Transaction Timeout 过期后,能够处理接下来的操作。有关 Transaction Timeout 的详细解释以及作用我们后面还会继续提到.
2. Transection Buffer
Pulsar 的事务消息其实早在 2.6.0 的版本之时就已经有计划在实现了,当时所有真实的事务消息是存储在 TB 中的,当 Producer 生产一条事务消息时,会进入 TB 中来,此时这条消息对 Consumer 是不可见的,只有当事务处理完成之后,这条对应的消息才会投递到 Consumer 监听的真实的 Topic 中。但是在 2.8.0 的实现中,真实的事务消息并没有存储到这里,而是投递到了真实的 Topic 中,那么这里就有一个问题,放到真实的 Topic 中,Consumer 一直在监听这个 Topic,它是如何保证消息不会立即被 Consumer 消费到呢?所以 Consumer 这里必然要有相应的处理,即它会根据事务的状态进行过滤,只有当事务完成的时候 Consumer 才会消费指定的消息。这样做的好处就在于避免了消息写放大的问题。之前一条事务消息会首先投递到 TB 中,TB 消费完成之后再投递到真实的 Topic,所以一条消息事实上是被写了两次。现在的实现依赖 Consumer 的状态过滤机制可以巧妙的规避写放大的问题。
3. Transaction Log
一个 TransactionLog 对应的是一个 managerLedger,所有事务的元数据信息会存储到 Transaction Log 中,这样做的好处在于,当 TC 出错宕机之后,可以从 Transaction Log 中恢复出来事务的元数据处理信息。一个 Transaction Log 主要的操作就是 append 和 reply 这两个动作,append 用来将事务的操作添加到 Transaction Log 中,reply 主要从指定的 Position 位置恢复对应的元数据信息。这里我们需要对 Transaction Log 中存储的数据做一下澄清,Transaction Log 中存储的主要是一些元数据的信息,即事务的状态信息,真实的事务消息我们存储在其它的地方。事务的状态有如下几种:
- OPEN
- COMMITTING
- ABORTING
- COMMITTED
- ABORTED
- ERROR
Transaction Log 主要用来存储事务对应的是上述的哪种状态,即事务执行到了什么地方,接下来需要做什么操作以及在事务回滚时会根据这些相应的状态信息来确认事务最终执行的状态。
4. Transaction ID
Transaction ID(简称:TxnID) 是用来唯一标识事务的一个字段,由两部分组成:64 位的 mostSigBits 和 64 位的 leastSigBits,总共128 位。TxnID 是由 TC 生成的,这里在 TC 端生成的好处就在于可以确保 TxnID 的全局唯一性,这个是事务执行的基础。mostSigBits 是 TC 的分区 id,leastSigBits 是最新的日志 id,leastSigBits 是事务的序号,该序号每次 1,可以从日志中进行恢复,不会重复,两部分都是long类型。
5. Pending Acknowledge State
对于一个事务消息而言,它可能由很多的普通消息构成,对于每一条消息其都有自己的 Ack 状态,但是对于事务消息而言,只有等构成这个事务的所有的普通消息完成之后,它才可以被正确的 commit,所以需要有一个集合来存储一个事务消息中待 Ack 的消息的集合。它主要提供如下保证,其一在 Transaction Timeout 结束之前,这个事务还没有完成,那么就需要将 Pending acknoeledge state 中未完成的消息进行回滚,即执行 abort 操作,回滚到事务执行前的状态。其二在 Transaction Timeout 之内,即事务还在执行过程中,那么其它的事务是不可以来操作这个事务集合中的消息的。既然涉及到事务状态的操作,那么必然会涉及到宕机之后状态恢复的问题,Pending acknowledge state 的消息存储是依赖 Cursor Log 来实现的,这样新的 Broker 节点就可以从 Cursor Log 中恢复出来宕机之前消息的确认状态,确保事务消息中确认的状态信息不会丢失。
在这里需要说明的是,Pulsar 之前的 Ack 是没有返回值的,也就是 Client 是不知道某一条消息是否被正确的 Ack 掉了,Ack 的状态是由 Broker 来维护管理的。为了能让 Client 获取到 Ack 的状态,在事务实现时,这里为 Ack 添加了返回值来确保 Client 侧能够知道 Ack 的具体状态信息。
事物流程
有了上面这些概念的解释,我们对 Pulsar 事务中各个主要的组件已经有了一个了解,接下来看看一个完整的事务流程
1. 寻找TC
由于 TXnID 是用来唯一标识一个事务的,且它是由 TC 生成的,所以我们在开始事务执行的时候,首先就需要依赖我们上述提到的 Pulsar 的 TC Topic:transaction_coordinator_assign 来找到对应的 Owner Broker,然后在该 Broker 上创建对应的 TC 来为这次事务执行分配一个唯一的事务 ID。
2. 打开一个事物
在寻找到这次事务归属于哪一个 TC 协调并生成对应的 TxnID 之后,TC 会将这个动作同步到 Transaction Log 中,将这一个事务 OPEN 的状态信息记录到 Transaction Log 中,一旦事务的状态信息被持久化之后,就可以确保这个记录不会被丢失。在完成持久化的动作之后,TC 会将这个事务的 ID 返回给 Client 端使用。
其实这里我们可以看到,对于一个事务而言,它执行的每一个状态变更信息都会首先请求到 TC,然后由 TC 通知 Transaction Log 进行持久化的动作,来确保事务的执行状态不会丢失。在上面我们也提到,TC 和 Transaction Log 本质上都是一个 ManagerLedger 对象,即相当于 BookKeeper 的 client 角色,将这些日志信息持久化到了 Bookie 中。
3. 发送事物消息
正如第二步所说,在事务操作的过程中,TC 作为一个协调器,需要感知一个事务执行过程中所有的状态操作。所以在事务消息发送之前也是一样的,需要先将这个操作告诉 TC,然后再由 TC 将这个动作通知到 Transaction Log 持久化起来。等一切准备就绪之后,Producer 开始将消息发送到真实的 Partitioned Topic 中,这里根据是否开启 batch 分为两种情况来讨论:如果没有开启 batch 的话,这个和正常的消息发送流程是一致的。假设开启了 batch 相关的功能,那么消息中会附加这个消息 TxnID, 在 broker 接收到这条消息之后,会检查这个 batch 中是否包含当前这个事务的消息,如果包含这个事务的消息,那么 Broker 会将这个 batch 写到分区的事务缓存中来做进一步的处理。
4. 确认事物消息
同理,在确认事务消息之前,client 会首先发送确认消息的请求到 TC 中,然后由 TC 来协调在事务的执行过程中究竟是需要做 commit 还是 abort 的操作。在准备工作就绪之后,与正常创建一个订阅的逻辑一样,client 会将订阅的请求发送到 Broker 中。正如我们在 《Pending acknowledge state》 章节中提到的,对于 Ack 的请求,client 需要感知这个动作是否成功,所以在原来 Ack 的请求中添加了一个 TxnID. 拿到 TxnID 之后 Broker 就可以检查这个 Ack 请求是否属于这次事务。之后就到了 Pending acknowledge state 组件的工作范畴了,它会决定这个 Ack 请求可以被哪些 consumer 来确认。
5. 完成事物
当上述所有的操作流程完成之后,TC 就会知道当前的这个事务状态是处于可以 commit 还是 abort 的状态,此时 TC 会将 COMMITTING 或者 ABORTING 的状态写到 Transaction Log 中持久化起来。只有将状态记录到 Transaction Log 中之后,这个事务才会开始执行真正的 commit 或者 abort 动作。当所有的事物消息完成之后(commit 或者 abort),TC 会将这次事务的状态标记为 COMMITTED 或者 ABORTED 然后记录到 Transaction Log 中,至此,一个事务的完整的处理流程就结束了。
对比 Kafka 事务
Pulsar 的事务处理流程与 Kafka 的事务处理思路大致上保持一致,大家都有一个 TC 以及对应的一个用于持久化 TC 所有操作的 Topic 来记录所有事务状态变更的请求。同样的在事务开始阶段也都有一个专门的 Partitioned Topic 来去 Lookup TC 对应的 Owner Broker 的位置在哪里。不同的是,第一:Kafka 中对于未确认的消息是维护在 Broker 端的,但是 Pulsar 的是维护在 Client 端的,通过 Transaction Timeout 来决定这个事务是否执行成功,所以有了 Transaction Timeout 的存在之后,就可以确保 client 和 broker 侧事务处理的一致性。第二:由于 Kafka 本身没有单条消息的 Ack,所以 Kafka 的事务处理只能是顺序执行的,当一个事务请求被阻塞之后,会阻塞后续所有的事务请求,但是 Pulsar 是可以对消息进行单条 Ack 的,所以在这里每一个事务的 Ack 动作是独立的,不会出现事务阻塞的情况。
写在最后
目前 TDMD 已经基于 Apache Pulsar 应用在多种业务场景下,腾讯云TDMQ、计平、数平等多个团队也在一起共建Pulsar,对Pulsar感兴趣的小伙伴,欢迎关注腾讯云中间件我们下一期的分享。
免费体验馆
消息队列CKafka
分布式、高吞吐量、高可扩展性的消息服务,具备数据压缩、同时支持离线和实时数据处理等优点。
扫码即可免费体验
免费体验路径:云产品体验->基础->消息队列CKafka
消息队列TDMQ
一款基于 Apache 顶级开源项目 Pulsar 自研的金融级分布式消息中间件。其计算与存储分离的架构设计,使得它具备极好的云原生和 Serverless 特性,用户按量使用,无需关心底层资源。
扫码点击“立即使用”,即可免费体验
微服务平台TSF
稳定、高性能的技术中台。一个围绕着应用和微服务的 PaaS 平台,提供应用全生命周期管理、数据化运营、立体化监控和服务治理等功能。TSF 拥抱 Spring Cloud 、Service Mesh 微服务框架,帮助企业客户解决传统集中式架构转型的困难,打造大规模高可用的分布式系统架构,实现业务、产品的快速落地。
扫码点击“免费体验”,即可免费体验
微服务引擎TSE
高效、稳定的注册中心托管,助力您快速实现微服务架构转型。
扫码点击“立即申请”,即可免费体验
弹性微服务TEM
面向微服务应用的 Serverless PaaS 平台,实现资源 Serverless 化与微服务架构的完美结合,提供一整套开箱即用的微服务解决方案。弹性微服务帮助用户创建和管理云资源,并提供秒级弹性伸缩,用户可按需使用、按量付费,极大程度上帮用户节约运维和资源成本。让用户充分聚焦企业核心业务本身,助力业务成功。
扫码点击“立即申请”,即可免费体验
往期
推荐
《玩转Kafka Raft模式-入门宝典》
《Tencent Kona JDK11无暂停内存管理ZGC生产实践》
《腾讯云中间件月报(2021年第六期)》
扫描下方二维码关注本公众号,
了解更多微服务、消息队列的相关信息!
解锁超多鹅厂周边!
戳原文,了解更多腾讯微服务平台相关信息