天天看點

SparkSql 處理json檔案案列

package org.example
import java.lang

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.functions._

object json {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)

    var conf = new SparkConf().setAppName("json").setMaster("local")
    var sc = new SparkContext(conf)
    val spark = SparkSession.builder().config(conf).getOrCreate()

    import spark.implicits._
    import org.apache.spark.sql.types._

    //讀取檔案建立 RDD
    val RDD = sc.textFile("file/input/json.txt")

    //建立 schema
        var schemastring = "Time Comtent"
        val fields = schemastring.split(" ").map(x=>StructField(x,StringType,nullable=true))
        var schema = StructType(fields)

    println("+++++++++++++++++++++++++++DF:建立 DateFrame+++++++++++++++++++++++")
    // 注意:如果用createDataFrame(rowRDD,schema) 則RDD需要 Row
    val rowRDD = RDD.map(_.split("\\|")).map(x=>Row(x(0).trim(),x(1).trim()))
    var opDF = spark.createDataFrame(rowRDD,schema)
    /*
    val rowRDD = RDD.map(_.split("\\|")).map(x => (x(0).trim(), x(1).trim()))
    var opDF = rowRDD.toDF("time","coment")   //或者 var opDF = spark.createDataFrame(rowRDD).toDF("time", "coment")
    */

    opDF.show(1, false)


    //檢視第一層
    val opDF1 = opDF.select($"time", get_json_object($"coment", "$.cm").alias("cm"), get_json_object($"coment", "$.ap").alias("ap"),
      get_json_object($"coment", "$.et").alias("et"))
    println("+++++++++++++++++++++++++++DF1: 第一層 +++++++++++++++++++++")
    opDF1.printSchema()
    opDF1.show(10, false)

    //檢視第二層
    var opDF2 = opDF1.select($"time", $"ap", get_json_object($"cm", "$.ln").alias("ln"), get_json_object($"cm", "$.sv").alias("sv"),
      get_json_object($"cm", "$.os").alias("os"), get_json_object($"cm", "$.g").alias("g"), get_json_object($"cm", "$.mid").alias("mid"),
      get_json_object($"cm", "$.nw").alias("nw"), get_json_object($"cm", "$.l").alias("l"), get_json_object($"cm", "$.vc").alias("vc"),
      get_json_object($"cm", "$.hw").alias("hw"), get_json_object($"cm", "$.ar").alias("ar"), get_json_object($"cm", "$.uid").alias("uid"),
      get_json_object($"cm", "$.t").alias("t"), get_json_object($"cm", "$.la").alias("la"), get_json_object($"cm", "$.md").alias("md"),
      get_json_object($"cm", "$.vn").alias("vn"), get_json_object($"cm", "$.ba").alias("ba"), get_json_object($"cm", "$.sr").alias("sr"),
      from_json($"et", ArrayType(StructType(StructField("ett", StringType) :: StructField("en", StringType) :: StructField("kv", StringType) :: Nil))).as("events"))
    //val opDF2 = opDF1.select($"time",$"ap",get_json_object($"cm","$.ln").alias("ln"),get_json_object($"cm","$.sv").alias("sv"),get_json_object($"cm","$.os").alias("os"),
    // get_json_object($"cm","$.g").alias("g"),get_json_object($"cm","$.mid").alias("mid"),get_json_object($"cm","$.nw").alias("nw"),get_json_object($"cm","$.l").alias("l"),
    // get_json_object($"cm","$.vc").alias("vc"),get_json_object($"cm","$.hw").alias("hw"),get_json_object($"cm","$.ar").alias("ar"),get_json_object($"cm","$.uid").alias("uid"),
    // get_json_object($"cm","$.t").alias("t"),get_json_object($"cm","$.la").alias("la"),get_json_object($"cm","$.md").alias("md"),get_json_object($"cm","$.vn").alias("vn"),
    // get_json_object($"cm","$.ba").alias("ba"),get_json_object($"cm","$.sr").alias("sr"),
    // from_json($"et",ArrayType(StructType(StructField("ett",StringType)::StructField("en",StringType)::StructField("kv",StringType)::Nil))).as("events"))
    println("+++++++++++++++++++++++++++++++++++DF2:+++++++++++++++++++++++++")
    opDF2.printSchema()
    opDF2.show(10, false)

    //explode
    val opDF3 = opDF2.select($"time", $"ap", $"ln", $"sv", $"os", $"g", $"mid", $"nw", $"l", $"vc", $"hw", $"ar", $"uid", $"t", $"la",
      $"md", $"vn", $"ba", $"sr", explode($"events").as("eventcontent"))
    println("++++++++++++++++++++++++++++++++DF3:++++++++++++++++++++++++++++")
    opDF3.printSchema()
    opDF3.show(10, false)

    //擷取第三層,即event
    val opDF4 = opDF3.select($"time", $"ap", $"ln", $"sv", $"os", $"g", $"mid", $"nw", $"l", $"vc", $"hw", $"ar", $"uid", $"t", $"la",
      $"md", $"vn", $"ba", $"sr", $"eventcontent.ett", $"eventcontent.en", $"eventcontent.kv")
    println("+++++++++++++++++++++++++++++++++DF4:+++++++++++++++++++++++++++")
    opDF4.printSchema
    opDF4.show(10, false)


    opDF4.createTempView("cai")
    println("建立臨時表以後的資料查詢")
    spark.sql("select time,hw from cai").show(10, false)
  }
}
           

繼續閱讀