基于opencv实现模块化图像处理管道

2022-02-09 12:31:40 浏览数 (1)

在这篇文章中,我们将学习如何为图像处理实现一个简单的模块化管道,我们将使用 OpenCV 进行图像处理和操作,并使用 Python 生成器进行管道步骤。

图像处理管道是一组按预定义顺序执行的任务,用于将图像转换为所需的结果或提取一些有趣的特征。

任务示例可以是:

  • 图像转换,如平移、旋转、调整大小、翻转和裁剪,
  • 图像的增强,
  • 提取感兴趣区域(ROI),
  • 计算特征描述符,
  • 图像或对象分类,
  • 物体检测,
  • 用于机器学习的图像注释,

最终结果可能是一个新图像,或者只是一个包含一些图像信息的JSON文件。

假设我们在一个目录中有大量图像,并且想要检测其中的人脸并将每个人脸写入单独的文件。此外,我们希望有一些 JSON 摘要文件,它告诉我们在何处找到人脸以及在哪个文件中找到人脸。我们的人脸检测流程如下所示:

人脸检测流程

这是一个非常简单的例子,可以用以下代码总结:

代码语言:javascript复制
import cv2
import os
import json
import numpy as np

def parse_args():
    import argparse

    # Parse command line arguments
    ap = argparse.ArgumentParser(description="Image processing pipeline")
    ap.add_argument("-i", "--input", required=True,
                    help="path to input image files")
    ap.add_argument("-o", "--output", default="output",
                    help="path to output directory")
    ap.add_argument("-os", "--out-summary", default=None,
                    help="output JSON summary file name")
    ap.add_argument("-c", "--classifier", default="models/haarcascade/haarcascade_frontalface_default.xml",
                    help="path to where the face cascade resides")

    return vars(ap.parse_args())

def list_images(path, valid_exts=None):
    image_files = []
    # Loop over the input directory structure
    for (root_dir, dir_names, filenames) in os.walk(path):
        for filename in sorted(filenames):
            # Determine the file extension of the current file
            ext = filename[filename.rfind("."):].lower()
            if valid_exts and ext.endswith(valid_exts):
                # Construct the path to the file and yield it
                file = os.path.join(root_dir, filename)
                image_files.append(file)

    return image_files

def main(args):
    os.makedirs(args["output"], exist_ok=True)

    # load the face detector
    detector = cv2.CascadeClassifier(args["classifier"])

    # list images from input directory
    input_image_files = list_images(args["input"], (".jpg", ".png"))

    # Storage for JSON summary
    summary = {}

    # Loop over the image paths
    for image_file in input_image_files:
        # Load the image and convert it to grayscale
        image = cv2.imread(image_file)
        gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)

        # Detect faces
        face_rects = detector.detectMultiScale(gray, scaleFactor=1.05, minNeighbors=5,
                                               minSize=(30, 30), flags=cv2.CASCADE_SCALE_IMAGE)
        summary[image_file] = {}
        # Loop over all detected faces
        for i, (x, y, w, h) in enumerate(face_rects):
            face = image[y:y w, x:x h]

            # Prepare output directory for faces
            output = os.path.join(*(image_file.split(os.path.sep)[1:]))
            output = os.path.join(args["output"], output)
            os.makedirs(output, exist_ok=True)

            # Save faces
            face_file = os.path.join(output, f"{i:05d}.jpg")
            cv2.imwrite(face_file, face)

            # Store summary data
            summary[image_file][face_file] = np.array([x, y, w, h], dtype=int).tolist()

        # Display summary
        print(f"[INFO] {image_file}: face detections {len(face_rects)}")

    # Save summary data
    if args["out_summary"]:
        summary_file = os.path.join(args["output"], args["out_summary"])
        print(f"[INFO] Saving summary to {summary_file}...")
        with open(summary_file, 'w') as json_file:
            json_file.write(json.dumps(summary))

if __name__ == '__main__':
    args = parse_args()
    main(args)

用于人脸检测和提取的简单图像处理脚本

