UDF函数
通过spark.udf.register(“name”,func)来进行注册。使用select func() … 来直接调用。如:
val peopleDF = spark.read.json("examples/src/main/resources/people.json")
peopleDF.createOrReplaceTempView("people")
spark.udf.register("add",(x:String)=>"A:"+x)
spark.sql("select add(name) from people").show
UDAF函数
1、弱类型UDAF函数
- 需要继承
类,并复写方法。UserDefinedAggregateFunction
- 注册一个UDAF函数。
- 使用自定以的UDAF函数。
如:
package com.dengdan.sparksql
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types.{DataType, DoubleType, IntegerType, LongType, StructField, StructType}
/**
* 自定义UDAF函数
* 样例数据:
* {"name":"Michael", "salary":3000}
* {"name":"Andy", "salary":4500}
* {"name":"Justin", "salary":3500}
* {"name":"Berta", "salary":4000}
* 目标:求平均工资【工资的总额,工资的个数】
*/
class AverageSal extends UserDefinedAggregateFunction {
//输入数据
override def inputSchema: StructType = StructType(StructField("salary", LongType) :: Nil)
//每个分区中的 共享变量
override def bufferSchema: StructType = StructType(StructField("sum", LongType) :: StructField("count", IntegerType) :: Nil)
//表示UDAF函数的输出类型
override def dataType: DataType = DoubleType
//表示 如果有相同的输入是否会存在相同的输出,如果是则true
override def deterministic: Boolean = true
//初始化 每个分区中的共享变量
override def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = 0L
buffer(1) = 0
}
//每个分区中的每条数据聚合的时候需要调用该方法
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
//获取这一行中的工资,将工资加入到sum中。
buffer(0) = buffer.getLong(0) + input.getLong(0)
//将工资的个数加1
buffer(1) = buffer.getInt(1) + 1
}
//将每一个分区的数据合并,形成最后的数据
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
//合并总的工资
buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)
//合并总的工资个数
buffer1(1) = buffer1.getInt(1) + buffer2.getInt(1)
}
//给出计算结果
override def evaluate(buffer: Row): Any = {
//总的工资 / 总的工资个数
buffer.getLong(0).toDouble / buffer.getInt(1)
}
}
object AverageSal {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("udaf").setMaster("local[*]")
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
//读入数据
val employee = spark.read.json("D:\\idea_workspace2020\\spark\\sparksql\\doc\\employees.json")
employee.createOrReplaceTempView("employee")
spark.udf.register("average", new AverageSal)
spark.sql("select average(salary) from employee").show
spark.stop()
}
}
2、强类型UDAF函数
- 主要在DSL风格中使用,继承Aggregator抽象类。依次配置输入、共享变量、输出的类型,需要用到case class。
-
使用时,调用UDAF函数是一个DataSet对象。
代码实现:
package com.dengdan.sparksql
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Encoder, Encoders, SparkSession}
import org.apache.spark.sql.expressions.Aggregator
case class Employee(name: String, salary: Long)
case class Aver(var sum: Long, var count: Int)
/**
* 求平均工资
*/
class Average extends Aggregator[Employee, Aver, Double] {
//初始化每个分区中的共享变量
override def zero: Aver = Aver(0L, 0)
//每个分区中每一条数据聚合的时候需要调的方法
override def reduce(b: Aver, a: Employee): Aver = {
b.sum += a.salary
b.count += 1
b
}
//将每个分区的输出 合并 形成最后的数据
override def merge(b1: Aver, b2: Aver): Aver = {
b1.sum += b2.sum
b1.count += b2.count
b1
}
//给出计算结果
override def finish(reduction: Aver): Double = reduction.sum.toDouble / reduction.count
//主要用于对共享变量进行编码
override def bufferEncoder: Encoder[Aver] = Encoders.product
//主要用于将输出进行编码
override def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}
/**
* 求平均工资
*/
object Average {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("udaf").setMaster("local[*]")
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
//读入数据并创建DataSet变量
import spark.implicits._
val employee = spark.read.json("D:\\idea_workspace2020\\spark\\sparksql\\doc\\employees.json").as[Employee]
val aver = new Average().toColumn.name("average")
employee.select(aver).show()
spark.stop()
}
}
开窗函数
- rank() 跳跃排序,有两个第二名时,后边跟着的时第四名。
- dense_rank() 连续排序,有两个第二名时仍然跟着第三名。
-
over () 开窗函数:
在使用聚合函数后,会将多行变成一行,而开窗函数是将一行变成多行。并且在使用聚合函数后,如果要显示其他的列必须将列加入到group by中。而使用开窗函数后,可以不使用group by,直接将所有信息显示出来。开窗函数适用于每一行的最后一列添加聚合函数的结果。
常用的开窗函数有:
- 为每条数据显示聚合信息。
聚合函数 () over()
- 为每条数据提供分组的聚合函数结果。
–按字段分组,分组后进行计算。聚合函数() over(partition by 字段) as 别名
- 与排名函数一起使用。
row number() over(order by 字段) as 别名
常用分析函数:
-
row_number() over(partition by ... order by ...)
-
rank() over(partition by ... order by ...)
-
dense_rank() over(partition by ... order by ...)
-
count() over(partition by ... order by ...)
-
max() over(partition by ... order by ...)
-
min() over(partition by ... order by ...)
-
sum() over(partition by ... order by ...)
-
avg() over(partition by ... order by ...)
-
first_value() over(partition by ... order by ...)
-
last_value() over(partition by ... order by ...)
-
lag() over(partition by ... order by ...)
-
lead() over(partition by ... order by ...)
lag和lead可以获取结果集中,按一定排序所排列的当前行的上下相邻若干offset的某个行的某个列(不用结果集的自关联);lag,lead分别是向前,向后;lag与lead有三个参数,第一个参数是列名,第二个参数是偏移的offset,第三个参数是超出记录窗口时的默认值。