用例
在这个现代世界中,我认为我们大多数人都熟悉使用计算机视觉应用程序的新行业,特别是闭路电视监控摄像头和视频分析,它们在计算机视觉技术中发挥着重要作用。
例如,当我们分析闭路电视摄像机时,作为第一步,我们应该使用 OpenCV 读取 RTSP URL,然后我们应该将其存储在云中的某个位置以进一步分析。
但问题是,当我们将帧一个接一个地上传到云端时,上传需要一些时间,不是吗?
为了让小伙伴们对此有一个清晰的认识,我们用 Google bucket 做了一个实验,估计一帧需要1.05秒才能上传 Google bucket 。因此,我们必须等待 1 秒才能获得响应,然后我们需要上传行中的下一帧。
解决方案是,我们可以使用 Celery 以异步方式上传帧。当我们以异步方式上传帧时,我们无法获得序列帧,作为一种手段,我们应该使用 Celery 中的组和链概念。
Celery 是什么?
Celery是 Python 世界中最受欢迎的任务管理器背景之一。“Celery”与多种消息代理兼容,如 RabbitMQ 或 Redis。它们可以同时充当生产者和消费者。此外,“Celery”是基于分布式消息传递的异步任务队列/作业队列。除此之外,它还专注于实时操作并支持调度。
明确定义后,让我们看看下面如何使用python代码配置celery。
第 1 步:导入所有必需的 celery 包
代码语言:javascript复制from celery import Celery
from celery.result import AsyncResult
from celery.result import allow_join_result
from celery.decorators import periodic_task
第 2 步:我们应该在 celery 中配置代理和后端。我已经使用Redis作为后端,所以在小伙伴们的系统中安装Redis,并确保它运行成功;
代码语言:javascript复制app = Celery(‘tasks’, backend=’redis://guest@127.0.0.1:6379', broker=’redis://guest@127.0.0.1:6379')
第 3 步:为了以异步方式调用函数,我们应该在函数上添加“@app.taks annotation”。
下面是 celery 编码示例,用于将帧上传到 Google bucket 中。
代码语言:javascript复制@app.task(bind=True, max_retries=30)
def upload_frames_gcs(self, file_name):
try:
url = upload_file_to_gcs(file_name)
return url
except Exception as e:
raise self.retry(exc=e)
第 4 步:以下是最重要的步骤:
我们将无法直接调用该函数并以异步方式上传帧,因为上传后我们无法获得序列帧,因此我们应该使用celery中的链和组概念将帧上传到 bucket 中。使用这种技术,我们可以并行上传多达 5 或 10 帧,也可以获得帧的序列顺序。但是,在开始编码之前,让我们先看看“Celery 中的链和组是什么”。
Celery 中的链
链是一种原语,可以让我们将更多任务链接到一个单一的签名中,因此它被称为“一个接一个,基本上形成一个回调链”。
Celery 中的组
组原语是一个签名,它采用应该并行应用的任务列表。
下面是一个示例编码,用来解释如何使用 celery 中的组和链技术将帧上传到Google bucket 中。
代码语言:javascript复制jobs = group(upload_frames_gcs.s(file_name, ts) for ts, file_name in file_name_dic.items())
result = jobs.apply_async()
可以理解的是,我们在一个组方法中调用 upload_frames_gcs 函数 ,然后我们可以看到“s ”在celery中传递一个名为“Chains concept”的参数,这允许我们链接签名,结果是“一个接一个地调用,本质上形成一个回调链”。最后,我们可以在一个任务中得到一组结果。
第 5 步:如果我们想在 celery 中上传后获取框架 URL,简单地说,在结果变量中就可以获取该组函数的任务 id,我们可以通过任务 id 来获取结果。
但是,请注意检查任务的状态,一旦任务完成,我们就可以获取框架 URL。
代码语言:javascript复制def taskid_status(task_id_array):
for task in task_id_array:
if task.successful():
task_id_array.remove(task)
with allow_join_result():
frames_array = []
for results in task.join():
frame_dic = {}
frame_dic['frame_url'] = results[0]
frames_array.append(frame_dic)
return task_id_array, frames_array
在frames_array 变量中,我们可以获得带有时间戳的所有帧。
已经用多个不同的测试用例测试了性能:
- 5 帧需要 0.85 秒才能上传谷歌存储。
- 10 帧需要 0.77 到 0.82 秒上传谷歌存储。
- 15 帧需要 0.9 到 1.0 秒上传谷歌存储。
- 30 帧需要 0.7 到 0.8 秒上传谷歌存储。
显然,增加要上传到存储空间中的帧数没有太大区别,因为多处理用于在celery 中执行任务的并发执行。