三种语义
背景
流处理即事件处理,简单的说是连续处理无限的数据/事件序列。下面用有向图来描述。
流处理(streaming process),有时也被称为事件处理(event processing),可以被简洁地描述为对于一个无限的数据或事件序列的连续处理。一个流,或事件,处理应用可以或多或少地由一个有向图,通常是一个有向无环图(DAG),来表达。在这样一个图中,每条边表示一个数据或事件流,而每个顶点表示使用应用定义好的逻辑来处理来自相邻边的数据或事件的算子。其中有两种特殊的顶点,通常被称作sources与sinks。Sources消费外部数据/事件并将其注入到应用当中,而sinks通常收集由应用产生的结果。图1描述了一个流处理应用的例子。
at-most-once 最多一次
类比UDP协议,不关心消息是否成功,只发送一次,“尽力而为”。
at-least-once 至少一次
数据/事件被保证会被应用中的所有算子至少处理一遍。消息第一次投递在算子2处理出现失败,会对数据/事件会被重放或重传; 二次重试处理超时,再次进行数据重放,结果是第二次和第三次重放最终的结果都是成功的。
exactly-once 精确一次
无论发生任何故障,都会确保数据/事件只被算子处理一次。实现exactly-once有两种典型的机制:
分布式快照/状态检查点(checkpointing)
机制里,流处理的每个应用算子都会周期性的checkpoint。如果发生故障,流处理中的应用算子会回滚到到最近一次全局一致处。在回滚过程中,所有的处理都会停止。流程会从最近一致处开始。
基于at-least-once去重
这种机制会为每个算子维护一份事务日志,来记录哪些数据/事件处理过了。
事务
事务是将多个操作当做一个操作,保证这个操作的原子性。事务成功则所有子操作全部成功,失败则所有子操作全部失败。 事务四特性(ACID):
- Atomicity(原子性):一个事物是一个不可分割的整体,要么全成功,要么全失败;
- Consistency(一致性):事务执行前后,数据状态要保持一致。商品A卖出1件,那么对应的库存也应该减1件;
- Isolation(隔离性):多个事务并发执行,互不干扰;
- Durablity(持久性):事务一旦提交,不能回滚。
本地事务
本地事务即局部事务。单机内,处理一组数据逻辑,这组操作要么全部成功,要么全部失败。比如:在同一数据库多次执行多条sql语句。本地事务不支持跨机器、跨数据库场景。
分布式事务
分布事务的解决方案有多种,2PC、TCC、本地消息表(异步确保)、Seata 2PC(改进)、MQ事务消息。本次重点说说MQ事务消息。
事务消息
流处理模式中,只有消息B被成功生产消息A才会被标识为已消费。
Kafka
Kafka的基本原理之前有文章介绍过,这里不再赘述原理
幂等实现
幂等即对接口多次调用的结果均一致。消息在处理失败时会进行重试,产生重复的消息。为实现幂等,Kafka引入了sequenceNumber(序列号)和producerID(PID)两个概念,其中PID对用户来说完全透明。每个生产者初始化时都会被分配一个PID。消息发送到每个分区都有一个唯一对应的从0开始自增的序列号,每发送一条消息就会将<PID,topic partition>对应值 1。Broker的内部维护着对应的序列号,收到的消息只有序列号比Broker的序列号大1时,这条消息才会被接收。如果写入消息的序列号<Broker中的序列号,那么消息是重复消息,会被丢弃;如果消息系列号>Broker序列号 1,那么说明消息可能乱序,会抛出错误。
因为sequenceNumber是针对<PID,topic partition>维度的,所以只能保证单个生产者会话中的单分区的幂等。
事务实现
事务实现建立在幂等之上。且事务支持跨分区,使用场景分为两种:
- 生产者发送多条消息封装在一个事务中,多条消息要么全部发送成功、要么全部发送失败;
- read-process-write模式,将消息写入和消息消费封装在一个事务中,即将消息的生产、消费、提交消费位移当做原子操作。在流处理场景下,上游产生消息写入kafka,经过处理后被其他服务成功消费,并更新消费进度。
事务特性和保证方式
Kafka通过事务可以保证跨生产者会话的消息幂等发送,以及跨生产者会话的事务恢复。
生产者角度
- transactionID唯一对应一个PID,transactionID由用户填写,PID由系统生成
- 为了保证有相同transactionID的新生产者能替换掉旧生产者,在创建消费者时,系统分配PID的同时也会分配producerEpoch。同一transactionID新增一个生产者,则对应的PID的producerEpoch 1 同一个transactionID最多只能对应一个有效的生产者,即producerEpoch值最大。当某个生产者实例宕机,新的生产者实例可以保证任何未完成的旧事务要么成功commit,要么被终止abort。这样新的实例可以从正常的工作状态开始。
消费者角度
- Kafka并不能保证已提交的事务中所有消息都能够被消费(如消费者可以访问任意offset的消息,可能存在消息遗漏)
- 消费端参数IsolationLevel,支持两种事务隔离级别。ReadUncommitted:消费端可以收到未提交事务的消息;ReadCommitted:只能收到成功commit的消息。kafka会缓存这些消息直到CommitTxn或者AbortTxn。
终止TransactionID
Transaction Coordinator会周期性遍历PID和transactionID映射关系,如果transactionID没有正在进行中的事务,并且上一个事务的结束时间与现在时间差过大就会将这个删除。
代码示例
代码语言:javascript复制package kafka
import (
"fmt"
"github.com/Shopify/sarama"
)
var (
version = ""
)
// NewTxnProducer 新增事务Producer
func NewTxnProducer(brokers []string) (*sarama.AsyncProducer, error) {
version, err := sarama.ParseKafkaVersion(version)
if err != nil {
fmt.Printf("Error parsing Kafka version: %v", err)
return nil, err
}
config := sarama.NewConfig()
config.Version = version
// 幂等
config.Producer.Idempotent = true
config.Producer.Return.Errors = false
// 等待所有副本提交成功
config.Producer.RequiredAcks = sarama.WaitForAll
// 选择要发送消息的分区,默认为散列
config.Producer.Partitioner = sarama.NewRoundRobinPartitioner
// 每次重试之间等待集群稳定的时间
config.Producer.Transaction.Retry.Backoff = 10
// 事务中用于识别生产者的实例
config.Producer.Transaction.ID = "txn_producer"
// 一个连接在发送阻塞之前,允许有多少个未完成的请求(默认为5)。
config.Net.MaxOpenRequests = 1
producer, err := sarama.NewAsyncProducer(brokers, config)
if err != nil {
fmt.Printf("NewAsyncProducer err: %v", err)
return nil, err
}
return &producer, nil
}
// SendMessage 发送消息
func SendMessage(producer sarama.AsyncProducer, msg string) error {
// 开始事务
err := producer.BeginTxn()
if err != nil {
fmt.Printf("BeginTxn err: %v", err)
return err
}
// 发送消息
producerMessage := &sarama.ProducerMessage{
Topic: "topic",
Key: nil,
Value: sarama.StringEncoder(msg),
}
producer.Input() <- producerMessage
// 提交事务
err = producer.CommitTxn()
if err != nil {
fmt.Printf("CommitTxn err: %v,msg: %s", err, msg)
if producer.TxnStatus()&sarama.ProducerTxnFlagFatalError != 0 {
fmt.Printf("need to recreate producer")
return err
}
if producer.TxnStatus()&sarama.ProducerTxnFlagAbortableError != 0 {
err = producer.AbortTxn()
if err != nil { // retry
fmt.Printf("Producer: unable to abort transaction: % v", err)
return err
}
}
// 再次尝试提交
err = producer.CommitTxn()
if err != nil {
fmt.Printf("CommitTxn err: %v,msg: %s", err, msg)
return err
}
}
return nil
}
Pulsar
Pulsar基础之前的文章有介绍过,这里不再赘述原理
Pulsar事务支持端到端的流处理,即保证数据写入后不丢失、数据不会被重复处理。使用事务需要Pulsar 2.8.0 或更高版本,目前事务API仅支持JAVA客户端。
基本概念
- Transaction coordinator 事务管理器,作用域在整个事务处理周期。如处理超时事务将其置为失效。
- Transaction Log,事务日志,用来存放事务处理相关的元数据。如果事务管理器中途宕机,可以通过事务日志中的数据恢复。
- Transaction buffer,事务缓存,存储未提交的事务消息。一个事务在未提交之前用户是不可见的,事务提交后会将消息发送到对应的消息分区。如果消息过期或者手动abort,消息会从缓存中删除。
- Transaction ID ,事务ID,长度是128位,高16位是对应的事务管理器序列号,后面位均为标识事务的自增位。有了事务ID能够轻松的定义事务事务问题。
- Pending acknowledge state,挂起确认状态。在消息被确定提交之前,其他的事务无法更改这条消息的状态。
事务实现
- 开启事务,申请事务ID:客户端通过coordinator获取事务ID,服务器会将事务ID进行记录。
- 发送事务消息:提前把将要发送到分区的消息记录在Transaction Log中,确保消息不被丢失、利于管理。再将消息按照普通流程写入对应Broker(Produce)并持久化到Data Log。
- 新增订阅:新增一个Consumer来订阅Producer,订阅信息通过coordinator记录在Transaction Log,并将订阅结果告知Producer。客户端把新增订阅告知Broker(ACK),并持久化在Pending ack log中。
- 结束事务:客户端通过coordinator发起结束事务的请求,并在Transaction Log中进行记录。coordinator分别告知Broker(ACK)、Broker(Produce)要提交事务,两个Broker会将当前的数据保存在Log中。
coordinator事务操作的proxy,任何事务相关的请求都要经过coordinator。事务的每步操作都会有Log记录并且有对应的状态。这样可以保证执行事务过程中的任何步骤、任何模块宕机消息都不会丢失,详细的日志也为重启提供了完整的数据。
RocketMQ
RocketMQ基础之前的文章有介绍过,这里不再赘述原理
事务实现
- 客户端Producer发送事务消息
- 收到服务器返回的确认消息
- 执行本地事务逻辑
- 客户端Producer向服务器申请提交事务请求
- 服务器请求客户端Producer回查本地事务状态,用来和服务器的事务状态进行对比
- 客户端Producer获取本地事务状态
- 服务端将本地事务状态和msg中的状态进行对比,对比成功继续commit,对比不成功rollback;
- 对比成功则commit,消息对消费者可见
- 对比失败则rollback,消息对消费者不可见 执行完步骤3的事务就属于半事务,半事务的消息不能被消费者消费。 在步骤5中,有事客户端因为宕机等其他原因无法响应服务器发起的事务状态校验的请求,这时服务器有定期的机制来轮询这些事务,并作出相应的处理。
代码示例
代码语言:javascript复制package rocketmq
import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"
rocketmq "github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/producer"
)
// DemoListener 测试demo使用
type DemoListener struct {
localTrans *sync.Map
transactionIndex int32
}
// NewDemoListener 创建事务检查对象
func NewDemoListener() *DemoListener {
return &DemoListener{
localTrans: new(sync.Map),
}
}
// CheckLocalTransaction 检查本地事务
func (dl *DemoListener) CheckLocalTransaction(msg *primitive.MessageExt) primitive.LocalTransactionState {
fmt.Printf("%v msg transactionID : %vn", time.Now(), msg.TransactionId)
v, existed := dl.localTrans.Load(msg.TransactionId)
if !existed {
fmt.Printf("unknow msg: %v, return Commit", msg)
return primitive.CommitMessageState
}
// 校验本地事务状态
state := v.(primitive.LocalTransactionState)
switch state {
case 1:
fmt.Printf("checkLocalTransaction COMMIT_MESSAGE: %vn", msg)
return primitive.CommitMessageState
case 2:
fmt.Printf("checkLocalTransaction ROLLBACK_MESSAGE: %vn", msg)
return primitive.RollbackMessageState
case 3:
fmt.Printf("checkLocalTransaction unknow: %vn", msg)
return primitive.UnknowState
default:
fmt.Printf("checkLocalTransaction default COMMIT_MESSAGE: %vn", msg)
return primitive.CommitMessageState
}
}
// ExecuteLocalTransaction 执行本地事务逻辑
// msg: 事务消息
// return: 本地事务状态
func (dl *DemoListener) ExecuteLocalTransaction(msg *primitive.Message) primitive.LocalTransactionState {
nextIndex := atomic.AddInt32(&dl.transactionIndex, 1)
fmt.Printf("nextIndex: %v for transactionID: %vn", nextIndex, msg.TransactionId)
status := nextIndex % 3
dl.localTrans.Store(msg.TransactionId, primitive.LocalTransactionState(status 1))
fmt.Printf("dl")
return primitive.UnknowState
}
// NewRocketmqTxnProducer 新增事务Producer
func NewRocketmqTxnProducer() (rocketmq.TransactionProducer, error) {
producer, err := rocketmq.NewTransactionProducer(
NewDemoListener(),
producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
producer.WithRetry(1),
)
if err != nil {
fmt.Printf("NewTransactionProducer err: %v", err)
return nil, err
}
return producer, nil
}
// SendRocketmqMessage 发送消息
func SendRocketmqMessage(producer rocketmq.TransactionProducer, topic, msg string) error {
result, err := producer.SendMessageInTransaction(
context.Background(), primitive.NewMessage(topic, []byte(msg)))
if err != nil {
fmt.Printf("SendMessageInTransaction err: %v,result: %s", err, result.String())
return err
}
err = producer.Shutdown()
if err != nil {
fmt.Printf("Shutdown err: %v", err)
return err
}
return nil
}