Golang任务队列machinery使用与源码剖析(一)

2019-09-12 15:33:22 浏览数 (2)

导语

异步任务,是每一位开发者都遇到过的技术名词,在任何一个稍微复杂的后台系统中,异步任务总是无法避免的,而任务队列由于其松耦合、易扩展的特性,成为了实现异步任务的可靠保证。

背景

当用户的一次请求事件发生,可能是某种数据的重复数查询,抑或是某批人群的覆盖率统计,展现到用户的是几行数字,但在透视到后端逻辑中,简单的这可能是一次mysql的联表查询或者elasticsearch的聚合,但更多情况下,是附带了一系列复杂的数据交互或者耗时的逻辑计算。当后端这种发生多次数据交互任务的情况一旦存在,为了实现每一次任务的可靠执行以及前端响应速度,任务队列的存在意义就凸显了。

场景与功能

任务队列有着广泛的适应场景:

  • 大批量的计算任务。如大量数据插入,通过拆分并分批插入任务队列,从而实现串行链式任务处理或者实现分组并行任务处理,提高系统鲁棒性,提高系统并发度;
  • 数据预处理。定期的从后端存储将数据同步到到缓存系统,从而在查询请求发生时,直接去缓存系统中查询,提高查询请求的响应速度;
  • 错误重试功能。为了提高系统的可用性,当函数处理出现错误时,我们希望可以给予其重试的机会,增强系统的可用性。

适用于任务队列的场景还有很多,同样,不同语言也有着自己著名的任务队列系统,众所周知的如python下的celery,PHP中laraval框架的Queues,都是使用度十分广泛的任务队列系统。

我们项目的技术栈为golang,因此,在我们go为基础的微服务框架中,需要存在一个类型于celery或者laraval中的任务队列系统,在经过了一系列筛选后,我们采用了machinery作为我们的任务队列系统。machinery,一个第三方开源的基于分布式消息分发的异步任务队列,有着以下这些特性:

  • 任务重试机制
  • 延迟任务支持
  • 任务回调机制
  • 任务结果记录
  • 支持Workflow模式:Chain,Group,Chord
  • 多Brokers支持:Redis, AMQP, AWS SQS
  • 多Backends支持:Redis, Memcache, AMQP, MongoDB

当前machinery在v1 stable版本,可以通过go get github.com/RichardKnop/machinery/v1获取。

架构设计

任务队列,简而言之就是一个放大的生产者消费者模型,用户请求会生成任务,任务生产者不断的向队列中插入任务,同时,队列的处理器程序充当消费者不断的消费任务。基于这种框架设计思想,我们来看下machinery的简单设计结构图例:

其中:

  • Server:业务模块,生成具体任务,可根据业务逻辑中,按交互进行拆分;
  • Broker:存储具体序列化后的任务,machinery中目前支持到Redis, AMQP,和SQS;
  • Worker:工作进程,负责消费者功能,处理具体的任务;
  • Backend:后端存储,用于存储任务执行状态的数据;

在本篇文章中,我们将对上述几个模块进行详细讲解。

Broker

machinery的broker支持多种存储介质:Redis,AMQP和SQS,本篇文章中,我们将以redis来详细介绍,其他类型的存储介质,在实现细节上由于介质的API支持不一可能略有不同,但machinery具体暴露接口类似,有兴趣的读者可以详细再阅读相关源码。

machinery的Broker实现了以下这几种接口,我们将重点介绍起着关键作用的接口:

代码语言:javascript复制
GetConfig() *config.Config
SetRegisteredTaskNames(names []string)
IsTaskRegistered(name string) bool
StartConsuming(consumerTag string, concurrency int, p TaskProcessor) (bool, error)
StopConsuming()
Publish(task *tasks.Signature) error
GetPendingTasks(queue string) ([]*tasks.Signature, error)

Broker启动和停止

当我们使用machinery时,在启动服务之后,StartConsuming()函数将以阻塞轮询的方式去Broker中获取任务并消费处理。而当服务停止之后,StopConsuming()函数将会等待一系列go程结束,以实现gracefully stop。

