介绍
hive 的用户自定义聚合函数 (UDAF) 是一个很好的功能, 集成了先进的数据处理. hive 有两种 UDAF: 简单和通用. 顾名思义, 简单的 UDAF, 写的相当简单的, 但因为使用 Java 反射导致性能损失, 而且有些特性不能使用, 如可变长度参数列表. 通用 UDAF 可以使用?? 所有功能, 但是 UDAF 就写的比较复杂, 不直观.
本文只介绍通用 UDAF.
UDAF 是需要在 hive 的 sql 语句和 group by 联合使用, hive 的 group by 对于每个分组, 只能返回一条记录, 这点和 MySQL 不一样, 切记.
UDAF 开发概览
开发通用 UDAF 有两个步骤, 第一个是编写 resolver 类, 第二个是编写 evaluator 类. resolver 负责类型检查, 操作符重载. evaluator 真正实现 UDAF 的逻辑. 通常来说, 顶层 UDAF 类继承 org.apache.hadoop.hive.ql.udf.GenericUDAFResolver2, 里面编写嵌套类 evaluator 实现 UDAF 的逻辑.
本文以 Hive 的内置 UDAF sum 函数的源代码作为示例讲解.
实现 resolver
resolver 通常继承 org.apache.hadoop.hive.ql.udf.GenericUDAFResolver2, 但是我们更建议继承 AbstractGenericUDAFResolver, 隔离将来 hive 接口的变化.
GenericUDAFResolver 和 GenericUDAFResolver2 接口的区别是, 后面的允许 evaluator 实现可以访问更多的信息, 例如 DISTINCT 限定符, 通配符 FUNCTION(*).
- public class GenericUDAFSum extends AbstractGenericUDAFResolver {
- static final Log LOG = LogFactory.getLog(GenericUDAFSum.class.getName());
- @Override
- public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters)
- throws SemanticException {
- // Type-checking goes here!
- return new GenericUDAFSumLong();
- }
- public static class GenericUDAFSumLong extends GenericUDAFEvaluator {
- // UDAF logic goes here!
- }
- }
这个就是 UDAF 的代码骨架, 第一行创建 LOG 对象, 用来写入警告和错误到 hive 的 log.GenericUDAFResolver 只需要重写一个方法: getEvaluator, 它根据 SQL 传入的参数类型, 返回正确的 evaluator. 这里最主要是实现操作符的重载.
getEvaluator 的完整代码如下:
- public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters)
- throws SemanticException {
- if (parameters.length != 1) {
- throw new UDFArgumentTypeException(parameters.length - 1,
- "Exactly one argument is expected.");
- }
- if (parameters[0].getCategory() != ObjectInspector.Category.PRIMITIVE) {
- throw new UDFArgumentTypeException(0,
- "Only primitive type arguments are accepted but"
- + parameters[0].getTypeName() + "is passed.");
- }
- switch (((PrimitiveTypeInfo) parameters[0]).getPrimitiveCategory()) {
- case BYTE:
- case SHORT:
- case INT:
- case LONG:
- case TIMESTAMP:
- return new GenericUDAFSumLong();
- case FLOAT:
- case DOUBLE:
- case STRING:
- return new GenericUDAFSumDouble();
- case BOOLEAN:
- default:
- throw new UDFArgumentTypeException(0,
- "Only numeric or string type arguments are accepted but"
- + parameters[0].getTypeName() + "is passed.");
- }
这里做了类型检查, 如果不是原生类型(即符合类型, array,map 此类), 则抛出异常, 还实现了操作符重载, 对于整数类型, 使用 GenericUDAFSumLong 实现 UDAF 的逻辑, 对于浮点类型, 使用 GenericUDAFSumDouble 实现 UDAF 的逻辑.
实现 evaluator
所有 evaluators 必须继承抽象类 org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator. 子类必须实现它的一些抽象方法, 实现 UDAF 的逻辑.
GenericUDAFEvaluator 有一个嵌套类 Mode, 这个类很重要, 它表示了 udaf 在 mapreduce 的各个阶段, 理解 Mode 的含义, 就可以理解了 hive 的 UDAF 的运行流程.
- public static enum Mode {
- /**
- * PARTIAL1: 这个是 mapreduce 的 map 阶段: 从原始数据到部分数据聚合
- * 将会调用 iterate()和 terminatePartial()
- */
- PARTIAL1,
- /**
- * PARTIAL2: 这个是 mapreduce 的 map 端的 Combiner 阶段, 负责在 map 端合并 map 的数据:: 从部分数据聚合到部分数据聚合:
- * 将会调用 merge() 和 terminatePartial()
- */
- PARTIAL2,
- /**
- * FINAL: mapreduce 的 reduce 阶段: 从部分数据的聚合到完全聚合
- * 将会调用 merge()和 terminate()
- */
- FINAL,
- /**
- * COMPLETE: 如果出现了这个阶段, 表示 mapreduce 只有 map, 没有 reduce, 所以 map 端就直接出结果了: 从原始数据直接到完全聚合
- * 将会调用 iterate()和 terminate()
- */
- COMPLETE
- };
一般情况下, 完整的 UDAF 逻辑是一个 mapreduce 过程, 如果有 mapper 和 reducer, 就会经历 PARTIAL1(mapper),FINAL(reducer), 如果还有 combiner, 那就会经历 PARTIAL1(mapper),PARTIAL2(combiner),FINAL(reducer).
而有一些情况下的 mapreduce, 只有 mapper, 而没有 reducer, 所以就会只有 COMPLETE 阶段, 这个阶段直接输入原始数据, 出结果.
下面以 GenericUDAFSumLong 的 evaluator 实现讲解
- public static class GenericUDAFSumLong extends GenericUDAFEvaluator {
- private PrimitiveObjectInspector inputOI;
- private LongWritable result;
- // 这个方法返回了 UDAF 的返回类型, 这里确定了 sum 自定义函数的返回类型是 Long 类型
- @Override
- public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {
- assert (parameters.length == 1);
- super.init(m, parameters);
- result = new LongWritable(0);
- inputOI = (PrimitiveObjectInspector) parameters[0];
- return PrimitiveObjectInspectorFactory.writableLongObjectInspector;
- }
- /** 存储 sum 的值的类 */
- static class SumLongAgg implements AggregationBuffer {
- boolean empty;
- long sum;
- }
- // 创建新的聚合计算的需要的内存, 用来存储 mapper,combiner,reducer 运算过程中的相加总和.
- @Override
- public AggregationBuffer getNewAggregationBuffer() throws HiveException {
- SumLongAgg result = new SumLongAgg();
- reset(result);
- return result;
- }
- //mapreduce 支持 mapper 和 reducer 的重用, 所以为了兼容, 也需要做内存的重用.
- @Override
- public void reset(AggregationBuffer agg) throws HiveException {
- SumLongAgg myagg = (SumLongAgg) agg;
- myagg.empty = true;
- myagg.sum = 0;
- }
- private boolean warned = false;
- //map 阶段调用, 只要把保存当前和的对象 agg, 再加上输入的参数, 就可以了.
- @Override
- public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {
- assert (parameters.length == 1);
- try {
- merge(agg, parameters[0]);
- } catch (NumberFormatException e) {
- if (!warned) {
- warned = true;
- LOG.warn(getClass().getSimpleName() + " "
- + StringUtils.stringifyException(e));
- }
- }
- }
- //mapper 结束要返回的结果, 还有 combiner 结束返回的结果
- @Override
- public Object terminatePartial(AggregationBuffer agg) throws HiveException {
- return terminate(agg);
- }
- //combiner 合并 map 返回的结果, 还有 reducer 合并 mapper 或 combiner 返回的结果.
- @Override
- public void merge(AggregationBuffer agg, Object partial) throws HiveException {
- if (partial != null) {
- SumLongAgg myagg = (SumLongAgg) agg;
- myagg.sum += PrimitiveObjectInspectorUtils.getLong(partial, inputOI);
- myagg.empty = false;
- }
- }
- //reducer 返回结果, 或者是只有 mapper, 没有 reducer 时, 在 mapper 端返回结果.
- @Override
- public Object terminate(AggregationBuffer agg) throws HiveException {
- SumLongAgg myagg = (SumLongAgg) agg;
- if (myagg.empty) {
- return null;
- }
- result.set(myagg.sum);
- return result;
- }
- }
除了 GenericUDAFSumLong, 还有重载的 GenericUDAFSumDouble, 以上代码都在 hive 的源码: org.apache.hadoop.hive.ql.udf.generic.GenericUDAFSum.
修改方法注册
修改 ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java 文件, 加入编写的 UDAF 类, 并注册名字.
FunctionRegistry 类包含了 hive 的所有内置自定义函数. 想要更好学习 hive 的 UDAF, 建议多看看里面的 UDAF.
总结
本文的目的是为初学者入门学习 udaf, 所以介绍了 udaf 的概览, 尤其是 udaf 的运行过程, 这对初学者是比较大的槛.
考虑入门, 本文简单介绍了 sum 的 UDAF 实现, 但是如果想要更好理解 UDAF 的运行过程, 建议再看看 avg UDAF:org.apache.hadoop.hive.ql.udf.generic.GenericUDAFAverage.avg UDAF 对 hive 的运行流程要控制的更加精细, 并判断当前运行的 Mode 做一定的逻辑处理.
参考
来源: http://www.bubuko.com/infodetail-3101373.html