TorchScript | 目标检测部署实战

2023-11-22 14:35:04 浏览数 (2)

Inference via TorchScript

简介

TorchScript 是可以由 TorchScript 编译器理解、编译和序列化的 PyTorch 模型的表示形式。从根本上说,TorchScript 本身就是一种编程语言。它是使用 PyTorch API 的 Python 的子集。

TorchScript 软件栈可以将 Python 代码转换成 C 代码。TorchScript 软件栈包括两部分:TorchScript(Python)和 LibTorch(C )。TorchScript 负责将 Python 代码转成一个模型文件,LibTorch 负责解析运行这个模型文件

原理

TorchScript 保存模型有两种模式:trace 模式和 script 模式。

trace 模式

trace 模式就是跟踪模型的执行,然后将其路径记录下来。在使用 trace 模式时,需要构造一个符合要求的输入,然后使用 TorchScript tracer 运行一遍,整个运行过程就会被记录下来。在 trace 模式中运行时,每执行一个算子,就会往当前的 graph 加入一个 node。所有代码执行完毕,每一步的操作就会以一个计算图里的某个节点的形式被保存下来。PyTorch 导出 ONNX 也是使用了这部分代码,所以理论上能够导出 ONNX 的模型也能够使用 trace 模式导出 torch 模型。

trace 模式有比较大的限制:

  • 不能有 if-else 等控制流
  • 只支持 Tensor 操作

为什么有这种限制:

代码语言:javascript复制
1. 跟踪出的 graph 是静态的,如果有控制流,那么记录下来的只是当时生成模型时走的那条路;
2. 追踪代码是跟 Tensor 算子绑定在一起的,如果是非 Tensor 的操作,是无法被记录的。
代码语言:javascript复制
class Module_0(torch.nn.Module):
    def __init__(self, N, M):
        super(Module_0, self).__init__()
        self.weight = torch.nn.Parameter(torch.rand(N, M))
        self.linear = torch.nn.Linear(N, M)

    def forward(self, input: torch.Tensor) -> torch.Tensor:
        output = self.weight.mm(input)
        output = self.linear(output)
        return output


scripted_module = torch.jit.trace(Module_0(2, 3).eval(), (torch.zeros(3, 2)))
scripted_module.save("Module_0.pt")
script 模式

script 模式不仅支持 if-else 等控制流,还支持非 Tensor 操作,如 List、Tuple、Map 等容器操作。

代码语言:javascript复制
class Module_1(torch.nn.Module):
    def __init__(self, N, M):
        super(Module_1, self).__init__()
        self.weight = torch.nn.Parameter(torch.rand(N, M))
        self.linear = torch.nn.Linear(N, M)

    def forward(self, input: torch.Tensor, do_linear: bool) -> torch.Tensor:
        output = self.weight.mm(input)
        if do_linear:
            output = self.linear(output)
        return output


scripted_module = torch.jit.script(Module_1(3, 3).eval())
scripted_module.save("Module_1.pt")

混合模式

一个 module 包含控制流,同时也包含一个只有 Tensor 操作的子模型。这种情况下当然可以直接使用 script 模式,但是 script 模式需要对部分变量进行类型标注。对上述子模型进行 trace,整体再进行 script:

代码语言:javascript复制
class Module_2(torch.nn.Module):
    def __init__(self, N, M):
        super(Module_2, self).__init__()
        self.linear = torch.nn.Linear(N, M)
        self.sub_module = torch.jit.trace(Module_0(2, 3).eval(), (torch.zeros(3, 2)))

    def forward(self, input: torch.Tensor, do_linear: bool) -> torch.Tensor:
        output = self.sub_module(input)
        if do_linear:
            output = self.linear(output)
        return output


scripted_module = torch.jit.script(Module_2(2, 3).eval())

libtorch

代码语言:javascript复制
#include <torch/script.h>

int main() {
  // load module
  torch::jit::script::Module torch_module;
  try {
    torch_module = torch::jit::load("my_module.pt");
  } catch (const c10::Error& e) {
    std::cerr << "error loading the module" << std::endl;
    return -1;
  }

  // make inputs
  std::vector<float> vec(9);
  std::vector<torch::jit::IValue> torch_inputs;
  torch::Tensor torch_tensor =
      torch::from_blob(vec.data(), {3, 3}, torch::kFloat32);
  torch_inputs.emplace_back(torch_tensor);
  torch_inputs.emplace_back(false);

  // run module
  torch::jit::IValue torch_outputs;
  try {
    torch_outputs = torch_module.forward(torch_inputs);
  } catch (const c10::Error& e) {
    std::cerr << "error running the module" << std::endl;
    return -1;
  }

  auto outputs_tensor = torch_outputs.toTensor();
}

