[源码解析] 并行分布式任务队列 Celery 之 负载均衡

2021-05-17 12:20:28 浏览数 (1)

[源码解析] 并行分布式任务队列 Celery 之 负载均衡

目录

  • [源码解析] 并行分布式任务队列 Celery 之 负载均衡
    • 0x00 摘要
    • 0x01 负载均衡
      • 1.1 哪几个 queue
        • 1.1.1 _brpop_start 选择下次读取的queue
        • 1.1.2 round_robin_cycle 设置下次读取的 queue
      • 1.2 哪一个worker
      • 1.3 哪一个进程
        • 1.3.1 策略
        • 1.3.2 公平调度
        • 1.3.3 公平调度 in Celery
    • 0x02 Autoscaler
      • 2.1 调用时机
      • 2.2 具体实现
        • 2.2.1 bgThread
        • 2.2.2 定义
    • 0xEE 个人信息
    • 0xFF 参考

0x00 摘要

Celery是一个简单、灵活且可靠的,处理大量消息的分布式系统,专注于实时处理的异步任务队列,同时也支持任务调度。本文介绍 Celery 的负载均衡机制。

Autoscaler 的作用 实际就是在线调节进程池大小。这也和缓解负载相关,所以放在这里一起论述。

0x01 负载均衡

Celery 的负载均衡其实可以分为三个层次,而且是与 Kombu 高度耦合(本文 broker 以 Redis 为例)。

  • 在 worker 决定 与 哪几个 queue 交互,有一个负载均衡(对于 queues );
  • 在 worker 决定与 broker 交互,使用 brpop 获取消息时候有一个负载均衡(决定哪一个 worker 来处理任务);
  • 在 worker 获得 broker 消息之后,内部 具体 调用 task 时候,worker 内部进行多进程分配时候,有一个负载均衡(决定 worker 内部哪几个进程)。

注意,这个顺序是从 worker 读取任务处理任务的角度 出发,而不是从系统架构角度出发。

因为从系统架构角度说,应该是 which worker ----> which queue in the worker ----> which subprocess in the worker 这个角度。

我们下面按照 "worker 读取任务处理任务角度" 的顺序进行分析。

1.1 哪几个 queue

Kombu 事实上是使用 redis 的 BRPOP 功能来完成对具体 queue 中消息的读取。

  • Kombu 是循环调用,每次调用会制定读取哪些内部queues的消息;
  • queue 这个逻辑概念,其实就是对应了 redis 中的一个 物理key,从 queue 读取,就代表 BRPOP 需要指定 监听的 key。
  • Kombu 是在每一次监听时候,根据这些 queues 得到 其在 redis 之中对应的物理keys,即都指定监听哪些 redis keys;
  • brpop是个多key命令,当给定多个 key 参数时,按参数 key 的先后顺序依次检查各个列表,弹出第一个非空列表的头元素。这样就得到了这些 逻辑queue 对应的消息。

因为 task 可能会 用到多个 queue,所以具体从哪几个queue 读取?这时候就用到了策略

1.1.1 _brpop_start 选择下次读取的queue

Kombu 在每次监听时候,调用 _brpop_start 完成监听。其作用就是 选择下一次读取的queues

_brpop_start 如下:

代码语言:javascript复制
def _brpop_start(self, timeout=1):
    # 得到一些内部queues
    queues = self._queue_cycle.consume(len(self.active_queues))
    if not queues:
        return
    # 得到queue对应的keys  
    keys = [self._q_for_pri(queue, pri) for pri in self.priority_steps
            for queue in queues]   [timeout or 0]
    self._in_poll = self.client.connection
    self.client.connection.send_command('BRPOP', *keys) # 利用这些keys,从redis内部获取key的消息

此时变量如下:

代码语言:javascript复制
self.active_queues = {set: 1} {'celery'}

len(self.active_queues) = {int} 1
  
self._queue_cycle = {round_robin_cycle} <kombu.utils.scheduling.round_robin_cycle object at 0x0000015A7EE9DE88>
  
