Byzer UDF 函数开发指南

2022-04-07 15:30:59 浏览数 (1)

Byzer 提供了三种方式让用户自己实现 UDF 从而更好的扩展SQL的能力。

  1. 动态 UDF. 在 Byzer 中使用 Scala/Java 编写 UDF, 随写随用,无需编译打包发布重启
  2. 内置 UDF. 使用 Scala/Java 编写 UDF,然后发布成 Jar, 引入 Jar 包后,需要重启
  3. 使用基于 Hive 开发的 UDF

动态 UDF

动态 UDF的使用最简单,用户可以使用 Byzer 的 register 语句将一段 Scala/Java 代码注册成 UDF.

比如,我们正在开发一个 ETL 脚本,希望获得一个数组的最后一个元素,但发现没有原生内置的函数能够实现这个,这个时候,可以直接用 Byzer Register 语句生成一个 UDF 函数,名称叫 arrayLast, 代码示例如下;

代码语言:javascript复制
register ScriptUDF.`` as arrayLast 
where lang="scala"
and code='''def apply(a:Seq[String])={
      a.last
}'''
and udfType="udf";

select arrayLast(array("a","b")) as lastChar as output;

写完之后,就可以在后续的 select 语句中使用。 运行结果如下:

在上面的示例中,如果用户使用 Scala 编写,那么 udfType 支持 udf/udaf 。UDAF 也很简单,示例如下:

代码语言:javascript复制
REGISTER ScriptUDF.`` AS plusFun options
className="SumAggregation"
and code='''

import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
class SumAggregation extends UserDefinedAggregateFunction with Serializable{
    def inputSchema: StructType = new StructType().add("a", LongType)
    def bufferSchema: StructType =  new StructType().add("total", LongType)
    def dataType: DataType = LongType
    def deterministic: Boolean = true
    def initialize(buffer: MutableAggregationBuffer): Unit = {
      buffer.update(0, 0l)
    }
    def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
      val sum   = buffer.getLong(0)
      val newitem = input.getLong(0)
      buffer.update(0, sum   newitem)
    }
    def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
      buffer1.update(0, buffer1.getLong(0)   buffer2.getLong(0))
    }
    def evaluate(buffer: Row): Any = {
      buffer.getLong(0)
    }
}
'''
and udfType="udaf";

SET data='''
{"a":1}
{"a":1}
{"a":1}
{"a":1}
''';

LOAD jsonStr.`data` AS dataTable;

SELECT a,plusFun(a) AS res FROM dataTable GROUP BY a AS output;

执行结果:

注意: Java 在当前版本只支持 UDF, 不支持 UDAF。

如何构建可复用的 UDF 工具集

对于这些动态编写的 UDF 函数,我们可以将其放在独立的 Byzer notebook 里,然后通过 include 语法引入(注意,该功能需要 Byzer notebook 1.1.0 以及以上版本才支持)

具体做法分成两步。第一步,创建一个 用于存储 UDF 的 Notebook, 比如 Notebook 名字叫 udfs.bznb

然后我们填入 arrayLast 函数的代码。接着,新创建一个 Notebook, 比如叫 job.bznb, 在该 Notebook 里可以通过如下方式引入 arrayLast 函数:

代码语言:javascript复制
include http.`project.demo.udfs`;
select arrayLast(array("a","b")) as lastChar as output;

结果如下:

在 Byzer Notebook 中,需要在一个 Notebook 里引入另外一个 Notebook,可以通过 Include语法,其中 http 和 project 是固定的。 后面 demo.udfs 则是目录路径,只不过用 . 替换了 /

假设 udfs 里有很多函数,不希望把所有的函数都包含进来,那么可以指定 Cell 的 序号 。 比如只包含第一个 cell, 那么可以这么写:

代码语言:javascript复制
include http.`project.demo.udfs#1`;
select arrayLast(array("a","b")) as lastChar as output;

期待 Byzer notebook 以后可以支持给 cell 命名

除此之外,还可以将代码放到 git 仓库中,假设用户放到 gitee上,那么可以用如下方式引用:

代码语言:javascript复制
include lib.`gitee.com/allwefantasy/lib-core`
where alias="libCore";

include local.`libCore.udf.hello`;
select hello() as name as output;

在这里,我们引用了 lib-core 项目里的一个 hello 函数,然后接着就可以在 select 语法中使用。

结果如下:

内置 UDF 函数

新建一个 Java/Scala 混合项目, 里面创建一个 object 对象,比如叫:

代码语言:javascript复制
package tech.mlsql.udfs.custom
import org.apache.spark.sql.UDFRegistration

object MyFunctions {
}

接着添加一个函数 mkString

代码语言:javascript复制
package tech.mlsql.udfs.custom
import org.apache.spark.sql.UDFRegistration

