在这篇文章中,我们将学习如何为图像处理实现一个简单的模块化管道,我们将使用 OpenCV 进行图像处理和操作,并使用 Python 生成器进行管道步骤。
图像处理管道是一组按预定义顺序执行的任务,用于将图像转换为所需的结果或提取一些有趣的特征。
任务示例可以是:
- 图像转换,如平移、旋转、调整大小、翻转和裁剪,
- 图像的增强,
- 提取感兴趣区域(ROI),
- 计算特征描述符,
- 图像或对象分类,
- 物体检测,
- 用于机器学习的图像注释,
最终结果可能是一个新图像,或者只是一个包含一些图像信息的JSON文件。
假设我们在一个目录中有大量图像,并且想要检测其中的人脸并将每个人脸写入单独的文件。此外,我们希望有一些 JSON 摘要文件,它告诉我们在何处找到人脸以及在哪个文件中找到人脸。我们的人脸检测流程如下所示:
人脸检测流程
这是一个非常简单的例子,可以用以下代码总结:
代码语言:javascript复制import cv2
import os
import json
import numpy as np
def parse_args():
import argparse
# Parse command line arguments
ap = argparse.ArgumentParser(description="Image processing pipeline")
ap.add_argument("-i", "--input", required=True,
help="path to input image files")
ap.add_argument("-o", "--output", default="output",
help="path to output directory")
ap.add_argument("-os", "--out-summary", default=None,
help="output JSON summary file name")
ap.add_argument("-c", "--classifier", default="models/haarcascade/haarcascade_frontalface_default.xml",
help="path to where the face cascade resides")
return vars(ap.parse_args())
def list_images(path, valid_exts=None):
image_files = []
# Loop over the input directory structure
for (root_dir, dir_names, filenames) in os.walk(path):
for filename in sorted(filenames):
# Determine the file extension of the current file
ext = filename[filename.rfind("."):].lower()
if valid_exts and ext.endswith(valid_exts):
# Construct the path to the file and yield it
file = os.path.join(root_dir, filename)
image_files.append(file)
return image_files
def main(args):
os.makedirs(args["output"], exist_ok=True)
# load the face detector
detector = cv2.CascadeClassifier(args["classifier"])
# list images from input directory
input_image_files = list_images(args["input"], (".jpg", ".png"))
# Storage for JSON summary
summary = {}
# Loop over the image paths
for image_file in input_image_files:
# Load the image and convert it to grayscale
image = cv2.imread(image_file)
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
# Detect faces
face_rects = detector.detectMultiScale(gray, scaleFactor=1.05, minNeighbors=5,
minSize=(30, 30), flags=cv2.CASCADE_SCALE_IMAGE)
summary[image_file] = {}
# Loop over all detected faces
for i, (x, y, w, h) in enumerate(face_rects):
face = image[y:y w, x:x h]
# Prepare output directory for faces
output = os.path.join(*(image_file.split(os.path.sep)[1:]))
output = os.path.join(args["output"], output)
os.makedirs(output, exist_ok=True)
# Save faces
face_file = os.path.join(output, f"{i:05d}.jpg")
cv2.imwrite(face_file, face)
# Store summary data
summary[image_file][face_file] = np.array([x, y, w, h], dtype=int).tolist()
# Display summary
print(f"[INFO] {image_file}: face detections {len(face_rects)}")
# Save summary data
if args["out_summary"]:
summary_file = os.path.join(args["output"], args["out_summary"])
print(f"[INFO] Saving summary to {summary_file}...")
with open(summary_file, 'w') as json_file:
json_file.write(json.dumps(summary))
if __name__ == '__main__':
args = parse_args()
main(args)
用于人脸检测和提取的简单图像处理脚本
代码中的注释也很有探索性,让我们来深入研究一下。首先,我们定义命令行参数解析器(第 6-20 行)以接受以下参数:
--input:这是包含我们图像的目录的路径(可以是子目录),这是唯一的强制性参数。 --output: 保存管道结果的输出目录。 --out-summary:如果我们想要一个 JSON 摘要,只需提供它的名称(例如 output.json)。 --classifier:用于人脸检测的预训练 Haar 级联的路径
接下来,我们定义list_images函数(第 22-34 行),它将帮助我们遍历输入目录结构以获取图像路径。对于人脸检测,我们使用称为Haar级联(第 40 行)的 Viola-Jones 算法,在深度学习和容易出现误报(在没有人脸的地方报告人脸)的时代,这是一种相当古老的算法。
来自电影“老友记”的示例图像,其中存在一些误报
主要处理循环如下:我们遍历图像文件(第 49行),逐个读取它们(第 51 行),检测人脸(第 55 行),将它们保存到准备好的目录(第 59-72 行)并保存带有人脸坐标的摘要报告(第 78-82 行)。
准备项目环境:
代码语言:javascript复制$ git clone git://github.com/jagin/image-processing-pipeline.git
$ cd image-processing-pipeline
$ git checkout 77c19422f0d7a90f1541ff81782948e9a12d2519
$ conda env create -f environment.yml
$ conda activate pipeline
为了确保你们的代码能够正常运行,请检查你们的切换分支命令是否正确:
代码语言:javascript复制77c19422f0d7a90f1541ff81782948e9a12d2519
让我们运行它:
代码语言:javascript复制$ python process_images.py --input assets/images -os output.json
我们得到了一个很好的总结:
代码语言:javascript复制[INFO] assets/images/friends/friends_01.jpg: face detections 2
[INFO] assets/images/friends/friends_02.jpg: face detections 3
[INFO] assets/images/friends/friends_03.jpg: face detections 5
[INFO] assets/images/friends/friends_04.jpg: face detections 14
[INFO] assets/images/landscapes/landscape_01.jpg: face detections 0
[INFO] assets/images/landscapes/landscape_02.jpg: face detections 0
[INFO] Saving summary to output/output.json...
每个图像的人脸图像(也有误报)存储在单独的目录中。
代码语言:javascript复制output
├── images
│ └── friends
│ ├── friends_01.jpg
│ │ ├── 00000.jpg
│ │ └── 00001.jpg
│ ├── friends_02.jpg
│ │ ├── 00000.jpg
│ │ ├── 00001.jpg
│ │ └── 00002.jpg
│ ├── friends_03.jpg
│ │ ├── 00000.jpg
│ │ ├── 00001.jpg
│ │ ├── 00002.jpg
│ │ ├── 00003.jpg
│ │ └── 00004.jpg
│ └── friends_04.jpg
│ ├── 00000.jpg
│ ├── 00001.jpg
│ ├── 00002.jpg
│ ├── 00003.jpg
│ ├── 00004.jpg
│ ├── 00005.jpg
│ ├── 00006.jpg
│ ├── 00007.jpg
│ ├── 00008.jpg
│ ├── 00009.jpg
│ ├── 00010.jpg
│ ├── 00011.jpg
│ ├── 00012.jpg
│ └── 00013.jpg
└── output.json
摘要文件output.json将包含人脸的坐标(x、y、宽度、高度):
代码语言:javascript复制{
"assets/images/friends/friends_01.jpg": {
"output/images/friends/friends_01.jpg/00000.jpg": [
434,
121,
154,
154
],
"output/images/friends/friends_01.jpg/00001.jpg": [
571,
145,
192,
192
]
},
...
}
上面的例子并不复杂,只有几个步骤,所以很容易快速创建一个简单的脚本,但很快它就会变得复杂。
在其中一个项目中,我正在研究步态识别,管道包含以下步骤:
- 捕捉视频
- 检测人员
- 估计人的姿势
- 跟踪姿势
- 创建蒙版
- 缓冲区掩码序列
- 编码步态
- 识别步态嵌入
- 显示结果
还有更多用于数据注释、指标生成等。
当我们的管道不断增长,但是不只是有我们在处理它时,问题就会开始出现。还有其他队友在做不同的步骤,管道的某些部分可以在其他管道中重复使用(例如读取图像、捕获视频等)。
我们需要管道是模块化的!我们还需要一种巧妙的方式在管道的步骤之间传递数据。在寻找解决方案时,我偶然发现了一个很好的代码片段,它允许我们使用 Python 生成器创建类似Unix 的管道。
代码语言:javascript复制#! /usr/bin/env python
class Pipeline(object):
def __init__(self):
self.source = None
def __iter__(self):
return self.generator()
def generator(self):
while True:
value = self.source.next()
if self.filter(value):
yield self.map(value)
def __or__(self, other):
other.source = self.generator()
return other
def filter(self, value):
return True
def map(self, value):
return value
class AllNumbers(Pipeline):
def generator(self):
value = 0
while True:
yield value
value = 1
class Evens(Pipeline):
def filter(self, value):
return value % 2 == 0
class MultipleOf(Pipeline):
def __init__(self, factor=1):
self.factor = factor
super(MultipleOf, self).__init__()
def filter(self, value):
return value % self.factor == 0
class Printer(Pipeline):
def map(self, value):
print value
return value
class First(Pipeline):
def __init__(self, total=10):
self.total = total
self.count = 0
super(First, self).__init__()
def map(self, value):
self.count = 1
if self.count > self.total:
raise StopIteration
return value
def main():
all_numbers = AllNumbers()
evens = MultipleOf(2)
multiple_of_3 = MultipleOf(3)
printer = Printer()
first_10 = First(10)
pipeline = all_numbers | evens | multiple_of_3 | first_10 | printer
for i in pipeline:
pass
if __name__ == '__main__':
main()
下面这个简单的例子创建了一个Pipeline,来打印前 10 个既是 3 的倍数又是偶数的数字。
代码语言:javascript复制$ python example_pipeline.py
0
6
12
18
24
30
36
42
48
54
最重要和最有趣的部分是Pipeline生成器类本身:
代码语言:javascript复制class Pipeline(object):
def __init__(self):
self.source = None
def __iter__(self):
return self.generator()
def generator(self):
while self.has_next():
data = next(self.source) if self.source else {}
if self.filter(data):
yield self.map(data)
def __or__(self, other):
other.source = self.generator()
return other
def filter(self, data):
return True
def map(self, data):
return data
def has_next(self):
return True
生成器函数允许我们声明一个行为类似于迭代器的函数,即它可以在 for 循环中使用。换句话说,生成器是一个函数,它返回一个我们可以迭代的对象(迭代器)(一次一个值)。
Pipeline是一个抽象类,它包含generator函数(第8-12行),默认情况下,agenerator函数通过filter函数(第18-19行)和map函数传递数据(来自上一个生成器)。 filter函数允许我们过滤通过管道的数据(如Even上面代码片段中的类)。map函数使我们能够像在第一类中一样操作(映射)管道数据或更新步骤的状态。 通过覆盖或运算符,可以创建类似 Unix 的管道:
load_images | detect_faces | save_faces | display_summary
。
管道的第一步必须生成我们的输入数据,因此我们必须覆盖generator函数。在我们的例子中,输入数据是要处理的图像列表,让我们将加载图像部分解耦到名为LoadImages的管道步骤中:
代码语言:javascript复制import cv2
from pipeline.pipeline import Pipeline
import pipeline.utils as utils
class LoadImages(Pipeline):
def __init__(self, src, valid_exts=(".jpg", ".png")):
self.src = src
self.valid_exts = valid_exts
super(LoadImages, self).__init__()
def generator(self):
source = utils.list_images(self.src, self.valid_exts)
while self.has_next():
image_file = next(source)
image = cv2.imread(image_file)
data = {
"image_file": image_file,
"image": image
}
if self.filter(data):
yield self.map(data)
管道生成器步骤加载图像
我们可以看到,generator会为在src目录中找到的每个图像文件生成以下字典结构:
data = { "image_file": image_file, "image": image }
对于每个文件,我们都获得了图像文件的路径和文件的二进制文件。
使用面向对象的编程,我们可以扩展LoadIamges类,并在需要过滤掉文件名或路径中包含选定单词的图像文件时重写filter函数。
下一步是检测人脸:
代码语言:javascript复制import cv2
from pipeline.pipeline import Pipeline
class CascadeDetectFaces(Pipeline):
def __init__(self, classifier):
# load the face detector
self.detector = cv2.CascadeClassifier(classifier)
super(DetectFaces, self).__init__()
def map(self, data):
image = data["image"]
# Detect faces
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
face_rects = self.detector.detectMultiScale(gray, scaleFactor=1.05, minNeighbors=5,
minSize=(30, 30), flags=cv2.CASCADE_SCALE_IMAGE)
data["face_rects"] = face_rects
return data
检测人脸步骤
它将使用map函数中源生成器(load_images)的数据,提取图像二进制文件(第14行),检测人脸(第17–18行),并使用人脸坐标(第20行)丰富数据,以便下一步操作。
我们可以将整个管道包含在以下main函数中:
代码语言:javascript复制import os
from pipeline.load_images import LoadImages
from pipeline.cascade_detect_faces import CascadeDetectFaces
from pipeline.save_faces import SaveFaces
from pipeline.save_summary import SaveSummary
from pipeline.display_summary import DisplaySummary
def parse_args():
import argparse
# Parse command line arguments
ap = argparse.ArgumentParser(description="Image processing pipeline")
ap.add_argument("-i", "--input", required=True,
help="path to input image files")
ap.add_argument("-o", "--output", default="output",
help="path to output directory")
ap.add_argument("-os", "--out-summary", default=None,
help="output JSON summary file name")
ap.add_argument("-c", "--classifier", default="models/haarcascade/haarcascade_frontalface_default.xml",
help="path to where the face cascade resides")
return vars(ap.parse_args())
def main(args):
# Create pipeline steps
load_images = LoadImages(args["input"])
detect_faces = CascadeDetectFaces(args["classifier"])
save_faces = SaveFaces(args["output"])
if args["out_summary"]:
summary_file = os.path.join(args["output"], args["out_summary"])
save_summary = SaveSummary(summary_file)
display_summary = DisplaySummary()
# Create image processing pipeline
pipeline = load_images | detect_faces | save_faces
if args["out_summary"]:
pipeline |= save_summary
pipeline |= display_summary
# Iterate through pipeline
for _ in pipeline:
pass
if args["out_summary"]:
print(f"[INFO] Saving summary to {summary_file}...")
save_summary.write()
if __name__ == '__main__':
args = parse_args()
main(args)
其中,单个步骤的逻辑是分离的,主要功能是干净整洁的,功能与文章开头的脚本相同。
处理管道的模块化使我们可以在视频处理中重用CascadeDetectFaces类:
代码语言:javascript复制python process_video_pipeline.py -i assets/videos/faces.mp4 -ov faces.avi -p
12%|██████████████████████████ ██████▋ | 71/577 [00:08<01:12, 6.99it/s]
具有以下示例结果:
CascadeDetectFaces并不完美,但我们可以使用cv2.dnnOpenCV 中的一些深度学习模型和模块创建另一个实现,这将更准确,更容易在管道中替换它。