详细来看StartConsumin()函数,具体源码如下(不相关的代码细节已经省略)。

代码语言:javascript复制
func (b *RedisBroker) StartConsuming(consumerTag string, concurrency int, taskProcessor TaskProcessor) (bool, error) {
	...
  // 获取任务go程
	go func() {
                ...
		for {
			select {
			case <-b.stopReceivingChan:
				return
			case <-timer.C:
				if concurrencyAvailable() {
					task, err := b.nextTask(b.cnf.DefaultQueue)
					if err != nil {
						timer.Reset(timerDuration)
						continue
					}
					deliveries <- task
				}
         //并发控制逻辑
				if concurrencyAvailable() {
					timer.Reset(0)//设置timer为0,立即继续消费任务
				} else {
					timer.Reset(timerDuration)//重置timer,等待duration后再尝试消费
				}
			}
		}
	}()

  // 获取延时任务go程
	go func() {
                ...
		for {
			select {
			case <-b.stopDelayedChan:
				return
			default:
				task, err := b.nextDelayedTask(redisDelayedTasksKey)
				if err != nil {
					continue
				}

				signature := new(tasks.Signature)
				decoder := json.NewDecoder(bytes.NewReader(task))
				decoder.UseNumber()
				if err := decoder.Decode(signature); err != nil {
					log.ERROR.Print(NewErrCouldNotUnmarshaTaskSignature(task, err))
				}

				if err := b.Publish(signature); err != nil {
					log.ERROR.Print(err)
				}
			}
		}
	}()
  //执行任务消费
	if err := b.consume(deliveries, pool, concurrency, taskProcessor); err != nil {
		return b.retry, err
	}
        ...
}

其中,

参数consumerTag在AMQP作为Broker时有意义;

参数concurrency用来实现任务并发调度的控制。

Broker任务获取

在StartComsuming()中,分别启动了两个go程来并行处理任务,因为针对延时任务和普通任务,machinery将任务存放于两个不同的rediskey中。

  • 对于普通任务,使用nextTask()函数用来从broker中获取任务,在redis作为broker时,machinery使用了LIST类型来存储任务,而nextTask()中使用了BLPOP来阻塞式的读取任务*1
  • 对于延时任务,使用nextDelayTask()函数从redis中的ZSET中,根据score来优先获取最近的任务(score为ETA的对应的unixnano值)。

具体来看nextTask()函数和nextDelayTask()函数,如下列出:

代码语言:javascript复制
// BLPOP出LIST中的数据
func (b *RedisBroker) nextTask(queue string) (result []byte, err error) {
	conn := b.open()
	defer conn.Close()

	items, err := redis.ByteSlices(conn.Do("BLPOP", queue, 1))
	if err != nil {
		return []byte{}, err
	}

	if len(items) != 2 {
		return []byte{}, redis.ErrNil
	}

	result = items[1]

	return result, nil
}
代码语言:javascript复制
// 结合WATCH,从ZSET中获取score最小的
func (b *RedisBroker) nextDelayedTask(key string) (result []byte, err error) {
   ...
   for {
      time.Sleep(time.Duration(pollPeriod) * time.Millisecond)
      if _, err = conn.Do("WATCH", key); err != nil {
         return
      }

      now := time.Now().UTC().UnixNano()

      items, err = redis.ByteSlices(conn.Do(
         "ZRANGEBYSCORE",
         key,
         0,
         now,
         "LIMIT",
         0,
         1,
      ))
      if err != nil {
         return
      }
      if len(items) != 1 {
         err = redis.ErrNil
         return
      }

      conn.Send("MULTI")
      conn.Send("ZREM", key, items[0])
      reply, err = conn.Do("EXEC")
      if err != nil {
         return
      }

      if reply != nil {
         result = items[0]
         break
      }
   }

   return
}

*1 特别需要注意的是,由于云服务的盛行,当下的云服务基本上都涵盖了redis服务,且提供了主备方案和集群方案等,但是不论时云服务或者时公司内部的redis服务,对BLPOP的支持可能会受限,这时候我们需要更改nextTask()函数中的BLPOP为LPOP来适应:

代码语言:javascript复制
/*
* modified at 20180717
* use LPOP instead of BLPOP, cause L5 redis does not support BLPOP
**/
// nextTask pops next available task from the default queue
func (b *Broker) nextTask(queue string) (result []byte, err error) {
   conn := b.open()
   defer conn.Close()

   item, err := redis.Bytes(conn.Do("LPOP", queue))
   if err != nil {
      return []byte{}, err
   }
   result = item

   return result, nil
}

Broker任务查看

在redis作为Broker时,machinery还提供了一个额外的接口实现(其他接口Broker存储介质未对该接口进行实现)GetPendingTasks()。顾名思义,GetPendingTasks()可以用来查看当前任务队列中处理pending状态,在等待被处理的任务的详细信息。

GetPendingTasks()函数,更多的可以理解为,是作者提供的“接口糖”,方便离线的对任务队列中的任务进行查看,当然,machinery中使用的几种第三方队列作为Broker,基本上都是支持这类数据的单独查看的。

代码语言:javascript复制
func (b *RedisBroker) GetPendingTasks(queue string) ([]*tasks.Signature, error) {
	...
	dataBytes, err := conn.Do("LRANGE", queue, 0, 10)
	if err != nil {
		return nil, err
	}
	results, err := redis.ByteSlices(dataBytes, err)
	if err != nil {
		return nil, err
	}

	taskSignatures := make([]*tasks.Signature, len(results))
	for i, result := range results {
		signature := new(tasks.Signature)
		decoder := json.NewDecoder(bytes.NewReader(result))
		decoder.UseNumber()
		if err := decoder.Decode(signature); err != nil {
			return nil, err
		}
		taskSignatures[i] = signature
	}
	return taskSignatures, nil
}

Broker任务发布

Publish()接口是实现任务发布的函数,将在后续篇幅在对任务做介绍时再单独详细介绍。

Backend

Backend,同样是任务队列不可或缺的一部分,其作用主要是用来存储任务的执行结果的,machinery中支持Redis, Memcache, AMQP, MongoDB四种类型的存储介质来实现Backend。

machinery的Backend,根据其自身的功能特性,实现了以下这几种接口,与Broker类似,我们将重点介绍几个关键的接口(同样,以下接口是不同类型的Backend的实现的接口超集,并不是Redis作为介质时都有的):

代码语言:javascript复制
// Workflow相关接口

InitGroup(groupUUID string, taskUUIDs []string) error
GroupCompleted(groupUUID string, groupTaskCount int) (bool, error)
GroupTaskStates(groupUUID string, groupTaskCount int) ([]*tasks.TaskState, error)
TriggerChord(groupUUID string) (bool, error)

// 任务状态设置接口
SetStatePending(signature *tasks.Signature) error
SetStateReceived(signature *tasks.Signature) error
SetStateStarted(signature *tasks.Signature) error
SetStateRetry(signature *tasks.Signature) error
SetStateSuccess(signature *tasks.Signature, results []*tasks.TaskResult) error
SetStateFailure(signature *tasks.Signature, err string) error
GetState(taskUUID string) (*tasks.TaskState, error)

// Purging stored stored tasks states and group meta data
PurgeState(taskUUID string) error
PurgeGroupMeta(groupUUID string) error

Workflow相关接口

我们可以看到,第一批接口有Group和Chord相关的字眼,这就是我们在一开始提到的machinery中Workflow机制。Workflow极大的使能了任务队列的功能,使得machinery更加得心应手。关于Workflow的知识,我们将在下面的篇幅中详细介绍,这儿仅仅简单的介绍这几个接口的功能。

InitGroup(),顾名思义,在创建一个Group任务;

GroupCompleted(),检查一个Group中所有的任务是否都执行完毕;

GroupTaskStates(),返回一个Group中,所有任务的状态

TriggerChord(),当Group中任务全部执行完毕后,触发Chrod任务

Backend任务状态