self = {Channel} <kombu.transport.redis.Channel object at 0x0000015A7EE31048>

所以_brpop_start 就是从 self._queue_cycle 获得几个需要读取的queue。

具体如下图:

代码语言:javascript复制
                                                               
                                                  Kombu       |          Redis
                                                              |
                                                              |
 --------------------------------------------                 |
|  Worker                                    |                |
|                                            |                |           queue 1 key
|    -----------                             |                |
|   | queue 1   |                            |   BRPOP(keys)  |
|   | queue 2   |                    keys    |                |
|   | ......    |  -------- ---------------------------------------->     queue 2 key
|   | queue n   |          ^                 |                |
|    -----------           | keys            |                |
|                          |                 |                |
|                          |                 |                |           queue 3 key
|             ------------- ------------     |                |
|            |         Keys list        |    |                |
|            |                          |    |                |
|             --------------------------     |                |
 --------------------------------------------                 |
                                                              |
                                                              |
                                                              |
                                                              |
                                                              |
                                                               
1.1.2 round_robin_cycle 设置下次读取的 queue

从上面代码中,我们可以知道 consume 就是返回 round_robin_cycle 中前几个 queue,即 return self.items[:n]。

而 self.items 的维护,是通过 rotate 完成的,就是把 最近用的 那个 queue 放到队列最后,这样给其他 queue 机会,就是 round robin 的概念了。

代码语言:javascript复制
class round_robin_cycle:
    """Iterator that cycles between items in round-robin."""

    def __init__(self, it=None):
        self.items = it if it is not None else []

    def update(self, it):
        """Update items from iterable."""
        self.items[:] = it

    def consume(self, n):
        """Consume n items."""
        return self.items[:n]

    def rotate(self, last_used):
        """Move most recently used item to end of list."""
        items = self.items
        try:
            items.append(items.pop(items.index(last_used)))
        except ValueError:
            pass
        return last_used

比如在如下代码中,当读取到消息之后,就会调用 self._queue_cycle.rotate(dest) 进行调整。

代码语言:javascript复制
    def _brpop_read(self, **options):
        try:
            try:
                dest__item = self.client.parse_response(self.client.connection,
                                                        'BRPOP',
                                                        **options)
            except self.connection_errors:
                # if there's a ConnectionError, disconnect so the next
                # iteration will reconnect automatically.
                self.client.connection.disconnect()
                raise
            if dest__item:
                dest, item = dest__item
                dest = bytes_to_str(dest).rsplit(self.sep, 1)[0]
                self._queue_cycle.rotate(dest) # 这里进行调整
                self.connection._deliver(loads(bytes_to_str(item)), dest)
                return True
            else:
                raise Empty()
        finally:
            self._in_poll = None

具体如下图:

代码语言:javascript复制
                                                               
                                                  Kombu       |          Redis
                                                              |
                                                              |
 --------------------------------------------                 |
|  Worker                                    |                |
|                                            |                |           queue 1 key
|    -----------                             |                |
|   | queue 1   |                            |   BRPOP(keys)  |
|   | queue 2   |                    keys    |                |
|   | ......    |  -------- ---------------------------------------->     queue 2 key
|   | queue n   |          ^                 |                |
|    -----------           | keys            |                |
|                          |                 |                |
|                                            |                |           queue 3 key
|                 round_robin_cycle          |                |
|                                            |                |
|                          |                 |                |
|                          |                 |                |
|             ------------- ------------     |                |
|            |         Keys list        |    |                |
|             --------------------------     |                |
 --------------------------------------------                 |
                                                              |
                                                               

1.2 哪一个worker

如果多个 worker 同时去使用 brpop 获取 broker 消息,那么具体哪一个能够读取到消息,其实这就是有一个 竞争机制因为redis 的单进程处理,所以只能有一个 worker 才能读到

这本身就是一个负载均衡。这个和 spring quartz 的负载均衡实现非常类似。

  • spring quartz 是 多个节点读取 同一个数据库记录决定谁能开始下一次处理,哪一个得到了数据库锁 就是哪个。
  • Kombu 是通过 多个 worker 读取 redis "同一个或者一组key" 的 实际结果 来决定 "哪一个 worker 能开始下一次处理"。

