天天看点

spark-12.sparkSQL_3_sparkSQL自定义函数UDF函数UDAF函数开窗函数

UDF函数

通过spark.udf.register(“name”,func)来进行注册。使用select func() … 来直接调用。如:

val peopleDF = spark.read.json("examples/src/main/resources/people.json")
peopleDF.createOrReplaceTempView("people")
spark.udf.register("add",(x:String)=>"A:"+x)
spark.sql("select add(name) from people").show
           

UDAF函数

1、弱类型UDAF函数

  • 需要继承

    UserDefinedAggregateFunction

    类,并复写方法。
  • 注册一个UDAF函数。
  • 使用自定以的UDAF函数。

如:

package com.dengdan.sparksql

import org.apache.spark.SparkConf
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types.{DataType, DoubleType, IntegerType, LongType, StructField, StructType}

/**
 * 自定义UDAF函数
 * 样例数据:
 * {"name":"Michael", "salary":3000}
 * {"name":"Andy", "salary":4500}
 * {"name":"Justin", "salary":3500}
 * {"name":"Berta", "salary":4000}
 * 目标:求平均工资【工资的总额,工资的个数】
 */
class AverageSal extends UserDefinedAggregateFunction {
  //输入数据
  override def inputSchema: StructType = StructType(StructField("salary", LongType) :: Nil)

  //每个分区中的 共享变量
  override def bufferSchema: StructType = StructType(StructField("sum", LongType) :: StructField("count", IntegerType) :: Nil)

  //表示UDAF函数的输出类型
  override def dataType: DataType = DoubleType

  //表示 如果有相同的输入是否会存在相同的输出,如果是则true
  override def deterministic: Boolean = true

  //初始化 每个分区中的共享变量
  override def initialize(buffer: MutableAggregationBuffer): Unit = {
    buffer(0) = 0L
    buffer(1) = 0
  }

  //每个分区中的每条数据聚合的时候需要调用该方法
  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    //获取这一行中的工资,将工资加入到sum中。
    buffer(0) = buffer.getLong(0) + input.getLong(0)
    //将工资的个数加1
    buffer(1) = buffer.getInt(1) + 1
  }

  //将每一个分区的数据合并,形成最后的数据
  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    //合并总的工资
    buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)
    //合并总的工资个数
    buffer1(1) = buffer1.getInt(1) + buffer2.getInt(1)
  }

  //给出计算结果
  override def evaluate(buffer: Row): Any = {
    //总的工资 / 总的工资个数
    buffer.getLong(0).toDouble / buffer.getInt(1)
  }
}

object AverageSal {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("udaf").setMaster("local[*]")
    val spark = SparkSession.builder().config(sparkConf).getOrCreate()
    //读入数据
    val employee = spark.read.json("D:\\idea_workspace2020\\spark\\sparksql\\doc\\employees.json")
    employee.createOrReplaceTempView("employee")
    spark.udf.register("average", new AverageSal)
    spark.sql("select average(salary) from employee").show

    spark.stop()
  }
}
           

2、强类型UDAF函数

  • 主要在DSL风格中使用,继承Aggregator抽象类。依次配置输入、共享变量、输出的类型,需要用到case class。
  • 使用时,调用UDAF函数是一个DataSet对象。

    代码实现:

package com.dengdan.sparksql

import org.apache.spark.SparkConf
import org.apache.spark.sql.{Encoder, Encoders, SparkSession}
import org.apache.spark.sql.expressions.Aggregator


case class Employee(name: String, salary: Long)

case class Aver(var sum: Long, var count: Int)

/**
 * 求平均工资
 */
class Average extends Aggregator[Employee, Aver, Double] {
  //初始化每个分区中的共享变量
  override def zero: Aver = Aver(0L, 0)

  //每个分区中每一条数据聚合的时候需要调的方法
  override def reduce(b: Aver, a: Employee): Aver = {
    b.sum += a.salary
    b.count += 1
    b
  }

  //将每个分区的输出 合并 形成最后的数据
  override def merge(b1: Aver, b2: Aver): Aver = {
    b1.sum += b2.sum
    b1.count += b2.count
    b1
  }

  //给出计算结果
  override def finish(reduction: Aver): Double = reduction.sum.toDouble / reduction.count

  //主要用于对共享变量进行编码
  override def bufferEncoder: Encoder[Aver] = Encoders.product

  //主要用于将输出进行编码
  override def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}

/**
 * 求平均工资
 */
object Average {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("udaf").setMaster("local[*]")
    val spark = SparkSession.builder().config(sparkConf).getOrCreate()
    //读入数据并创建DataSet变量
    import spark.implicits._
    val employee = spark.read.json("D:\\idea_workspace2020\\spark\\sparksql\\doc\\employees.json").as[Employee]
    val aver = new Average().toColumn.name("average")
    employee.select(aver).show()

    spark.stop()
  }
}
           

开窗函数

  • rank() 跳跃排序,有两个第二名时,后边跟着的时第四名。
  • dense_rank() 连续排序,有两个第二名时仍然跟着第三名。
  • over () 开窗函数:

    在使用聚合函数后,会将多行变成一行,而开窗函数是将一行变成多行。并且在使用聚合函数后,如果要显示其他的列必须将列加入到group by中。而使用开窗函数后,可以不使用group by,直接将所有信息显示出来。开窗函数适用于每一行的最后一列添加聚合函数的结果。

常用的开窗函数有:

  1. 为每条数据显示聚合信息。

    聚合函数 () over()

  2. 为每条数据提供分组的聚合函数结果。

    聚合函数() over(partition by 字段) as 别名

    –按字段分组,分组后进行计算。
  3. 与排名函数一起使用。

    row number() over(order by 字段) as 别名

常用分析函数:

  1. row_number() over(partition by ... order by ...)

  2. rank() over(partition by ... order by ...)

  3. dense_rank() over(partition by ... order by ...)

  4. count() over(partition by ... order by ...)

  5. max() over(partition by ... order by ...)

  6. min() over(partition by ... order by ...)

  7. sum() over(partition by ... order by ...)

  8. avg() over(partition by ... order by ...)

  9. first_value() over(partition by ... order by ...)

  10. last_value() over(partition by ... order by ...)

  11. lag() over(partition by ... order by ...)

  12. lead() over(partition by ... order by ...)

lag和lead可以获取结果集中,按一定排序所排列的当前行的上下相邻若干offset的某个行的某个列(不用结果集的自关联);lag,lead分别是向前,向后;lag与lead有三个参数,第一个参数是列名,第二个参数是偏移的offset,第三个参数是超出记录窗口时的默认值。

继续阅读