PySpark UD(A)F 的高效使用

2021-09-10 17:21:20 浏览数 (2)

Spark无疑是当今数据科学和大数据领域最流行的技术之一。尽管它是用Scala开发的,并在Java虚拟机(JVM)中运行,但它附带了Python绑定,也称为PySpark,其API深受panda的影响。在功能方面,现代PySpark在典型的ETL和数据处理方面具有与Pandas相同的功能,例如groupby、聚合等等。

需要注意的一件重要的事情是,除了基于编程数据的处理功能之外,Spark还有两个显著的特性。一种是,Spark附带了SQL作为定义查询的替代方式,另一种是用于机器学习的Spark MLlib。这两个主题都超出了本文的范围,但如果考虑将PySpark作为更大数据集的panda和scikit-learn的替代方案,那么应该考虑到这两个主题。

1.UDAF

聚合函数是对一组行进行操作并产生结果的函数,例如sum()或count()函数。用户定义的聚合函数(UDAF)通常用于更复杂的聚合,而这些聚合并不是常使用的分析工具自带的。

这就是RDD API发挥作用的地方。需要提醒的是,弹性分布式数据集(Resilient Distributed Dataset, RDD)是Spark的底层数据结构,Spark DataFrame是构建在其之上的。由于主要是在PySpark中处理DataFrames,所以可以在RDD属性的帮助下访问底层RDD,并使用toDF()将其转换回来。这个RDD API允许指定在数据上执行的任意Python函数。举个例子,假设有一个DataFrame df,它包含10亿行,带有一个布尔值is_sold列,想要过滤带有sold产品的行。

代码语言:txt复制
df.rdd.filter(lambdax:x.is_sold==True).toDF()

虽然没有明确声明,但这个 lambda 函数本质上是一个用户定义函数 (UDF)。对于这个确切的用例,还可以使用更高级的 DataFrame filter() 方法,产生相同的结果。

代码语言:txt复制
df.filter(df.is_sold==True)

需记住,尽可能使用内置的RDD 函数或DataFrame UDF,这将比UDF实现快得多。为了更好地理解实质性的性能差异,现在将绕道而行,调查这两个filter示例的背后情况。

2.PySpark Internals

PySpark 实际上是用 Scala 编写的 Spark 核心的包装器。当在 Python 中启动 SparkSession 时,PySpark 在后台使用 Py4J 启动 JVM 并创建 Java SparkContext。所有 PySpark 操作,例如的 df.filter() 方法调用,在幕后都被转换为对 JVM SparkContext 中相应 Spark DataFrame 对象的相应调用。所以在的 df.filter() 示例中,DataFrame 操作和过滤条件将发送到 Java SparkContext,在那里它被编译成一个整体优化的查询计划。执行查询后,过滤条件将在 Java 中的分布式 DataFrame 上进行评估,无需对 Python 进行任何回调!如果工作流从 Hive 加载 DataFrame 并将生成的 DataFrame 保存为 Hive 表,在整个查询执行过程中,所有数据操作都在 Java Spark 工作线程中以分布式方式执行,这使得 Spark 可以非常快速地查询大型数据集.好的,那么为什么 RDD filter() 方法那么慢呢?原因是 lambda 函数不能直接应用于驻留在 JVM 内存中的 DataFrame。

内部实际发生的是 Spark 在集群节点上的 Spark 执行程序旁边启动 Python 工作线程。在执行时,Spark 工作器将 lambda 函数发送给这些 Python 工作器。接下来,Spark worker 开始序列化他们的 RDD 分区,并通过套接字将它们通过管道传输到 Python worker,lambda 函数在每行上进行评估。对于结果行,整个序列化/反序列化过程在再次发生,以便实际的 filter() 可以应用于结果集。

下图还显示了在 PySpark 中使用任意 Python 函数时的整个数据流,该图来自PySpark Internal Wiki.

因为数据来回复制过多,在分布式 Java 系统中执行 Python 函数在执行时间方面非常昂贵。这个底层的探索:只要避免Python UDF,PySpark 程序将大约与基于 Scala 的 Spark 程序一样快。如果无法避免 UDF,至少应该尝试使它们尽可能高效。

3.complex type

如果只是在Spark数据帧中使用简单的数据类型,一切都工作得很好,甚至如果激活了Arrow,一切都会非常快,但如何涉及复杂的数据类型,如MAP,ARRAY和STRUCT。将得到的是:TypeError: Unsupported type in conversion to Arrow。

为了摆脱这种困境,本文将演示如何在没有太多麻烦的情况下绕过Arrow当前的限制。先看看pandas_udf提供了哪些特性,以及如何使用它。

Spark 2.3’s pandas_udf 特性

功能有概述如下:

function type

Operation

Input → Output

Pandas equivalent

SCALAR

Mapping

Series → Series

df.transform(...)

GROUPED_MAP

Group & Map

DataFrame → DataFrame

df.apply(...)

GROUPED_AGG

Reduce

Series → Scalar

df.aggregate(...)

