天天看點

pyspark udf 多個參數_在spark中使用UDF函數

pyspark udf 多個參數_在spark中使用UDF函數
spark-udf

雖然

spark.sql.function

中的已經包含了大多數常用的函數,但是總有一些場景是内置函數無法滿足要求的,此時就需要使用自定義函數了(UDF)。剛好最近用spark時,

scala

,

java

,

python

輪換着用,是以這裡總結一下spark中自定義函數的簡單用法。

這裡總結了

scala

,

java

,

python

三種接口的DataFrame和sparkSQL的自定義函數定義和使用方法,對于比較複雜的分組自定義函數未涉及,對于這類複雜需求,應該有變通之法吧。

1、pyspark接口的UDF

1.1、在dataframe中使用

# 定義自定義函數

import numpy as np
def log_py(num):
    return float(np.log(num))

# 注冊自定義函數
log_udf = functions.udf(log_py, FloatType())

# 使用自定義函數
dataframe = dataframe.withColumn(col, log_udf(col))
           

特别說明:

np.log

的傳回值類型是numpy.float類型,spark是無法識别的,是以要轉換成Python的float類型,是以寫成

float(np.log(num))

1.2、在sparkSQL中使用

# 定義自定義函數
def is_nulludf(fieldValue, defaultValue):
    if fieldValue == None:
        return defaultValue
    return fieldValue

# 注冊自定義函數
spark.udf.register("is_nulludf", is_nulludf)

# 使用自定義函數
spark.sql("select col_name, is_nulludf(col_name) as col_name2 from tble ")
           

2、scala接口的UDF

2.1、在dataframe中使用

# 定義自定義函數
def add_one(col: Double) = {
      col + 1
    }

# 注冊自定義函數
spark.udf.register("add_one", add_one _)

# 使用自定義函數
import org.apache.spark.sql.functions
dataframe.withColumn("a2", functions.callUDF("add_one", functions.col("a")))
           

2.2、在sparkSQL中使用

# 定義自定義函數
def strLen(col: String) = {
    str.length()
}

# 注冊自定義函數
spark.udf.register("strLen", strLen _)

# 使用自定義函數
spark.sql("select name,strLen(name) from table ")
           

3、Java接口的UDF

3.1、在dataframe中使用

# 自定義并注冊自定義函數
import static org.apache.spark.sql.types.DataTypes.DoubleType;

spark.udf().register("toFloat",new UDF1<String, Double>(){
    @Override
    public Double call(String number) {
        return Double.valueOf(number)
    }
}}, DoubleType);

# 使用自定義函數
import org.apache.spark.sql.functions;
dataframe = dataframe.withColumn(col, functions.callUDF("toFloat", functions.col(col)));
           

說明:UDF1的參數順序表示java-udf函數的輸入輸出類型,最後面的DoubleType是spark中定義的float類型。

大家看完記得關注點贊.