文章大纲
- Executor 端进程间通信和序列化
- Pandas UDF
- 参考文献
系列文章:
- pyspark 原理、源码解析与优劣势分析(1) ---- 架构与java接口
- pyspark 原理、源码解析与优劣势分析(2) ---- Executor 端进程间通信和序列化
- pyspark 原理、源码解析与优劣势分析(3) ---- 优劣势总结
Executor 端进程间通信和序列化
对于 Spark 内置的算子,在 Python 中调用 RDD、DataFrame 的接口后,从上文可以看出会通过 JVM 去调用到 Scala 的接口,最后执行和直接使用 Scala 并无区别。而 对于需要使用 UDF 的情形,在 Executor 端就需要启动一个 Python worker 子进程,然后执行 UDF 的逻辑。那么 Spark 是怎样判断需要启动子进程的呢?
在 Spark 编译用户的 DAG 的时候,Catalyst Optimizer 会创建 BatchEvalPython 或者 ArrowEvalPython 这样的 Logical Operator,随后会被转换成 PythonEvals 这个 Physical Operator。在 PythonEvals(https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala)中:
代码语言:javascript复制object PythonEvals extends Strategy {
override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case ArrowEvalPython(udfs, output, child, evalType) =>
ArrowEvalPythonExec(udfs, output, planLater(child), evalType) :: Nil
case BatchEvalPython(udfs, output, child) =>
BatchEvalPythonExec(udfs, output, planLater(child)) :: Nil
case _ =>
Nil
}
}
创建了 ArrowEvalPythonExec 或者 BatchEvalPythonExec,而这二者内部会创建 ArrowPythonRunner、PythonUDFRunner 等类的对象实例,并调用了它们的 compute 方法。由于它们都继承了 BasePythonRunner,基类的 compute 方法中会去启动 Python 子进程:
代码语言:javascript复制def compute(
inputIterator: Iterator[IN],
partitionIndex: Int,
context: TaskContext): Iterator[OUT] = {
// ......
val worker: Socket = env.createPythonWorker(pythonExec, envVars.asScala.toMap)
// Start a thread to feed the process input from our parent's iterator
val writerThread = newWriterThread(env, worker, inputIterator, partitionIndex, context)
writerThread.start()
val stream = new DataInputStream(new BufferedInputStream(worker.getInputStream, bufferSize))
val stdoutIterator = newReaderIterator(
stream, writerThread, startTime, env, worker, releasedOrClosed, context)
new InterruptibleIterator(context, stdoutIterator)
这里 env.createPythonWorker 会通过 PythonWorkerFactory
(https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala)去启动 Python 进程。
Executor 端启动 Python 子进程后,会创建一个 socket 与 Python 建立连接。所有 RDD 的数据都要序列化后,通过 socket 发送,而结果数据需要同样的方式序列化传回 JVM。
对于直接使用 RDD 的计算,或者没有开启 spark.sql.execution.arrow.enabled 的 DataFrame,是将输入数据按行发送给 Python,可想而知,这样效率极低。
在 Spark 2.2 后提供了基于 Arrow 的序列化、反序列化的机制(从 3.0 起是默认开启),从 JVM 发送数据到 Python 进程的代码在 sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala。这个类主要是重写了 newWriterThread 这个方法,使用了 ArrowWriter 向 socket 发送数据:
代码语言:javascript复制val arrowWriter = ArrowWriter.create(root)
val writer = new ArrowStreamWriter(root, null, dataOut)
writer.start()
while (inputIterator.hasNext) {
val nextBatch = inputIterator.next()
while (nextBatch.hasNext) {
arrowWriter.write(nextBatch.next())
}
arrowWriter.finish()
writer.writeBatch()
arrowWriter.reset()
可以看到,每次取出一个 batch,填充给 ArrowWriter,实际数据会保存在 root 对象中,然后由 ArrowStreamWriter 将 root 对象中的整个 batch 的数据写入到 socket 的 DataOutputStream 中去。ArrowStreamWriter 会调用 writeBatch 方法去序列化消息并写数据,代码参考 ArrowWriter.java#L131。
代码语言:javascript复制protected ArrowBlock writeRecordBatch(ArrowRecordBatch batch) throws IOException {
ArrowBlock block = MessageSerializer.serialize(out, batch, option);
LOGGER.debug("RecordBatch at {}, metadata: {}, body: {}",
block.getOffset(), block.getMetadataLength(), block.getBodyLength());
return block;
}
在 MessageSerializer 中,使用了 flatbuffer 来序列化数据。flatbuffer 是一种比较高效的序列化协议,它的主要优点是反序列化的时候,不需要解码,可以直接通过裸 buffer 来读取字段,可以认为反序列化的开销为零。我们来看看 Python 进程收到消息后是如何反序列化的。
Python 子进程实际上是执行了 worker.py 的 main 函数 (python/pyspark/worker.py):
代码语言:javascript复制if __name__ == '__main__':
# Read information about how to connect back to the JVM from the environment.
java_port = int(os.environ["PYTHON_WORKER_FACTORY_PORT"])
auth_secret = os.environ["PYTHON_WORKER_FACTORY_SECRET"]
(sock_file, _) = local_connect_and_auth(java_port, auth_secret)
main(sock_file, sock_file)
这里会去向 JVM 建立连接,并从 socket 中读取指令和数据。对于如何进行序列化、反序列化,是通过 UDF 的类型来区分:
代码语言:javascript复制eval_type = read_int(infile)
if eval_type == PythonEvalType.NON_UDF:
func, profiler, deserializer, serializer = read_command(pickleSer, infile)
else:
func, profiler, deserializer, serializer = read_udfs(pickleSer, infile, eval_type)
在 read_udfs 中,如果是 PANDAS 类的 UDF,会创建 ArrowStreamPandasUDFSerializer,其余的 UDF 类型创建 BatchedSerializer。我们来看看 ArrowStreamPandasUDFSerializer(python/pyspark/serializers.py):
代码语言:javascript复制def dump_stream(self, iterator, stream):
import pyarrow as pa
writer = None
try:
for batch in iterator:
if writer is None:
writer = pa.RecordBatchStreamWriter(stream, batch.schema)
writer.write_batch(batch)
finally:
if writer is not None:
writer.close()
def load_stream(self, stream):
import pyarrow as pa
reader = pa.ipc.open_stream(stream)
for batch in reader:
yield batch
可以看到,这里双向的序列化、反序列化,都是调用了 PyArrow 的 ipc 的方法,和前面看到的 Scala 端是正好对应的,也是按 batch 来读写数据。对于 Pandas 的 UDF,读到一个 batch 后,会将 Arrow 的 batch 转换成 Pandas Series。
代码语言:javascript复制def arrow_to_pandas(self, arrow_column):
from pyspark.sql.types import _check_series_localize_timestamps
# If the given column is a date type column, creates a series of datetime.date directly
# instead of creating datetime64[ns] as intermediate data to avoid overflow caused by
# datetime64[ns] type handling.
s = arrow_column.to_pandas(date_as_object=True)
s = _check_series_localize_timestamps(s, self._timezone)
return s
def load_stream(self, stream):
"""
Deserialize ArrowRecordBatches to an Arrow table and return as a list of pandas.Series.
"""
batches = super(ArrowStreamPandasSerializer, self).load_stream(stream)
import pyarrow as pa
for batch in batches:
yield [self.arrow_to_pandas(c) for c in pa.Table.from_batches([batch]).itercolumns()]
Pandas UDF
前面我们已经看到,PySpark 提供了基于 Arrow 的进程间通信来提高效率,那么对于用户在 Python 层的 UDF,是不是也能直接使用到这种高效的内存格式呢?答案是肯定的,这就是 PySpark 推出的 Pandas UDF。区别于以往以行为单位的 UDF,Pandas UDF 是以一个 Pandas Series 为单位,batch 的大小可以由 spark.sql.execution.arrow.maxRecordsPerBatch 这个参数来控制。这是一个来自官方文档的示例:
代码语言:javascript复制def multiply_func(a, b):
return a * b
multiply = pandas_udf(multiply_func, returnType=LongType())
df.select(multiply(col("x"), col("x"))).show()
上文已经解析过,PySpark 会将 DataFrame 以 Arrow 的方式传递给 Python 进程,Python 中会转换为 Pandas Series,传递给用户的 UDF。在 Pandas UDF 中,可以使用 Pandas 的 API 来完成计算,在易用性和性能上都得到了很大的提升。
参考文献
https://www.mobvista.com/cn/blog/2019-12-27-2/ https://blog.csdn.net/Pysamlam/article/details/115683424
pyspark 启动原理: https://blog.csdn.net/wangyaninglm/article/details/114038572