除了UDF的返回类型之外,pandas_udf还需要指定一个描述UDF一般行为的函数类型。如果只是想将一个scalar映射到一个scalar,或者将一个向量映射到具有相同长度的向量,则可以使用PandasUDFType.SCALAR。这还将确定UDF检索一个Pandas Series作为输入,并需要返回一个相同长度的Series。它基本上与Pandas数据帧的transform方法相同。GROUPED_MAP UDF是最灵活的,因为它获得一个Pandas数据帧,并允许返回修改的或新的。

4.基本想法

解决方案将非常简单。利用to_json函数将所有具有复杂数据类型的列转换为JSON字符串。因为Arrow可以轻松处理字符串,所以可以使用pandas_udf装饰器。在UDF中,将这些列转换回它们的原始类型,并进行实际工作。如果想返回具有复杂类型的列,只需反过来做所有事情。这意味着在UDF中将这些列转换为JSON,返回Pandas数据帧,并最终将Spark数据帧中的相应列从JSON转换为复杂类型

5.实现

将实现分为三种不同的功能:

  • 1)Spark DataFrame和JSON 相互转换的函数;
  • 2)pandas DataFrame和JSON 相互转换的函数
  • 3)装饰器:包装类,调用上述2类函数实现对数据具体处理函数的封装

1) Spark DataFrame的转换

代码语言:txt复制
from pyspark.sql.types import MapType, StructType, ArrayType, StructField
from pyspark.sql.functions import to_json, from_json

def is_complex_dtype(dtype):
    """Check if dtype is a complex type
    Args:
        dtype: Spark Datatype
    Returns:
        Bool: if dtype is complex
    """
    return isinstance(dtype, (MapType, StructType, ArrayType))

def complex_dtypes_to_json(df):
    """Converts all columns with complex dtypes to JSON
    Args:
        df: Spark dataframe
    Returns:
        tuple: Spark dataframe and dictionary of converted columns and their data types
    """
    conv_cols = dict()
    selects = list()
    for field in df.schema:
        if is_complex_dtype(field.dataType):
            conv_cols[field.name] = field.dataType
            selects.append(to_json(field.name).alias(field.name))
        else:
            selects.append(field.name)
    df = df.select(*selects)
    return df, conv_cols

def complex_dtypes_from_json(df, col_dtypes):
    """Converts JSON columns to complex types
    Args:
        df: Spark dataframe
        col_dtypes (dict): dictionary of columns names and their datatype
    Returns:
        Spark dataframe
    """
    selects = list()
    for column in df.columns:
        if column in col_dtypes.keys():
            schema = StructType([StructField('root', col_dtypes[column])])
            selects.append(from_json(column, schema).getItem('root').alias(column))
        else:
            selects.append(column)
    return df.select(*selects)

函数complex_dtypes_to_json将一个给定的Spark数据帧转换为一个新的数据帧,其中所有具有复杂类型的列都被JSON字符串替换。除了转换后的数据帧外,它还返回一个带有列名及其转换后的原始数据类型的字典。

代码语言:txt复制
  complex_dtypes_from_json使用该信息将这些列精确地转换回它们的原始类型。可能会觉得在模式中定义某些根节点很奇怪。这是必要的,因为绕过了Spark的from_json的一些限制。转换之后,再次删除这个根结构体,这样complex_dtypes_to_json和complex_dtypes_from_json就变成了相反的了。现在,还可以轻松地定义一个可以处理复杂Spark数据帧的toPandas。
代码语言:txt复制
def toPandas(df):
    """Same as df.toPandas() but converts complex types to JSON first
    Args:
        df: Spark dataframe
    Returns:
        Pandas dataframe
    """
    return complex_dtypes_to_json(df)[0].toPandas()

2)Pandas DataFrame的转换

类似地,定义了与上面相同的函数,但针对的是Pandas数据帧。不同之处在于,对于实际的UDF,需要知道要将哪些列转换为复杂类型,因为希望避免探测每个包含字符串的列。在向JSON的转换中,如前所述添加root节点。

代码语言:txt复制
import json

def cols_from_json(df, columns):
    """Converts Pandas dataframe colums from json
    Args:
        df (dataframe): Pandas DataFrame
        columns (iter): list of or iterator over column names
    Returns:
        dataframe: new dataframe with converted columns
    """
    for column in columns:
        df[column] = df[column].apply(json.loads)
    return df

def ct_val_to_json(value):
    """Convert a scalar complex type value to JSON
    Args:
        value: map or list complex value
    Returns:
        str: JSON string
    """
    return json.dumps({'root': value})

def cols_to_json(df, columns):
    """Converts Pandas dataframe columns to json and adds root handle
    Args:
        df (dataframe): Pandas DataFrame
        columns ([str]): list of column names
    Returns:
        dataframe: new dataframe with converted columns
    """
    for column in columns:
        df[column] = df[column].apply(ct_val_to_json)
    return df

3)装饰器

