前言
写了一个每三秒执行一次,有需要处理的数据则处理,没有则退出的函数。用celery周期性任务执行。遇到了一个问题就是,有数据处理时。3秒后,下一个进程会关掉此进程,加了一个锁。保证同一时间只有一个进程执行,其他进程直接退出。
实现代码
代码语言:javascript复制from celery import shared_task, current_task
@shared_task
def sync_start_process(sname):
"""
异步执行任务
"""
lock_name = f"lock_sync_start_process_{sname}"
# 尝试获取锁定
with current_task.app.Lock(lock_name, timeout=0):
# 如果成功获取到锁定,则说明之前没有相同任务在执行,可以继续执行任务逻辑
# 执行任务逻辑...
process_start(sname)
# 如果无法获取到锁定(即有相同任务正在执行),则会抛出 Reject 异常,任务会被自动丢弃
raise Reject("Task is already in progress")
遇到问题
一开始调试成功了,后来遇到启动报错。死活找不到原因。
chatgpt提示4.0.0版本以上,Lock对象被移除了,更新代码。使用redis锁。
代码语言:javascript复制#操作docker
import redis
from celery import shared_task, current_task
from celery.exceptions import Reject
from .utils import process_start
#日志
from loguru import logger
@shared_task
def sync_start_process(sname):
"""
异步执行任务
"""
lock_name = f"lock_sync_start_process_{sname}"
#创建redis链接
r = redis.Redis(host='redis', port=6379, db=3, password='redis@2022')
# 尝试获取锁定
lock_acquired = r.set(lock_name, 'locked', nx=True, ex=1)
if lock_acquired:
try:
# 如果成功获取到锁定,则说明之前没有相同任务在执行,可以继续执行任务逻辑
# 执行任务逻辑...
process_start(sname)
finally:
#释放锁
r.delete(lock_name)
else:
# 如果无法获取到锁定(即有相同任务正在执行),则会抛出 Reject 异常,任务会被自动丢弃
raise Reject("Task is already in progress")