雖然
spark.sql.function
中的已經包含了大多數常用的函數,但是總有一些場景是内置函數無法滿足要求的,此時就需要使用自定義函數了(UDF)。剛好最近用spark時,
scala
,
java
,
python
輪換着用,是以這裡總結一下spark中自定義函數的簡單用法。
這裡總結了
scala
,
java
,
python
三種接口的DataFrame和sparkSQL的自定義函數定義和使用方法,對于比較複雜的分組自定義函數未涉及,對于這類複雜需求,應該有變通之法吧。
1、pyspark接口的UDF
1.1、在dataframe中使用
# 定義自定義函數
import numpy as np
def log_py(num):
return float(np.log(num))
# 注冊自定義函數
log_udf = functions.udf(log_py, FloatType())
# 使用自定義函數
dataframe = dataframe.withColumn(col, log_udf(col))
特别說明:
np.log
的傳回值類型是numpy.float類型,spark是無法識别的,是以要轉換成Python的float類型,是以寫成
float(np.log(num))
1.2、在sparkSQL中使用
# 定義自定義函數
def is_nulludf(fieldValue, defaultValue):
if fieldValue == None:
return defaultValue
return fieldValue
# 注冊自定義函數
spark.udf.register("is_nulludf", is_nulludf)
# 使用自定義函數
spark.sql("select col_name, is_nulludf(col_name) as col_name2 from tble ")
2、scala接口的UDF
2.1、在dataframe中使用
# 定義自定義函數
def add_one(col: Double) = {
col + 1
}
# 注冊自定義函數
spark.udf.register("add_one", add_one _)
# 使用自定義函數
import org.apache.spark.sql.functions
dataframe.withColumn("a2", functions.callUDF("add_one", functions.col("a")))
2.2、在sparkSQL中使用
# 定義自定義函數
def strLen(col: String) = {
str.length()
}
# 注冊自定義函數
spark.udf.register("strLen", strLen _)
# 使用自定義函數
spark.sql("select name,strLen(name) from table ")
3、Java接口的UDF
3.1、在dataframe中使用
# 自定義并注冊自定義函數
import static org.apache.spark.sql.types.DataTypes.DoubleType;
spark.udf().register("toFloat",new UDF1<String, Double>(){
@Override
public Double call(String number) {
return Double.valueOf(number)
}
}}, DoubleType);
# 使用自定義函數
import org.apache.spark.sql.functions;
dataframe = dataframe.withColumn(col, functions.callUDF("toFloat", functions.col(col)));
說明:UDF1的參數順序表示java-udf函數的輸入輸出類型,最後面的DoubleType是spark中定義的float類型。
大家看完記得關注點贊.