天天看點

spark2.1:讀取hive中存儲的多元組(string,double)失敗

這兩天和同僚一起在想着如何把一個表的記錄減少,表記錄包含了:objectid(主小區資訊),gridid(歸屬栅格),height(高度),rsrp(主小區rsrp),n_objectid(鄰區),n_rsrp(鄰小區rsrp)

記錄中一個主小區對應有多個鄰區資訊,在分組合并記錄時:

1)先按照objectid,gridid,height進行分組,把所有鄰區資訊給存儲到集合中;

2)基于1)的結果之上,按照objectid分組,把gridid,height,rsrp,array(n_objectid),array(n_rsrp)作為集合存儲。

實作思路一:采用array<array<string>>單維元祖存儲

[my@sdd983 tommyduan_service]$ /app/my/fi_client/spark2/Spark2x/spark/bin/spark-shell
2018-03-24 14:10:38,583 | WARN  | main | Unable to load native-hadoop library for your platform... using builtin-java classes where applicable | org.apache.hadoop.util.NativeCodeLoader.<clinit>(NativeCodeLoader.java:62)
2018-03-24 14:10:38,827 | WARN  | main | In Spark 1.0 and later spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone and LOCAL_DIRS in YARN). | org.apache.spark.internal.Logging$class.logWarning(Logging.scala:66)
2018-03-24 14:10:38,837 | WARN  | main | Detected deprecated memory fraction settings: [spark.shuffle.memoryFraction, spark.storage.memoryFraction, spark.storage.unrollFraction]. As of Spark 1.6, execution and storage memory management are unified. All memory fractions used in the old model are now deprecated and no longer read. If you wish to use the old memory management, you may explicitly enable `spark.memory.useLegacyMode` (not recommended). | org.apache.spark.internal.Logging$class.logWarning(Logging.scala:66)
Spark context Web UI available at http://192.168.143.332:23799
Spark context available as 'sc' (master = local[*], app id = local-1521871839949).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.1.0
      /_/

Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_72)
Type in expressions to have them evaluated.
Type :help for more information.

scala>     import spark.sql
import spark.sql

scala>     import spark.implicits._
import spark.implicits._

scala> sql("use my_hive_db")
2018-03-24 14:10:56,686 | WARN  | main | load mapred-default.xml, HIVE_CONF_DIR env not found! | org.apache.hadoop.hive.ql.session.SessionState.loadMapredDefaultXml(SessionState.java:1101)
2018-03-24 14:10:58,250 | WARN  | main | load mapred-default.xml, HIVE_CONF_DIR env not found! | org.apache.hadoop.hive.ql.session.SessionState.loadMapredDefaultXml(SessionState.java:1101)
res0: org.apache.spark.sql.DataFrame = []
scala>     var fpb_df = sql(
     |       s"""|select gridid,height,objectid,n_objectid,rsrp,(rsrp-n_rsrp) as rsrp_dis
     |           |from fpd_tabke
     |           |where p_city=571 and p_day=20180322 limit 50
     |           |""".stripMargin)
fpb_df: org.apache.spark.sql.DataFrame = [gridid: string, height: string ... 4 more fields]

scala>     var fpb_groupby_obj_grid_height_df1 = fpb_df.groupBy("objectid", "gridid", "height", "rsrp").agg(
     |       collect_list("n_objectid").alias("n_objectid1"),
     |       collect_list("rsrp_dis").alias("rsrp_dis1")
     |     ).select(col("objectid"), col("gridid"), col("height"), col("rsrp"), col("n_objectid1").alias("n_objectid"),  col("rsrp_dis1").alias("rsrp_dis"))
fpb_groupby_obj_grid_height_df1: org.apache.spark.sql.DataFrame = [objectid: string, gridid: string ... 4 more fields]

scala>     var fpb_groupby_obj_df1 = fpb_groupby_obj_grid_height_df1.groupBy("objectid").agg(
     |       collect_list("gridid").alias("gridid1"),
     |       collect_list("height").alias("height1"),
     |       collect_list("rsrp").alias("rsrp1"),
     |       collect_list("n_objectid").alias("n_objectid1"),
     |       collect_list("rsrp_dis").alias("rsrp_dis1")
     |     ).select(col("objectid"), col("gridid1").alias("gridid"), col("height1").alias("height"), col("rsrp1").alias("rsrp"), col("n_objectid1").alias("n_objectid"),col("rsrp_dis1").alias("rsrp_dis"))
