Byzer 提供了三种方式让用户自己实现 UDF 从而更好的扩展SQL的能力。
- 动态 UDF. 在 Byzer 中使用 Scala/Java 编写 UDF, 随写随用,无需编译打包发布重启
- 内置 UDF. 使用 Scala/Java 编写 UDF,然后发布成 Jar, 引入 Jar 包后,需要重启
- 使用基于 Hive 开发的 UDF
动态 UDF
动态 UDF的使用最简单,用户可以使用 Byzer 的 register 语句将一段 Scala/Java 代码注册成 UDF.
比如,我们正在开发一个 ETL 脚本,希望获得一个数组的最后一个元素,但发现没有原生内置的函数能够实现这个,这个时候,可以直接用 Byzer Register 语句生成一个 UDF 函数,名称叫 arrayLast, 代码示例如下;
代码语言:javascript复制register ScriptUDF.`` as arrayLast
where lang="scala"
and code='''def apply(a:Seq[String])={
a.last
}'''
and udfType="udf";
select arrayLast(array("a","b")) as lastChar as output;
写完之后,就可以在后续的 select
语句中使用。 运行结果如下:
在上面的示例中,如果用户使用 Scala 编写,那么 udfType 支持 udf/udaf 。UDAF 也很简单,示例如下:
代码语言:javascript复制REGISTER ScriptUDF.`` AS plusFun options
className="SumAggregation"
and code='''
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
class SumAggregation extends UserDefinedAggregateFunction with Serializable{
def inputSchema: StructType = new StructType().add("a", LongType)
def bufferSchema: StructType = new StructType().add("total", LongType)
def dataType: DataType = LongType
def deterministic: Boolean = true
def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer.update(0, 0l)
}
def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
val sum = buffer.getLong(0)
val newitem = input.getLong(0)
buffer.update(0, sum newitem)
}
def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer1.update(0, buffer1.getLong(0) buffer2.getLong(0))
}
def evaluate(buffer: Row): Any = {
buffer.getLong(0)
}
}
'''
and udfType="udaf";
SET data='''
{"a":1}
{"a":1}
{"a":1}
{"a":1}
''';
LOAD jsonStr.`data` AS dataTable;
SELECT a,plusFun(a) AS res FROM dataTable GROUP BY a AS output;
执行结果:
注意: Java 在当前版本只支持 UDF, 不支持 UDAF。
如何构建可复用的 UDF 工具集
对于这些动态编写的 UDF 函数,我们可以将其放在独立的 Byzer notebook 里,然后通过 include 语法引入(注意,该功能需要 Byzer notebook 1.1.0 以及以上版本才支持)
具体做法分成两步。第一步,创建一个 用于存储 UDF 的 Notebook, 比如 Notebook 名字叫 udfs.bznb
:
然后我们填入 arrayLast
函数的代码。接着,新创建一个 Notebook, 比如叫 job.bznb
, 在该 Notebook 里可以通过如下方式引入 arrayLast 函数:
include http.`project.demo.udfs`;
select arrayLast(array("a","b")) as lastChar as output;
结果如下:
在 Byzer Notebook 中,需要在一个 Notebook 里引入另外一个 Notebook,可以通过 Include语法,其中 http 和 project 是固定的。 后面 demo.udfs
则是目录路径,只不过用 .
替换了 /
。
假设 udfs 里有很多函数,不希望把所有的函数都包含进来,那么可以指定 Cell 的 序号 。 比如只包含第一个 cell, 那么可以这么写:
代码语言:javascript复制include http.`project.demo.udfs#1`;
select arrayLast(array("a","b")) as lastChar as output;
期待 Byzer notebook 以后可以支持给 cell 命名
除此之外,还可以将代码放到 git 仓库中,假设用户放到 gitee上,那么可以用如下方式引用:
代码语言:javascript复制include lib.`gitee.com/allwefantasy/lib-core`
where alias="libCore";
include local.`libCore.udf.hello`;
select hello() as name as output;
在这里,我们引用了 lib-core 项目里的一个 hello 函数,然后接着就可以在 select 语法中使用。
结果如下:
内置 UDF 函数
新建一个 Java/Scala 混合项目, 里面创建一个 object 对象,比如叫:
代码语言:javascript复制package tech.mlsql.udfs.custom
import org.apache.spark.sql.UDFRegistration
object MyFunctions {
}
接着添加一个函数 mkString
:
package tech.mlsql.udfs.custom
import org.apache.spark.sql.UDFRegistration
object MyFunctions {
def mkString(uDFRegistration: UDFRegistration) = {
uDFRegistration.register("mkString", (sep: String, co: WrappedArray[String]) => {
co.mkString(sep)
})
}
}
该函数接受一个 UDFRegistration
对象, 然后使用该对象注册真实的 UDF 函数。 register 方法的第一个参数是 UDF 在 SQL 中使用的名字,第二个参数则是一个普通的 Scala 函数。
如果想具体的业务逻辑使用 Java 开发,那么需要单独再写一个 Java 类,在里面实现具体的逻辑,然后在 Scala 函数中调用。
开发完成后,打包这个项目,生成 Jar 包,为了能够让 Byzer 识别到这些 UDF, 需要做三件事:
- 把 Jar 包丢到 Byzer 项目的 jars 目录里去
- 启动时,在启动脚本中添加一个参数
-streaming.udf.clzznames "tech.mlsql.udfs.custom.MyFunctions"
如果有多个,用逗号分割即可。 - 然后重启 Byzer。
目前内置的很多内置的 UDF 函数就是利用这种方式开发的。 参看 streaming.core.compositor.spark.udf.Functions
如何把 Jar 包放到正确的目录里很重要,对于不同的 Byzer 发行版,目录可能有差异。具体如下;
- 分布式 Yarn based 版本,将 Jar 包放到
${SPARK_HOME}/jars
目录即可。 如果是已经运行了,你需要重启 Byzer。 - 分布式 K8s base 版本, 现阶段可能需要重新构建镜像。参考 Byzer build 项目。
- Sandbox 版本,启动容器后,进入容器
/work
目录,然后将 Jar 包放到/work/${SPARK_HOME}/jars
目录即可. 需要重启容器。 - 桌面版本,以 Mac 为例, 将 Jar 包放到
~/.vscode/extensions/allwefantasy.mlsql-0.0.7/dist/mlsql-lang/spark
目录下即可,然后重启 VSCode 即可。 - 命令行版本,则是在发行版根目录下的 libs/ 目录里。
使用基于 Hive 开发的 UDF
首先,按照前面内置函数中说的方式,将基于 Hive 规范的 UDF 函数的 Jar 包放到指定的目录中。
其次,你需要执行特定的指令动态注册:
代码语言:javascript复制CREATE TEMPORARY FUNCTION testUDF AS 'org.apache.hadoop.hive.ql.udf.generic.GenericUDFAbs';
考虑到该指令重启后会失效,用户可以将这些指令放到一个单独的 Notebook里,然后采用 动态 UDF
中介绍的,通过 include 语法在需要使用的地方进行引用即可。
为啥 UDF 不支持 Python 呢
为啥 UDF 不支持 Python 呢? 因为我们有更好更高效的方式 [Byzer-python](虎年知识年货之 Byzer-python 一站式教学指南来啦!),比如下面的两段代码:
代码语言:javascript复制load binaryFile.`/tmp/cifar10/cifar/train/*.png` as cifar10;
上面这段代码把对象存储里的五万张图片都加载成表。接着我希望把每张图片缩放成 28*28 像素,这个时候用 Python 其实会方便些,因为 Python里有很多成熟的库,比如 OpenCV。 用户可以按如下方式处理:
代码语言:javascript复制#%python
#%input=raw_cifar10_table
#%output=cifar10_resize
#�che=true
#%schema=st(field(content,binary),field(path,string))
#�taMode=data
#%env=source /opt/miniconda3/bin/activate ray1.8.0
import io,cv2,numpy as np
from pyjava.api.mlsql import RayContext
ray_context = RayContext.connect(globals(),"127.0.0.1:10001")
ray_context.to_dataset().to_dask()
def resize_image(row):
new_row = {}
image_bin = row["content"]
oriimg = cv2.imdecode(np.frombuffer(io.BytesIO(image_bin).getbuffer(),np.uint8),1)
newimage = cv2.resize(oriimg,(28,28))
is_success, buffer = cv2.imencode(".png", newimage)
io_buf = io.BytesIO(buffer)
new_row["content"]=io_buf.getvalue()
new_row["path"]= row["path"]
return new_row
ray_context.foreach(resize_image)
这里我们可以看到,我们只要在 Notebook 里写一个 Python 回调函数,然后传递给 ray_context 对象就可以。
最后,把 Python 处理的结果保存回文件系统:
代码语言:javascript复制select arrayLast(split(path,"/")) as fileName,content
from cifar10_resize
as final_dataset;
save overwrite final_dataset as image.`/tmp/size-28x28`
where imageColumn="content"
and fileName="fileName";
SQL 和 Python之间实现了无缝衔接。