至此,得到了名为pandas_udf_ct的最终装饰器所需要的所有东西,并将所有成分组合在一起。与Spark的官方pandas_udf一样,的装饰器也接受参数returnType和functionType。它只是稍微复杂一点,你首先必须传递returnType, functionType,这给你留下了一些特殊的装饰器。带有这种装饰器的函数接受cols_in和cols_out参数,这些参数指定哪些列需要转换为JSON,哪些列需要转换为JSON。只有在传递了这些信息之后,才能得到定义的实际UDF。

代码语言:txt复制
import json
from functools import wraps
from pyspark.sql.functions import pandas_udf, PandasUDFType
import pandas as pd

class pandas_udf_ct(object):
    """Decorator for UDAFs with Spark >= 2.3 and complex types
    Args:
        returnType: the return type of the user-defined function. The value can be either a 
                    pyspark.sql.types.DataType object or a DDL-formatted type string.
        functionType: an enum value in pyspark.sql.functions.PandasUDFType. Default: SCALAR.

    Returns:
        Function with arguments `cols_in` and `cols_out` defining column names having complex 
        types that need to be transformed during input and output for GROUPED_MAP. In case of 
        SCALAR, we are dealing with a series and thus transformation is done if `cols_in` or 
        `cols_out` evaluates to `True`. 
        Calling this functions with these arguments returns the actual UDF.
    """

    def __init__(self, returnType=None, functionType=None):
        self.return_type = returnType
        self.function_type = functionType

    def __call__(self, func):
        @wraps(func)
        def converter(*, cols_in=None, cols_out=None):
            if cols_in is None:
                cols_in = list()
            if cols_out is None:
                cols_out = list()

            @pandas_udf(self.return_type, self.function_type)
            def udf_wrapper(values):
                if isinstance(values, pd.DataFrame):
                    values = cols_from_json(values, cols_in)
                elif isinstance(values, pd.Series) and cols_in:
                    values = values.apply(json.loads)
                res = func(values)
                if self.function_type == PandasUDFType.GROUPED_MAP:
                    if isinstance(res, pd.Series):
                        res = res.to_frame().T
                    res = cols_to_json(res, cols_out)
                elif cols_out and self.function_type == PandasUDFType.SCALAR:
                    res = res.apply(ct_val_to_json)
                elif (isinstance(res, (dict, list)) and 
                      self.function_type == PandasUDFType.GROUPED_AGG):
                    res = ct_val_to_json(res)
                return res

            return udf_wrapper

        return converter

上述只是一个典型的带有参数的装饰器实现,但对 cols_in 和 cols_out 多了一层包装。

6.使用

show me the code!

1) 首先构造数据:

代码语言:txt复制
from pyspark.sql.types import Row
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
spark.conf.set("spark.sql.execution.arrow.enabled", "true")

df = spark.createDataFrame([(1., {'a': 1}, ["a", "a"], Row(a=1)),
                            (2., {'b': 1}, ["a", "b"], Row(a=42)),
                            (3., {'a': 1, 'b': 3}, ["d","e"], Row(a=1))],
                           schema=['vals', 'maps', 'lists', 'structs'])
df.show(), df.printSchema()

2) 定义处理过程,并用封装类装饰

为简单起见,假设只想将值为 42 的键 x 添加到 maps 列中的字典中。 但首先,使用 complex_dtypes_to_json 来获取转换后的 Spark 数据帧 df_json 和转换后的列 ct_cols。然后定义 UDF 规范化并使用的 pandas_udf_ct 装饰它,使用 dfj_json.schema(因为只需要简单的数据类型)和函数类型 GROUPED_MAP 指定返回类型。

代码语言:txt复制
df_json, ct_cols = complex_dtypes_to_json(df)

def change_vals(dct):
    dct['x'] = 42
    return dct

@pandas_udf_ct(df_json.schema, PandasUDFType.GROUPED_MAP)
def normalize(pdf):
    pdf['maps'].apply(change_vals)
    return pdf

只是为了演示,现在按 df_json 的 vals 列分组,并在每个组上应用的规范化 UDF。如前所述,必须首先使用参数 cols_in 和 cols_out 调用它,而不是仅仅传递 normalize。作为输入列,传递了来自 complex_dtypes_to_json 函数的输出 ct_cols,并且由于没有更改 UDF 中数据帧的形状,因此将其用于输出 cols_out。如果的 UDF 删除列或添加具有复杂数据类型的其他列,则必须相应地更改 cols_out。作为最后一步,使用 complex_dtypes_from_json 将转换后的 Spark 数据帧的 JSON 字符串转换回复杂数据类型。

3)使用

代码语言:txt复制
df_json = df_json.groupby("vals").apply(normalize(cols_in=ct_cols, cols_out=ct_cols))
df_final = complex_dtypes_from_json(df_json, ct_cols)
df_final.show(truncate=False), df_final.printSchema()

7. 结语

本文展示了一个实用的解决方法来处理 Spark 2.3/4 的 UDF 和复杂数据类型。与每个解决方法一样,它远非完美。话虽如此,所提出的解决方法已经在生产环境中顺利运行了一段时间。

参考

More Efficient UD(A)Fs with PySpark

Efficient UD(A)Fs with PySpark

0 人点赞