MaxCompute Java版UDF开发
MaxCompute UDF概述
MaxCompute UDF(User Defined Function)即用户自定义函数。
背景信息
广义的UDF定义是自定义标量函数(UDF)、自定义表值函数(UDTF)及自定义聚合函数(UDAF)三种类型的自定义函数的集合。狭义的UDF仅代表用户自定义标量函数。MaxCompute UDF支持的自定义函数类型如下。
自定义函数类型 | 名称 | 应用场景 |
---|---|---|
UDF | User Defined Scalar Function。用户自定义标量函数。 | 适用于一进一出业务场景。即其输入与输出是一对一的关系,读入一行数据,输出一个值。 |
UDTF | User Defined Table Valued Function。用户自定义表值函数,又称表格UDF。 | 适用于一进多出业务场景。即其输入与输出是一对多的关系,读入一行数据,输出多个值可视为一张表。 |
UDAF | User Defined Aggregation Function。用户自定义聚合函数。 | 适用于多进一出业务场景。即其输入与输出是多对一的关系,将多条输入记录聚合成一个输出值。 |
除上述自定义函数外,MaxCompute还提供如下针对特殊场景的能力支撑。
自定义函数类型 | 应用场景 |
---|---|
代码嵌入式UDF | 当需要简化MaxCompute UDF操作步骤,并希望能直接查看代码实现逻辑时,可以直接将Java或Python代码嵌入SQL脚本。 |
SQL语言定义函数 | 当代码中存在很多相似部分时,可以通过SQL自定义函数实现,提高代码复用率的同时还可以简化操作流程。 |
开源地理空间UDF | 支持在MaxCompute中使用Hive地理空间函数分析空间数据。 |
注意事项
使用自定义函数时,需要注意:
- 在性能上,自定义函数的性能低于内建函数,建议您优先使用内建函数实现相同逻辑的业务需求。
- 在SQL语句中使用自定义函数时,如果计算的数据量过大并且存在倾斜,会导致作业占用的内存超出默认分配的内存。此时,您可以在Session级别设置
set odps.sql.udf.joiner.jvm.memory=xxxx;
属性来解决此问题。 - 当自定义函数的名称与内建函数的名称相同时,自定义函数会覆盖同名的内建函数。
开发流程
使用Java代码编写MaxCompute UDF时,开发流程如下。
配置pom依赖
使用Maven编写代码时,需要先在Pom文件中添加代码相关SDK依赖,确保后续编写的代码可编译成功。例如开发自定义函数需要添加的SDK依赖为:
代码语言:javascript复制<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>odps-sdk-udf</artifactId>
<version>0.29.10-public</version>
</dependency>
Jetbrains全家桶1年46,售后保障稳定
编写代码
根据业务需求,编写自定义函数代码。
调试代码
通过本地运行或单元测试方式调试自定义函数,查看运行结果是否符合预期。
编译并导出JAR包
调试自定义函数代码,确保本地运行成功后打包为JAR包。
添加资源
将JAR包作为资源上传至MaxCompute项目。
创建MaxCompute UDF
基于上传的JAR包资源创建自定义函数。
调用MaxCompute UDF
在查询数据代码中调用自定义函数。
使用说明
自定义函数的使用方法如下:
- 在归属MaxCompute项目中使用自定义函数:使用方法与内建函数类似,可以参照内建函数的使用方法使用自定义函数。
- 跨项目使用自定义函数:即在项目A中使用项目B的自定义函数,跨项目分享语句示例:
select B:udf_in_other_project(arg0, arg1) as res from table_t;
。
数据类型
MaxCompute Type | Java Type | Java Writable Type |
---|---|---|
TINYINT | java.lang.Byte | ByteWritable |
SMALLINT | java.lang.Short | ShortWritable |
INT | java.lang.Integer | IntWritable |
BIGINT | java.lang.Long | LongWritable |
FLOAT | java.lang.Float | FloatWritable |
DOUBLE | java.lang.Double | DoubleWritable |
DECIMAL | java.math.BigDecimal | BigDecimalWritable |
BOOLEAN | java.lang.Boolean | BooleanWritable |
STRING | java.lang.String | Text |
VARCHAR | com.aliyun.odps.data.Varchar | VarcharWritable |
BINARY | com.aliyun.odps.data.Binary | BytesWritable |
DATETIME | java.util.Date | DatetimeWritable |
TIMESTAMP | java.sql.Timestamp | TimestampWritable |
INTERVAL_YEAR_MONTH | 不涉及 | IntervalYearMonthWritable |
INTERVAL_DAY_TIME | 不涉及 | IntervalDayTimeWritable |
ARRAY | java.util.List | 不涉及 |
MAP | java.util.Map | 不涉及 |
STRUCT | com.aliyun.odps.data.Struct | 不涉及 |
MaxCompute SDK
MaxCompute提供的SDK信息如下。
SDK名称 | 描述 |
---|---|
odps-sdk-core | 提供操作MaxCompute基本资源的类。 |
odps-sdk-commons | Java Util封装。 |
odps-sdk-udf | UDF功能的主体接口。 |
odps-sdk-mapred | MapReduce API。 |
odps-sdk-graph | Graph API。 |
UDF
UDF概述
MaxCompute支持通过Java、Python语言编写代码创建UDF,扩展MaxCompute的函数能力,满足个性化业务需求。
背景信息
UDF适用于一进一出业务场景。即其输入与输出是一对一的关系,读入一行数据,输出一个值。
Java UDF
UDF代码结构
可以通过IntelliJ IDEA(Maven)或MaxCompute Studio工具使用Java语言编写UDF代码,代码中需要包含如下信息:
- Java包(Package):可选
- 继承UDF类:必选。
- 必需携带的UDF类为com.aliyun.odps.udf.UDF。当您需要使用其他UDF类或者需要用到复杂数据类型时,请根据MaxCompute SDK添加需要的类。例如STRUCT数据类型对应的UDF类为com.aliyun.odps.data.Struct。
-
@Resolve
注解:可选。 格式为@Resolve(<signature>)
,signature
用于定义函数的输入参数和返回值的数据类型。当需要在UDF中使用STRUCT数据类型时,无法基于com.aliyun.odps.data.Struct
反射分析得到Field Name和Field Type,所以需要用@Resolve
注解来辅助获取。即如果需要在UDF中使用STRUCT,请在UDF Class中加上@Resolve
注解,注解只会影响参数或返回值中包含com.aliyun.odps.data.Struct的重载。例如@Resolve("struct<a:string>,string->string")
。 - 自定义Java类:必选。 UDF代码的组织单位,定义了实现业务需求的变量及方法。
-
evaluate
方法:必选。 非静态的Public方法,位于自定义的Java类中。evaluate
方法的输入参数和返回值的数据类型将作为SQL语句中UDF的函数签名Signature(定义UDF的输入与输出数据类型)。 可以在UDF中实现多个evaluate
方法,在调用UDF时,MaxCompute会依据UDF调用的参数类型匹配正确的evaluate
方法。 编写Java UDF时可以使用Java Type或Java Writable Type - UDF初始化或结束代码:可选。可以通过
void setup(ExecutionContext ctx)
和void close()
分别实现UDF初始化和结束。void setup(ExecutionContext ctx)
方法会在evaluate
方法前调用且仅会调用一次,可以用来初始化一些计算所需要的资源或类的成员对象。void close()
方法会在evaluate
方法结束后调用,可以用来执行一些清理工作,例如关闭文件。
UDF代码示例如下。
使用Java Type类型
代码语言:javascript复制//将定义的Java类组织在org.alidata.odps.udf.examples包中。
package org.alidata.odps.udf.examples;
//继承UDF类。
import com.aliyun.odps.udf.UDF;
//自定义Java类。
public final class Lower extends UDF {
//evaluate方法。其中:String标识输入参数的数据类型,return标识返回值。
public String evaluate(String s) {
if (s == null) {
return null;
}
return s.toLowerCase();
}
}
使用Java Writable Type类型
代码语言:javascript复制//将定义的Java类组织在com.aliyun.odps.udf.example包中。
package com.aliyun.odps.udf.example;
//添加Java Writable Type类型必需的类。
import com.aliyun.odps.io.Text;
//继承UDF类。
import com.aliyun.odps.udf.UDF;
//自定义Java类。
public class MyConcat extends UDF {
private Text ret = new Text();
//evaluate方法。其中:Text标识输入参数的数据类型,return标识返回值。
public Text evaluate(Text a, Text b) {
if (a == null || b == null) {
return null;
}
ret.clear();
ret.append(a.getBytes(), 0, a.getLength());
ret.append(b.getBytes(), 0, b.getLength());
return ret;
}
}
注意事项
在编写Java UDF时,您需要注意:
- 不同UDF JAR包中不建议存在类名相同但实现逻辑不一样的类。例如UDF1、UDF2分别对应资源JAR包udf1.jar、udf2.jar,两个JAR包里都包含名称为
com.aliyun.UserFunction.class
的类但实现逻辑不一样,当同一条SQL语句中同时调用UDF1和UDF2时,MaxCompute会随机加载其中一个类,此时会导致UDF执行结果不符合预期甚至编译失败。 - Java UDF中输入或返回值的数据类型是对象,数据类型首字母必须大写,例如String。
- SQL中的NULL值通过Java中的NULL表示。Java Primitive Type无法表示SQL中的NULL值,不允许使用。
Java UDF使用示例
兼容Hive Java UDF示例
注意事项
使用兼容的Hive UDF时,您需要注意:
- 在MaxCompute上使用
add jar
命令添加Hive UDF的资源时,您需要指定所有JAR包,MaxCompute无法自动将所有JAR包加入Classpath。 - 调用Hive UDF时,需要在SQL语句前添加
set odps.sql.hive.compatible=true;
语句,与SQL语句一起提交执行。
Hive UDF代码示例
代码语言:javascript复制package com.aliyun.odps.compiler.hive;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
public class Collect extends GenericUDF {
@Override
public ObjectInspector initialize(ObjectInspector[] objectInspectors) throws UDFArgumentException {
if (objectInspectors.length == 0) {
throw new UDFArgumentException("Collect: input args should >= 1");
}
for (int i = 1; i < objectInspectors.length; i ) {
if (objectInspectors[i] != objectInspectors[0]) {
throw new UDFArgumentException("Collect: input oi should be the same for all args");
}
}
return ObjectInspectorFactory.getStandardListObjectInspector(objectInspectors[0]);
}
@Override
public Object evaluate(DeferredObject[] deferredObjects) throws HiveException {
List<Object> objectList = new ArrayList<>(deferredObjects.length);
for (DeferredObject deferredObject : deferredObjects) {
objectList.add(deferredObject.get());
}
return objectList;
}
@Override
public String getDisplayString(String[] strings) {
return "Collect";
}
}
该UDF代码示例可以将任意类型、数量的参数打包成ARRAY输出。假设Hive UDF对应的JAR包名称为test.jar。
操作步骤
将Hive UDF代码示例通过Hive平台编译为JAR包,执行如下命令将Hive UDF JAR包添加为MaxCompute资源。
代码语言:javascript复制--添加资源。
add jar test.jar;
执行如下命令注册UDF函数。
代码语言:javascript复制--注册函数。
create function hive_collect as 'com.aliyun.odps.compiler.hive.Collect' using 'test.jar';
执行如下SQL语句调用新建的UDF函数。
代码语言:javascript复制--设置MaxCompute项目的模式为Hive兼容模式。
set odps.sql.hive.compatible=true;
--调用UDF函数。
select hive_collect(4y, 5y, 6y);
复杂数据类型示例
UDF代码示例
如下代码中,定义了3个重载的evaluate方法。其中:
- 第一个用ARRAY作为参数,ARRAY对应java.util.List。
- 第二个用MAP作为参数,MAP对应java.util.Map。
- 第三个用STRUCT作为参数,STRUCT对应com.aliyun.odps.data.Struct。
com.aliyun.odps.data.Struct无法通过反射分析获取到field name和field type,需要辅助使用
@Resolve annotation
,即如果您需要在UDF中使用STRUCT,要求在UDF class上也标注上@Resolve
注解,该注解只会影响参数或返回值中包含com.aliyun.odps.data.Struct的重载。
import com.aliyun.odps.data.Struct;
import com.aliyun.odps.udf.UDF;
import com.aliyun.odps.udf.annotation.Resolve;
import java.util.List;
import java.util.Map;
@Resolve("struct<a:string>,string->string")
public class UdfArray extends UDF {
//接收两个参数,第一个参数对应ARRAY复杂数据类型,第二个参数对应要获取的元素的index,目的是要取出位于index位置的元素。
public String evaluate(List<String> vals, Long index) {
return vals.get(index.intValue());
}
//接收两个参数,第一个参数对应MAP复杂数据类型,第二个参数对应要取出的Key,目的是要取出Key对应的值。
public String evaluate(Map<String, String> map, String key) {
return map.get(key);
}
//接收两个参数,第一个参数对应STRUCT复杂数据类型,第二个参数为一个Key值,目的是要取出STRUCT中成员变量a对应的值,并在其后增加Key值以STRING格式返回。
public String evaluate(Struct struct, String key) {
return struct.getFieldValue("a") key;
}
}
使用示例
代码语言:javascript复制select my_index(array('a', 'b', 'c'), 0); --返回a。
select my_index(map('key_a','val_a', 'key_b', 'val_b'), 'key_b'); --返回val_b。
select my_index(named_struct('a', 'hello'), 'world'); --返回hello world。
UDTF
UDTF概述
背景信息
UDTF为用户自定义表值函数,适用于一进多出业务场景。即其输入与输出是一对多的关系,读入一行数据,输出多个值可视为一张表。
使用限制
在select
语句中使用UDTF时,不允许存在其他列或表达式。错误示例如下。
--查询语句中同时携带了UDTF和其他列。
select value, user_udtf(key) as mycol ...
UDTF不能嵌套使用。错误示例如下。
代码语言:javascript复制--user_udtf1嵌套了user_udtf2,不允许嵌套。
select user_udtf1(user_udtf2(key)) as mycol...;
不支持在同一个select
子句中与group by
、distribute by
、sort by
联用。错误示例如下。
--UDTF不能与group by联用。
select user_udtf(key) as mycol ... group by mycol;
Java UDTF
UDTF代码结构
代码中需要包含如下信息:
- Java包(Package):可选。
- 继承UDTF类:必选。
必须携带的UDTF类为
com.aliyun.odps.udf.UDTF
、com.aliyun.odps.udf.annotation.Resolve
(对应@Resolve
注解)和com.aliyun.odps.udf.UDFException
(对应实现Java类的方法)。 - 自定义Java类:必选。 UDTF代码的组织单位,定义了实现业务需求的变量及方法。
-
@Resolve
注解:必选。 格式为@Resolve(<signature>)
。signature
为函数签名,用于定义函数的输入参数和返回值的数据类型。UDTF无法通过反射分析获取函数签名,只能通过@Resolve
注解方式获取函数签名,例如@Resolve("smallint->varchar(10)")
。 - 实现Java类的方法:必选。
Java类实现包含如下4个方法,可以根据实际需要进行选择。
接口定义 描述
public void setup(ExecutionContext ctx) throws UDFException
初始化方法,在UDTF处理输入的数据前,MaxCompute会调用用户自定义的初始化行为。在每个Worker内setup
会被先调用一次。public void process(Object[] args) throws UDFException
SQL中每一条记录都会对应调用一次process
,process
的参数为SQL语句中指定的UDTF输入参数。输入参数以Object[]
的形式传入,输出结果通过forward
函数输出。您需要在process
函数内自行调用forward
,以决定输出数据。public void close() throws UDFException
UDTF的结束方法。只会被调用一次,即在处理完最后一条记录之后被调用。public void forward(Object …o) throws UDFException
调用forward
方法输出数据,每调用一次forward
代表输出一条记录。在SQL查询语句中调用UDTF时,可以通过as
子句将forward
输出的结果进行重命名。
UDTF代码示例如下。
代码语言:javascript复制//将定义的Java类组织在org.alidata.odps.udtf.examples包中。
package org.alidata.odps.udtf.examples;
//继承UDTF类。
import com.aliyun.odps.udf.UDTF;
import com.aliyun.odps.udf.UDTFCollector;
import com.aliyun.odps.udf.annotation.Resolve;
import com.aliyun.odps.udf.UDFException;
//自定义Java类。
//@Resolve注解。
@Resolve("string,bigint->string,bigint")
public class MyUDTF extends UDTF {
//实现Java类的方法。
@Override
public void process(Object[] args) throws UDFException {
String a = (String) args[0];
Long b = (Long) args[1];
for (String t: a.split("\s ")) {
forward(t, b);
}
}
}
@Resolve注解
@Resolve
注解格式如下。
@Resolve(<signature>)
signature
为函数签名字符串,用于标识输入参数和返回值的数据类型。执行UDTF时,UDTF函数的输入参数和返回值类型要与函数签名指定的类型一致。查询语义解析阶段会检查不符合函数签名定义的用法,检查到类型不匹配时会报错。具体格式如下。
'arg_type_list -> type_list'
其中:
-
type_list
:表示返回值的数据类型。UDTF可以返回多列。支持的数据类型为:BIGINT、STRING、DOUBLE、BOOLEAN、DATETIME、DECIMAL、FLOAT、BINARY、DATE、DECIMAL(precision,scale)、复杂数据类型(ARRAY、MAP、STRUCT)或复杂数据类型嵌套。 -
arg_type_list
:表示输入参数的数据类型。输入参数可以为多个,用英文逗号(,)分隔。支持的数据类型为BIGINT、STRING、DOUBLE、BOOLEAN、DATETIME、DECIMAL、FLOAT、BINARY、DATE、DECIMAL(precision,scale)、CHAR、VARCHAR、复杂数据类型(ARRAY、MAP、STRUCT)或复杂数据类型嵌套。arg_type_list
还支持星号(*)或为空(‘’):- 当
arg_type_list
为星号(*)时,表示输入参数为任意个数。 - 当
arg_type_list
为空(‘’)时,表示无输入参数。
- 当
合法@Resolve
注解示例如下。
@Resolve注解示例 | 说明 |
---|---|
@Resolve('bigint,boolean->string,datetime') | 输入参数类型为BIGINT、BOOLEAN,返回值类型为STRING、DATETIME。 |
@Resolve('*->string, datetime') | 输入任意个参数,返回值类型为STRING、DATETIME。 |
@Resolve('->double, bigint, string') | 无输入参数,返回值类型为DOUBLE、BIGINT、STRING。 |
@Resolve("array<string>,struct<a1:bigint,b1:string>,string->map<string,bigint>,struct<b1:bigint>") | 输入参数类型为ARRAY、STRUCT、MAP,返回值类型为MAP、STRUCT。 |
使用示例
UDTF查询示例如下:
代码语言:javascript复制select user_udtf(col0, col1) as (c0, c1) from my_table;
Java UDTF读取MaxCompute资源示例
UDTF代码示例
代码语言:javascript复制package com.aliyun.odps.examples.udf;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.Iterator;
import com.aliyun.odps.udf.ExecutionContext;
import com.aliyun.odps.udf.UDFException;
import com.aliyun.odps.udf.UDTF;
import com.aliyun.odps.udf.annotation.Resolve;
/** * project: example_project * table: wc_in2 * partitions: p2=1,p1=2 * columns: colc,colb */
@Resolve("string,string->string,bigint,string")
public class UDTFResource extends UDTF {
ExecutionContext ctx;
long fileResourceLineCount;
long tableResource1RecordCount;
long tableResource2RecordCount;
@Override
public void setup(ExecutionContext ctx) throws UDFException {
this.ctx = ctx;
try {
InputStream in = ctx.readResourceFileAsStream("file_resource.txt");
BufferedReader br = new BufferedReader(new InputStreamReader(in));
String line;
fileResourceLineCount = 0;
while ((line = br.readLine()) != null) {
fileResourceLineCount ;
}
br.close();
Iterator<Object[]> iterator = ctx.readResourceTable("table_resource1").iterator();
tableResource1RecordCount = 0;
while (iterator.hasNext()) {
tableResource1RecordCount ;
iterator.next();
}
iterator = ctx.readResourceTable("table_resource2").iterator();
tableResource2RecordCount = 0;
while (iterator.hasNext()) {
tableResource2RecordCount ;
iterator.next();
}
} catch (IOException e) {
throw new UDFException(e);
}
}
@Override
public void process(Object[] args) throws UDFException {
String a = (String) args[0];
long b = args[1] == null ? 0 : ((String) args[1]).length();
forward(a, b, "fileResourceLineCount=" fileResourceLineCount "|tableResource1RecordCount="
tableResource1RecordCount "|tableResource2RecordCount=" tableResource2RecordCount);
}
}
SQL代码示例如下。
代码语言:javascript复制select my_udtf("10","20") as (a, b, fileResourceLineCount);
UDAF
UDAF概述
UDAF为用户自定义聚合函数,适用于多进一出业务场景。即其输入与输出是多对一的关系,将多条输入记录聚合成一个输出值。
Java UDAF
UDAF代码结构
代码中需要包含如下信息:
Java包(Package):可选。
继承UDAF类:必选。
必需携带的UDAF类为import com.aliyun.odps.udf.Aggregator
和com.aliyun.odps.udf.annotation.Resolve
(对应@Resolve
注解)。com.aliyun.odps.udf.UDFException
(可选,对应实现Java类初始化和结束的方法)。
@Resolve
注解:必选。
格式为@Resolve(<signature>)
。signature
为函数签名,用于定义函数的输入参数和返回值的数据类型。UDAF无法通过反射分析获取函数签名,只能通过@Resolve
注解方式获取函数签名,例如@Resolve("smallint->varchar(10)")
。
自定义Java类:必选。
UDAF代码的组织单位,定义了实现业务需求的变量及方法。
实现Java类的方法:必选。
实现Java类需要继承com.aliyun.odps.udf.Aggregator
类并实现如下方法。
import com.aliyun.odps.udf.ContextFunction;
import com.aliyun.odps.udf.ExecutionContext;
import com.aliyun.odps.udf.UDFException;
public abstract class Aggregator implements ContextFunction {
//初始化方法。
@Override
public void setup(ExecutionContext ctx) throws UDFException {
}
//结束方法。
@Override
public void close() throws UDFException {
}
//创建聚合Buffer。
abstract public Writable newBuffer();
//iterate方法。
//buffer为聚合buffer,是指一个阶段性的汇总数据,即在不同的Map任务中,group by后得出的数据(可理解为一个集合),每行执行一次。
//Writable[]表示一行数据,在代码中指代传入的列。例如writable[0]表示第一列,writable[1]表示第二列。
//args为SQL中调用UDAF时指定的参数,不能为NULL,但是args里面的元素可以为NULL,代表对应的输入数据是NULL。
abstract public void iterate(Writable buffer, Writable[] args) throws UDFException;
//terminate方法。
abstract public Writable terminate(Writable buffer) throws UDFException;
//merge方法。
abstract public void merge(Writable buffer, Writable partial) throws UDFException;
}
其中:iterate
、merge
和terminate
是最重要的三个方法,UDAF的主要逻辑依赖于这三个方法的实现。此外,还需要您实现自定义的Writable buffer。
Writable buffer将内存中的对象转换成字节序列(或其他数据传输协议)以便于储存到磁盘(持久化)和网络传输。因为MaxCompute使用分布式计算的方式来处理聚合函数,因此需要知道如何序列化和反序列化数据,以便于数据在不同的设备之间进行传输。
UDAF代码示例如下。
代码语言:javascript复制//将定义的Java类组织在org.alidata.odps.udaf.examples包中。
package org.alidata.odps.udaf.examples;
//继承UDAF类。
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import com.aliyun.odps.io.DoubleWritable;
import com.aliyun.odps.io.Writable;
import com.aliyun.odps.udf.Aggregator;
import com.aliyun.odps.udf.UDFException;
import com.aliyun.odps.udf.annotation.Resolve;
//自定义Java类。
//@Resolve注解。
@Resolve("double->double")
public class AggrAvg extends Aggregator {
//实现Java类的方法。
private static class AvgBuffer implements Writable {
private double sum = 0;
private long count = 0;
@Override
public void write(DataOutput out) throws IOException {
out.writeDouble(sum);
out.writeLong(count);
}
@Override
public void readFields(DataInput in) throws IOException {
sum = in.readDouble();
count = in.readLong();
}
}
private DoubleWritable ret = new DoubleWritable();
@Override
public Writable newBuffer() {
return new AvgBuffer();
}
@Override
public void iterate(Writable buffer, Writable[] args) throws UDFException {
DoubleWritable arg = (DoubleWritable) args[0];
AvgBuffer buf = (AvgBuffer) buffer;
if (arg != null) {
buf.count = 1;
buf.sum = arg.get();
}
}
@Override
public Writable terminate(Writable buffer) throws UDFException {
AvgBuffer buf = (AvgBuffer) buffer;
if (buf.count == 0) {
ret.set(0);
} else {
ret.set(buf.sum / buf.count);
}
return ret;
}
@Override
public void merge(Writable buffer, Writable partial) throws UDFException {
AvgBuffer buf = (AvgBuffer) buffer;
AvgBuffer p = (AvgBuffer) partial;
buf.sum = p.sum;
buf.count = p.count;
}
}
@Resolve注解
@Resolve
注解格式如下。
@Resolve(<signature>)
signature
为字符串,用于标识输入参数和返回值的数据类型。执行UDAF时,UDAF函数的输入参数和返回值类型要与函数签名指定的类型一致。查询语义解析阶段会检查不符合函数签名定义的用法,检查到类型不匹配时会报错。具体格式如下。
'arg_type_list -> type'
其中:
-
arg_type_list
:表示输入参数的数据类型。输入参数可以为多个,用英文逗号(,)分隔。支持的数据类型为BIGINT、STRING、DOUBLE、BOOLEAN、DATETIME、DECIMAL、FLOAT、BINARY、DATE、DECIMAL(precision,scale)、CHAR、VARCHAR、复杂数据类型(ARRAY、MAP、STRUCT)或复杂数据类型嵌套。arg_type_list
还支持星号(*)或为空(‘’):- 当
arg_type_list
为星号(*)时,表示输入参数为任意个数。 - 当
arg_type_list
为空(‘’)时,表示无输入参数。
- 当
-
type
:表示返回值的数据类型。UDAF只返回一列。支持的数据类型为:BIGINT、STRING、DOUBLE、BOOLEAN、DATETIME、DECIMAL、FLOAT、BINARY、DATE、DECIMAL(precision,scale)、复杂数据类型(ARRAY、MAP、STRUCT)或复杂数据类型嵌套。
合法@Resolve
注解示例如下。
@Resolve注解示例 | 说明 |
---|---|
@Resolve('bigint,double->string') | 输入参数类型为BIGINT、DOUBLE,返回值类型为STRING。 |
@Resolve('*->string') | 输入任意个参数,返回值类型为STRING。 |
@Resolve('->double') | 无输入参数,返回值类型为DOUBLE。 |
@Resolve('array<bigint>->struct<x:string, y:int>') | 输入参数类型为ARRAY,返回值类型为STRUCT<x:STRING, y:INT>。 |
使用示例
以通过MaxCompute Studio开发计算平均值的UDAF函数AggrAvg
为例,实现逻辑如下。
- 输入数据分片:MaxCompute会按照MapReduce处理流程对输入数据按照一定的大小进行分片,每片的大小适合一个Worker在适当的时间内完成。
分片大小需要您通过
odps.stage.mapper.split.size
参数进行配置。 - 计算平均值第一阶段:每个Worker统计分片内数据的个数及汇总值。您可以将每个分片内的数据个数及汇总值视为一个中间结果。
- 计算平均值第二阶段:汇总第一阶段中每个分片内的信息。
- 最终输出:
r.sum/r.count
即是所有输入数据的平均值。
代码嵌入式UDF
功能介绍
代码嵌入式UDF支持将Java或Python代码嵌入SQL脚本。Janino-compiler编译器会识别并提取嵌入的代码,完成代码编译(Java)、动态生成资源和创建临时函数操作。
代码嵌入式UDF允许您将SQL脚本和第三方代码放入同一个源码文件,减少使用UDT或UDF的操作步骤,方便日常开发。
使用限制
嵌入式Java代码使用Janino-compiler编译器进行编译,且支持的Java语法只是标准Java JDK的一个子集。嵌入式Java代码使用限制包含但不限于以下内容:
- 不支持Lambda表达式。
- 不支持Catch多种Exception类型。例如
catch(Exception1 | Exception2 e)
。 - 不支持自动推导泛型。例如
Map map = new HashMap<>();
。 - 类型参数的推导会被忽略,必须显示Cast。例如
(String) myMap.get(key)
。 - Assert会强制开启,不受JVM的**-ea**参数控制。
- 不支持Java 8以上(不包含Java 8)版本的语言功能。
UDT引用嵌入式代码
代码语言:javascript复制SELECT
s,
com.mypackage.Foo.extractNumber(s)
FROM VALUES ('abc123def'),('apple') AS t(s);
#CODE ('lang'='JAVA')
package com.mypackage;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class Foo {
final static Pattern compile = Pattern.compile(".*?([0-9] ).*");
public static String extractNumber(String input) {
final Matcher m = compile.matcher(input);
if (m.find()) {
return m.group(1);
}
return null;
}
}
#END CODE;
#CODE
和#END CODE
:表示嵌入式代码的开始和结束位置。位于脚本末尾的嵌入式代码块作用域为整个脚本。‘lang’=’JAVA’
:表示嵌入式代码为Java代码。还支持PYTHON
。- 在SQL脚本里可以使用UDT语法直接调用
Foo.extractNumber
。
Java代码嵌入式UDF
代码语言:javascript复制CREATE TEMPORARY FUNCTION foo AS 'com.mypackage.Reverse' USING
#CODE ('lang'='JAVA')
package com.mypackage;
import com.aliyun.odps.udf.UDF;
public class Reverse extends UDF {
public String evaluate(String input) {
if (input == null) return null;
StringBuilder ret = new StringBuilder();
for (int i = input.toCharArray().length - 1; i >= 0; i--) {
ret.append(input.toCharArray()[i]);
}
return ret.toString();
}
}
#END CODE;
SELECT foo('abdc');
- 嵌入式代码块可以置于
USING
后或脚本末尾,置于USING
后的代码块作用域仅为CREATE TEMPORARY FUNCTION
语句。 CREATE TEMPORARY FUNCTION
创建的函数为临时函数,仅在本次执行生效,不会存入MaxCompute的Meta系统。
Java代码嵌入式UDTF
代码语言:javascript复制CREATE TEMPORARY FUNCTION foo AS 'com.mypackage.Reverse' USING
#CODE ('lang'='JAVA', 'filename'='embedded.jar')
package com.mypackage;
import com.aliyun.odps.udf.UDTF;
import com.aliyun.odps.udf.UDFException;
import com.aliyun.odps.udf.annotation.Resolve;
@Resolve({
"string->string,string"})
public class Reverse extends UDTF {
@Override
public void process(Object[] objects) throws UDFException {
String str = (String) objects[0];
String[] split = str.split(",");
forward(split[0], split[1]);
}
}
#END CODE;
SELECT foo('ab,dc') AS (a,b);
由于@Resolve
返回值要求为string[]
,但Janino-compiler编译器无法将"string->string,string"
识别为string[]
,@Resolve
注解的参数需要加大括号({}
),为嵌入式代码特有内容。用普通方式创建Java UDTF时可省略大括号({}
)。
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
发布者:全栈程序员栈长,转载请注明出处:https://javaforall.cn/234503.html原文链接:https://javaforall.cn