腾讯太极机器学习平台|Light在广告粗排中的数据下载与解析优化

2021-08-04 10:40:12 浏览数 (1)

概述

广告粗排训练是一个小模型、低延时的业务场景。在此场景下,我们基于于云帆Oteam中的Light通用训练加速框架,根据广告粗排训练特性定制化地构建了GPU上同步分布式的模式进行数据并行的训练模式,将存储在HDFS上的训练数据,读取到本地,然后输入到模型中,进行前向计算。该训练方式不存在PS,每个worker上有全量的参数。Light框架下的各个worker前向计算获得梯度后,使用LightCC进行梯度规约通信,并将获得的梯度更新到本地的参数上。我们在上述训练方式下,进行了系统瓶颈分析和性能优化。本系列文章对在系统中所作的部分优化进行了总结。双塔结构是广告推荐场景中采用的一种典型模型结构,如图1:

图1.特征从tower1(例如user tower)和tower2(例如item tower)分别输入模型。离散的特征做embedding后,从获得每个特征各自对应的权重。一个特征可能存在多个embedding。不同特征通过embedding lookup得到的权重值经过concat/stack后reduction然后合并,输入到Dense的部分,然后计算loss。

在本文中,我们首先简单阐述训练地数据内容和模型结构,然后讨论训练过程中存在和的瓶颈,最后根据分析内容阐述我们的优化方案和效果。

数据下载与Parsing

广告粗排的数据以文件的形式存储在HDFS的集群上。文件以TFRecord的二进制格式进行存储(protobuf),每个样本中的内容按照特征来划分。在广告粗排中,主要涉及int64和string类型的Dense/Sparse特征。当训练程序使用数据时,需要从远端将文件读到本地,并将这些二进制格式的特征解为对应的Tensor或SparseTensor。

这部分的耗时主要可以分为2个部分:文件下载和数据处理。文件下载指将存在远端HDFS中的数据读到本地,数据处理包括数据的解压、反序列化、组装Tensor、按照特征分发Tensor。

性能瓶颈分析

  • 数据下载

在Baseline的实现中,数据从远端HDFS下载到本地。在Baseline实现中,使用独立的下载进程将训练数据下载到本地。当一部分数据下载完成后,下载进程将这部分数据标记为“已完成”。训练进程检测到某个目录下的文件下载完成后,从本地磁盘读文件,然后放入内存。

图2. 数据下载Pipeline (base),下载进程的Fetcher从远端HDFS下载文件到tmp buffer(disk),当下载完成后,将tmp buffer标记为working buffer(disk)。对于训练进程,Reader检测working buffer,如果存在ready的working buffer,则认为训练数据以就位,开始消耗working buffer。

假设下载速度和文件消耗速度稳定,那么当两者相等时,下载的耗时能完美地被隐藏。但是,在实际的业务场景中,由于模型结构变化,数据文件大小的变化,HDFS集群的负载状态变化等原因,上述条件基本无法满足。因此,我们可以发现,在训练过程中,经常出现训练进程等待数据下载的情况发生。上述过程涉及网络IO,磁盘IO,Pipeline链条较长,可能出现的瓶颈点也越多,消耗时间的过程也越多。

性能优化

整体上,我们选择使用tf.data下的API组合来实现高性能的下载和样本解析。tf.data提供了一组相关的API,我们要做的,是根据实际情况来选择API的组合顺序。

并行下载与下载Buffer
Baseline的实现选择调用tensorflow下的gfiles.Copy  API来从远端下载文件到本地。我们使用如下方式来对dataset pipeline做优化:
代码语言:javascript复制
  def inputbuffer_fn(x):      return dataset_type(x,                          compression_type=compression_type,                          buffer_size=tfrecord_buffer_size)
  dataset = input_slices.apply(tf.contrib.data.parallel_interleave(      lambda x: inputbuffer_fn(x),      cycle_length=num_read_threads,      sloppy=True))

其中,input_slices是输入的文件列表。

利用tf.contrib.data.parallel_interleave,我们构建了一个多线程的下载实例。每个线程从远端HDFS直接读tfrecord_buffer_size字节的样本到本地内存。这个buffer能够起到缓冲网络波动的能力。我们可以根据实际机型和网络情况,调整并行度cycle_length和buffer的数值,来适应实际的训练消耗样本的速度。

图3.数据下载Pipeline (optimized) 单个线程下载数据的过程

如图3,我们舍弃了将数据先落入本地磁盘的过程,减短了数据下载的Pipeline落入磁盘和文件标记的过程,减少了磁盘IO的操作。另外,由于加入了网络buffer,即使当HDFS负载或网络情况出现波动时,也能够在一定程度上避免训练进程收到的影响。

当我们使用并行下载时,使用如下实现方式:   

图4.并行下载工作方式

当开启并行下载时,多个worker线程被开启,各自独立地选择一部分文件,如果当前download buffer无足量样本,则从远端HDFS下载样本到download buffer(内存)。若download buffer中有足量样本,填满zi(zlib input) buffer。zlib通过解压操作inflate,获得解压后的数据到zo(zlib output) buffer。master线程选择某个worker,并从中读取一条样本。这里我们设置了sloppy,使当某些worker因为网络原因卡住时,从该worker跳过,避免因部分worker卡住而导致数据消耗过程停顿的情况发生。

经过优化后,我们获得了约58%的QPS提升。

线程队列Buffer