fpb_groupby_obj_df1: org.apache.spark.sql.DataFrame = [objectid: string, gridid: array<string> ... 4 more fields]

scala>     fpb_groupby_obj_df1.map(s => (s.getAs[String]("objectid"), s.getSeq[String](1), s.getSeq[String](2), s.getSeq[String](3), s.getSeq[Seq[String]](4), s.getSeq[Seq[Double]](5))).show
+---------+--------------------+--------------------+--------------------+--------------------+--------------------+
|       _1|                  _2|                  _3|                  _4|                  _5|                  _6|
+---------+--------------------+--------------------+--------------------+--------------------+--------------------+
|100700931|[2676906_708106, ...|[0, 5, 0, 0, 0, 0...|[-130.399994, -12...|[WrappedArray(101...|[WrappedArray(0.0...|
+---------+--------------------+--------------------+--------------------+--------------------+--------------------+
scala> fpb_groupby_obj_df1.map(s => (s.getAs[String]("objectid"), s.getSeq[String](1), s.getSeq[String](2), s.getSeq[String](3), s.getSeq[Seq[String]](4), s.getSeq[Seq[Double]](5))).schema
res4: org.apache.spark.sql.types.StructType = 
StructType(
    StructField(_1,StringType,true), 
    StructField(_2,ArrayType(StringType,true),true), 
    StructField(_3,ArrayType(StringType,true),true),
    StructField(_4,ArrayType(StringType,true),true), 
    StructField(_5,ArrayType(ArrayType(StringType,true),true),true), 
    StructField(_6,ArrayType(ArrayType(DoubleType,false),true),true)
)      

 方案二:存儲格式為:array<array<(string,double)>>,讀取失敗。

scala> sql("use my_hive_db")
2018-03-24 14:10:56,686 | WARN  | main | load mapred-default.xml, HIVE_CONF_DIR env not found! | org.apache.hadoop.hive.ql.session.SessionState.loadMapredDefaultXml(SessionState.java:1101)
2018-03-24 14:10:58,250 | WARN  | main | load mapred-default.xml, HIVE_CONF_DIR env not found! | org.apache.hadoop.hive.ql.session.SessionState.loadMapredDefaultXml(SessionState.java:1101)
res0: org.apache.spark.sql.DataFrame = []
scala>     var fpb_df = sql(
     |       s"""|select gridid,height,objectid,n_objectid,rsrp,(rsrp-n_rsrp) as rsrp_dis
     |           |from fpd_tabke
     |           |where p_city=571 and p_day=20180322 limit 50
     |           |""".stripMargin)
fpb_df: org.apache.spark.sql.DataFrame = [gridid: string, height: string ... 4 more fields]
scala> var fpb_groupby_obj_grid_height_df2 = fpb_df.map(s =>
      (s.getAs[String]("objectid"), s.getAs[String]("gridid"), s.getAs[String]("height"), s.getAs[String]("rsrp"), (s.getAs[String]("n_objectid"), s.getAs[Double]("rsrp_dis")))
    ).toDF("objectid", "gridid", "height", "rsrp", "neighbour").groupBy("objectid", "gridid", "height", "rsrp").agg(
      collect_list("neighbour").alias("neighbour1")
    ).select(col("objectid"), col("gridid"), col("height"), col("rsrp"), col("neighbour1").alias("neighbour"))

scala> var fpb_groupby_obj_df2 = fpb_groupby_obj_grid_height_df2.groupBy("objectid").agg(
      collect_list("gridid").alias("gridid1"),
      collect_list("height").alias("height1"),
      collect_list("rsrp").alias("rsrp1"),
      collect_list("neighbour").alias("neighbour1")
    ).select(col("objectid"), col("gridid1").alias("gridid"), col("height1").alias("height"), col("rsrp1").alias("rsrp"), col("neighbour1").alias("neighbour"))
scala>     val encoder = Encoders.tuple(
     |       Encoders.STRING,
     |       Encoders.javaSerialization[Seq[String]],
     |       Encoders.javaSerialization[Seq[String]],
     |       Encoders.javaSerialization[Seq[String]],
     |       Encoders.javaSerialization[Seq[Seq[(String, Double)]]]
     |     )
encoder: org.apache.spark.sql.Encoder[(String, Seq[String], Seq[String], Seq[String], Seq[Seq[(String, Double)]])] = class[_1[0]: string, _2[0]: binary, _3[0]: binary, _4[0]: binary, _5[0]: binary]

scala> fpb_groupby_obj_df2.show
+---------+--------------------+--------------------+--------------------+--------------------+
| objectid|              gridid|              height|                rsrp|           neighbour|
+---------+--------------------+--------------------+--------------------+--------------------+
|100700931|[2676906_708106, ...|[0, 5, 0, 0, 0, 0...|[-130.399994, -12...|[WrappedArray([10...|
+---------+--------------------+--------------------+--------------------+--------------------+


scala> fpb_groupby_obj_df2.map { s => (s.getAs[String]("objectid"), s.getSeq[String](1), s.getSeq[String](2), s.getSeq[String](3), s.getSeq[Seq[(String, Double)]](4)) }(encoder).show
+---------+--------------------+--------------------+--------------------+--------------------+
|    value|                  _2|                  _3|                  _4|                  _5|
+---------+--------------------+--------------------+--------------------+--------------------+
|100700931|[AC ED 00 05 73 7...|[AC ED 00 05 73 7...|[AC ED 00 05 73 7...|[AC ED 00 05 73 7...|
+---------+--------------------+--------------------+--------------------+--------------------+


scala>   fpb_groupby_obj_df2.map(s => (s.getAs[String]("objectid"), s.getSeq[String](1), s.getSeq[String](2), s.getSeq[String](3), s.getSeq[Seq[(String, Double)]](4))).show()
[Stage 6:======================================================>(963 + 1) / 964]2018-03-24 15:09:09,267 | ERROR | Executor task launch worker for task 3859 | Exception in task 0.0 in stage 7.0 (TID 3859) | org.apache.spark.internal.Logging$class.logError(Logging.scala:91)
java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to scala.Tuple2
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:381)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:828)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:828)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:99)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

scala>  val encoder = Encoders.tuple(
     |       Encoders.STRING,
     |       Encoders.kryo[Seq[String]],
     |       Encoders.kryo[Seq[String]],
     |       Encoders.kryo[Seq[String]],
     |       Encoders.kryo[Seq[Seq[(String, Double)]]]
     |     )
encoder: org.apache.spark.sql.Encoder[(String, Seq[String], Seq[String], Seq[String], Seq[Seq[(String, Double)]])] = class[_1[0]: string, _2[0]: binary, _3[0]: binary, _4[0]: binary, _5[0]: binary]

scala>  fpb_groupby_obj_df2.map { s => (s.getAs[String]("objectid"), s.getSeq[String](1), s.getSeq[String](2), s.getSeq[String](3), s.getSeq[Seq[(String, Double)]](4)) }(encoder).show
+---------+--------------------+--------------------+--------------------+--------------------+
|    value|                  _2|                  _3|                  _4|                  _5|
+---------+--------------------+--------------------+--------------------+--------------------+
|100700931|[01 00 73 63 61 6...|[01 00 73 63 61 6...|[01 00 73 63 61 6...|[01 00 73 63 61 6...|
+---------+--------------------+--------------------+--------------------+--------------------+      

基礎才是程式設計人員應該深入研究的問題,比如:

1)List/Set/Map内部組成原理|差別

2)mysql索引存儲結構&如何調優/b-tree特點、計算複雜度及影響複雜度的因素。。。

3)JVM運作組成與原理及調優

4)Java類加載器運作原理

5)Java中GC過程原理|使用的回收算法原理

6)Redis中hash一緻性實作及與hash其他差別

7)Java多線程、線程池開發、管理Lock與Synchroined差別

8)Spring IOC/AOP 原理;加載過程的。。。

【+加關注】。