machinery中将任务的状态进行了很详细的划分,通过接口我们就可以看到,machinery支持了以下几种任务中间态:

  • Pending,任务到达Broker
  • Received,任务从Broker中读取成功
  • Started,任务开始执行
  • Retry,任务需要重试
  • Success,任务执行成功
  • Failure,任务执行失败

下面简单列出源码中设置状态接口的使用:

代码语言:javascript复制
// SetStatePending updates task state to PENDING
func (b *RedisBackend) SetStatePending(signature *tasks.Signature) error {
   taskState := tasks.NewPendingTaskState(signature)
   return b.updateState(taskState)
}

// SetStateReceived updates task state to RECEIVED
func (b *RedisBackend) SetStateReceived(signature *tasks.Signature) error {
   taskState := tasks.NewReceivedTaskState(signature)
   return b.updateState(taskState)
}
...

Worker

Worker负责了任务队列的执行单元,是任务队列中处理任务的关键元素,也是因此,Worker的接口很少,很直接:

代码语言:javascript复制
Launch() 
LaunchAsync(errorsChan chan<- error)
Quit()
Process(signature *tasks.Signature)

Worker启动和停止

Worker启动是通过Launch()启动了一个进程,去订阅默认的任务队列,并且处理收到的任务。LaunchAsync()是Launch()的非阻塞版本,而通过Launch()中的代码,我们发现,其实就是调用了LaunchAsync()。

在LaunchAsync()中,通过开启一个go程,实现了非阻塞式的调用了Broker的StartConsuming()函数。

代码语言:javascript复制
func (worker *Worker) Launch() error {
   errorsChan := make(chan error)
   worker.LaunchAsync(errorsChan)
   return <-errorsChan
}

// Launch()的非阻塞调用
func (worker *Worker) LaunchAsync(errorsChan chan<- error) {
   ...
   // broker消费者go程,同时负责与broker的断开重连等
   go func() {
      for {
         retry, err := broker.StartConsuming(worker.ConsumerTag, worker.Concurrency, worker)

         if retry {
            if worker.errorHandler != nil {
               worker.errorHandler(err)
            } else {
               log.WARNING.Printf("Broker failed with error: %s", err)
            }
         } else {
            errorsChan <- err // stop the goroutine
            return
         }
      }
   }()
   ...
   }
}

Worker停止是通过Quit()函数来实现,其调用了Broker的StopConsuming()函数,以实现gracefully stop。

代码语言:javascript复制
// Quit tears down the running worker process
func (worker *Worker) Quit() {
   worker.server.GetBroker().StopConsuming()
}

Worker处理

Worker中的Process()函数,将会处理在Broker中的待处理任务,并且负责了任务回调的触发功能。Process()函数的任务流程主要是:

任务检测->任务获取->任务预处理->Tracing处理->任务执行

代码语言:javascript复制
func (worker *Worker) Process(signature *tasks.Signature) error {
   ...
   //根据任务名,获取注册任务
   taskFunc, err := worker.server.GetRegisteredTask(signature.Name)
   if err != nil {
      return nil
   }

   // 更新任务状态 Received
   if err = worker.server.GetBackend().SetStateReceived(signature); err != nil {
      return fmt.Errorf("Set state received error: %s", err)
   }

   // 任务预处理,预防任务出错,导致后面影响worker的运行
   task, err := tasks.New(taskFunc, signature.Args)
   if err != nil {
      worker.taskFailed(signature, err)
      return err
   }

   // tracing处理   
   taskSpan := tracing.StartSpanFromHeaders(signature.Headers, signature.Name)
   tracing.AnnotateSpanWithSignatureInfo(taskSpan, signature)
   task.Context = opentracing.ContextWithSpan(task.Context, taskSpan)

   // 更新任务状态 Started
   if err = worker.server.GetBackend().SetStateStarted(signature); err != nil {
      return fmt.Errorf("Set state started error: %s", err)
   }

   // 任务执行
   results, err := task.Call()
   if err != nil {
      // If a tasks.ErrRetryTaskLater was returned from the task,
      // retry the task after specified duration
      retriableErr, ok := interface{}(err).(tasks.ErrRetryTaskLater)
      if ok {
         return worker.retryTaskIn(signature, retriableErr.RetryIn())
      }

      // Otherwise, execute default retry logic based on signature.RetryCount
      // and signature.RetryTimeout values
      if signature.RetryCount > 0 {
         return worker.taskRetry(signature)
      }

      return worker.taskFailed(signature, err)
   }

   return worker.taskSucceeded(signature, results)
}