代码中的注释也很有探索性,让我们来深入研究一下。首先,我们定义命令行参数解析器(第 6-20 行)以接受以下参数:

--input:这是包含我们图像的目录的路径(可以是子目录),这是唯一的强制性参数。 --output: 保存管道结果的输出目录。 --out-summary:如果我们想要一个 JSON 摘要,只需提供它的名称(例如 output.json)。 --classifier:用于人脸检测的预训练 Haar 级联的路径

接下来,我们定义list_images函数(第 22-34 行),它将帮助我们遍历输入目录结构以获取图像路径。对于人脸检测,我们使用称为Haar级联(第 40 行)的 Viola-Jones 算法,在深度学习和容易出现误报(在没有人脸的地方报告人脸)的时代,这是一种相当古老的算法。

来自电影“老友记”的示例图像,其中存在一些误报

主要处理循环如下:我们遍历图像文件(第 49行),逐个读取它们(第 51 行),检测人脸(第 55 行),将它们保存到准备好的目录(第 59-72 行)并保存带有人脸坐标的摘要报告(第 78-82 行)。

准备项目环境:

代码语言:javascript复制
$ git clone git://github.com/jagin/image-processing-pipeline.git
$ cd image-processing-pipeline
$ git checkout 77c19422f0d7a90f1541ff81782948e9a12d2519
$ conda env create -f environment.yml
$ conda activate pipeline

为了确保你们的代码能够正常运行,请检查你们的切换分支命令是否正确:

代码语言:javascript复制
77c19422f0d7a90f1541ff81782948e9a12d2519

让我们运行它:

代码语言:javascript复制
$ python process_images.py --input assets/images -os output.json

我们得到了一个很好的总结:

代码语言:javascript复制
[INFO] assets/images/friends/friends_01.jpg: face detections 2
[INFO] assets/images/friends/friends_02.jpg: face detections 3
[INFO] assets/images/friends/friends_03.jpg: face detections 5
[INFO] assets/images/friends/friends_04.jpg: face detections 14
[INFO] assets/images/landscapes/landscape_01.jpg: face detections 0
[INFO] assets/images/landscapes/landscape_02.jpg: face detections 0
[INFO] Saving summary to output/output.json...

每个图像的人脸图像(也有误报)存储在单独的目录中。

代码语言:javascript复制
output
├── images
│   └── friends
│       ├── friends_01.jpg
│       │   ├── 00000.jpg
│       │   └── 00001.jpg
│       ├── friends_02.jpg
│       │   ├── 00000.jpg
│       │   ├── 00001.jpg
│       │   └── 00002.jpg
│       ├── friends_03.jpg
│       │   ├── 00000.jpg
│       │   ├── 00001.jpg
│       │   ├── 00002.jpg
│       │   ├── 00003.jpg
│       │   └── 00004.jpg
│       └── friends_04.jpg
│           ├── 00000.jpg
│           ├── 00001.jpg
│           ├── 00002.jpg
│           ├── 00003.jpg
│           ├── 00004.jpg
│           ├── 00005.jpg
│           ├── 00006.jpg
│           ├── 00007.jpg
│           ├── 00008.jpg
│           ├── 00009.jpg
│           ├── 00010.jpg
│           ├── 00011.jpg
│           ├── 00012.jpg
│           └── 00013.jpg
└── output.json

摘要文件output.json将包含人脸的坐标(x、y、宽度、高度):

代码语言:javascript复制
{
  "assets/images/friends/friends_01.jpg": {
    "output/images/friends/friends_01.jpg/00000.jpg": [
      434,
      121,
      154,
      154
    ],
    "output/images/friends/friends_01.jpg/00001.jpg": [
      571,
      145,
      192,
      192
    ]
  },
  ...
}

上面的例子并不复杂,只有几个步骤,所以很容易快速创建一个简单的脚本,但很快它就会变得复杂。

