一、UDF的分类
UDF类型 | 描述 |
UDF(User Defined Scalar Function) | 用户自定义标量值函数。其输入与输出是一对一的关系,即读入一行数据,输出一个值。 |
UDTF(User Defined Table Valued Function) | 自定义表值函数。用于解决调用一次函数输出多行数据的需求。UDTF是唯一能够返回多个字段的自定义函数。UDTF不等于UDT(User Defined Type)。 |
UDAF(User Defined Aggregation Function) | 自定义聚合函数。其输入与输出是多对一的关系,即将多条输入记录聚合成一个输出值。UDAF可以与SQL中的GROUP BY语句联用。具体语法请参见 聚合函数 。 |
二、UDF参数解析
MaxCompute数据类型与Java数据类型的对应关系如下。
注意点:
- 此处ARRAY类型对应的Java类型是List,而不是数组。
- VARCHAR,BINART,STRUCT一些数据类型是ODPS独有的
- Java中对应的数据类型以及返回值数据类型是对象,数据类型首字母需大写。
MaxCompute Type | Java Type |
TINYINT | java.lang.Byte |
SMALLINT | java.lang.Short |
INT | java.lang.Integer |
BIGINT | java.lang.Long |
FLOAT | java.lang.Float |
DOUBLE | java.lang.Double |
DECIMAL | java.math.BigDecimal |
BOOLEAN | java.lang.Boolean |
STRING | java.lang.String |
VARCHAR | com.aliyun.odps.data.Varchar |
BINARY | com.aliyun.odps.data.Binary |
DATETIME | java.util.Date |
TIMESTAMP | java.sql.Timestamp |
ARRAY | java.util.List |
MAP | java.util.Map |
STRUCT | com.aliyun.odps.data.Struct |
MaxCompute 2.0版本支持定义Java UDF时,使用Writable类型作为参数和返回值。MaxCompute数据类型和Java Writable类型的映射关系如下。
Java Writable Type | |
ByteWritable | |
ShortWritable | |
IntWritable | |
LongWritable | |
FloatWritable | |
DoubleWritable | |
BigDecimalWritable | |
BooleanWritable | |
Text | |
VarcharWritable | |
BytesWritable | |
DatetimeWritable | |
TimestampWritable | |
INTERVAL_YEAR_MONTH | IntervalYearMonthWritable |
INTERVAL_DAY_TIME | IntervalDayTimeWritable |
N/A | |
MaxCompute SQL Type | Python 2 Type |
STR | |
BOOL | |
CHAR | |
BYTEARRAY | |
DATE | |
DECIMAL.DECIMAL | |
LIST | |
DICT | |
COLLECTIONS.NAMEDTUPLE |
Python 3 Type | |
UNICODE | |
DATETIME.DATETIME | |
BYTES | |
DATETIME.DATE | |
三、UDF的使用方式
UDF、UDTF、UDAT可进行参考文档
https://help.aliyun.com/document_detail/27867.html?spm=a2c4g.11186623.6.762.463d7468xnFPHbJAVA UDF
UDF的高级使用:
3.1UDF中的变长参数
java语言:
package com.mrtest.cn;
import com.aliyun.odps.udf.UDF;
import com.aliyun.odps.udf.annotation.Resolve;
import java.util.ArrayList;
import java.util.List;
@Resolve({"*->array"})
public class TestUDF extends UDF {
public List evaluate(String ... s) {
List list = new ArrayList();
for (String name : s) {
list.add(name);
}
return list;
}
}
Python语言:
from odps.udf import annotate
@annotate("*->bigint")
class ParamFunc(object):
def evaluate(self, *nums):
sum = 0
for num in nums:
sum=num+sum
return sum
3.2UDF的重载
注意事项:对于List与List是不能解析对应的方法的,这种属于类型擦除
package com.aliyun.odps.examples.udf;
import com.aliyun.odps.udf.UDF;
public class UDFExample extends UDF {
public String evaluate(String a) {
return "s2s:" + a;
}
public String evaluate(String a, String b) {
return "ss2s:" + a + "," + b;
}
public String evaluate(String a, String b, String c) {
return "sss2s:" + a + "," + b + "," + c;
}
}
3.3UDF访问对应文件和表
package com.aliyun.odps.examples.udf;
import com.aliyun.odps.udf.ExecutionContext;
import com.aliyun.odps.udf.UDF;
import com.aliyun.odps.udf.UDFException;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.Iterator;
public class UDFResource extends UDF {
ExecutionContext ctx;
long fileResourceLineCount;
long tableResource1RecordCount;
long tableResource2RecordCount;
@Override
public void setup(ExecutionContext ctx) throws UDFException {
this.ctx = ctx;
try {
InputStream in = ctx.readResourceFileAsStream("file_resource.txt");
BufferedReader br = new BufferedReader(new InputStreamReader(in));
String line;
fileResourceLineCount = 0;
while ((line = br.readLine()) != null) {
fileResourceLineCount++;
}
br.close();
Iterator iterator = ctx.readResourceTable("table_resource1").iterator();
tableResource1RecordCount = 0;
while (iterator.hasNext()) {
tableResource1RecordCount++;
iterator.next();
}
iterator = ctx.readResourceTable("table_resource2").iterator();
tableResource2RecordCount = 0;
while (iterator.hasNext()) {
tableResource2RecordCount++;
iterator.next();
}
} catch (IOException e) {
throw new UDFException(e);
}
}
/**
* project: example_project table: wc_in2 partitions: p2=1,p1=2 columns: colc,colb
*/
public String evaluate(String a, String b) {
return "ss2s:" + a + "," + b + "|fileResourceLineCount=" + fileResourceLineCount
+ "|tableResource1RecordCount=" + tableResource1RecordCount + "|tableResource2RecordCount="
+ tableResource2RecordCount;
}
}
python语言:
#coding: utf-8
from odps.udf import annotate
from odps.distcache import get_cache_file
@annotate('double -> double')
class Compute(object):
def __init__(self):
import json
#获取对应文本文件
cache_file = get_cache_file('file.txt')
dataMat = []
for line in cache_file :
curLine = line.strip().split(',')
#处理逻辑
cache_file.close()
#获取对应的表文件
records = list(get_cache_table('table_resource1'))
for record in records:
self.my_dict[record[0]] = [record[1]]
#处理逻辑
def evaluate(self, input):
#处理逻辑
3.4UDF访问外部网络(VPC、外部网络、专有网络)
https://help.aliyun.com/document_detail/187866.html3.5UDF使用第三方包
https://help.aliyun.com/document_detail/189752.html#coding: utf-8
# explode.py
from odps.udf import annotate
from odps.distcache import get_cache_archive
import datetime
def include_package_path(res_name):
import os, sys
archive_files = get_cache_archive(res_name)
dir_names = sorted ([os.path.dirname(os.path.normpath(f.name)) for f in archive_files
if '.dist_info' not in f.name], key=lambda v: len(v))
sys.path.append(os.path.dirname(dir_names[0]))
@annotate("string->boolean")
class is_workday_udf(object):
def __init__(self):
include_package_path('chinese-calendar-master.zip')
def evaluate(self, date_str):
# try:
import chinese_calendar
date_strs = date_str.split("-")
year_num = int(date_strs[0])
month_num = int(date_strs[1])
day_num = int(date_strs[2])
date_num = datetime.date(year=year_num, month=month_num, day=day_num)
result = chinese_calendar.is_workday(date_num)
return result
# except:
# return True
函数的注册
执行的select的的操作
set odsp.pypy.enabled=false;
set odps.isolation.session.enable=true;
select my_json('{"info":"11","desc":"a|b","filename":"4b-2a-3c-4d-5b"}') as a;
3.6使用嵌入式开发UDF
CREATE TEMPORARY FUNCTION foo AS 'com.mypackage.Reverse' USING
#CODE ('lang'='JAVA')
package com.mypackage;
import com.aliyun.odps.udf.UDF;
public class Reverse extends UDF {
public String evaluate(String input) {
if (input == null) return null;
StringBuilder ret = new StringBuilder();
for (int i = input.toCharArray().length - 1; i >= 0; i--) {
ret.append(input.toCharArray()[i]);
}
return ret.toString();
}
}
#END CODE;
SELECT foo('abdc');
- 嵌入式代码块可以置于USING后或脚本末尾,置于USING后的代码块作用域仅为CREATE TEMPORARY FUNCTION语句。
- CREATE TEMPORARY FUNCTION创建的函数为临时函数,仅在本次执行生效,不会存入MaxCompute的Meta系统。
CREATE TEMPORARY FUNCTION foo AS 'embedded.UDFTest' USING
#CODE ('lang'='PYTHON', 'filename'='embedded')
from odps.udf import annotate
@annotate("bigint->bigint")
class UDFTest(object):
def evaluate(self, a):
return a * a
#END CODE;
SELECT foo(4);
- Python代码的缩进需要符合Python语言规范。
- 由于注册Python UDF时AS后的类名需要包含Python源码的文件名,您可以通过’filename’=’embedded’指定一个虚拟文件名。
3.7使用SQL语言定义函数
create sql function my_sum(@a BIGINT, @b BIGINT, @c BIGINT) returns @my_sum BIGINT
as begin
@temp := @a + @b;
@my_sum := @temp + @c;
end;
create sql function my_func(@s STRING)
AS if(@s rlike '"git_(m|a)"', 1, 0);
欢迎加入“MaxCompute开发者社区2群”,点击链接申请加入或扫描二维码
https://h5.dingtalk.com/invite-page/index.html?bizSource=____source____&corpId=dingb682fb31ec15e09f35c2f4657eb6378f&inviterUid=E3F28CD2308408A8&encodeDeptId=0054DC2B53AFE745