上一篇 关于spark 和ray整合的文章在这:
祝威廉:Spark整合Ray思路漫谈
另外还讲了讲Spark 和Ray 的对比:
祝威廉:从MR到Spark再到Ray,谈分布式编程的发展
现在我们来思考一个比较好的部署模式,架构图大概类似这样:
首先,大家可以理解为k8s已经解决一切了,我们spark,ray都跑在K8s上。但是,如果我们希望一个spark 是实例多进程跑的时候,我们并不希望是像传统的那种方式,所有的节点都跑在K8s上,而是将executor部分放到yarn cluster. 在我们的架构里,spark driver 是一个应用,我们可以启动多个pod从而获得多个spark driver实例,对外提供负载均衡,roll upgrade/restart 等功能。也就是k8s应该是面向应用的。但是复杂的计算,我们依然希望留给Yarn,尤其是还涉及到数据本地性,然计算和存储放到一起(yarn和HDFS通常是在一起的),避免k8s和HDFS有大量数据交换。
因为Yarn对Java/Scala友好,但是对Python并不友好,尤其是在yarn里涉及到Python环境问题会非常难搞(主要是Yarn对docker的支持还是不够优秀,对GPU支持也不好),而机器学习其实一定重度依赖Python以及非常复杂的本地库以及Python环境,并且对资源调度也有比较高的依赖,因为算法是很消耗机器资源的,必须也有资源池,所以我们希望机器学习部分能跑在K8s里。但是我们希望整个数据处理和训练过程是一体的,算法的同学应该无法感知到k8s/yarn的区别。为了达到这个目标,用户依然使用pyspark来完成计算,然后在pyspark里使用ray的API做模型训练和预测,数据处理部分自动在yarn中完成,而模型训练部分则自动被分发到k8s中完成。并且因为ray自身的优势,算法可以很好的控制自己需要的资源,比如这次训练需要多少GPU/CPU/内存,支持所有的算法库,在做到对算法最少干扰的情况下,然算法的同学们有最好的资源调度可以用。
下面展示一段MLSQL代码片段展示如何利用上面的架构:
代码语言:javascript复制-- python 训练模型的代码
set py_train='''
import ray
ray.init()
@ray.remote(num_cpus=2, num_gpus=1)
def f(x):
return x * x
futures = [f.remote(i) for i in range(4)]
print(ray.get(futures))
''';
load script.`py_train` as py_train;
-- 设置需要的python环境描述
set py_env='''
''';
load script.`py_env` as py_env;
-- 加载hive的表
load hive.`db1.table1` as table1;
-- 对Hive做处理,比如做一些特征工程
select features,label from table1 as data;
-- 提交Python代码到Ray里,此时是运行在k8s里的
train data as PythonAlg.`/tmp/tf/model`
where scripts="py_train"
and entryPoint="py_train"
and condaFile="py_env"
and keepVersion="true"
and fitParam.0.fileFormat="json" -- 还可以是parquet
and `fitParam.0.psNum`="1";
下面是PySpark的示例代码:
代码语言:javascript复制from pyspark.ml.linalg import Vectors, SparseVector
from pyspark.sql import SparkSession
import logging
import ray
from pyspark.sql.types import StructField, StructType, BinaryType, StringType, ArrayType, ByteType
from sklearn.naive_bayes import GaussianNB
import os
from sklearn.externals import joblib
import pickle
import scipy.sparse as sp
from sklearn.svm import SVC
import io
import codecs
os.environ["PYSPARK_PYTHON"] = "/Users/allwefantasy/deepavlovpy3/bin/python3"
logger = logging.getLogger(__name__)
base_dir = "/Users/allwefantasy/CSDNWorkSpace/spark-deep-learning_latest"
spark = SparkSession.builder.master("local[*]").appName("example").getOrCreate()
data = spark.read.format("libsvm").load(base_dir "/data/mllib/sample_libsvm_data.txt")
## 广播数据
dataBr = spark.sparkContext.broadcast(data.collect())
## 训练模型 这部分代码会在spark executor里的python worker执行
def train(row):
import ray
ray.init()
train_data_id = ray.put(dataBr.value)
## 这个函数的python代码会在K8s里的Ray里执行
@ray.remote
def ray_train(x):
X = []
y = []
for i in ray.get(train_data_id):
X.append(i["features"])
y.append(i["label"])
if row["model"] == "SVC":
gnb = GaussianNB()
model = gnb.fit(X, y)
# 为什么还需要encode一下?
pickled = codecs.encode(pickle.dumps(model), "base64").decode()
return [row["model"], pickled]
if row["model"] == "BAYES":
svc = SVC()
model = svc.fit(X, y)
pickled = codecs.encode(pickle.dumps(model), "base64").decode()
return [row["model"], pickled]
result = ray_train.remote(row)
ray.get(result)
##训练模型 将模型结果保存到HDFS上
rdd = spark.createDataFrame([["SVC"], ["BAYES"]], ["model"]).rdd.map(train)
spark.createDataFrame(rdd, schema=StructType([StructField(name="modelType", dataType=StringType()),
StructField(name="modelBinary", dataType=StringType())])).write.
format("parquet").
mode("overwrite").save("/tmp/wow")
这是一个标准的Python程序,只是使用了pyspark/ray的API,我们就完成了上面所有的工作,同时训练两个模型,并且数据处理的工作在spark中,模型训练的在ray中。
完美结合!最重要的是解决了资源管理的问题!