Hive自定义UDF

2022-11-22 16:23:34 浏览数 (1)

UDF概述

UDF全称:User-Defined Functions,即用户自定义函数,在Hive SQL编译成MapReduce任务时,执行java方法,类似于像MapReduce执行过程中加入一个插件,方便扩展。

UDF种类

  • UDF:操作单个数据行,产生单个数据行;
  • UDAF:操作多个数据行,产生一个数据行;
  • UDTF:操作一个数据行,产生多个数据行一个表作为输出;

自定义UDF步骤

1.编写UDF函数,

  • UDF需要继承org.apache.hadoop.hive.ql.exec.UDF
  • UDTF继承org.apache.hadoop.hive.ql.udf.generic.GenericUDTF
  • UDAF使用比较少,这里先不讲解

2.将写好的类打包为jar,如HiveUDF-1.0.jar,并且上传到Hive机器或者HDFS目录

3.入到Hive shell环境中,输入命令注册该jar文件;

代码语言:javascript复制
add jar /home/hadoop/HiveUDF-1.0.jar

或者

把HiveUDF-1.0.jar上传到hdfs,

代码语言:javascript复制
hadoop fs -put HiveUDF-1.0.jar /home/hadoop/HiveUDF-1.0.jar

再输入命令

代码语言:javascript复制
add jar hdfs://hadoop01:9000/home/hadoop/HiveUDF-1.0.jar

4.为UDF类起一个别名,

代码语言:javascript复制
create temporary function myudf as 'cn.psvmc.udf.MyUDF'

注意,这里UDF只是为这个Hive会话临时定义的;

5.在select中使用myudf()

注册UDF

临时注册

添加jar到hive的classpath

代码语言:javascript复制
add jar HiveUDF-1.0.jar;

注册临时函数

代码语言:javascript复制
create temporary function myudf AS 'cn.psvmc.udf.MyUDF';

删除临时注册函数

代码语言:javascript复制
drop temporary function myudf;

永久注册

代码语言:javascript复制
create function myudf as 'cn.psvmc.udf.MyUDF' using jar 'hdfs://hadoop01:9000/home/hadoop/HiveUDF-1.0.jar';

注意:

永久注册需要将jar包上传到hdfs,否则在集群中运行的时候,会出现找不到jar包的情况!

查看注册的函数

代码语言:javascript复制
show functions;

实例UDF

现在我们实现一个字符串转大写的UDF。

Hive建表测试及数据

代码语言:javascript复制
create table if not exists t_user (
  id int,
  name string
)
clustered by (id) into 2 buckets
row format delimited fields terminated by '|'
stored as orc TBLPROPERTIES('transactional'='true');

复制

向Hive表中插入数据:

代码语言:javascript复制
insert into t_user values(1,'Ba|qz');
insert into t_user values(1,'xa');

项目中添加依赖

代码语言:javascript复制
<dependency>
  <groupId>org.apache.hive</groupId>
  <artifactId>hive-exec</artifactId>
  <version>2.1.1</version>
</dependency>

添加evaluate有两个注意事项:

1)evaluate方法遵循重载的原则,参数是用户自定义的,调用那个方法调用是在使用函数时候的参数决定。

2)evaluate方法必须有返回值,返回类型以及方法参数可以是Java数据或相应的Writable类。

代码语言:javascript复制
package cn.psvmc.udf;

import org.apache.hadoop.hive.ql.exec.UDF;

public class MyUDF extends UDF {
    public String evaluate(String s) {
        if (s == null) {
            return "";
        }
        return s.toUpperCase();
    }
}

上传到Linux目录,然后用add jar来添加路径

代码语言:javascript复制
add jar /home/hadoop/hivetest/HiveUDF-1.0.jar

创建临时函数:

代码语言:javascript复制
create temporary function myudf as "com.master.HiveUDF.MyUDF";

UDF使用

代码语言:javascript复制
select myudf(name) from t_user;

重载示例

代码语言:javascript复制
package cn.psvmc.udf;

import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;

public class TestHiveSimpleUDF extends UDF {
    public IntWritable evaluate(IntWritable i) {
        return new IntWritable(i.get());
    }

    public Text evaluate(Text text) {
        return new Text(text.toString());
    }
}

实例UDTF

initialize方法制定了返回的列名及数据类型(forward写入数据的类型是一个数组,对应着initialize定义的列名),可以返回多个,在List里面对应即可。

函数列名调用的时侯通过:myudtf(c1,c2) t1 as co1,col2来使用列名。

代码语言:javascript复制
package cn.psvmc.udf;

import java.util.ArrayList;

import java.util.List;

import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;

public class MyUDTF extends GenericUDTF {

    @Override
    public StructObjectInspector initialize(ObjectInspector[] argOIs) throws UDFArgumentException {
        List<String> fieldNameList = new ArrayList<>();
        List<ObjectInspector> fieldTypeList = new ArrayList<>();
        fieldNameList.add("col");
        fieldTypeList.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
        return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNameList, fieldTypeList);
    }

    @Override
    public void process(Object[] args) throws HiveException {
        String col = args[0].toString();
        String[] cols = col.split("\|");
        for (String c : cols) {
            String[] results = new String[1];
            results[0] = c;
            forward(results);
        }
    }

    @Override
    public void close() throws HiveException {

    }
}

UDTF使用

代码语言:javascript复制
select myudtf(name) from t_user;

和前面的数据结合,这时候,需要用lateral view来操作,语句如下

代码语言:javascript复制
select t1.id,t2.col from t_user t1 lateral view myudtf(name) t2 as col

0 人点赞