machinery中,主要是通过反射实现了任务执行,具体的执行方式,在获取了函数之后与普通的反射无异,详细的介绍在后续篇幅介绍。关于任务执行之后的处理,有可能三种处理:

  • 任务执行成功

taskSucceeded(),是在一个任务被成功执行后调用,主要负责更新任务状态、触发回调函数或者chord任务中的回调函数(前提是该task是chrod的分组任务中的最后一个任务),关于chord任务,在后面关于Workflow模式中将会详细介绍。

代码语言:javascript复制
func (worker *Worker) taskSucceeded(signature *tasks.Signature, taskResults []*tasks.TaskResult) error {
	// 更新任务状态
	if err := worker.server.GetBackend().SetStateSuccess(signature, taskResults); err != nil {
		return fmt.Errorf("Set state success error: %s", err)
	}
        ...
	// 回调任务
	for _, successTask := range signature.OnSuccess {
        // 当immutable为false时,传递参数
		if signature.Immutable == false {
			// Pass results of the task to success callbacks
			for _, taskResult := range taskResults {
				successTask.Args = append(successTask.Args, tasks.Arg{
					Type:  taskResult.Type,
					Value: taskResult.Value,
				})
			}
		}
		worker.server.SendTask(successTask)
	}
        ...
	// 触发chord任务的回掉函数
	shouldTrigger, err := worker.server.GetBackend().TriggerChord(signature.GroupUUID)
	if err != nil {
		return fmt.Errorf("Trigger chord error: %s", err)
	}
        ...
	// 针对group任务的返回值做参数传递
	for _, taskState := range taskStates {
		if !taskState.IsSuccess() {
			return nil
		}

		if signature.ChordCallback.Immutable == false {
			for _, taskResult := range taskState.Results {
				signature.ChordCallback.Args = append(signature.ChordCallback.Args, tasks.Arg{
					Type:  taskResult.Type,
					Value: taskResult.Value,
				})
			}
		}
	}
	// 发送chord任务
	_, err = worker.server.SendTask(signature.ChordCallback)
	if err != nil {
		return err
	}

	return nil
}
  • 任务执行失败

taskFailed(),是在一个任务执行失败(完全失败,即重试也失败)后调用。需要负责更新任务状态,并触发OnError回调函数。

代码语言:javascript复制
func (worker *Worker) taskFailed(signature *tasks.Signature, taskErr error) error {
   // 任务状态更新 Failure
   if err := worker.server.GetBackend().SetStateFailure(signature, taskErr.Error()); err != nil {
      return fmt.Errorf("Set state failure error: %s", err)
   }

   ...
   // Trigger error callbacks
   for _, errorTask := range signature.OnError {
      // Pass error as a first argument to error callbacks
      args := append([]tasks.Arg{{
         Type:  "string",
         Value: taskErr.Error(),
      }}, errorTask.Args...)
      errorTask.Args = args
      worker.server.SendTask(errorTask)
   }

   return nil
}
  • 任务重试

关于任务重试,machinery中提供了两种方式来实现。

第一种,machinery中通过设置任务的RetryCount和RetryTimeout参数来实现。

第二种,通过返回一个ErrRetryTaskLater类型的值来制定。

由于任务重试,需要依赖于对machinery中任务数据结构的了解,我们将在之后详细介绍。

本篇文章主要介绍了任务队列的背景与说明,同时介绍了machinery的设计结构,并详细的介绍了machinery中的每个具体模块的功能与源码实现。更多关于machinery的源码实现和功能介绍,将在下一篇继续介绍。

0 人点赞