Hive:创建UDF
默认分类
2020-01-14
513
0
Hive的内置函数有可能无法满足你的查询需求,这时候就需要自定义函数来处理数据.本文介绍UDF类别,并创建一个简单的UDF示例.
三种UDF类型
UDF (user-defined function) : 作用于单个数据行,且产生一个数据行作为输出.大多数函数(例如数学函数和字符串函数)都属于这一类. UDAF(user-defined aggregate function): 接受多个输入数据行,并产生一个输出数据行.像COUNT和MAX这昂的函数都是聚集函数 UDTF(user-defined table-generating function):操作作用于单个数据行,且产生多个数据行(一个表)作为输出.
一个UDF必须满足下面两个条件:
- 一个UDF必须是 org.apache.hadoop.hive.ql.exec.UDF的子类
- 一个UDF必须至少实现了evaluate()方法
创建简单UDF
代码
package com.bihell.hive.udf; import org.apache.hadoop.hive.ql.exec.UDF; import org.apache.hadoop.io.Text; public final class Lower extends UDF { public Text evaluate(final Text s) { if (s == null) { return null; } return new Text(s.toString().toLowerCase()); } }
创建Schema
CREATE TABLE IF NOT EXISTS authors( name STRING, book STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';
导入数据
LOAD DATA LOCAL INPATH ‘authors.txt’ INTO TABLE authors;
加载Jar包
add jar my_lower.jar
创建函数
CREATE TEMPORARY FUNCTION my_lower AS ‘com.bihell.hive.udf.Lower’;
运行函数
select my_lower(name) from authors;
UDAF示例
/**
* Created by master on 2016/4/27.
*/
import org.apache.hadoop.hive.ql.exec.UDAF;
import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;
import org.apache.hadoop.io.IntWritable;
public class Maximum extends UDAF {
public static class MaximumIntUDAFEvaluator implements UDAFEvaluator {
private IntWritable result;
public void init() {
System.err.printf("%s %s\n", hashCode(), "init");
result = null;
}
public boolean iterate(IntWritable value) {
System.err.printf("%s %s %s\n", hashCode(), "iterate", value);
if (value == null) {
return true;
}
if (result == null) {
result = new IntWritable(value.get());
} else {
result.set(Math.max(result.get(), value.get()));
}
return true;
}
public IntWritable terminatePartial() {
System.err.printf("%s %s\n", hashCode(), "terminatePartial");
return result;
}
public boolean merge(IntWritable other) {
System.err.printf("%s %s %s\n", hashCode(), "merge", other);
return iterate(other);
}
public IntWritable terminate() {
System.err.printf("%s %s\n", hashCode(), "terminate");
return result;
}
}
}
UDAF 必须是org.apache.hadoop.hive.ql.exec.UDAF的子类,且包含一个或多个嵌套的,实现了org.apache.hadoop.hive.ql.exec.UDAFEvaluator的静态类.在这个示例中只有一个嵌套类.MaximumIntUDAFEvaluator.我们也可以添加更多的计算函数(如MaximunLongUDAFEvaluator和MaximumFloatUDAFEvaluator)来提供长整型、浮点型等类型数的最大值的UDAF的重载。
必须实现下面5个方法:
- init()方法 负责初始化计算函数并重设它的内部状态. 在MaximumIntUDAFevaluator中我们把最终结果的IntWritable对象设为null. 我们使用null来表示目前还没有对很合值进行聚集计算,这和对空集NULL计算最大值有的结果是一致的.
- iterate() 方法 每次对一个新值进行聚集计算时都会调用iterate() 方法. 计算函数要根据聚集计算的结果更新其内部状态.iterate()接受的参数和Hive中被调用的函数的参数是对应的.在这个示例中,只有一个参数.方法首先检测参数值是否为null,如果是,则将其忽略.否则,resul变量实例就被设为value的整数值(如果这是方法第一次接受输入),或设为当前值和value值中的较大值(如果已经接受一些值).如果输入值合法我们就放方法返回ture.
- terminatePartial() 方法 Hive需要部分聚集结果时会调用terminatePartial()方法.这个方法必须返回一个封装了聚集计算当前状态的对象.在这里,因为只需要对已知的最大值或在没有值时的控制null进行封装,所以使用一个IntWritable即可.
- merge() 方法 在Hive决定要合并一个部分聚集值和另一个部分聚集值会调用merge()方法.该方法接受一个对象作为输入.这个对象类型必须和terminatePartial()方法的返回类型一致.在示例里merge()方法可以直接使用iterate()方法,因为部分结果的聚集和原始值的聚集的表达方法是相同的.但一般情况下不能这样做,这个方法实现的逻辑会合并计算函数和部分聚集的状态.
- terminate() 方法 Hive需要最终聚集结果时会调用terminate()方法,计算函数需要把状态作为一个值返回.在这里,我们返回实例变量result
执行我们的函数
create temporary function maximum as ‘com.bihell.hive.Maximum’; select maxinum(temperature) from records;
创建永久函数
创建临时函数每次导入声明比较麻烦,现在让我们创建永久函数
1.把JAR文件拷贝到HDFS并且确保hive用户可以访问这个JAR文件。 2.把JAR文件拷贝到HiveServer2运行的主机上,放到一个hive用户可以读取,写入和执行的目录里面(比如/opt/local/hive/lib/)。 3.进入Cloudera Manager管理控制台,选择Hive服务。 4.点击Configuration标签。 5.展开Service-Wide > Advanced目录。 6.配置Hive Auxiliary JARs Directory属性,值为步骤2的/opt/local/hive/lib/路径。这个这个属性会覆盖hive.aux.jars.path,即使它在HiveServer2 advanced 配置里面也一样会被覆盖。 7.点击Save ChangesJAR文件就被添加到HIVE_AUX_JARS_PATH
变量里面了。 8.推送配置文件 a.在Cloudera Manager管理控制台,进入Hive服务。 b.在Actions菜单选择Deploy Client Configuration。 c.点击Deploy Client Configuration。 9.重启Hive服务,如果Hive Auxiliary JARs Directory属性设置了,但是实际目录不存在,HiveServer2会自动失败。 10.如果启用了Sentry,需要给role赋予特权。以hive用户使用Beeline并且用过Hive SQL GRANT语句做以下操作:
GRANT ALL ON URI 'file:///opt/local/hive/lib/my.jar' TO ROLE EXAMPLE_ROLE
HDFS上面的JAR文件也需要授权
GRANT ALL ON URI 'hdfs:///path/to/jar' TO ROLE EXAMPLE_ROLE
11.运行CREATE FUNCTION来创建函数
CREATE FUNCTION addfunc AS 'com.example.hiveserver2.udf.add' USING JAR 'hdfs:///path/to/jar'
参考
http://mvnrepository.com/ Using the CDH 5 Maven Repository Hadoop- The Definitive Guide, 4th Edition User-Defined Functions (UDFs) with HiveServer2 Using Cloudera Manager