一、UDF的分類
UDF類型 | 描述 |
UDF(User Defined Scalar Function) | 使用者自定義标量值函數。其輸入與輸出是一對一的關系,即讀入一行資料,輸出一個值。 |
UDTF(User Defined Table Valued Function) | 自定義表值函數。用于解決調用一次函數輸出多行資料的需求。UDTF是唯一能夠傳回多個字段的自定義函數。UDTF不等于UDT(User Defined Type)。 |
UDAF(User Defined Aggregation Function) | 自定義聚合函數。其輸入與輸出是多對一的關系,即将多條輸入記錄聚合成一個輸出值。UDAF可以與SQL中的GROUP BY語句聯用。具體文法請參見 聚合函數 。 |
二、UDF參數解析
MaxCompute資料類型與Java資料類型的對應關系如下。
注意點:
- 此處ARRAY類型對應的Java類型是List,而不是數組。
- VARCHAR,BINART,STRUCT一些資料類型是ODPS獨有的
- Java中對應的資料類型以及傳回值資料類型是對象,資料類型首字母需大寫。
MaxCompute Type | Java Type |
TINYINT | java.lang.Byte |
SMALLINT | java.lang.Short |
INT | java.lang.Integer |
BIGINT | java.lang.Long |
FLOAT | java.lang.Float |
DOUBLE | java.lang.Double |
DECIMAL | java.math.BigDecimal |
BOOLEAN | java.lang.Boolean |
STRING | java.lang.String |
VARCHAR | com.aliyun.odps.data.Varchar |
BINARY | com.aliyun.odps.data.Binary |
DATETIME | java.util.Date |
TIMESTAMP | java.sql.Timestamp |
ARRAY | java.util.List |
MAP | java.util.Map |
STRUCT | com.aliyun.odps.data.Struct |
MaxCompute 2.0版本支援定義Java UDF時,使用Writable類型作為參數和傳回值。MaxCompute資料類型和Java Writable類型的映射關系如下。
Java Writable Type | |
ByteWritable | |
ShortWritable | |
IntWritable | |
LongWritable | |
FloatWritable | |
DoubleWritable | |
BigDecimalWritable | |
BooleanWritable | |
Text | |
VarcharWritable | |
BytesWritable | |
DatetimeWritable | |
TimestampWritable | |
INTERVAL_YEAR_MONTH | IntervalYearMonthWritable |
INTERVAL_DAY_TIME | IntervalDayTimeWritable |
N/A | |
MaxCompute SQL Type | Python 2 Type |
STR | |
BOOL | |
CHAR | |
BYTEARRAY | |
DATE | |
DECIMAL.DECIMAL | |
LIST | |
DICT | |
COLLECTIONS.NAMEDTUPLE |
Python 3 Type | |
UNICODE | |
DATETIME.DATETIME | |
BYTES | |
DATETIME.DATE | |
三、UDF的使用方式
UDF、UDTF、UDAT可進行參考文檔
https://help.aliyun.com/document_detail/27867.html?spm=a2c4g.11186623.6.762.463d7468xnFPHbJAVA UDF
UDF的進階使用:
3.1UDF中的變長參數
java語言:
package com.mrtest.cn;
import com.aliyun.odps.udf.UDF;
import com.aliyun.odps.udf.annotation.Resolve;
import java.util.ArrayList;
import java.util.List;
@Resolve({"*->array"})
public class TestUDF extends UDF {
public List evaluate(String ... s) {
List list = new ArrayList();
for (String name : s) {
list.add(name);
}
return list;
}
}
Python語言:
from odps.udf import annotate
@annotate("*->bigint")
class ParamFunc(object):
def evaluate(self, *nums):
sum = 0
for num in nums:
sum=num+sum
return sum
3.2UDF的重載
注意事項:對于List與List是不能解析對應的方法的,這種屬于類型擦除
package com.aliyun.odps.examples.udf;
import com.aliyun.odps.udf.UDF;
public class UDFExample extends UDF {
public String evaluate(String a) {
return "s2s:" + a;
}
public String evaluate(String a, String b) {
return "ss2s:" + a + "," + b;
}
public String evaluate(String a, String b, String c) {
return "sss2s:" + a + "," + b + "," + c;
}
}
3.3UDF通路對應檔案和表
package com.aliyun.odps.examples.udf;
import com.aliyun.odps.udf.ExecutionContext;
import com.aliyun.odps.udf.UDF;
import com.aliyun.odps.udf.UDFException;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.Iterator;
public class UDFResource extends UDF {
ExecutionContext ctx;
long fileResourceLineCount;
long tableResource1RecordCount;
long tableResource2RecordCount;
@Override
public void setup(ExecutionContext ctx) throws UDFException {
this.ctx = ctx;
try {
InputStream in = ctx.readResourceFileAsStream("file_resource.txt");
BufferedReader br = new BufferedReader(new InputStreamReader(in));
String line;
fileResourceLineCount = 0;
while ((line = br.readLine()) != null) {
fileResourceLineCount++;
}
br.close();
Iterator iterator = ctx.readResourceTable("table_resource1").iterator();
tableResource1RecordCount = 0;
while (iterator.hasNext()) {
tableResource1RecordCount++;
iterator.next();
}
iterator = ctx.readResourceTable("table_resource2").iterator();
tableResource2RecordCount = 0;
while (iterator.hasNext()) {
tableResource2RecordCount++;
iterator.next();
}
} catch (IOException e) {
throw new UDFException(e);
}
}
/**
* project: example_project table: wc_in2 partitions: p2=1,p1=2 columns: colc,colb
*/
public String evaluate(String a, String b) {
return "ss2s:" + a + "," + b + "|fileResourceLineCount=" + fileResourceLineCount
+ "|tableResource1RecordCount=" + tableResource1RecordCount + "|tableResource2RecordCount="
+ tableResource2RecordCount;
}
}
python語言:
#coding: utf-8
from odps.udf import annotate
from odps.distcache import get_cache_file
@annotate('double -> double')
class Compute(object):
def __init__(self):
import json
#擷取對應文本檔案
cache_file = get_cache_file('file.txt')
dataMat = []
for line in cache_file :
curLine = line.strip().split(',')
#處理邏輯
cache_file.close()
#擷取對應的表檔案
records = list(get_cache_table('table_resource1'))
for record in records:
self.my_dict[record[0]] = [record[1]]
#處理邏輯
def evaluate(self, input):
#處理邏輯
3.4UDF通路外部網絡(VPC、外部網絡、專有網絡)
https://help.aliyun.com/document_detail/187866.html3.5UDF使用第三方包
https://help.aliyun.com/document_detail/189752.html#coding: utf-8
# explode.py
from odps.udf import annotate
from odps.distcache import get_cache_archive
import datetime
def include_package_path(res_name):
import os, sys
archive_files = get_cache_archive(res_name)
dir_names = sorted ([os.path.dirname(os.path.normpath(f.name)) for f in archive_files
if '.dist_info' not in f.name], key=lambda v: len(v))
sys.path.append(os.path.dirname(dir_names[0]))
@annotate("string->boolean")
class is_workday_udf(object):
def __init__(self):
include_package_path('chinese-calendar-master.zip')
def evaluate(self, date_str):
# try:
import chinese_calendar
date_strs = date_str.split("-")
year_num = int(date_strs[0])
month_num = int(date_strs[1])
day_num = int(date_strs[2])
date_num = datetime.date(year=year_num, month=month_num, day=day_num)
result = chinese_calendar.is_workday(date_num)
return result
# except:
# return True
函數的注冊
執行的select的的操作
set odsp.pypy.enabled=false;
set odps.isolation.session.enable=true;
select my_json('{"info":"11","desc":"a|b","filename":"4b-2a-3c-4d-5b"}') as a;
3.6使用嵌入式開發UDF
CREATE TEMPORARY FUNCTION foo AS 'com.mypackage.Reverse' USING
#CODE ('lang'='JAVA')
package com.mypackage;
import com.aliyun.odps.udf.UDF;
public class Reverse extends UDF {
public String evaluate(String input) {
if (input == null) return null;
StringBuilder ret = new StringBuilder();
for (int i = input.toCharArray().length - 1; i >= 0; i--) {
ret.append(input.toCharArray()[i]);
}
return ret.toString();
}
}
#END CODE;
SELECT foo('abdc');
- 嵌入式代碼塊可以置于USING後或腳本末尾,置于USING後的代碼塊作用域僅為CREATE TEMPORARY FUNCTION語句。
- CREATE TEMPORARY FUNCTION建立的函數為臨時函數,僅在本次執行生效,不會存入MaxCompute的Meta系統。
CREATE TEMPORARY FUNCTION foo AS 'embedded.UDFTest' USING
#CODE ('lang'='PYTHON', 'filename'='embedded')
from odps.udf import annotate
@annotate("bigint->bigint")
class UDFTest(object):
def evaluate(self, a):
return a * a
#END CODE;
SELECT foo(4);
- Python代碼的縮進需要符合Python語言規範。
- 由于注冊Python UDF時AS後的類名需要包含Python源碼的檔案名,您可以通過’filename’=’embedded’指定一個虛拟檔案名。
3.7使用SQL語言定義函數
create sql function my_sum(@a BIGINT, @b BIGINT, @c BIGINT) returns @my_sum BIGINT
as begin
@temp := @a + @b;
@my_sum := @temp + @c;
end;
create sql function my_func(@s STRING)
AS if(@s rlike '"git_(m|a)"', 1, 0);
歡迎加入“MaxCompute開發者社群2群”,點選連結申請加入或掃描二維碼
https://h5.dingtalk.com/invite-page/index.html?bizSource=____source____&corpId=dingb682fb31ec15e09f35c2f4657eb6378f&inviterUid=E3F28CD2308408A8&encodeDeptId=0054DC2B53AFE745