package org.example
import java.lang
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.functions._
object json {
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
var conf = new SparkConf().setAppName("json").setMaster("local")
var sc = new SparkContext(conf)
val spark = SparkSession.builder().config(conf).getOrCreate()
import spark.implicits._
import org.apache.spark.sql.types._
//讀取檔案建立 RDD
val RDD = sc.textFile("file/input/json.txt")
//建立 schema
var schemastring = "Time Comtent"
val fields = schemastring.split(" ").map(x=>StructField(x,StringType,nullable=true))
var schema = StructType(fields)
println("+++++++++++++++++++++++++++DF:建立 DateFrame+++++++++++++++++++++++")
// 注意:如果用createDataFrame(rowRDD,schema) 則RDD需要 Row
val rowRDD = RDD.map(_.split("\\|")).map(x=>Row(x(0).trim(),x(1).trim()))
var opDF = spark.createDataFrame(rowRDD,schema)
/*
val rowRDD = RDD.map(_.split("\\|")).map(x => (x(0).trim(), x(1).trim()))
var opDF = rowRDD.toDF("time","coment") //或者 var opDF = spark.createDataFrame(rowRDD).toDF("time", "coment")
*/
opDF.show(1, false)
//檢視第一層
val opDF1 = opDF.select($"time", get_json_object($"coment", "$.cm").alias("cm"), get_json_object($"coment", "$.ap").alias("ap"),
get_json_object($"coment", "$.et").alias("et"))
println("+++++++++++++++++++++++++++DF1: 第一層 +++++++++++++++++++++")
opDF1.printSchema()
opDF1.show(10, false)
//檢視第二層
var opDF2 = opDF1.select($"time", $"ap", get_json_object($"cm", "$.ln").alias("ln"), get_json_object($"cm", "$.sv").alias("sv"),
get_json_object($"cm", "$.os").alias("os"), get_json_object($"cm", "$.g").alias("g"), get_json_object($"cm", "$.mid").alias("mid"),
get_json_object($"cm", "$.nw").alias("nw"), get_json_object($"cm", "$.l").alias("l"), get_json_object($"cm", "$.vc").alias("vc"),
get_json_object($"cm", "$.hw").alias("hw"), get_json_object($"cm", "$.ar").alias("ar"), get_json_object($"cm", "$.uid").alias("uid"),
get_json_object($"cm", "$.t").alias("t"), get_json_object($"cm", "$.la").alias("la"), get_json_object($"cm", "$.md").alias("md"),
get_json_object($"cm", "$.vn").alias("vn"), get_json_object($"cm", "$.ba").alias("ba"), get_json_object($"cm", "$.sr").alias("sr"),
from_json($"et", ArrayType(StructType(StructField("ett", StringType) :: StructField("en", StringType) :: StructField("kv", StringType) :: Nil))).as("events"))
//val opDF2 = opDF1.select($"time",$"ap",get_json_object($"cm","$.ln").alias("ln"),get_json_object($"cm","$.sv").alias("sv"),get_json_object($"cm","$.os").alias("os"),
// get_json_object($"cm","$.g").alias("g"),get_json_object($"cm","$.mid").alias("mid"),get_json_object($"cm","$.nw").alias("nw"),get_json_object($"cm","$.l").alias("l"),
// get_json_object($"cm","$.vc").alias("vc"),get_json_object($"cm","$.hw").alias("hw"),get_json_object($"cm","$.ar").alias("ar"),get_json_object($"cm","$.uid").alias("uid"),
// get_json_object($"cm","$.t").alias("t"),get_json_object($"cm","$.la").alias("la"),get_json_object($"cm","$.md").alias("md"),get_json_object($"cm","$.vn").alias("vn"),
// get_json_object($"cm","$.ba").alias("ba"),get_json_object($"cm","$.sr").alias("sr"),
// from_json($"et",ArrayType(StructType(StructField("ett",StringType)::StructField("en",StringType)::StructField("kv",StringType)::Nil))).as("events"))
println("+++++++++++++++++++++++++++++++++++DF2:+++++++++++++++++++++++++")
opDF2.printSchema()
opDF2.show(10, false)
//explode
val opDF3 = opDF2.select($"time", $"ap", $"ln", $"sv", $"os", $"g", $"mid", $"nw", $"l", $"vc", $"hw", $"ar", $"uid", $"t", $"la",
$"md", $"vn", $"ba", $"sr", explode($"events").as("eventcontent"))
println("++++++++++++++++++++++++++++++++DF3:++++++++++++++++++++++++++++")
opDF3.printSchema()
opDF3.show(10, false)
//擷取第三層,即event
val opDF4 = opDF3.select($"time", $"ap", $"ln", $"sv", $"os", $"g", $"mid", $"nw", $"l", $"vc", $"hw", $"ar", $"uid", $"t", $"la",
$"md", $"vn", $"ba", $"sr", $"eventcontent.ett", $"eventcontent.en", $"eventcontent.kv")
println("+++++++++++++++++++++++++++++++++DF4:+++++++++++++++++++++++++++")
opDF4.printSchema
opDF4.show(10, false)
opDF4.createTempView("cai")
println("建立臨時表以後的資料查詢")
spark.sql("select time,hw from cai").show(10, false)
}
}