[源码解析] 并行分布式任务队列 Celery 之 负载均衡
目录
- [源码解析] 并行分布式任务队列 Celery 之 负载均衡
- 0x00 摘要
- 0x01 负载均衡
- 1.1 哪几个 queue
- 1.1.1 _brpop_start 选择下次读取的queue
- 1.1.2 round_robin_cycle 设置下次读取的 queue
- 1.2 哪一个worker
- 1.3 哪一个进程
- 1.3.1 策略
- 1.3.2 公平调度
- 1.3.3 公平调度 in Celery
- 1.1 哪几个 queue
- 0x02 Autoscaler
- 2.1 调用时机
- 2.2 具体实现
- 2.2.1 bgThread
- 2.2.2 定义
- 0xEE 个人信息
- 0xFF 参考
0x00 摘要
Celery是一个简单、灵活且可靠的,处理大量消息的分布式系统,专注于实时处理的异步任务队列,同时也支持任务调度。本文介绍 Celery 的负载均衡机制。
Autoscaler 的作用 实际就是在线调节进程池大小。这也和缓解负载相关,所以放在这里一起论述。
0x01 负载均衡
Celery 的负载均衡其实可以分为三个层次,而且是与 Kombu 高度耦合(本文 broker 以 Redis 为例)。
- 在 worker 决定 与 哪几个 queue 交互,有一个负载均衡(对于 queues );
- 在 worker 决定与 broker 交互,使用 brpop 获取消息时候有一个负载均衡(决定哪一个 worker 来处理任务);
- 在 worker 获得 broker 消息之后,内部 具体 调用 task 时候,worker 内部进行多进程分配时候,有一个负载均衡(决定 worker 内部哪几个进程)。
注意,这个顺序是从 worker 读取任务处理任务的角度 出发,而不是从系统架构角度出发。
因为从系统架构角度说,应该是 which worker ----> which queue in the worker ----> which subprocess in the worker
这个角度。
我们下面按照 "worker 读取任务处理任务角度" 的顺序进行分析。
1.1 哪几个 queue
Kombu 事实上是使用 redis 的 BRPOP 功能来完成对具体 queue 中消息的读取。
- Kombu 是循环调用,每次调用会制定读取哪些内部queues的消息;
- queue 这个逻辑概念,其实就是对应了 redis 中的一个 物理key,从 queue 读取,就代表 BRPOP 需要指定 监听的 key。
- Kombu 是在每一次监听时候,根据这些 queues 得到 其在 redis 之中对应的物理keys,即都指定监听哪些 redis keys;
- brpop是个多key命令,当给定多个 key 参数时,按参数 key 的先后顺序依次检查各个列表,弹出第一个非空列表的头元素。这样就得到了这些 逻辑queue 对应的消息。
因为 task 可能会 用到多个 queue,所以具体从哪几个queue 读取?这时候就用到了策略。
1.1.1 _brpop_start 选择下次读取的queue
Kombu 在每次监听时候,调用 _brpop_start 完成监听。其作用就是 选择下一次读取的queues。
_brpop_start 如下:
代码语言:javascript复制def _brpop_start(self, timeout=1):
# 得到一些内部queues
queues = self._queue_cycle.consume(len(self.active_queues))
if not queues:
return
# 得到queue对应的keys
keys = [self._q_for_pri(queue, pri) for pri in self.priority_steps
for queue in queues] [timeout or 0]
self._in_poll = self.client.connection
self.client.connection.send_command('BRPOP', *keys) # 利用这些keys,从redis内部获取key的消息
此时变量如下:
代码语言:javascript复制self.active_queues = {set: 1} {'celery'}
len(self.active_queues) = {int} 1
self._queue_cycle = {round_robin_cycle} <kombu.utils.scheduling.round_robin_cycle object at 0x0000015A7EE9DE88>
self = {Channel} <kombu.transport.redis.Channel object at 0x0000015A7EE31048>
所以_brpop_start
就是从 self._queue_cycle 获得几个需要读取的queue。
具体如下图:
代码语言:javascript复制
Kombu | Redis
|
|
-------------------------------------------- |
| Worker | |
| | | queue 1 key
| ----------- | |
| | queue 1 | | BRPOP(keys) |
| | queue 2 | keys | |
| | ...... | -------- ----------------------------------------> queue 2 key
| | queue n | ^ | |
| ----------- | keys | |
| | | |
| | | | queue 3 key
| ------------- ------------ | |
| | Keys list | | |
| | | | |
| -------------------------- | |
-------------------------------------------- |
|
|
|
|
|
1.1.2 round_robin_cycle 设置下次读取的 queue
从上面代码中,我们可以知道 consume 就是返回 round_robin_cycle 中前几个 queue,即 return self.items[:n]。
而 self.items 的维护,是通过 rotate 完成的,就是把 最近用的 那个 queue 放到队列最后,这样给其他 queue 机会,就是 round robin 的概念了。
代码语言:javascript复制class round_robin_cycle:
"""Iterator that cycles between items in round-robin."""
def __init__(self, it=None):
self.items = it if it is not None else []
def update(self, it):
"""Update items from iterable."""
self.items[:] = it
def consume(self, n):
"""Consume n items."""
return self.items[:n]
def rotate(self, last_used):
"""Move most recently used item to end of list."""
items = self.items
try:
items.append(items.pop(items.index(last_used)))
except ValueError:
pass
return last_used
比如在如下代码中,当读取到消息之后,就会调用 self._queue_cycle.rotate(dest)
进行调整。
def _brpop_read(self, **options):
try:
try:
dest__item = self.client.parse_response(self.client.connection,
'BRPOP',
**options)
except self.connection_errors:
# if there's a ConnectionError, disconnect so the next
# iteration will reconnect automatically.
self.client.connection.disconnect()
raise
if dest__item:
dest, item = dest__item
dest = bytes_to_str(dest).rsplit(self.sep, 1)[0]
self._queue_cycle.rotate(dest) # 这里进行调整
self.connection._deliver(loads(bytes_to_str(item)), dest)
return True
else:
raise Empty()
finally:
self._in_poll = None
具体如下图:
代码语言:javascript复制
Kombu | Redis
|
|
-------------------------------------------- |
| Worker | |
| | | queue 1 key
| ----------- | |
| | queue 1 | | BRPOP(keys) |
| | queue 2 | keys | |
| | ...... | -------- ----------------------------------------> queue 2 key
| | queue n | ^ | |
| ----------- | keys | |
| | | |
| | | queue 3 key
| round_robin_cycle | |
| | |
| | | |
| | | |
| ------------- ------------ | |
| | Keys list | | |
| -------------------------- | |
-------------------------------------------- |
|
1.2 哪一个worker
如果多个 worker 同时去使用 brpop 获取 broker 消息,那么具体哪一个能够读取到消息,其实这就是有一个 竞争机制,因为redis 的单进程处理,所以只能有一个 worker 才能读到。
这本身就是一个负载均衡。这个和 spring quartz 的负载均衡实现非常类似。
- spring quartz 是 多个节点读取 同一个数据库记录决定谁能开始下一次处理,哪一个得到了数据库锁 就是哪个。
- Kombu 是通过 多个 worker 读取 redis "同一个或者一组key" 的 实际结果 来决定 "哪一个 worker 能开始下一次处理"。
具体如下图:
代码语言:javascript复制
Kombu | Redis
|
|
-------------------------------------- |
| Worker 1 | |
| | |
| ----------- | |
| | queue 1 | | BRPOP(keys) |
| | queue 2 | keys | |
| | ...... | -------- ----------------------------- |
| | queue n | ^ | | |
| ----------- | keys | | |
| | | | |
| | | |
| round_robin_cycle | | | --> queue 1 key
| ^ | | | |
| | | | | |
| | | | | Single Thread |
| ------------ --------- | ---------------------> queue 2 key
| | keys list | | | | |
| ---------------------- | | | |
-------------------------------------- | | |
| | --> queue 3 key
| |
-------------------------------------- | |
| Worker 2 | BRPOP(keys) | |
| | --------------- |
| | | |
-------------------------------------- | |
| |
-------------------------------------- BRPOP(keys) | |
| Worker 3 | | |
| | -------------- |
| |
| |
--------------------------------------
1.3 哪一个进程
进程池中,使用了策略来决定具体使用哪一个进程来处理任务。
1.3.1 策略
先讲解 strategy。在 AsynPool 启动有如下,配置了策略:
代码语言:javascript复制class AsynPool(_pool.Pool):
"""AsyncIO Pool (no threads)."""
def __init__(self, processes=None, synack=False,
sched_strategy=None, proc_alive_timeout=None,
*args, **kwargs):
self.sched_strategy = SCHED_STRATEGIES.get(sched_strategy,
sched_strategy)
于是我们看看 strategy 定义如下,基本由名字可以知道其策略意义:
代码语言:javascript复制SCHED_STRATEGY_FCFS = 1 # 先来先服务
SCHED_STRATEGY_FAIR = 4 # 公平
SCHED_STRATEGIES = {
None: SCHED_STRATEGY_FAIR,
'default': SCHED_STRATEGY_FAIR,
'fast': SCHED_STRATEGY_FCFS,
'fcfs': SCHED_STRATEGY_FCFS,
'fair': SCHED_STRATEGY_FAIR,
}
1.3.2 公平调度
我们讲讲公平调度的概念。
不同系统对于公平调度的理解大同小异,我们举几个例子看看。
- Linux 中,调度器必须在各个进程之间尽可能公平地共享CPU时间,而同时又要考虑不同的任务优先级。一般原理是:按所需分配的计算能力,向系统中每个进程提供最大的公正性,或者从另外一个角度上说, 试图确保没有进程被亏待。
- Hadoop 中,公平调度是一种赋予作业(job)资源的方法,它的目的是让所有的作业随着时间的推移,都能平均的获取等同的共享资源。当单独一个作业在运行时,它将使用整个集群。当有其它作业被提交上来时,系统会将任务(task)空闲时间片(slot)赋给这些新的作业,以使得每一个作业都大概获取到等量的CPU时间。
- Yarn 之中,Fair Share指的都是Yarn根据每个队列的权重、最大,最小可运行资源计算的得到的可以分配给这个队列的最大可用资源。
1.3.3 公平调度 in Celery
在 asynpool之中,有设置,看看"是否为 fair 调度":
代码语言:javascript复制is_fair_strategy = self.sched_strategy == SCHED_STRATEGY_FAIR
基于 is_fair_strategy 这个变量,Celery 的公平调度有几处体现。
在开始 poll 时候,如果是 fair,则需要 存在 idle worker 才调度,这样就给了 idler worker 一个调度机会。
代码语言:javascript复制def on_poll_start():
# Determine which io descriptors are not busy
inactive = diff(active_writes)
# Determine hub_add vs hub_remove strategy conditional
if is_fair_strategy:
# outbound buffer present and idle workers exist
add_cond = outbound and len(busy_workers) < len(all_inqueues)
else: # default is add when data exists in outbound buffer
add_cond = outbound
if add_cond: # calling hub_add vs hub_remove
iterate_file_descriptors_safely(
inactive, all_inqueues, hub_add,
None, WRITE | ERR, consolidate=True)
else:
iterate_file_descriptors_safely(
inactive, all_inqueues, hub_remove)
在具体发布 写操作 时候,也会看看是否 worker 已经正在忙于执行某一个 task,如果正在执行,就不调度,这样就给了其他 不忙worker 一个调度的机会。
代码语言:javascript复制 def schedule_writes(ready_fds, total_write_count=None):
if not total_write_count:
total_write_count = [0]
# Schedule write operation to ready file descriptor.
# The file descriptor is writable, but that does not
# mean the process is currently reading from the socket.
# The socket is buffered so writable simply means that
# the buffer can accept at least 1 byte of data.
# This means we have to cycle between the ready fds.
# the first version used shuffle, but this version
# using `total_writes % ready_fds` is about 30% faster
# with many processes, and also leans more towards fairness
# in write stats when used with many processes
# [XXX On macOS, this may vary depending
# on event loop implementation (i.e, select/poll vs epoll), so
# have to test further]
num_ready = len(ready_fds)
for _ in range(num_ready):
ready_fd = ready_fds[total_write_count[0] % num_ready]
total_write_count[0] = 1
if ready_fd in active_writes:
# already writing to this fd
continue
if is_fair_strategy and ready_fd in busy_workers: # 是否调度
# worker is already busy with another task
continue
if ready_fd not in all_inqueues:
hub_remove(ready_fd)
continue
具体逻辑如下:
代码语言:javascript复制
Kombu | Redis
|
BRPOP(keys) |
------------------------------------ |
| Worker 1 | --------------- |
| | | |
------------------------------------ | | queue 1 key
| | ->
| | |
------------------------------------ BRPOP(keys) | | Single thread |
| Worker 2 | --------------------------------------> queue 2 key
| | | | (which worker) |
------------------------------------ | | |
| | |
------------------------------------ | | -> queue 3 key
| Worker 3 | | |
| | | |
| ----------- | | |
| | queue 1 | | BRPOP(keys) | |
| | queue 2 | keys | | |
| | ...... | -------- ------------------------- |
| | queue n | ^ | |
| ----------- | keys | |
| | | |
| | |
| round_robin_cycle (which queues) |
| ^ | |
| | | |
| | | |
| ---- ---- | |
| |keys list| | |
| | --------- | |
------------------------------------ |
| |
| fair_strategy(which subprocess) |
| |
------- ---------- ---------------- |
| | | |
v v v |
----- -------- ------ ------- ----- -------- |
| subprocess 1 | | subprocess 2 | | subprocess 3 |
-------------- -------------- --------------
0x02 Autoscaler
Autoscaler 的作用 实际就是在线调节进程池大小。这也和缓解负载相关,所以放在这里一起论述。
2.1 调用时机
在 WorkerComponent 中可以看到,为 AutoScaler 注册了两个调用途径:
- 注册在 consumer 消息响应方法中,这样消费时候如果有需要,就会调整;
- 利用 Hub 的 call_repeatedly 方法注册了周期任务,即周期看看是否需要调整。
这样就会最大程度的加大调用频率。
代码语言:javascript复制class WorkerComponent(bootsteps.StartStopStep):
"""Bootstep that starts the autoscaler thread/timer in the worker."""
def create(self, w):
scaler = w.autoscaler = self.instantiate(
w.autoscaler_cls,
w.pool, w.max_concurrency, w.min_concurrency,
worker=w, mutex=DummyLock() if w.use_eventloop else None,
)
return scaler if not w.use_eventloop else None
def register_with_event_loop(self, w, hub):
w.consumer.on_task_message.add(w.autoscaler.maybe_scale) # 消费时候如果有需要,就会调整
hub.call_repeatedly( # 周期看看是否需要调整
w.autoscaler.keepalive, w.autoscaler.maybe_scale,
)
2.2 具体实现
2.2.1 bgThread
Autoscaler 是Background thread,这样 AutoScaler就可以在后台运行:
代码语言:javascript复制class bgThread(threading.Thread):
"""Background service thread."""
def run(self):
body = self.body
shutdown_set = self._is_shutdown.is_set
try:
while not shutdown_set():
body()
finally:
self._set_stopped()
2.2.2 定义
Autoscaler 的定义如下,可以看到其逻辑就是定期判断是否需要调整:
- 如果当前并发已经到了最大,则下调;
- 如果到了最小并发,则上调;
- 则具体上调下调的,都是通过具体线程池函数做到的,这就是要根据具体操作系统来进行分析,此处略过。
class Autoscaler(bgThread):
"""Background thread to autoscale pool workers."""
def __init__(self, pool, max_concurrency,
min_concurrency=0, worker=None,
keepalive=AUTOSCALE_KEEPALIVE, mutex=None):
super().__init__()
self.pool = pool
self.mutex = mutex or threading.Lock()
self.max_concurrency = max_concurrency
self.min_concurrency = min_concurrency
self.keepalive = keepalive
self._last_scale_up = None
self.worker = worker
def body(self):
with self.mutex:
self.maybe_scale()
sleep(1.0)
def _maybe_scale(self, req=None):
procs = self.processes
cur = min(self.qty, self.max_concurrency)
if cur > procs:
self.scale_up(cur - procs)
return True
cur = max(self.qty, self.min_concurrency)
if cur < procs:
self.scale_down(procs - cur)
return True
def maybe_scale(self, req=None):
if self._maybe_scale(req):
self.pool.maintain_pool()
def update(self, max=None, min=None):
with self.mutex:
if max is not None:
if max < self.processes:
self._shrink(self.processes - max)
self._update_consumer_prefetch_count(max)
self.max_concurrency = max
if min is not None:
if min > self.processes:
self._grow(min - self.processes)
self.min_concurrency = min
return self.max_concurrency, self.min_concurrency
def scale_up(self, n):
self._last_scale_up = monotonic()
return self._grow(n)
def scale_down(self, n):
if self._last_scale_up and (
monotonic() - self._last_scale_up > self.keepalive):
return self._shrink(n)
def _grow(self, n):
self.pool.grow(n)
def _shrink(self, n):
self.pool.shrink(n)
def _update_consumer_prefetch_count(self, new_max):
diff = new_max - self.max_concurrency
if diff:
self.worker.consumer._update_prefetch_count(
diff
)
@property
def qty(self):
return len(state.reserved_requests)
@property
def processes(self):
return self.pool.num_processes