0835-5.16.2-如何按需加载Python依赖包到Spark集群

2021-04-30 12:12:22 浏览数 (1)

1.文档编写目的

在开发Pyspark代码时,经常会用到Python的依赖包。在PySpark的分布式运行的环境下,要确保所有节点均存在我们用到的Packages,本篇文章主要介绍如何将我们需要的Package依赖包加载到我们的运行环境中,而非将全量的Package包加载到Pyspark运行环境中,本篇文章以xgboost1.0.2包为例来介绍。

  • 测试环境:

1.Redhat7.6

2.CDH5.16.2

3.使用root用户操作

2.环境检查

1.确保集群所有节点已安装了相同的Python版本,测试环境使用了Anaconda来部署统一的Python环境。

2.找一个任意OS节点装上Python3.6.4 版本,用来准备提取依赖包

配置pip使用国内的Python源

代码语言:javascript复制
[root@cdh02 ~]# cat /etc/pip.conf 
[global]
index-url = https://mirrors.aliyun.com/pypi/simple/
[install]
user = true
trusted-host=mirrors.aliyun.com

3.在上一步的节点上安装xgboost1.0.2依赖包

代码语言:javascript复制
/opt/cloudera/parcels/Anaconda-5.1.0.1/bin/pip install xgboost==1.0.2

xgboost包安装成功后默认在/root/.local/lib/python3.7/site-packages目录下

验证xgboost包是否安装安装成功

4.将安装成功的xgboost包打包成zip并上传到hdfs目录

代码语言:javascript复制
cd /root/.local/lib/python3.6/site-packages/
zip -r xgb.zip xgboost

将准备好的xgb.zip包上传到hdfs的/python/dependency/目录下

代码语言:javascript复制
hadoop fs -mkdir -p /python/dependency
hadoop fs -put xgb.zip /python/dependency/
hadoop fs -ls /python/dependency

3.Pyspark中加载依赖包

1.在初始化SparkSession对象时指定spark.yarn.dist.archives参数

代码语言:javascript复制
spark = SparkSession
    .builder
    .appName("PythonPi")
    .config('spark.yarn.dist.archives', 'hdfs:///python/dependency/xgb.zip#xgb')
    .getOrCreate()

注意:指定的路径是HDFS上的路径,路径后的#xgb是必须指定的,xgb可以任意命令,需要和后面代码使用一致即可。

2.自定义一个函数,主要用来加载Python的环境变量(在执行分布式代码时需要调用该函数,否则Executor的运行环境不会加载Python依赖)

代码语言:javascript复制
def fun(x):
  import sys
  import os
  sys.path.append(os.getcwd()   "/"   "xgb")
  import xgboost
  return xgboost.__version__

3.接下来就是在代码中使用定义的function

代码语言:javascript复制
sc = spark.sparkContext
rdd = sc.parallelize([1,2,3,4,5,6,7], 3)
rdd.map(lambda x: fun(x)).distinct().collect()

4.通过上述的方式在执行Executor时加载Python的依赖包到运行环境中解决Pyspark对Packages依赖问题,完整示例代码如下:

代码语言:javascript复制
from __future__ import print_function
import sys
from random import random
from operator import add
from pyspark.sql import SparkSession

import os
py_environ=os.environ['CONDA_DEFAULT_ENV']
if py_environ=='python2.7':
  os.environ['PYSPARK_PYTHON'] = '/usr/bin/python'
else:
  os.environ['PYSPARK_PYTHON'] = '/opt/cloudera/parcels/Anaconda-5.1.0.1/bin/python'

spark = SparkSession
    .builder
    .appName("PythonPi")
    .config('spark.yarn.dist.archives', 'hdfs:///python/dependency/xgb.zip#xgb')
    .getOrCreate()

def fun(x):
  import sys
  import os
  sys.path.append(os.getcwd()   "/"   "xgb")
  import xgboost
  return xgboost.__version__

sc = spark.sparkContext

rdd = sc.parallelize([1,2,3,4,5,6,7], 3)
rdd.map(lambda x: fun(x)).distinct().collect()

4.运行结果验证

执行Pyspark代码验证所有的Executor是否有加载到xgboost依赖包

5.总结

1.存放在HDFS上的第三方依赖包可以存在多个,也可以将多个package包打包到一个zip包里。

2.注意zip中的依赖包一定是通过pip命令成功安装后的packages,而不是直接下在下来的安装包。

3.在指定spark.yarn.dist.archives路径时,必须指定在路径最后加上#号和一个别名,该别名会在运行Executor和driver时作为zip包解压的目录存在。

0 人点赞