在使用如图4的并行下载方式后,我们仍然观察到当训练正常运行时,QPS仍然存在周期性的起伏波动。我们对该现象进行了观察,发现该现象在数据消耗速度较慢的模型上,具有更为明显的起伏现象。于是可以合理猜测,该现象与模型的计算时间,即样本的消耗速度挂钩。

那么为什么会出现这种现象呢?对图4中worker中的状态进行观察,发现当使用较复杂模型时,大量的worker工作处于停滞状态。经过分析,发现原因是worker线程将download buffer读满后,停止下载,然后进行数据解压,填满zlib输出buffer,生成record。直到master把worker buffer的样本消耗空时,才重新从远端HDFS读取样本将download buffer填满。当模型计算速度相对较快时,worker供给样本的速度跟不上数据消耗的速度,当大部分worker没有准备好样本时,master在所有worker输出结果中轮询空转,意味着此时大量worker在做下载,没有产生数据。而当master在某个worker输出结果中终于访问到样本时,这些worker线程才结束上一轮的下载,准备好了一批样本供给master。master在一段时间内,worker有足量的样本,大量worker停止下载。因此,我们才会观察到下载QPS周期性涨落的现象。了解原因后,可以做如下优化:

代码语言:javascript复制
def inputbuffer_fn(x):    return dataset_type(x,                        compression_type=compression_type,                        buffer_size=tfrecord_buffer_size)
def prefetch_inputbuffer_fn(x):    return inputbuffer_fn(x).prefetch(num_examples_twig_prefetch)
dataset = input_slices.apply(tf.contrib.data.parallel_interleave(    lambda x: prefetch_inputbuffer_fn(x) if num_examples_twig_prefetch else inputbuffer_fn(x),    cycle_length=num_read_threads,    sloppy=True))

当使用prefetch_inputbuffer_fn时,我们在每个worker thread中外挂了一个prefetch thread。这个prefetch thread和worker thread互为Ping-Pong,在一定程度上避免了,在worker线程内,下载样本和消费buffer的串行问题。

图5.队列线程buffer

在worker thread上再开了一个prefetch thread。master从prefetch thread中读样本,避免了因为worker thread读远端数据和解压输出串行关系,导致的因为master来不及消耗的部分worker thread等待问题。

该方法的实际效果和模型复杂程度、机器型号和网络情况、HDFS负载情况等因素有关。经过对多种情况进行对比,该方法在部分广告模型上,能获得约0% ~ 23%不等的性能提升。在有性能需求时,可以考虑开启使用。

CPU效率优化

广告推荐是一个典型的CPU bound场景。当使用GPU做训练时,通常可以看到CPU利用率非常高,但GPU利用率相对较低。导致该现象的原因主要有以下几点:

1. 模型结构较为“矮胖”,数据预处理的部分占整体训练耗时的比例相对较大。

2. 特征大量使用字符串来表示,由于在GPU对string的计算支持不友好,因此需要将string特征转变为GPU能够处理的表达形式,这里需要消耗大量CPU资源。

3. 业务使用的一些算子在Tensorflow中缺乏GPU版本。

4. 单机内的CPU核心数相对有限。

整数转字符串优化 字符串广告粗排业务中,特征的主要表达形式。整形特征也先转成采用字符串特征,然后再做Hash。这部分消耗大量CPU计算。

代码语言:javascript复制
void Appendv(string* dst, const char* format, va_list ap) {  ...
  // Increase the buffer size to the size requested by vsnprintf,  // plus one for the closing .  int length = result   1;  char* buf = new char[length];
  // Restore the va_list before we use it again  va_copy(backup_ap, ap);  result = vsnprintf(buf, length, format, backup_ap);  va_end(backup_ap);
  if (result >= 0 && result < length) {    // It fit    dst->append(buf, result);  }  delete[] buf;}

TensorFlow AsString算子的实现围绕vsnprintf展开,这里主要用到了int64转string的计算。经过调研,我们选了使用更高性能的fmt来实现同样功能:

代码语言:javascript复制
   ...
    auto input_flat = input_tensor->flat<T>();    auto output_flat = output_tensor->flat<string>();
    for (int i=0; i<output_flat.size();   i) {      output_flat(i) = fmt::to_string<T>(input_flat(i));    }    ...

使得单个算子性能提升到了约200%。在图7中可以看到,替换fmt的实现后,CPU部分算子耗时减少到了100 ms的范围内。QPS得到约10%的性能提升。

解压优化

在从远端HDFS下载完成部分数据后,由于样本经过了gzip压缩,所以需要对应地将数据内容解压出来。Tensorflow中使用zlib inflate来完成这一过程。替换czlib,能够将这一过程的耗时降低40%。czlib和zlib具有相同的API,只需将tensorflow的zlib依赖替换为czlib即可。但因为数据下载部分通常被训练耗时掩盖,所以这部分的优化效果在整体QPS上并不明显。

CPU资源扩展

针对广告粗排CPU bound的情况,我们使用具有更多CPU核心数的机型。并测试了整机上平均每卡的训练性能,如图6:

不同机型训练平均单卡QPS,CPU利用率,GPU利用率。数字编号越高CPU核心数越多。

当使用拥有更多CPU核心数的机型进行训练时,线程间CPU抢占的情况有所缓解。下期为你解读:《腾讯太极机器学习平台|Light在广告粗排中的特征与Embedding优化》

近期热文推荐

关于我们


本期编辑:carol/suzie

技术分享:关注微信公众号 【鹅厂架构师】【腾讯架构师】

0 人点赞