在其中一个项目中,我正在研究步态识别,管道包含以下步骤:

  • 捕捉视频
  • 检测人员
  • 估计人的姿势
  • 跟踪姿势
  • 创建蒙版
  • 缓冲区掩码序列
  • 编码步态
  • 识别步态嵌入
  • 显示结果

还有更多用于数据注释、指标生成等。

当我们的管道不断增长,但是不只是有我们在处理它时,问题就会开始出现。还有其他队友在做不同的步骤,管道的某些部分可以在其他管道中重复使用(例如读取图像、捕获视频等)。

我们需要管道是模块化的!我们还需要一种巧妙的方式在管道的步骤之间传递数据。在寻找解决方案时,我偶然发现了一个很好的代码片段,它允许我们使用 Python 生成器创建类似Unix 的管道。

代码语言:javascript复制
#! /usr/bin/env python

class Pipeline(object):
    def __init__(self):
        self.source = None
    
    def __iter__(self):
        return self.generator()
    
    def generator(self):
        while True:
            value = self.source.next()
            if self.filter(value):
                yield self.map(value)
    
    def __or__(self, other):
        other.source = self.generator()
        return other
    
    def filter(self, value):
        return True
    
    def map(self, value):
        return value


class AllNumbers(Pipeline):
    def generator(self):
        value = 0
        while True:
            yield value
            value  = 1


class Evens(Pipeline):
    def filter(self, value):
        return value % 2 == 0


class MultipleOf(Pipeline):
    def __init__(self, factor=1):
        self.factor = factor
        super(MultipleOf, self).__init__()
        
    def filter(self, value):
        return value % self.factor == 0


class Printer(Pipeline):
    def map(self, value):
        print value
        return value


class First(Pipeline):
    def __init__(self, total=10):
        self.total = total
        self.count = 0
        super(First, self).__init__()
    
    def map(self, value):
        self.count  = 1
        if self.count > self.total:
            raise StopIteration
        return value


def main():
    all_numbers = AllNumbers()
    evens = MultipleOf(2)
    multiple_of_3 = MultipleOf(3)
    printer = Printer()
    first_10 = First(10)
    pipeline = all_numbers | evens | multiple_of_3 | first_10 | printer
    
    for i in pipeline:
        pass


if __name__ == '__main__':
    main()

下面这个简单的例子创建了一个Pipeline,来打印前 10 个既是 3 的倍数又是偶数的数字。

代码语言:javascript复制
$ python example_pipeline.py
0
6
12
18
24
30
36
42
48
54

最重要和最有趣的部分是Pipeline生成器类本身:

代码语言:javascript复制
class Pipeline(object):
    def __init__(self):
        self.source = None

    def __iter__(self):
        return self.generator()

    def generator(self):
        while self.has_next():
            data = next(self.source) if self.source else {}
            if self.filter(data):
                yield self.map(data)

    def __or__(self, other):
        other.source = self.generator()
        return other

    def filter(self, data):
        return True

    def map(self, data):
        return data

    def has_next(self):
        return True

生成器函数允许我们声明一个行为类似于迭代器的函数,即它可以在 for 循环中使用。换句话说,生成器是一个函数,它返回一个我们可以迭代的对象(迭代器)(一次一个值)。

Pipeline是一个抽象类,它包含generator函数(第8-12行),默认情况下,agenerator函数通过filter函数(第18-19行)和map函数传递数据(来自上一个生成器)。 filter函数允许我们过滤通过管道的数据(如Even上面代码片段中的类)。map函数使我们能够像在第一类中一样操作(映射)管道数据或更新步骤的状态。 通过覆盖或运算符,可以创建类似 Unix 的管道:

load_images | detect_faces | save_faces | display_summary

管道的第一步必须生成我们的输入数据,因此我们必须覆盖generator函数。在我们的例子中,输入数据是要处理的图像列表,让我们将加载图像部分解耦到名为LoadImages的管道步骤中:

代码语言:javascript复制
import cv2

from pipeline.pipeline import Pipeline
import pipeline.utils as utils