具体如下图:

代码语言:javascript复制
                                                             
                                            Kombu           |    Redis
                                                            |
                                                            |
 --------------------------------------                     |
|  Worker 1                            |                    |
|                                      |                    |
|    -----------                       |                    |
|   | queue 1   |                      |   BRPOP(keys)      |
|   | queue 2   |              keys    |                    |
|   | ......    |  -------- -----------------------------   |
|   | queue n   |          ^           |                 |  |
|    -----------           |  keys     |                 |  |
|                          |           |                 |  |
|                                      |                 |  |
|                  round_robin_cycle   |                 |  |                --> queue 1 key
|                          ^           |                 |  |               |
|                          |           |                 |  |               |
|                          |           |                 |  | Single Thread |
|              ------------ ---------  |                  ---------------------> queue 2 key
|             |     keys list        | |                 |  |               |
|              ----------------------  |                 |  |               |
 --------------------------------------                  |  |               |
                                                         |  |                --> queue 3 key
                                                         |  |
 --------------------------------------                  |  |
| Worker 2                             |   BRPOP(keys)   |  |
|                                      |  ---------------   |
|                                      |                 |  |
 --------------------------------------                  |  |
                                                         |  |
 --------------------------------------    BRPOP(keys)   |  |
| Worker 3                             |                 |  |
|                                      |   --------------   |
|                                      |                     
|                                      |
 -------------------------------------- 

1.3 哪一个进程

进程池中,使用了策略来决定具体使用哪一个进程来处理任务。

1.3.1 策略

先讲解 strategy。在 AsynPool 启动有如下,配置了策略:

代码语言:javascript复制
class AsynPool(_pool.Pool):
    """AsyncIO Pool (no threads)."""

    def __init__(self, processes=None, synack=False,
                 sched_strategy=None, proc_alive_timeout=None,
                 *args, **kwargs):
        self.sched_strategy = SCHED_STRATEGIES.get(sched_strategy,
                                                   sched_strategy)

于是我们看看 strategy 定义如下,基本由名字可以知道其策略意义:

代码语言:javascript复制
SCHED_STRATEGY_FCFS = 1 # 先来先服务
SCHED_STRATEGY_FAIR = 4 # 公平

SCHED_STRATEGIES = {
    None: SCHED_STRATEGY_FAIR,
    'default': SCHED_STRATEGY_FAIR,
    'fast': SCHED_STRATEGY_FCFS,
    'fcfs': SCHED_STRATEGY_FCFS,
    'fair': SCHED_STRATEGY_FAIR,
}
1.3.2 公平调度

我们讲讲公平调度的概念。

不同系统对于公平调度的理解大同小异,我们举几个例子看看。

  • Linux 中,调度器必须在各个进程之间尽可能公平地共享CPU时间,而同时又要考虑不同的任务优先级。一般原理是:按所需分配的计算能力,向系统中每个进程提供最大的公正性,或者从另外一个角度上说, 试图确保没有进程被亏待。
  • Hadoop 中,公平调度是一种赋予作业(job)资源的方法,它的目的是让所有的作业随着时间的推移,都能平均的获取等同的共享资源。当单独一个作业在运行时,它将使用整个集群。当有其它作业被提交上来时,系统会将任务(task)空闲时间片(slot)赋给这些新的作业,以使得每一个作业都大概获取到等量的CPU时间。
  • Yarn 之中,Fair Share指的都是Yarn根据每个队列的权重、最大,最小可运行资源计算的得到的可以分配给这个队列的最大可用资源。
1.3.3 公平调度 in Celery

在 asynpool之中,有设置,看看"是否为 fair 调度":

代码语言:javascript复制
is_fair_strategy = self.sched_strategy == SCHED_STRATEGY_FAIR         

基于 is_fair_strategy 这个变量,Celery 的公平调度有几处体现。

