如何在 1 秒内将 50 个 OpenCV 帧上传到云存储

2022-02-14 10:08:55 浏览数 (1)

用例

在这个现代世界中,我认为我们大多数人都熟悉使用计算机视觉应用程序的新行业,特别是闭路电视监控摄像头和视频分析,它们在计算机视觉技术中发挥着重要作用。

例如,当我们分析闭路电视摄像机时,作为第一步,我们应该使用 OpenCV 读取 RTSP URL,然后我们应该将其存储在云中的某个位置以进一步分析。

但问题是,当我们将帧一个接一个地上传到云端时,上传需要一些时间,不是吗?

为了让小伙伴们对此有一个清晰的认识,我们用 Google bucket 做了一个实验,估计一帧需要1.05秒才能上传 Google bucket 。因此,我们必须等待 1 秒才能获得响应,然后我们需要上传行中的下一帧。

解决方案是,我们可以使用 Celery 以异步方式上传帧。当我们以异步方式上传帧时,我们无法获得序列帧,作为一种手段,我们应该使用 Celery 中的组和链概念。

Celery 是什么?

Celery是 Python 世界中最受欢迎的任务管理器背景之一。“Celery”与多种消息代理兼容,如 RabbitMQ 或 Redis。它们可以同时充当生产者和消费者。此外,“Celery”是基于分布式消息传递的异步任务队列/作业队列。除此之外,它还专注于实时操作并支持调度。

明确定义后,让我们看看下面如何使用python代码配置celery。

第 1 步:导入所有必需的 celery 包

代码语言:javascript复制
from celery import Celery
from celery.result import AsyncResult
from celery.result import allow_join_result
from celery.decorators import periodic_task

第 2 步:我们应该在 celery 中配置代理和后端。我已经使用Redis作为后端,所以在小伙伴们的系统中安装Redis,并确保它运行成功;

代码语言:javascript复制
app = Celery(‘tasks’, backend=’redis://guest@127.0.0.1:6379', broker=’redis://guest@127.0.0.1:6379')

第 3 步:为了以异步方式调用函数,我们应该在函数上添加“@app.taks annotation”。

下面是 celery 编码示例,用于将帧上传到 Google bucket 中。

代码语言:javascript复制
@app.task(bind=True, max_retries=30)
def upload_frames_gcs(self, file_name):
    try:
        url = upload_file_to_gcs(file_name)
        return url
    except Exception as e:
        raise self.retry(exc=e)

第 4 步:以下是最重要的步骤:

我们将无法直接调用该函数并以异步方式上传帧,因为上传后我们无法获得序列帧,因此我们应该使用celery中的链和组概念将帧上传到 bucket 中。使用这种技术,我们可以并行上传多达 5 或 10 帧,也可以获得帧的序列顺序。但是,在开始编码之前,让我们先看看“Celery 中的链和组是什么”。

Celery 中的链

链是一种原语,可以让我们将更多任务链接到一个单一的签名中,因此它被称为“一个接一个,基本上形成一个回调链”。

Celery 中的组

组原语是一个签名,它采用应该并行应用的任务列表。

下面是一个示例编码,用来解释如何使用 celery 中的组和链技术将帧上传到Google bucket 中。

代码语言:javascript复制
jobs = group(upload_frames_gcs.s(file_name, ts) for ts, file_name in file_name_dic.items())
result = jobs.apply_async()

可以理解的是,我们在一个组方法中调用 upload_frames_gcs 函数 ,然后我们可以看到“s ”在celery中传递一个名为“Chains concept”的参数,这允许我们链接签名,结果是“一个接一个地调用,本质上形成一个回调链”。最后,我们可以在一个任务中得到一组结果。

第 5 步:如果我们想在 celery 中上传后获取框架 URL,简单地说,在结果变量中就可以获取该组函数的任务 id,我们可以通过任务 id 来获取结果。

但是,请注意检查任务的状态,一旦任务完成,我们就可以获取框架 URL。

代码语言:javascript复制
def taskid_status(task_id_array):
        for task in task_id_array:
            if task.successful():
                task_id_array.remove(task)
                with allow_join_result():
                    frames_array = []
                    for results in task.join(): 
                        frame_dic = {}
                        frame_dic['frame_url'] = results[0]
                        frames_array.append(frame_dic)
       return task_id_array, frames_array

在frames_array 变量中,我们可以获得带有时间戳的所有帧。

已经用多个不同的测试用例测试了性能:

  1. 5 帧需要 0.85 秒才能上传谷歌存储。
  2. 10 帧需要 0.77 到 0.82 秒上传谷歌存储。
  3. 15 帧需要 0.9 到 1.0 秒上传谷歌存储。
  4. 30 帧需要 0.7 到 0.8 秒上传谷歌存储。

显然,增加要上传到存储空间中的帧数没有太大区别,因为多处理用于在celery 中执行任务的并发执行。

0 人点赞