class LoadImages(Pipeline):
    def __init__(self, src, valid_exts=(".jpg", ".png")):
        self.src = src
        self.valid_exts = valid_exts

        super(LoadImages, self).__init__()

    def generator(self):
        source = utils.list_images(self.src, self.valid_exts)
        while self.has_next():
            image_file = next(source)
            image = cv2.imread(image_file)

            data = {
                "image_file": image_file,
                "image": image
            }

            if self.filter(data):
                yield self.map(data)

管道生成器步骤加载图像

我们可以看到,generator会为在src目录中找到的每个图像文件生成以下字典结构:

data = { "image_file": image_file, "image": image }

对于每个文件,我们都获得了图像文件的路径和文件的二进制文件。

使用面向对象的编程,我们可以扩展LoadIamges类,并在需要过滤掉文件名或路径中包含选定单词的图像文件时重写filter函数。

下一步是检测人脸:

代码语言:javascript复制
import cv2

from pipeline.pipeline import Pipeline

class CascadeDetectFaces(Pipeline):
    def __init__(self, classifier):
        # load the face detector
        self.detector = cv2.CascadeClassifier(classifier)

        super(DetectFaces, self).__init__()

    def map(self, data):
        image = data["image"]

        # Detect faces
        gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
        face_rects = self.detector.detectMultiScale(gray, scaleFactor=1.05, minNeighbors=5,
                                                    minSize=(30, 30), flags=cv2.CASCADE_SCALE_IMAGE)
        data["face_rects"] = face_rects

        return data

检测人脸步骤

它将使用map函数中源生成器(load_images)的数据,提取图像二进制文件(第14行),检测人脸(第17–18行),并使用人脸坐标(第20行)丰富数据,以便下一步操作。

我们可以将整个管道包含在以下main函数中:

代码语言:javascript复制
import os

from pipeline.load_images import LoadImages
from pipeline.cascade_detect_faces import CascadeDetectFaces
from pipeline.save_faces import SaveFaces
from pipeline.save_summary import SaveSummary
from pipeline.display_summary import DisplaySummary

def parse_args():
    import argparse

    # Parse command line arguments
    ap = argparse.ArgumentParser(description="Image processing pipeline")
    ap.add_argument("-i", "--input", required=True,
                    help="path to input image files")
    ap.add_argument("-o", "--output", default="output",
                    help="path to output directory")
    ap.add_argument("-os", "--out-summary", default=None,
                    help="output JSON summary file name")
    ap.add_argument("-c", "--classifier", default="models/haarcascade/haarcascade_frontalface_default.xml",
                    help="path to where the face cascade resides")

    return vars(ap.parse_args())

def main(args):
    # Create pipeline steps
    load_images = LoadImages(args["input"])
    detect_faces = CascadeDetectFaces(args["classifier"])
    save_faces = SaveFaces(args["output"])
    if args["out_summary"]:
        summary_file = os.path.join(args["output"], args["out_summary"])
        save_summary = SaveSummary(summary_file)
    display_summary = DisplaySummary()

    # Create image processing pipeline
    pipeline = load_images | detect_faces | save_faces
    if args["out_summary"]:
        pipeline |= save_summary
    pipeline |= display_summary

    # Iterate through pipeline
    for _ in pipeline:
        pass

    if args["out_summary"]:
        print(f"[INFO] Saving summary to {summary_file}...")
        save_summary.write()

if __name__ == '__main__':
    args = parse_args()
    main(args)

其中,单个步骤的逻辑是分离的,主要功能是干净整洁的,功能与文章开头的脚本相同。

处理管道的模块化使我们可以在视频处理中重用CascadeDetectFaces类:

代码语言:javascript复制
python process_video_pipeline.py -i assets/videos/faces.mp4 -ov faces.avi -p 
12%|██████████████████████████ ██████▋ | 71/577 [00:08<01:12, 6.99it/s]

具有以下示例结果:

CascadeDetectFaces并不完美,但我们可以使用cv2.dnnOpenCV 中的一些深度学习模型和模块创建另一个实现,这将更准确,更容易在管道中替换它。

0 人点赞