在开始 poll 时候,如果是 fair,则需要 存在 idle worker 才调度,这样就给了 idler worker 一个调度机会

代码语言:javascript复制
def on_poll_start():
    # Determine which io descriptors are not busy
    inactive = diff(active_writes)

    # Determine hub_add vs hub_remove strategy conditional
    if is_fair_strategy:
        # outbound buffer present and idle workers exist
        add_cond = outbound and len(busy_workers) < len(all_inqueues)
    else:  # default is add when data exists in outbound buffer
        add_cond = outbound

    if add_cond:  # calling hub_add vs hub_remove
        iterate_file_descriptors_safely(
            inactive, all_inqueues, hub_add,
            None, WRITE | ERR, consolidate=True)
    else:
        iterate_file_descriptors_safely(
            inactive, all_inqueues, hub_remove)

在具体发布 写操作 时候,也会看看是否 worker 已经正在忙于执行某一个 task,如果正在执行,就不调度,这样就给了其他 不忙worker 一个调度的机会

代码语言:javascript复制
        def schedule_writes(ready_fds, total_write_count=None):
            if not total_write_count:
                total_write_count = [0]
            # Schedule write operation to ready file descriptor.
            # The file descriptor is writable, but that does not
            # mean the process is currently reading from the socket.
            # The socket is buffered so writable simply means that
            # the buffer can accept at least 1 byte of data.

            # This means we have to cycle between the ready fds.
            # the first version used shuffle, but this version
            # using `total_writes % ready_fds` is about 30% faster
            # with many processes, and also leans more towards fairness
            # in write stats when used with many processes
            # [XXX On macOS, this may vary depending
            # on event loop implementation (i.e, select/poll vs epoll), so
            # have to test further]
            num_ready = len(ready_fds)

            for _ in range(num_ready):
                ready_fd = ready_fds[total_write_count[0] % num_ready]
                total_write_count[0]  = 1
                if ready_fd in active_writes:
                    # already writing to this fd
                    continue 
                if is_fair_strategy and ready_fd in busy_workers: # 是否调度
                    # worker is already busy with another task
                    continue
                if ready_fd not in all_inqueues:
                    hub_remove(ready_fd)
                    continue

具体逻辑如下:

代码语言:javascript复制
                                                           
                                            Kombu         |    Redis
                                                          |
                                         BRPOP(keys)      |
 ------------------------------------                     |
|  Worker 1                          |  ---------------   |
|                                    |                 |  |
 ------------------------------------                  |  |                     queue 1 key
                                                       |  |                  ->
                                                       |  |                 |
 ------------------------------------      BRPOP(keys) |  |  Single thread  |
| Worker 2                           |  --------------------------------------> queue 2 key
|                                    |                 |  |  (which worker) |
 ------------------------------------                  |  |                 |
                                                       |  |                 |
 ------------------------------------                  |  |                  -> queue 3 key
| Worker 3                           |                 |  |
|                                    |                 |  |
|      -----------                   |                 |  |
|     | queue 1   |                  |     BRPOP(keys) |  |
|     | queue 2   |          keys    |                 |  |
|     | ......    |  -------- -------------------------   |
|     | queue n   |          ^       |                    |
|      -----------           | keys  |                    |
|                            |       |                    |
|                                    |                    |
|                 round_robin_cycle (which queues)        |
|                            ^       |                    |
|                            |       |                    |
|                            |       |                    |
|                        ---- ----   |                    |
|                       |keys list|  |                    |
|             |          ---------   |                    |
 ------------------------------------                     |
              |                                           |
              |  fair_strategy(which subprocess)          |
              |                                           |
       ------- ---------- ----------------                |
      |                  |                |               |
      v                  v                v               |
 ----- --------    ------ -------    ----- --------       |
| subprocess 1 |  | subprocess 2 |  | subprocess 3 |       
 --------------    --------------    -------------- 

0x02 Autoscaler

Autoscaler 的作用 实际就是在线调节进程池大小。这也和缓解负载相关,所以放在这里一起论述。

