1.文档编写目的
在开发Pyspark代码时,经常会用到Python的依赖包。在PySpark的分布式运行的环境下,要确保所有节点均存在我们用到的Packages,本篇文章主要介绍如何将我们需要的Package依赖包加载到我们的运行环境中,而非将全量的Package包加载到Pyspark运行环境中,本篇文章以xgboost1.0.2包为例来介绍。
- 测试环境:
1.Redhat7.6
2.CDH5.16.2
3.使用root用户操作
2.环境检查
1.确保集群所有节点已安装了相同的Python版本,测试环境使用了Anaconda来部署统一的Python环境。
2.找一个任意OS节点装上Python3.6.4 版本,用来准备提取依赖包
配置pip使用国内的Python源
代码语言:javascript复制[root@cdh02 ~]# cat /etc/pip.conf
[global]
index-url = https://mirrors.aliyun.com/pypi/simple/
[install]
user = true
trusted-host=mirrors.aliyun.com
3.在上一步的节点上安装xgboost1.0.2依赖包
代码语言:javascript复制/opt/cloudera/parcels/Anaconda-5.1.0.1/bin/pip install xgboost==1.0.2
xgboost包安装成功后默认在/root/.local/lib/python3.7/site-packages目录下
验证xgboost包是否安装安装成功
4.将安装成功的xgboost包打包成zip并上传到hdfs目录
代码语言:javascript复制cd /root/.local/lib/python3.6/site-packages/
zip -r xgb.zip xgboost
将准备好的xgb.zip包上传到hdfs的/python/dependency/目录下
代码语言:javascript复制hadoop fs -mkdir -p /python/dependency
hadoop fs -put xgb.zip /python/dependency/
hadoop fs -ls /python/dependency
3.Pyspark中加载依赖包
1.在初始化SparkSession对象时指定spark.yarn.dist.archives参数
代码语言:javascript复制spark = SparkSession
.builder
.appName("PythonPi")
.config('spark.yarn.dist.archives', 'hdfs:///python/dependency/xgb.zip#xgb')
.getOrCreate()
注意:指定的路径是HDFS上的路径,路径后的#xgb是必须指定的,xgb可以任意命令,需要和后面代码使用一致即可。
2.自定义一个函数,主要用来加载Python的环境变量(在执行分布式代码时需要调用该函数,否则Executor的运行环境不会加载Python依赖)
代码语言:javascript复制def fun(x):
import sys
import os
sys.path.append(os.getcwd() "/" "xgb")
import xgboost
return xgboost.__version__
3.接下来就是在代码中使用定义的function
代码语言:javascript复制sc = spark.sparkContext
rdd = sc.parallelize([1,2,3,4,5,6,7], 3)
rdd.map(lambda x: fun(x)).distinct().collect()
4.通过上述的方式在执行Executor时加载Python的依赖包到运行环境中解决Pyspark对Packages依赖问题,完整示例代码如下:
代码语言:javascript复制from __future__ import print_function
import sys
from random import random
from operator import add
from pyspark.sql import SparkSession
import os
py_environ=os.environ['CONDA_DEFAULT_ENV']
if py_environ=='python2.7':
os.environ['PYSPARK_PYTHON'] = '/usr/bin/python'
else:
os.environ['PYSPARK_PYTHON'] = '/opt/cloudera/parcels/Anaconda-5.1.0.1/bin/python'
spark = SparkSession
.builder
.appName("PythonPi")
.config('spark.yarn.dist.archives', 'hdfs:///python/dependency/xgb.zip#xgb')
.getOrCreate()
def fun(x):
import sys
import os
sys.path.append(os.getcwd() "/" "xgb")
import xgboost
return xgboost.__version__
sc = spark.sparkContext
rdd = sc.parallelize([1,2,3,4,5,6,7], 3)
rdd.map(lambda x: fun(x)).distinct().collect()
4.运行结果验证
执行Pyspark代码验证所有的Executor是否有加载到xgboost依赖包
5.总结
1.存放在HDFS上的第三方依赖包可以存在多个,也可以将多个package包打包到一个zip包里。
2.注意zip中的依赖包一定是通过pip命令成功安装后的packages,而不是直接下在下来的安装包。
3.在指定spark.yarn.dist.archives路径时,必须指定在路径最后加上#号和一个别名,该别名会在运行Executor和driver时作为zip包解压的目录存在。