作者 | Antti Puurula
来源 | Medium
编辑 | 代码医生团队
走向分布式人工智能
在过去的几年里,Python已成为数据科学和人工智能的通用语言,所有使用Python作为主要界面语言的着名深度学习框架(Keras,Pytorch,MXNet)。与竞争语言相比,Python在DS和AI的几乎每个方面都可以与之竞争或超越:最新的机器学习算法及其高效实现(Scikit-Learn,LightGBM,XGBoost),数据处理和分析(Pandas,cuDF),高效的数值计算库(Numpy) ,PyPy,Numba),GPU计算(CuPY)和Web API编程(Flask,Celery)。
由于Global Interpreter Lock(GIL)作为其核心设计的一部分,Python的致命弱点是并行多线程和多进程工作负载的弱点。这已经在Python阵营中产生了解决方案解决方案,以及更加强调并行性的替代语言,例如GoLang。硬件正在进行的军备竞赛期间加速了对并行性的需求:消费者CPU在短短几年内从4核心变为32核心(AMD 2990WX),而价格合理的云计算节点现在每个都提供224个核心(亚马逊u-6tb1.metal)。
对于AI而言,对并行性的需求不仅适用于单个工作站或计算节点,而且适用于编排分布在可能数千个计算节点上的AI处理流水线。与CPU内核的变化类似,本地和云使用的网络传输速度已从1 Gb / s变为商用10-100 Gb / s连接。直到最近,大部分此类大数据技术都基于Hadoop等Java框架,但软件和硬件的变化带来了新的解决方案类型,包括用于AI的三个主要Python分布式处理框架:PySpark,Dask和射线。
分布式批处理框架
Apache Spark及其Python接口PySpark是最古老的框架,最初的GitHub版本可追溯到2010年10月4日.Spark将自己定位为主要的大数据技术之一,在企业界得到广泛采用。它提供了Map-Reduce编程范例的扩展,通过将较大的任务映射到分发给工作人员的一组小批量(Map)来解决批处理任务,并在每个小批量完成后组合结果(Reduce) 。Spark处理Map的定向非循环图(DAG)减少计算管道,在整个DAG处理过程中保持数据在工作人员之间的分布。任务图在功能上定义,并且在优化DAG计算顺序之后懒惰地执行任务。
Dask和Ray都基于Spark的DAG并发功能评估的核心思想,数据在整个过程中保持分布。Dask及其调度程序后端Distributed是一个更新的框架,2015年1月29日使用原始的GitHub版本。虽然Spark是为Java和Scala编写的,但Dask是为Python编写的,并提供了一组丰富的分布式类。Dask还提供了更丰富的低级API,支持对AI模型的分布式培训至关重要的actor类。
Ray是最新的框架,最初的GitHub版本日期为2017年5月21日。与Dask一样,Ray拥有Python优先API和对actor的支持。它有几个高性能优化,使其更高效。与Spark和Dask不同,任务在每个节点内急切执行,因此每个工作进程在收到所需数据后立即启动。工作节点中的数据使用Apache Arrow对象存储,这些对象在节点上工作的所有进程之间提供零对象共享。工作节点具有自己的本地调度程序,进一步减少了全局调度程序的开销。
Wordbatch
这三个框架在其调度程序引擎的设计和实现方面差别很大:序列化,传输,调度,配置需求,内存需求等。对于给定的复杂任务,很难(如果不是不可能)说哪个引擎能够工作得最好。对于某些任务,特定框架根本不起作用。Spark缺乏演员,使模型的大规模培训复杂化。Dask不会序列化复杂的依赖项。Ray结果存储不能存储一些非常基本的Python对象,例如collections.Counter。因此,无论是性能还是可行性,测试给定任务的每个框架都是有用的,并选择一个有效的框架。
Wordbatch库v.1.4使用可交换的调度程序后端对管道进行批处理。它的orchestrator类Batcher保留对后端句柄的引用,并处理任务到小批量的映射并减少结果。调度程序后端是可交换的,因此如果一个后端无法处理处理任务,则只需更换Batcher对象的后端和backend_handle属性即可交换任何其他后端。它支持本地(串行,线程,多处理,Loky)和分布式后端(Spark,Dask,Ray)。类似地调用分布式框架,在可能的情况下将数据分布在整个管道中。
Wordbatch还附带了一组管道和类,它们为基于文本的机器学习提供了一整套工具,并且可以作为模板在其他域中进行处理。如果需要,Wordbatch类可以独立调用Batcher上的Map-Reduce操作,并支持整个管道中的分布式存储,以及使用fit_partial() - 方法进行流处理。
基准设置
可以使用Wordbatch作为中立基准来测试三个分布式框架,以及非分布式后端作为基线。为了简化比较,将在两个硬件设置下使用两个基本流水线。这两项任务最多使用来自TripAdvisor评论数据集1.28M评论。
http://times.cs.uiuc.edu/~wang296/Data/
完整版的基准脚本可在github上找到。对于这两个任务,测试脚本初始化Batcher,如下所示:
https://github.com/anttttti/Wordbatch/blob/master/scripts/backends_benchmark.py
代码语言:javascript复制from wordbatch.batcher import Batcher
batcher = Batcher(procs=16, minibatch_size=5000, backend=backend[0], backend_handle=backend[1])
这里“procs”是使用的进程数,“minibatch_size”是每个小批处理中要处理的数据行数,“backend”是后端的名称,“backend_handle”给出了Batcher的API句柄通信。
第一个管道ApplyBatch在每个小批量评论上运行Scikit-learn HashingVectorizer,并返回简化的散列特征稀疏矩阵。
代码语言:javascript复制from sklearn.feature_extraction.text import HashingVectorizer
from wordbatch.pipelines import ApplyBatch
hv = HashingVectorizer(decode_error=’ignore’, n_features=2 ** 25, preprocessor=normalize_text, ngram_range=(1, 2), norm=’l2')
output = ApplyBatch(hv.transform,batcher=batcher).transform(texts_chunk)
第二个管道WordBatch是一个全文处理管道,执行1)文本规范化,2)拼写校正和词干化,3)字典计数,4)词袋特征提取和TF-IDF加权的连续步骤。拼写校正和字典计数步骤都执行自己的Map-Reduce操作来计算字频表,拼写校正和特征提取步骤需要向每个工作人员发送字典。
代码语言:javascript复制from wordbatch.pipelines import WordBatch
from wordbatch.extractors import WordBag
from wordbatch.transformers import Tokenizer, Dictionary
wb = WordBatch(normalize_text=normalize_text,
dictionary=Dictionary(min_df=10, max_words=1000000),
tokenizer=Tokenizer(spellcor_count=2, spellcor_dist=2, raw_min_df= 2, stemmer=stemmer),
extractor=WordBag(hash_ngrams=0, norm= ‘l2’, tf= ‘binary’, idf= 50.0),
batcher=batcher)
output = wb.fit_transform(texts_chunk)
第一个硬件设置使用单个i9-9900K CPU,8核和64GB DDR4 RAM,可以处理所有覆盖的测试。第二个设置使用直接10 Gb / s以太网连接将另一个工作节点与18核i9-7980XE CPU连接。使用的操作系统是Ubuntu 18.04.2 LTS,库版本是pyspark 2.4.1,ray 0.7.0和分布式1.28.1。
结果
基准测试1.在单个节点上分发Scikit-Learn HashingVectorizer
对于在单个节点上并行化HashingVectorizer的简单任务,与运行单个串行进程相比,所有并行框架都获得了大致线性的加速。对于最大的1.28M文档,串行需要256秒,而多处理需要36秒。有趣的是,Ray实际上比多处理更快,需要33秒,而Spark需要50秒。
基准测试2.在单个节点上分发WordBatch特征提取管道
使用WordBatch管道的更复杂的任务显示出令人惊讶的结果。Spark,Ray和多处理再次显示线性加速,随着数据的增加保持不变,但Loky和Dask都无法并行化任务。相比于为1.28M文档连续拍摄460s,Ray在91s中再次以最快的速度完成。Loky和Dask都有越来越多的时间使用,大致在同一时间使用串行收敛,但随着数据量的增加,可能会超过串行时间使用。这种奇怪行为的可能原因是流程之间缺乏共享以及此任务需要两次向每个工作人员发送字典。字典随着数据的增加而变得越来越大,并且不能有效共享辅助数据的开销超出了并行化的好处。这是一个令人惊讶的结果,
基准测试3.使用其他节点分发HashingVectorizer
继续使用超过10 Gb / s的额外18个内核进行第二个硬件设置,所有三个分布均受益于附加节点。但是,由于更大的内存要求和接近配置的内存限制,Spark在最大的1.28M文档任务中遇到了麻烦。实际上,Spark需要对其组件进行大量配置,这对其用户来说是一种挫败感。最多,附加节点为Spark提供22%的加速。Dask和Ray的表现要好得多,Dask的加速率为32%,Ray的加速率为41%,为1.28M。与单节点相比的加速比也随着数据大小而增加,并且在最大测试尺寸下似乎没有接近饱和。
基准测试4.使用附加节点分发WordBatch管道
使用附加节点测试WordBatch管道,发现Dask不会获得太多收益。当使用额外的节点时,它有效处理辅助数据的问题似乎更加复杂,因此在最大的1.28M文档条件下,只能从457s加速到420s,并且随着任务的增加,加速不断降低。Spark和Ray都可以在此任务中更好地使用附加节点,Spark的最大加速比为38%,Ray的最大加速比为28%,文档为0.64M。由于更好地使用附加节点,具有附加节点的Spark几乎与Ray相同,并且可以通过更大的数据大小和更复杂的处理流水线来完成。
结论性思考
这些基本基准测试演示了分布式调度程序的一些主要属性。所有调度程序对于分发Python工作负载都很有用,但有些不适合每个任务。实际应用程序将涉及大型集群上更复杂的管道,但这会使直接比较变得复杂,原因在于:配置调度程序的选择,关于如何实现共享数据的设计决策以及诸如演员之类的远程类,以及如何使用GPU和其他非CPU处理器。
作为初步结论,Ray似乎是最有希望的框架。它比单个节点上的Python标准多处理工作速度快10%左右,并且在所有条件下都能很好地使用附加节点。与Spark不同,集群配置非常少,并且它支持actor。与Dask不同,它可以很好地序列化嵌套的Python对象依赖项,并有效地在进程之间共享数据,线性地扩展复杂的管道。与单个串行进程相比,具有附加节点的Ray提供12.9x加速分配HashingVectorizer,并且在更复杂的任务上提供6.7倍加速。
可用硬件也会对调度程序的性能产生很大影响。如果此处使用1 Gb / s连接,则附加节点几乎没有优势。10 Gb / s上的100 Gb / s将增加额外节点的好处,并改变测试后端之间的结果。与Ray相比,Dask特别会从100 Gb / s中受益更多。如果像Spark使用Hadoop那样从分布式存储中提取数据,这将在一定程度上降低高带宽网络的依赖性。但是,大多数实际流水线都会进行需要高带宽的数据传输。对于更多节点,这些框架应该都使用100 Gb / s,并且应该规划AI管道以最小化网络流量并最大化分布式核心的使用。
链接
apache / spark
https://github.com/apache/spark
Apache Spark。通过在GitHub上创建一个帐户,为apache / spark开发做出贡献。
dask / dask
https://github.com/dask/dask
具有任务调度的并行计算。通过在GitHub上创建一个帐户来为dask / dask开发做贡献。
ray-project / ray
https://github.com/ray-project/ray
用于构建和运行分布式应用程序的快速而简单的框架。
用于分布式AI处理流水线的
https://github.com/anttttti/Wordbatch
anttttti / Wordbatch
Python库,使用可交换的调度程序后端。
与Spark的比较 - Dask 1.2.2文档
http://docs.dask.org/en/stable/spark.html
它们都可以部署在相同的集群上。
许多不同的分布式系统
与dask的比较·问题#642·ray-project / ray
https://github.com/ray-project/ray/issues/642
ray看起来像一个有趣的项目!