2.1 调用时机

在 WorkerComponent 中可以看到,为 AutoScaler 注册了两个调用途径:

  • 注册在 consumer 消息响应方法中,这样消费时候如果有需要,就会调整;
  • 利用 Hub 的 call_repeatedly 方法注册了周期任务,即周期看看是否需要调整。

这样就会最大程度的加大调用频率。

代码语言:javascript复制
class WorkerComponent(bootsteps.StartStopStep):
    """Bootstep that starts the autoscaler thread/timer in the worker."""

    def create(self, w):
        scaler = w.autoscaler = self.instantiate(
            w.autoscaler_cls,
            w.pool, w.max_concurrency, w.min_concurrency,
            worker=w, mutex=DummyLock() if w.use_eventloop else None,
        )
        return scaler if not w.use_eventloop else None

    def register_with_event_loop(self, w, hub):
        w.consumer.on_task_message.add(w.autoscaler.maybe_scale) # 消费时候如果有需要,就会调整
        
        hub.call_repeatedly( # 周期看看是否需要调整
            w.autoscaler.keepalive, w.autoscaler.maybe_scale,
        )

2.2 具体实现

2.2.1 bgThread

Autoscaler 是Background thread,这样 AutoScaler就可以在后台运行:

代码语言:javascript复制
class bgThread(threading.Thread):
    """Background service thread."""

    def run(self):
        body = self.body
        shutdown_set = self._is_shutdown.is_set
        try:
            while not shutdown_set():
            	body()
        finally:
            self._set_stopped()
2.2.2 定义

Autoscaler 的定义如下,可以看到其逻辑就是定期判断是否需要调整:

  • 如果当前并发已经到了最大,则下调;
  • 如果到了最小并发,则上调;
  • 则具体上调下调的,都是通过具体线程池函数做到的,这就是要根据具体操作系统来进行分析,此处略过。
代码语言:javascript复制
class Autoscaler(bgThread):
    """Background thread to autoscale pool workers."""

    def __init__(self, pool, max_concurrency,
                 min_concurrency=0, worker=None,
                 keepalive=AUTOSCALE_KEEPALIVE, mutex=None):
        super().__init__()
        self.pool = pool
        self.mutex = mutex or threading.Lock()
        self.max_concurrency = max_concurrency
        self.min_concurrency = min_concurrency
        self.keepalive = keepalive
        self._last_scale_up = None
        self.worker = worker

    def body(self):
        with self.mutex:
            self.maybe_scale()
        sleep(1.0)

    def _maybe_scale(self, req=None):
        procs = self.processes
        cur = min(self.qty, self.max_concurrency)
        if cur > procs:
            self.scale_up(cur - procs)
            return True
        cur = max(self.qty, self.min_concurrency)
        if cur < procs:
            self.scale_down(procs - cur)
            return True

    def maybe_scale(self, req=None):
        if self._maybe_scale(req):
            self.pool.maintain_pool()

    def update(self, max=None, min=None):
        with self.mutex:
            if max is not None:
                if max < self.processes:
                    self._shrink(self.processes - max)
                self._update_consumer_prefetch_count(max)
                self.max_concurrency = max
            if min is not None:
                if min > self.processes:
                    self._grow(min - self.processes)
                self.min_concurrency = min
            return self.max_concurrency, self.min_concurrency

    def scale_up(self, n):
        self._last_scale_up = monotonic()
        return self._grow(n)

    def scale_down(self, n):
        if self._last_scale_up and (
                monotonic() - self._last_scale_up > self.keepalive):
            return self._shrink(n)

    def _grow(self, n):
        self.pool.grow(n)

    def _shrink(self, n):
		self.pool.shrink(n)

    def _update_consumer_prefetch_count(self, new_max):
        diff = new_max - self.max_concurrency
        if diff:
            self.worker.consumer._update_prefetch_count(
                diff
            )

    @property
    def qty(self):
        return len(state.reserved_requests)

    @property
    def processes(self):
        return self.pool.num_processes

0 人点赞