文章大纲
- 01. PySpark 的多进程架构
- 02. Python Driver 如何调用 Java 的接口
- 02.1 pyspark.SparkContext context.py源码剖析
- 02.2 spark.sql.session session.py 源码剖析
- 03. Python Driver 端的 RDD、SQL 接口
- 参考文献
Spark 框架主要是由 Scala 语言实现,同时也包含少量 Java 代码。Spark 面向用户的编程接口,也是 Scala。然而,在数据科学领域,Python 一直占据比较重要的地位,仍然有大量的数据工程师在使用各类 Python 数据处理和科学计算的库,例如 numpy、Pandas、scikit-learn 等。同时,Python 语言的入门门槛也显著低于 Scala。为此,Spark 推出了 PySpark,在 Spark 框架上提供一套 Python 的接口,方便广大数据科学家使用。本文主要从源码实现层面解析 PySpark 的实现原理,包括以下几个方面:
- pyspark 原理、源码解析与优劣势分析(1) ---- 架构与java接口
- pyspark 原理、源码解析与优劣势分析(2) ---- Executor 端进程间通信和序列化
- pyspark 原理、源码解析与优劣势分析(3) ---- 优劣势总结
01. PySpark 的多进程架构
PySpark 采用了 Python、JVM 进程分离的多进程架构,在 Driver、Executor 端均会同时有 Python、JVM 两个进程。当通过 spark-submit 提交一个 PySpark 的 Python 脚本时,Driver 端会直接运行这个 Python 脚本,并从 Python 中启动 JVM;而在 Python 中调用的 RDD 或者 DataFrame 的操作,会通过 Py4j 调用到 Java 的接口。在 Executor 端恰好是反过来,首先由 Driver 启动了 JVM 的 Executor 进程,然后在 JVM 中去启动 Python 的子进程,用以执行 Python 的 UDF,这其中是使用了 socket 来做进程间通信。总体的架构图如下所示:
02. Python Driver 如何调用 Java 的接口
上面提到,通过 spark-submit 提交 PySpark 作业后,Driver 端首先是运行用户提交的 Python 脚本,然而 Spark 提供的大多数 API 都是 Scala 或者 Java 的,那么就需要能够在 Python 中去调用 Java 接口。这里 PySpark 使用了 Py4j 这个开源库。
当创建 Python 端的 SparkContext 对象时,实际会启动 JVM,并创建一个 Scala 端的 SparkContext 对象。 代码实现在 python/pyspark/context.py:
02.1 pyspark.SparkContext context.py源码剖析
github代码: https://github.com/apache/spark/blob/master/python/pyspark/context.py
文档代码 http://spark.apache.org/docs/latest/api/python/_modules/pyspark/context.html#SparkContext
代码语言:javascript复制 @classmethod
def _ensure_initialized(cls, instance=None, gateway=None, conf=None):
"""
Checks whether a SparkContext is initialized or not.
Throws error if a SparkContext is already running.
"""
with SparkContext._lock:
if not SparkContext._gateway:
SparkContext._gateway = gateway or launch_gateway(conf)
SparkContext._jvm = SparkContext._gateway.jvm
if instance:
if (SparkContext._active_spark_context and
SparkContext._active_spark_context != instance):
currentMaster = SparkContext._active_spark_context.master
currentAppName = SparkContext._active_spark_context.appName
callsite = SparkContext._active_spark_context._callsite
# Raise error if there is already a running Spark context
raise ValueError(
"Cannot run multiple SparkContexts at once; "
"existing SparkContext(app=%s, master=%s)"
" created by %s at %s:%s "
% (currentAppName, currentMaster,
callsite.function, callsite.file, callsite.linenum))
else:
SparkContext._active_spark_context = instance
在 launch_gateway (python/pyspark/java_gateway.py)中,首先启动JVM 进程,然后创建 JavaGateway 并 import 一些关键的 class,拿到 JavaGateway 对象,即可以通过它的 jvm 属性,去调用 Java 的类了,例如:
然后会继续创建 JVM 中的 SparkContext 对象
代码语言:javascript复制def launch_gateway(conf=None, popen_kwargs=None):
"""
launch jvm gateway
Parameters
----------
conf : :py:class:`pyspark.SparkConf`
spark configuration passed to spark-submit
popen_kwargs : dict
Dictionary of kwargs to pass to Popen when spawning
the py4j JVM. This is a developer feature intended for use in
customizing how pyspark interacts with the py4j JVM (e.g., capturing
stdout/stderr).
Returns
-------
ClientServer or JavaGateway
"""
if "PYSPARK_GATEWAY_PORT" in os.environ:
gateway_port = int(os.environ["PYSPARK_GATEWAY_PORT"])
gateway_secret = os.environ["PYSPARK_GATEWAY_SECRET"]
# Process already exists
proc = None
else:
SPARK_HOME = _find_spark_home()
# Launch the Py4j gateway using Spark's run command so that we pick up the
# proper classpath and settings from spark-env.sh
on_windows = platform.system() == "Windows"
script = "./bin/spark-submit.cmd" if on_windows else "./bin/spark-submit"
command = [os.path.join(SPARK_HOME, script)]
if conf:
for k, v in conf.getAll():
command = ['--conf', '%s=%s' % (k, v)]
submit_args = os.environ.get("PYSPARK_SUBMIT_ARGS", "pyspark-shell")
if os.environ.get("SPARK_TESTING"):
submit_args = ' '.join([
"--conf spark.ui.enabled=false",
submit_args
])
command = command shlex.split(submit_args)
# Create a temporary directory where the gateway server should write the connection
# information.
conn_info_dir = tempfile.mkdtemp()
try:
fd, conn_info_file = tempfile.mkstemp(dir=conn_info_dir)
os.close(fd)
os.unlink(conn_info_file)
env = dict(os.environ)
env["_PYSPARK_DRIVER_CONN_INFO_PATH"] = conn_info_file
# Launch the Java gateway.
popen_kwargs = {} if popen_kwargs is None else popen_kwargs
# We open a pipe to stdin so that the Java gateway can die when the pipe is broken
popen_kwargs['stdin'] = PIPE
# We always set the necessary environment variables.
popen_kwargs['env'] = env
if not on_windows:
# Don't send ctrl-c / SIGINT to the Java gateway:
def preexec_func():
signal.signal(signal.SIGINT, signal.SIG_IGN)
popen_kwargs['preexec_fn'] = preexec_func
proc = Popen(command, **popen_kwargs)
else:
# preexec_fn not supported on Windows
proc = Popen(command, **popen_kwargs)
# Wait for the file to appear, or for the process to exit, whichever happens first.
while not proc.poll() and not os.path.isfile(conn_info_file):
time.sleep(0.1)
if not os.path.isfile(conn_info_file):
raise Exception("Java gateway process exited before sending its port number")
with open(conn_info_file, "rb") as info:
gateway_port = read_int(info)
gateway_secret = UTF8Deserializer().loads(info)
finally:
shutil.rmtree(conn_info_dir)
# In Windows, ensure the Java child processes do not linger after Python has exited.
# In UNIX-based systems, the child process can kill itself on broken pipe (i.e. when
# the parent process' stdin sends an EOF). In Windows, however, this is not possible
# because java.lang.Process reads directly from the parent process' stdin, contending
# with any opportunity to read an EOF from the parent. Note that this is only best
# effort and will not take effect if the python process is violently terminated.
if on_windows:
# In Windows, the child process here is "spark-submit.cmd", not the JVM itself
# (because the UNIX "exec" command is not available). This means we cannot simply
# call proc.kill(), which kills only the "spark-submit.cmd" process but not the
# JVMs. Instead, we use "taskkill" with the tree-kill option "/t" to terminate all
# child processes in the tree (http://technet.microsoft.com/en-us/library/bb491009.aspx)
def killChild():
Popen(["cmd", "/c", "taskkill", "/f", "/t", "/pid", str(proc.pid)])
atexit.register(killChild)
# Connect to the gateway (or client server to pin the thread between JVM and Python)
if os.environ.get("PYSPARK_PIN_THREAD", "false").lower() == "true":
gateway = ClientServer(
java_parameters=JavaParameters(
port=gateway_port,
auth_token=gateway_secret,
auto_convert=True),
python_parameters=PythonParameters(
port=0,
eager_load=False))
else:
gateway = JavaGateway(
gateway_parameters=GatewayParameters(
port=gateway_port,
auth_token=gateway_secret,
auto_convert=True))
# Store a reference to the Popen object for use by the caller (e.g., in reading stdout/stderr)
gateway.proc = proc
# Import the classes used by PySpark
java_import(gateway.jvm, "org.apache.spark.SparkConf")
java_import(gateway.jvm, "org.apache.spark.api.java.*")
java_import(gateway.jvm, "org.apache.spark.api.python.*")
java_import(gateway.jvm, "org.apache.spark.ml.python.*")
java_import(gateway.jvm, "org.apache.spark.mllib.api.python.*")
java_import(gateway.jvm, "org.apache.spark.resource.*")
# TODO(davies): move into sql
java_import(gateway.jvm, "org.apache.spark.sql.*")
java_import(gateway.jvm, "org.apache.spark.sql.api.python.*")
java_import(gateway.jvm, "org.apache.spark.sql.hive.*")
java_import(gateway.jvm, "scala.Tuple2")
return gateway
02.2 spark.sql.session session.py 源码剖析
https://github.com/apache/spark/blob/master/python/pyspark/sql/session.py
spark 2.0 版本后推荐使用Spark.session 作为初始化的api,或者为了兼容1.0 或者2.0版本的api 把他们同时返回,当然他们直接可以互相转化:
代码语言:javascript复制def setup_spark_session(param_dict):
"""
Description : Used to setup spark session
Input : param_dict - parameter dictionary
Output : Spark Session, Spark Context, and SQL Context
"""
os.environ["PYSPARK_PYTHON"] = "/home/hadoop/anaconda/envs/playground_py36/bin/python"
try:
spark.stop()
print("Stopped a SparkSession")
except Exception as e:
print("No existing SparkSession")
SPARK_DRIVER_MEMORY = param_dict["SPARK_DRIVER_MEMORY"] # "10G"
SPARK_DRIVER_CORE = param_dict["SPARK_DRIVER_CORE"] # "5"
SPARK_EXECUTOR_MEMORY = param_dict["SPARK_EXECUTOR_MEMORY"] # "3G"
SPARK_EXECUTOR_CORE = param_dict["SPARK_EXECUTOR_CORE"] # "1"
AWS_ACCESS_KEY = param_dict["AWS_ACCESS_KEY"]
AWS_SECRET_KEY = param_dict["AWS_SECRET_KEY"]
AWS_S3_ENDPOINT = param_dict["AWS_S3_ENDPOINT"]
conf = SparkConf().
setAppName(param_dict["APP_NAME"]).
setMaster('yarn-client').
set('spark.executor.cores', SPARK_EXECUTOR_CORE).
set('spark.executor.memory', SPARK_EXECUTOR_MEMORY).
set('spark.driver.cores', SPARK_DRIVER_CORE).
set('spark.driver.memory', SPARK_DRIVER_MEMORY).
set('spark.driver.maxResultSize', '0')
spark = SparkSession.builder.
config(conf=conf).
getOrCreate()
sc = spark.sparkContext
hadoop_conf = sc._jsc.hadoopConfiguration()
hadoop_conf.set("fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
hadoop_conf.set("fs.s3a.access.key", AWS_ACCESS_KEY)
hadoop_conf.set("fs.s3a.secret.key", AWS_SECRET_KEY)
hadoop_conf.set("fs.s3a.endpoint", AWS_S3_ENDPOINT)
hadoop_conf.set("mapreduce.fileoutputcommitter.algorithm.version", "2")
sqlContext = SQLContext(sc)
return spark, sc, sqlContext
我们来看看 getOrCreate()源码,此方法首先检查是否存在有效的全局默认SparkSession,如果有则返回。如果不存在有效的全局默认SparkSession,则创建新的SparkSession并将新创建的SparkSession指定为全局默认的SparkSession。
注意到,self._lock 是一个from threading import RLock 导入的锁,RLock被称为重入锁,RLock锁是一个可以被同一个线程多次 acquire 的锁,但是最后必须由获取它的线程来释放它,不论同一个线程调用了多少次的acquire,最后它都必须调用相同次数的 release 才能完全释放锁,这个时候其他的线程才能获取这个锁。在Builder 中对其进行了声明。
代码语言:javascript复制 class Builder(object):
"""Builder for :class:`SparkSession`.
"""
_lock = RLock()
_options = {}
_sc = None
代码语言:javascript复制def getOrCreate(self):
"""Gets an existing :class:`SparkSession` or, if there is no existing one, creates a
new one based on the options set in this builder.
.. versionadded:: 2.0.0
Examples
--------
This method first checks whether there is a valid global default SparkSession, and if
yes, return that one. If no valid global default SparkSession exists, the method
creates a new SparkSession and assigns the newly created SparkSession as the global
default.
>>> s1 = SparkSession.builder.config("k1", "v1").getOrCreate()
>>> s1.conf.get("k1") == "v1"
True
In case an existing SparkSession is returned, the config options specified
in this builder will be applied to the existing SparkSession.
>>> s2 = SparkSession.builder.config("k2", "v2").getOrCreate()
>>> s1.conf.get("k1") == s2.conf.get("k1")
True
>>> s1.conf.get("k2") == s2.conf.get("k2")
True
"""
with self._lock:
from pyspark.context import SparkContext
from pyspark.conf import SparkConf
session = SparkSession._instantiatedSession
if session is None or session._sc._jsc is None:
if self._sc is not None:
sc = self._sc
else:
sparkConf = SparkConf()
for key, value in self._options.items():
sparkConf.set(key, value)
# This SparkContext may be an existing one.
sc = SparkContext.getOrCreate(sparkConf)
# Do not update `SparkConf` for existing `SparkContext`, as it's shared
# by all sessions.
session = SparkSession(sc)
for key, value in self._options.items():
session._jsparkSession.sessionState().conf().setConfString(key, value)
return session
03. Python Driver 端的 RDD、SQL 接口
在 PySpark 中,继续初始化一些 Python 和 JVM 的环境后,Python 端的 SparkContext 对象就创建好了,它实际是对 JVM 端接口的一层封装。和 Scala API 类似,SparkContext 对象也提供了各类创建 RDD 的接口,和 Scala API 基本一一对应,我们来看一些例子。
参考文献
https://www.mobvista.com/cn/blog/2019-12-27-2/ https://blog.csdn.net/Pysamlam/article/details/115683424
pyspark 启动原理: https://blog.csdn.net/wangyaninglm/article/details/114038572