语法限制

  • 支持的类型有限,这些类型是指在运行(而非初始化)过程中使用的对象或者函数参数
    • A PyTorch tensor of any dtype, dimension, or backend
    • 这其中不包括 set 数据类型,这意味着需要使用 set 的地方就要通过其他的方式绕过,比如先用 list 然后去重
    • 使用 tuple 时需要声明其中的类型,例如 Tuple[int, int, int],这也就意味着 tuple 在运行时长度不能变化,所以要使用 list 代替
    • 创建字典时,只有 int、float、comple、string、torch.Tensor 可以作为 key
  • 不支持 lambda 函数,但是可以通过自定义排序类的方式实现,略微麻烦,但是可以解决
  • 因为 TorchScript 是静态类型语言,运行时不能变换变量类型
  • 因为编码问题,所以对中文字符串进行遍历时会抛异常,所以尽量不要处理中文,如果需要处理中文,则需要将中文切分成字符粒度后再送入模型中进行处理

部署实战

  1. 首先参考yolov5模型,导出时候模型分为2个部分,一个用trace跟踪的traced_script_module(不包括最后一层),一个是最后检测层self.model.model[-1]
代码语言:javascript复制
model = TracedModel(model, device, opt.img_size)
 
class TracedModel(nn.Module):
 
 def __init__(self, model=None, device=None, img_size=(640,640)): 
  super(TracedModel, self).__init__()
  # model:导入的模型
  # device: cpu、gpu
  # img_size: 输入图像大小
  print(" Convert model to Traced-model... ") 
  self.stride = model.stride # 8., 16., 32
  self.names = model.names # 每个类别的标签名
  self.model = model
 
  self.model = revert_sync_batchnorm(self.model)
  self.model.to('cpu')
  self.model.eval() # 切换为 eval 模式,不计算梯度
 
  self.detect_layer = self.model.model[-1] # 得到最后的检测层
  self.model.traced = True # False 修改为 True
  # 随机制造一个 bs=1 输入 tensor
  rand_example = torch.rand(1, 3, img_size, img_size)
 
  traced_script_module = torch.jit.trace(self.model, rand_example, strict=False)
  #traced_script_module = torch.jit.script(self.model)
  traced_script_module.save("traced_model.pt")
  print(" traced_script_module saved! ")
  self.model = traced_script_module
  self.model.to(device)
  self.detect_layer.to(device)
  print(" model is traced! n") 
 
 def forward(self, x, augment=False, profile=False):
  out = self.model(x)
  out = self.detect_layer(out)
  return out
  1. 最好将检测层一起导出,model.model[-1].export = True
代码语言:javascript复制
model.model[-1].export = True
img = torch.zeros(opt.batch_size, 3, *opt.img_size).to(device) 
ts = torch.jit.trace(model, img, strict=False)
ts.save(f)
  1. 成功导出torchscript格式后,可用Netro打开,即可验证是否成功

image-20231117175744016

  1. yolov5输出return x if self.training else torch.cat(z, 1)推理输出和训练不同
代码语言:javascript复制
class IDetect(nn.Module):
    stride = None  # strides computed during build
    export = False  # onnx export

    def __init__(self, nc=80, anchors=(), ch=()):  # detection layer
        super(IDetect, self).__init__()
        self.nc = nc  # number of classes
        self.no = nc   5  # number of outputs per anchor
        self.nl = len(anchors)  # number of detection layers
        self.na = len(anchors[0]) // 2  # number of anchors
        self.grid = [torch.zeros(1)] * self.nl  # init grid
        a = torch.tensor(anchors).float().view(self.nl, -1, 2)
        self.register_buffer('anchors', a)  # shape(nl,na,2)
        self.register_buffer('anchor_grid', a.clone().view(self.nl, 1, -1, 1, 1, 2))  # shape(nl,1,na,1,1,2)
        self.m = nn.ModuleList(nn.Conv2d(x, self.no * self.na, 1) for x in ch)  # output conv
        
        self.ia = nn.ModuleList(ImplicitA(x) for x in ch)
        self.im = nn.ModuleList(ImplicitM(self.no * self.na) for _ in ch)

    def forward(self, x):
        # x = x.copy()  # for profiling
        z = []  # inference output
        self.training |= self.export
        for i in range(self.nl):
            x[i] = self.m[i](self.ia[i](x[i]))  # conv
            x[i] = self.im[i](x[i])
            bs, _, ny, nx = x[i].shape  # x(bs,255,20,20) to x(bs,3,20,20,85)
            # 转ONNX时修改,避免scatterND结点
            # x[i] = x[i].view(bs, self.na, self.no, ny, nx).permute(0, 1, 3, 4, 2).contiguous()
            #替换为:
            x[i] = x[i].view(-1, self.na, self.no, ny, nx).permute(0, 1, 3, 4, 2).contiguous()

            if not self.training:  # inference
                if self.grid[i].shape[2:4] != x[i].shape[2:4]:
                    self.grid[i] = self._make_grid(nx, ny).to(x[i].device)
                # 转ONNX时修改,避免scatterND结点
                y = x[i].sigmoid()
                # y[..., 0:2] = (y[..., 0:2] * 2. - 0.5   self.grid[i]) * self.stride[i]  # xy
                # y[..., 2:4] = (y[..., 2:4] * 2) ** 2 * self.anchor_grid[i]  # wh
                # z.append(y.view(bs, -1, self.no))
                # 替换为:
                xy = (y[..., 0:2] * 2. - 0.5   self.grid[i]) * self.stride[i]
                wh = (y[..., 2:4] * 2) ** 2 * self.anchor_grid[i].view(1,self.na,1,1,2)
                y = torch.cat((xy,wh,y[..., 4:]),-1)
                z.append(y.view(-1,int(y.size(1) * y.size(2) * y.size(3)) , self.no))
        # 替换掉不必要的return
        # return x if self.training else (torch.cat(z, 1), x)
        #替换为:
        return x if self.training else torch.cat(z, 1)


    @staticmethod
    def _make_grid(nx=20, ny=20):
        yv, xv = torch.meshgrid([torch.arange(ny), torch.arange(nx)])
        return torch.stack((xv, yv), 2).view((1, 1, ny, nx, 2)).float()
  1. Detect层是YOLOv5最后一层,包含三个输出,分别是下降stride(见stribe属性,8,16,32)倍的网格。