object MyFunctions {
   def mkString(uDFRegistration: UDFRegistration) = {
    uDFRegistration.register("mkString", (sep: String, co: WrappedArray[String]) => {
      co.mkString(sep)
    })
  }
}

该函数接受一个 UDFRegistration 对象, 然后使用该对象注册真实的 UDF 函数。 register 方法的第一个参数是 UDF 在 SQL 中使用的名字,第二个参数则是一个普通的 Scala 函数。

如果想具体的业务逻辑使用 Java 开发,那么需要单独再写一个 Java 类,在里面实现具体的逻辑,然后在 Scala 函数中调用。

开发完成后,打包这个项目,生成 Jar 包,为了能够让 Byzer 识别到这些 UDF, 需要做三件事:

  1. 把 Jar 包丢到 Byzer 项目的 jars 目录里去
  2. 启动时,在启动脚本中添加一个参数 -streaming.udf.clzznames "tech.mlsql.udfs.custom.MyFunctions" 如果有多个,用逗号分割即可。
  3. 然后重启 Byzer。

目前内置的很多内置的 UDF 函数就是利用这种方式开发的。 参看 streaming.core.compositor.spark.udf.Functions

如何把 Jar 包放到正确的目录里很重要,对于不同的 Byzer 发行版,目录可能有差异。具体如下;

  1. 分布式 Yarn based 版本,将 Jar 包放到 ${SPARK_HOME}/jars 目录即可。 如果是已经运行了,你需要重启 Byzer。
  2. 分布式 K8s base 版本, 现阶段可能需要重新构建镜像。参考 Byzer build 项目。
  3. Sandbox 版本,启动容器后,进入容器 /work 目录,然后将 Jar 包放到 /work/${SPARK_HOME}/jars 目录即可. 需要重启容器。
  4. 桌面版本,以 Mac 为例, 将 Jar 包放到 ~/.vscode/extensions/allwefantasy.mlsql-0.0.7/dist/mlsql-lang/spark 目录下即可,然后重启 VSCode 即可。
  5. 命令行版本,则是在发行版根目录下的 libs/ 目录里。

使用基于 Hive 开发的 UDF

首先,按照前面内置函数中说的方式,将基于 Hive 规范的 UDF 函数的 Jar 包放到指定的目录中。

其次,你需要执行特定的指令动态注册:

代码语言:javascript复制
CREATE TEMPORARY FUNCTION testUDF AS 'org.apache.hadoop.hive.ql.udf.generic.GenericUDFAbs';

考虑到该指令重启后会失效,用户可以将这些指令放到一个单独的 Notebook里,然后采用 动态 UDF 中介绍的,通过 include 语法在需要使用的地方进行引用即可。

为啥 UDF 不支持 Python 呢

为啥 UDF 不支持 Python 呢? 因为我们有更好更高效的方式 [Byzer-python](虎年知识年货之 Byzer-python 一站式教学指南来啦!),比如下面的两段代码:

代码语言:javascript复制
load binaryFile.`/tmp/cifar10/cifar/train/*.png` as cifar10;

上面这段代码把对象存储里的五万张图片都加载成表。接着我希望把每张图片缩放成 28*28 像素,这个时候用 Python 其实会方便些,因为 Python里有很多成熟的库,比如 OpenCV。 用户可以按如下方式处理:

代码语言:javascript复制
#%python
#%input=raw_cifar10_table
#%output=cifar10_resize
#�che=true
#%schema=st(field(content,binary),field(path,string))
#�taMode=data
#%env=source /opt/miniconda3/bin/activate ray1.8.0

import io,cv2,numpy as np
from pyjava.api.mlsql import RayContext

ray_context = RayContext.connect(globals(),"127.0.0.1:10001")
ray_context.to_dataset().to_dask()
def resize_image(row):
    new_row = {}
    image_bin = row["content"]    
    oriimg = cv2.imdecode(np.frombuffer(io.BytesIO(image_bin).getbuffer(),np.uint8),1)
    newimage = cv2.resize(oriimg,(28,28))
    is_success, buffer = cv2.imencode(".png", newimage)
    io_buf = io.BytesIO(buffer)
    new_row["content"]=io_buf.getvalue()
    new_row["path"]= row["path"]    
    return new_row

ray_context.foreach(resize_image)

这里我们可以看到,我们只要在 Notebook 里写一个 Python 回调函数,然后传递给 ray_context 对象就可以。

最后,把 Python 处理的结果保存回文件系统:

代码语言:javascript复制
select arrayLast(split(path,"/")) as fileName,content  
from cifar10_resize 
as final_dataset;

save overwrite final_dataset as image.`/tmp/size-28x28` 
where imageColumn="content" 
and fileName="fileName";

SQL 和 Python之间实现了无缝衔接。

0 人点赞