导语
异步任务,是每一位开发者都遇到过的技术名词,在任何一个稍微复杂的后台系统中,异步任务总是无法避免的,而任务队列由于其松耦合、易扩展的特性,成为了实现异步任务的可靠保证。
背景
当用户的一次请求事件发生,可能是某种数据的重复数查询,抑或是某批人群的覆盖率统计,展现到用户的是几行数字,但在透视到后端逻辑中,简单的这可能是一次mysql的联表查询或者elasticsearch的聚合,但更多情况下,是附带了一系列复杂的数据交互或者耗时的逻辑计算。当后端这种发生多次数据交互任务的情况一旦存在,为了实现每一次任务的可靠执行以及前端响应速度,任务队列的存在意义就凸显了。
场景与功能
任务队列有着广泛的适应场景:
- 大批量的计算任务。如大量数据插入,通过拆分并分批插入任务队列,从而实现串行链式任务处理或者实现分组并行任务处理,提高系统鲁棒性,提高系统并发度;
- 数据预处理。定期的从后端存储将数据同步到到缓存系统,从而在查询请求发生时,直接去缓存系统中查询,提高查询请求的响应速度;
- 错误重试功能。为了提高系统的可用性,当函数处理出现错误时,我们希望可以给予其重试的机会,增强系统的可用性。
适用于任务队列的场景还有很多,同样,不同语言也有着自己著名的任务队列系统,众所周知的如python下的celery,PHP中laraval框架的Queues,都是使用度十分广泛的任务队列系统。
我们项目的技术栈为golang,因此,在我们go为基础的微服务框架中,需要存在一个类型于celery或者laraval中的任务队列系统,在经过了一系列筛选后,我们采用了machinery作为我们的任务队列系统。machinery,一个第三方开源的基于分布式消息分发的异步任务队列,有着以下这些特性:
- 任务重试机制
- 延迟任务支持
- 任务回调机制
- 任务结果记录
- 支持Workflow模式:Chain,Group,Chord
- 多Brokers支持:Redis, AMQP, AWS SQS
- 多Backends支持:Redis, Memcache, AMQP, MongoDB
当前machinery在v1 stable版本,可以通过go get github.com/RichardKnop/machinery/v1获取。
架构设计
任务队列,简而言之就是一个放大的生产者消费者模型,用户请求会生成任务,任务生产者不断的向队列中插入任务,同时,队列的处理器程序充当消费者不断的消费任务。基于这种框架设计思想,我们来看下machinery的简单设计结构图例:
其中:
- Server:业务模块,生成具体任务,可根据业务逻辑中,按交互进行拆分;
- Broker:存储具体序列化后的任务,machinery中目前支持到Redis, AMQP,和SQS;
- Worker:工作进程,负责消费者功能,处理具体的任务;
- Backend:后端存储,用于存储任务执行状态的数据;
在本篇文章中,我们将对上述几个模块进行详细讲解。
Broker
machinery的broker支持多种存储介质:Redis,AMQP和SQS,本篇文章中,我们将以redis来详细介绍,其他类型的存储介质,在实现细节上由于介质的API支持不一可能略有不同,但machinery具体暴露接口类似,有兴趣的读者可以详细再阅读相关源码。
machinery的Broker实现了以下这几种接口,我们将重点介绍起着关键作用的接口:
代码语言:javascript复制GetConfig() *config.Config
SetRegisteredTaskNames(names []string)
IsTaskRegistered(name string) bool
StartConsuming(consumerTag string, concurrency int, p TaskProcessor) (bool, error)
StopConsuming()
Publish(task *tasks.Signature) error
GetPendingTasks(queue string) ([]*tasks.Signature, error)
Broker启动和停止
当我们使用machinery时,在启动服务之后,StartConsuming()函数将以阻塞轮询的方式去Broker中获取任务并消费处理。而当服务停止之后,StopConsuming()函数将会等待一系列go程结束,以实现gracefully stop。
详细来看StartConsumin()函数,具体源码如下(不相关的代码细节已经省略)。
代码语言:javascript复制func (b *RedisBroker) StartConsuming(consumerTag string, concurrency int, taskProcessor TaskProcessor) (bool, error) {
...
// 获取任务go程
go func() {
...
for {
select {
case <-b.stopReceivingChan:
return
case <-timer.C:
if concurrencyAvailable() {
task, err := b.nextTask(b.cnf.DefaultQueue)
if err != nil {
timer.Reset(timerDuration)
continue
}
deliveries <- task
}
//并发控制逻辑
if concurrencyAvailable() {
timer.Reset(0)//设置timer为0,立即继续消费任务
} else {
timer.Reset(timerDuration)//重置timer,等待duration后再尝试消费
}
}
}
}()
// 获取延时任务go程
go func() {
...
for {
select {
case <-b.stopDelayedChan:
return
default:
task, err := b.nextDelayedTask(redisDelayedTasksKey)
if err != nil {
continue
}
signature := new(tasks.Signature)
decoder := json.NewDecoder(bytes.NewReader(task))
decoder.UseNumber()
if err := decoder.Decode(signature); err != nil {
log.ERROR.Print(NewErrCouldNotUnmarshaTaskSignature(task, err))
}
if err := b.Publish(signature); err != nil {
log.ERROR.Print(err)
}
}
}
}()
//执行任务消费
if err := b.consume(deliveries, pool, concurrency, taskProcessor); err != nil {
return b.retry, err
}
...
}
其中,
参数consumerTag在AMQP作为Broker时有意义;
参数concurrency用来实现任务并发调度的控制。
Broker任务获取
在StartComsuming()中,分别启动了两个go程来并行处理任务,因为针对延时任务和普通任务,machinery将任务存放于两个不同的rediskey中。
- 对于普通任务,使用nextTask()函数用来从broker中获取任务,在redis作为broker时,machinery使用了LIST类型来存储任务,而nextTask()中使用了BLPOP来阻塞式的读取任务*1。
- 对于延时任务,使用nextDelayTask()函数从redis中的ZSET中,根据score来优先获取最近的任务(score为ETA的对应的unixnano值)。
具体来看nextTask()函数和nextDelayTask()函数,如下列出:
代码语言:javascript复制// BLPOP出LIST中的数据
func (b *RedisBroker) nextTask(queue string) (result []byte, err error) {
conn := b.open()
defer conn.Close()
items, err := redis.ByteSlices(conn.Do("BLPOP", queue, 1))
if err != nil {
return []byte{}, err
}
if len(items) != 2 {
return []byte{}, redis.ErrNil
}
result = items[1]
return result, nil
}
代码语言:javascript复制// 结合WATCH,从ZSET中获取score最小的
func (b *RedisBroker) nextDelayedTask(key string) (result []byte, err error) {
...
for {
time.Sleep(time.Duration(pollPeriod) * time.Millisecond)
if _, err = conn.Do("WATCH", key); err != nil {
return
}
now := time.Now().UTC().UnixNano()
items, err = redis.ByteSlices(conn.Do(
"ZRANGEBYSCORE",
key,
0,
now,
"LIMIT",
0,
1,
))
if err != nil {
return
}
if len(items) != 1 {
err = redis.ErrNil
return
}
conn.Send("MULTI")
conn.Send("ZREM", key, items[0])
reply, err = conn.Do("EXEC")
if err != nil {
return
}
if reply != nil {
result = items[0]
break
}
}
return
}
*1 特别需要注意的是,由于云服务的盛行,当下的云服务基本上都涵盖了redis服务,且提供了主备方案和集群方案等,但是不论时云服务或者时公司内部的redis服务,对BLPOP的支持可能会受限,这时候我们需要更改nextTask()函数中的BLPOP为LPOP来适应:
代码语言:javascript复制/*
* modified at 20180717
* use LPOP instead of BLPOP, cause L5 redis does not support BLPOP
**/
// nextTask pops next available task from the default queue
func (b *Broker) nextTask(queue string) (result []byte, err error) {
conn := b.open()
defer conn.Close()
item, err := redis.Bytes(conn.Do("LPOP", queue))
if err != nil {
return []byte{}, err
}
result = item
return result, nil
}
Broker任务查看
在redis作为Broker时,machinery还提供了一个额外的接口实现(其他接口Broker存储介质未对该接口进行实现)GetPendingTasks()。顾名思义,GetPendingTasks()可以用来查看当前任务队列中处理pending状态,在等待被处理的任务的详细信息。
GetPendingTasks()函数,更多的可以理解为,是作者提供的“接口糖”,方便离线的对任务队列中的任务进行查看,当然,machinery中使用的几种第三方队列作为Broker,基本上都是支持这类数据的单独查看的。
代码语言:javascript复制func (b *RedisBroker) GetPendingTasks(queue string) ([]*tasks.Signature, error) {
...
dataBytes, err := conn.Do("LRANGE", queue, 0, 10)
if err != nil {
return nil, err
}
results, err := redis.ByteSlices(dataBytes, err)
if err != nil {
return nil, err
}
taskSignatures := make([]*tasks.Signature, len(results))
for i, result := range results {
signature := new(tasks.Signature)
decoder := json.NewDecoder(bytes.NewReader(result))
decoder.UseNumber()
if err := decoder.Decode(signature); err != nil {
return nil, err
}
taskSignatures[i] = signature
}
return taskSignatures, nil
}
Broker任务发布
Publish()接口是实现任务发布的函数,将在后续篇幅在对任务做介绍时再单独详细介绍。
Backend
Backend,同样是任务队列不可或缺的一部分,其作用主要是用来存储任务的执行结果的,machinery中支持Redis, Memcache, AMQP, MongoDB四种类型的存储介质来实现Backend。
machinery的Backend,根据其自身的功能特性,实现了以下这几种接口,与Broker类似,我们将重点介绍几个关键的接口(同样,以下接口是不同类型的Backend的实现的接口超集,并不是Redis作为介质时都有的):
代码语言:javascript复制// Workflow相关接口
InitGroup(groupUUID string, taskUUIDs []string) error
GroupCompleted(groupUUID string, groupTaskCount int) (bool, error)
GroupTaskStates(groupUUID string, groupTaskCount int) ([]*tasks.TaskState, error)
TriggerChord(groupUUID string) (bool, error)
// 任务状态设置接口
SetStatePending(signature *tasks.Signature) error
SetStateReceived(signature *tasks.Signature) error
SetStateStarted(signature *tasks.Signature) error
SetStateRetry(signature *tasks.Signature) error
SetStateSuccess(signature *tasks.Signature, results []*tasks.TaskResult) error
SetStateFailure(signature *tasks.Signature, err string) error
GetState(taskUUID string) (*tasks.TaskState, error)
// Purging stored stored tasks states and group meta data
PurgeState(taskUUID string) error
PurgeGroupMeta(groupUUID string) error
Workflow相关接口
我们可以看到,第一批接口有Group和Chord相关的字眼,这就是我们在一开始提到的machinery中Workflow机制。Workflow极大的使能了任务队列的功能,使得machinery更加得心应手。关于Workflow的知识,我们将在下面的篇幅中详细介绍,这儿仅仅简单的介绍这几个接口的功能。
InitGroup(),顾名思义,在创建一个Group任务;
GroupCompleted(),检查一个Group中所有的任务是否都执行完毕;
GroupTaskStates(),返回一个Group中,所有任务的状态
TriggerChord(),当Group中任务全部执行完毕后,触发Chrod任务
Backend任务状态
machinery中将任务的状态进行了很详细的划分,通过接口我们就可以看到,machinery支持了以下几种任务中间态:
- Pending,任务到达Broker
- Received,任务从Broker中读取成功
- Started,任务开始执行
- Retry,任务需要重试
- Success,任务执行成功
- Failure,任务执行失败
下面简单列出源码中设置状态接口的使用:
代码语言:javascript复制// SetStatePending updates task state to PENDING
func (b *RedisBackend) SetStatePending(signature *tasks.Signature) error {
taskState := tasks.NewPendingTaskState(signature)
return b.updateState(taskState)
}
// SetStateReceived updates task state to RECEIVED
func (b *RedisBackend) SetStateReceived(signature *tasks.Signature) error {
taskState := tasks.NewReceivedTaskState(signature)
return b.updateState(taskState)
}
...
Worker
Worker负责了任务队列的执行单元,是任务队列中处理任务的关键元素,也是因此,Worker的接口很少,很直接:
代码语言:javascript复制Launch()
LaunchAsync(errorsChan chan<- error)
Quit()
Process(signature *tasks.Signature)
Worker启动和停止
Worker启动是通过Launch()启动了一个进程,去订阅默认的任务队列,并且处理收到的任务。LaunchAsync()是Launch()的非阻塞版本,而通过Launch()中的代码,我们发现,其实就是调用了LaunchAsync()。
在LaunchAsync()中,通过开启一个go程,实现了非阻塞式的调用了Broker的StartConsuming()函数。
代码语言:javascript复制func (worker *Worker) Launch() error {
errorsChan := make(chan error)
worker.LaunchAsync(errorsChan)
return <-errorsChan
}
// Launch()的非阻塞调用
func (worker *Worker) LaunchAsync(errorsChan chan<- error) {
...
// broker消费者go程,同时负责与broker的断开重连等
go func() {
for {
retry, err := broker.StartConsuming(worker.ConsumerTag, worker.Concurrency, worker)
if retry {
if worker.errorHandler != nil {
worker.errorHandler(err)
} else {
log.WARNING.Printf("Broker failed with error: %s", err)
}
} else {
errorsChan <- err // stop the goroutine
return
}
}
}()
...
}
}
Worker停止是通过Quit()函数来实现,其调用了Broker的StopConsuming()函数,以实现gracefully stop。
代码语言:javascript复制// Quit tears down the running worker process
func (worker *Worker) Quit() {
worker.server.GetBroker().StopConsuming()
}
Worker处理
Worker中的Process()函数,将会处理在Broker中的待处理任务,并且负责了任务回调的触发功能。Process()函数的任务流程主要是:
任务检测->任务获取->任务预处理->Tracing处理->任务执行
代码语言:javascript复制func (worker *Worker) Process(signature *tasks.Signature) error {
...
//根据任务名,获取注册任务
taskFunc, err := worker.server.GetRegisteredTask(signature.Name)
if err != nil {
return nil
}
// 更新任务状态 Received
if err = worker.server.GetBackend().SetStateReceived(signature); err != nil {
return fmt.Errorf("Set state received error: %s", err)
}
// 任务预处理,预防任务出错,导致后面影响worker的运行
task, err := tasks.New(taskFunc, signature.Args)
if err != nil {
worker.taskFailed(signature, err)
return err
}
// tracing处理
taskSpan := tracing.StartSpanFromHeaders(signature.Headers, signature.Name)
tracing.AnnotateSpanWithSignatureInfo(taskSpan, signature)
task.Context = opentracing.ContextWithSpan(task.Context, taskSpan)
// 更新任务状态 Started
if err = worker.server.GetBackend().SetStateStarted(signature); err != nil {
return fmt.Errorf("Set state started error: %s", err)
}
// 任务执行
results, err := task.Call()
if err != nil {
// If a tasks.ErrRetryTaskLater was returned from the task,
// retry the task after specified duration
retriableErr, ok := interface{}(err).(tasks.ErrRetryTaskLater)
if ok {
return worker.retryTaskIn(signature, retriableErr.RetryIn())
}
// Otherwise, execute default retry logic based on signature.RetryCount
// and signature.RetryTimeout values
if signature.RetryCount > 0 {
return worker.taskRetry(signature)
}
return worker.taskFailed(signature, err)
}
return worker.taskSucceeded(signature, results)
}
machinery中,主要是通过反射实现了任务执行,具体的执行方式,在获取了函数之后与普通的反射无异,详细的介绍在后续篇幅介绍。关于任务执行之后的处理,有可能三种处理:
- 任务执行成功
taskSucceeded(),是在一个任务被成功执行后调用,主要负责更新任务状态、触发回调函数或者chord任务中的回调函数(前提是该task是chrod的分组任务中的最后一个任务),关于chord任务,在后面关于Workflow模式中将会详细介绍。
代码语言:javascript复制func (worker *Worker) taskSucceeded(signature *tasks.Signature, taskResults []*tasks.TaskResult) error {
// 更新任务状态
if err := worker.server.GetBackend().SetStateSuccess(signature, taskResults); err != nil {
return fmt.Errorf("Set state success error: %s", err)
}
...
// 回调任务
for _, successTask := range signature.OnSuccess {
// 当immutable为false时,传递参数
if signature.Immutable == false {
// Pass results of the task to success callbacks
for _, taskResult := range taskResults {
successTask.Args = append(successTask.Args, tasks.Arg{
Type: taskResult.Type,
Value: taskResult.Value,
})
}
}
worker.server.SendTask(successTask)
}
...
// 触发chord任务的回掉函数
shouldTrigger, err := worker.server.GetBackend().TriggerChord(signature.GroupUUID)
if err != nil {
return fmt.Errorf("Trigger chord error: %s", err)
}
...
// 针对group任务的返回值做参数传递
for _, taskState := range taskStates {
if !taskState.IsSuccess() {
return nil
}
if signature.ChordCallback.Immutable == false {
for _, taskResult := range taskState.Results {
signature.ChordCallback.Args = append(signature.ChordCallback.Args, tasks.Arg{
Type: taskResult.Type,
Value: taskResult.Value,
})
}
}
}
// 发送chord任务
_, err = worker.server.SendTask(signature.ChordCallback)
if err != nil {
return err
}
return nil
}
- 任务执行失败
taskFailed(),是在一个任务执行失败(完全失败,即重试也失败)后调用。需要负责更新任务状态,并触发OnError回调函数。
代码语言:javascript复制func (worker *Worker) taskFailed(signature *tasks.Signature, taskErr error) error {
// 任务状态更新 Failure
if err := worker.server.GetBackend().SetStateFailure(signature, taskErr.Error()); err != nil {
return fmt.Errorf("Set state failure error: %s", err)
}
...
// Trigger error callbacks
for _, errorTask := range signature.OnError {
// Pass error as a first argument to error callbacks
args := append([]tasks.Arg{{
Type: "string",
Value: taskErr.Error(),
}}, errorTask.Args...)
errorTask.Args = args
worker.server.SendTask(errorTask)
}
return nil
}
- 任务重试
关于任务重试,machinery中提供了两种方式来实现。
第一种,machinery中通过设置任务的RetryCount和RetryTimeout参数来实现。
第二种,通过返回一个ErrRetryTaskLater类型的值来制定。
由于任务重试,需要依赖于对machinery中任务数据结构的了解,我们将在之后详细介绍。
本篇文章主要介绍了任务队列的背景与说明,同时介绍了machinery的设计结构,并详细的介绍了machinery中的每个具体模块的功能与源码实现。更多关于machinery的源码实现和功能介绍,将在下一篇继续介绍。