25200=(80∗80 40∗40 20∗20)∗3 按照stride来划分,以640 × 640像素图像为例,stride分别是8,16,32 640 / 8 = 80,这层网格大小是80 × 80 640 / 16 = 40,这层网格大小是40 × 40 640 / 32 = 20,这层网格大小是20 × 20

  1. 后处理代码
代码语言:javascript复制
def numpy_sigmoid(x):
    return 1/(1 np.exp(-x))

def make_grid(nx=20, ny=20):
    xv,yv = np.meshgrid(np.arange(nx), np.arange(ny))
    res = np.stack((xv,yv), 2).reshape(1,1,nx,ny,2).astype(np.float32)
    return res
  
def numpy_detect(x, nc=None, bs=1):
    anchor_grid = [10.0, 13.0, 16.0, 30.0, 33.0, 23.0, 30.0, 61.0, 62.0, 45.0, 59.0, 119.0, 116.0, 90.0, 156.0, 198.0, 373.0, 326.0]
    anchor_grid = np.array(anchor_grid).reshape(3,1,-1,1,1,2)
    stride = np.array([8, 16, 32])
    grid = [make_grid(80,80), make_grid(40,40), make_grid(20,20)]
    z = []
    for i in range(3):
        y = numpy_sigmoid(x[i])
        y[..., 0:2] = (y[..., 0:2] * 2. - 0.5   grid[i]) * stride[i]  # xy
        y[..., 2:4] = (y[..., 2:4] * 2) ** 2 * anchor_grid[i]  # wh
        z.append(y.reshape(bs, -1, nc   5))
    res = np.concatenate(z, 1)
    return res
代码语言:javascript复制
model_script = torch.jit.load('torchscript.pt')
model_script = model_script.to('cuda')
for file in files:
 img_path = os.path.join(test_path,file)
    img0 = cv2.imread(img_path) 
    img = letterbox(img0,new_shape=(640, 640), color=(114, 114, 114), auto=False, scaleFill=False, scaleup=True, stride=32)[0]
    print(f'[img.shape] {img.shape}')
    # 640 448
    # # Convert
    img = img[:, :, ::-1].transpose(2, 0, 1)  # BGR to RGB, to 3x416x416
    img = np.ascontiguousarray(img)

    img = torch.from_numpy(img).to(device)
    img = img.half() if half else img.float()  # uint8 to fp16/32
    img /= 255.0  # 0 - 255 to 0.0 - 1.0
    if img.ndimension() == 3:
        img = img.unsqueeze(0)
 
    # ! export torchscript
    pred = model_script(img) 
    pred = [x.data.cpu().numpy() for x in pred]
    pred = numpy_detect(pred, 11)
    pred = torch.tensor(pred).to('cuda')
    
    # Apply NMS
    # (center x, center y, width, height) 
    pred = non_max_suppression(pred, conf_thres=conf_thres, iou_thres=iou_thres, classes=None, agnostic=False)
  • 注意 letterbox 中 auto=False 保持固定尺寸

结果演示

  • 以上仅以yolov5为例,实际并非yolov5

0 人点赞