天天看點

Spark資料分析之pyspark

一、大資料簡史,從hadoop到Spark

1.hadoop的出現:
(1)問題:1990年,電商爆發以及機器産生了大量資料,單一的系統無法承擔
(2)辦法:為了解決(1)的問題許多公司,尤其是大公司上司了普通硬體叢集的水準擴充
(3)執行:hadoop應運而生

2.spark的出現:
(1)hadoop面臨問題:
     - 硬體瓶頸:多年來,記憶體技術突飛猛進,而硬碟技術沒有太大的變化。hadoop主要
運用的是硬碟,沒有利用好記憶體技術。
     - 程式設計困難,hadoop的MapReduce程式設計不太容易,後續才出現了Pig、Hive
     - 場景不适,批處理要根據不同場景進行開發
(2)spark應運而生

3.叢集的強大之處:
(1)存儲:切割   HDFS的Block
(2)計算:切割  【分布式并行計算】 MapReduce/Spark
(3)存儲 + 計算:  HDFS/S3 + MapReduce/Spark
           

二、Spark簡介

(1)目标:企業資料處理的統一引擎

(2)特點:
      - 廣支援:一套系統解決多種環境
      - 高速度:記憶體上運作Hadoop快100倍,硬碟上運作比Hadoop快10倍
      - 多接口:如:Python、Java、R...
      - 多應用:sparkSQL、sparkStreaming、MLlib、GraphX

(3)為啥spark解決了hadoop慢的問題呢?
      - 減少網絡使用:Spark設計思想是減少節點間的資料互動
      - 運用記憶體技術:Spark是和記憶體進行互動,Hadoop是磁盤進行互動

(4)大資料處理的三種情況:
      - 複雜的批量處理:時間長,跨度為10min~N hr
      - 曆史資料為基礎的互動式查詢:時間通常為10sec~N min
      - 實時資料為基礎的流式資料:時間通常為N ms~N sec
                   
(5)Spark的對應方案:
      - Spark Core:以RDD為基礎提供了豐富的基礎操作接口,
                    使得Spark可以靈活處理類似MR的批處理作業
      - Spark SQL:相容HIVE資料,提供比Hive更快的查詢速度
                   (10~100x)的分布式SQL引擎
      - Spark Streaming:将流式計算分解成一系列小的批處理作業
                         利用spark輕量級低延遲時間的架構來支援流數
                         據處理,目前已經支援Kafka,Flume等
      - GraphX:提供圖形計算架構,與Pregel/GraphLab相容
      - Milb:提供基于Spark的機器學習算法庫
           

三、Spark Core核心之RDD

(1)RDD是什麼:
    ①RDD是一個抽象類
    ②RDD支援多種類型,即泛形
    ③RDD:彈性分布式資料集
         彈性     【容錯,自動進行資料的修複】
         分布式   【不同節點運作】
         資料集
                  - 不可變 (1,2,3,4,5,6)
                  - 可分區 (1,2,3)(4,5,6)
(2)RDD的特性:
    ①一個RDD由多個分區/分片組成
    ②對RDD進行一個函數操作,會對RDD的所有分區都執行相同函數操作
    ③一個RDD依賴于其他RDD,RDD1->RDD2->RDD3->RDD4->RDD5,若RDD1中某節點資料丢失,
     後面的RDD會根據前面的資訊進行重新計算
    ④對于Key-Value的RDD可以制定一個partitioner,告訴他如何分片。常用hash/range
    ⑤移動資料不如移動計算,注:移動資料,不如移動計算。考慮磁盤IO和網絡資源傳輸等
(3)圖解RDD:
           
Spark資料分析之pyspark

RDD圖解

(4)SparkContext&SparkConf
     SparkContext意義:主入口點
     SparkContext作用:連接配接Spark叢集
     SparkConf作用:建立SparkContext前得使用SparkConf進行配置,以鍵值對形式進行
     ①建立SparkContext
        - 連接配接到Spark“叢集”:local、standalone、yarn、mesos
        - 通過SparkContext來建立RDD、廣播變量到叢集
     ②建立SparkContext前還需要建立SparkConf對象
     ③SparkConf().setAppName(appname).setMaster('local')這個設定高于系統設定
     ④pyspark.SparkContext連接配接到Spark‘叢集’即master(local[單機]、Standalone[标準]
       、yarn(hadoop上)、mesos),建立RDD,廣播變量到叢集
     ⑤代碼:
     conf = SparkConf().setAppName(appname).setMaster('local')
     sc   = SparkContext(Conf=Conf)

(5)pyspark的使用:
    ①預設:pySpark在啟動時就會建立一個SparkContext,别名為sc
           <SparkContext master = local[*] appName = PySparkShell>
    ②參數的使用:
          ./bin/pyspark --master local[4]
          ./bin/pyspark  --master[4]  --py-files code.py
  
(6)RDD的建立:
①集合轉RDD
   data = [1,2,3]
   distData = sc.parallelize(data,3) #這行代碼可以将清單轉為RDD資料集
   distData.collect() #這行代碼可以列印輸出RDD資料集#【觸發一個job】
   distData.reduce(lambda a,b :a+b) #【觸發一個job】

   注意:一個CPU可以設定2~4個partition

②外部資料集轉RDD
   distFile = sc.textFile("hello.txt") #将外部資料轉換為RDD對象
   distFile.collect()

(7)送出pyspark作業到伺服器上運作
   ./spark-submit --master local[2] --name spark0301 /home/hadoop/scrip
   t/spark0301.py

(8)PySpark實戰之Spark Core核心RDD常用算子:
  【兩個算子】
   ①transfermation: map、filter【過濾】、group by、distinct
          map()是将傳入的函數依次作用到序列的每個元素,每個元素都是獨自被函數“作用”一次
   ②action: count, reduce, collect
   注意:(1)
          1)All transformations in Spark are lazy,in that they do not
            compute their results right away.
            -- Spark的transformations很懶,因為他們沒有馬上計算出結果
          2)Instead they just remember the transformations applied to some 
            base dataset
            -- 相反,他們隻記得應用于基本資料集
        (2)
          1)action triggers the computation
            -- 動作觸發計算
          2) action returns values to driver or writes data to external storage
            --  action将傳回值資料寫入外部存儲
  【單詞記憶】
           applied to:施加到
           Instead:相反
           in that:因為
           external storage:外部存儲


map()是将傳入的函數依次作用到序列的每個元素,每個元素都是獨自被函數“作用”一次
           
Spark資料分析之pyspark
(1)map
  map(func) 
  #将func函數作用到資料集每一個元素上,生成一個新的分布式【資料集】傳回
(2)filter
  filter(func)
  傳回所有func傳回值為true的元素,生成一個新的分布式【資料集】傳回
(3)flatMap  #flat壓扁以後做map
  flatMap(func)
  輸入的item能夠被map或0或多個items輸出,傳回值是一個【Sequence】
(4)groupByKey:把相同的key的資料分發到一起
  ['hello', 'spark', 'hello', 'world', 'hello', 'world']
  ('hello',1) ('spark',1)........
(5)reduceByKey: 把相同的key的資料分發到一起并進行相應的計算
  mapRdd.reduceByKey(lambda a,b:a+b)
  [1,1]  1+1
  [1,1,1]  1+1=2+1=3
  [1]    1
(6)左連接配接:以左表為基準
   右連接配接:以右表為基準
   全連接配接:以左右都為基準
           

練習1:Transformation算子程式設計

from pyspark import SparkConf, SparkContext
if __name__ == '__main__':
    conf = SparkConf().setMaster("local[2]").setAppName("spark0401")
    sc = SparkContext(conf=conf)
    '''
    map:
        map(func)
        将func函數作用到資料集的每個元素上,生成一個新的分布式資料集傳回
    '''
    print("***************************map***************************")
    def my_map():
        # 建立一個序列
        data = [1,2,3,4,5]
        # 将序列轉換為RDD
        rdd1 = sc.parallelize(data)
        # 使用函數對RDD進行作用,生成RDD2
        rdd2 = rdd1.map(lambda x:x*2)
        # 使用collect()講結果輸出
        print(rdd2.collect())

    my_map()

    def my_map2():
        a = sc.parallelize(["dog","tiger","lion","cat","panter","eagle"])
        b = a.map(lambda x:(x,1)) #進來一個x,傳回一個(x,1)的形式
        print(b.collect())
    my_map2()
    print("***************************filter***************************")
    def my_filter():
        #給一個資料
        data = [1,2,3,4,5]
        rdd1 = sc.parallelize(data)
        mapRdd = rdd1.map(lambda x:x**2)
        filterRdd = mapRdd.filter(lambda x:x>5)
        print(filterRdd.collect())
    '''
    filter:
        filter(func)
        傳回所有func傳回值為true的元素,生成一個新的分布式資料集傳回
    '''
    def my_filter():
        data = [1,2,3,4,5]
        rdd1 = sc.parallelize(data)
        mapRdd = rdd1.map(lambda x:x*2)
        filterRdd = mapRdd.filter(lambda x:x > 5)
        print(filterRdd.collect())
        print(sc.parallelize(data).map(lambda x:x*2).filter(lambda x:x>5).collect())
    my_filter()
    print("***************************flatMap()***************************")
    #Wordcount第一步:
    def my_flatMap():
        #flatMap,将東西壓扁/拆開 後做map
        data = ["hello spark","hello world","hello world"]
        rdd = sc.parallelize(data)
        print(rdd.flatMap(lambda line:line.split(" ")).collect())
    my_flatMap()
    print("***************************groupBy()***************************")
    def my_groupBy():
        data = ["hello spark","hello world","hello world"]
        rdd = sc.parallelize(data)
        mapRdd = rdd.flatMap(lambda line:line.split(" ")).map(lambda x:(x,1))
        groupByRdd = mapRdd.groupByKey()
        print(groupByRdd.collect())
        print(groupByRdd.map(lambda x:{x[0]:list(x[1])}).collect())

    my_groupBy()

    print("***************************reduceByKey()***************************")
    #出現Wordcount結果
    def my_reduceByKey():
        data = ["hello spark", "hello world", "hello world"]
        rdd = sc.parallelize(data)
        mapRdd = rdd.flatMap(lambda line: line.split(" ")).map(lambda x: (x, 1))
        reduceByKeyRdd = mapRdd.reduceByKey(lambda a,b:a+b)
        print(reduceByKeyRdd.collect())
    my_reduceByKey()

    print("***************************sortByKey()***************************")
    #将Wordcount結果中數字出現的次數進行降序排列
    def my_sort():
        data = ["hello spark", "hello world", "hello world"]
        rdd = sc.parallelize(data)
        mapRdd = rdd.flatMap(lambda line: line.split(" ")).map(lambda x: (x, 1))
        reduceByKeyRdd = mapRdd.reduceByKey(lambda a, b: a + b)
        #reduceByKeyRdd.sortByKey().collect() 此時是按照字典在排序
        #reduceByKeyRdd.sortByKey(False).collect()
        #先對對鍵與值互換位置,再排序,再換位置回來
        reduceByKey=reduceByKeyRdd.map(lambda x:(x[1],x[0])).sortByKey(False).map(lambda x:(x[1],x[0])).collect()
        print(reduceByKey)
    my_sort()

    print("***************************union()***************************")
    def my_union():
        a = sc.parallelize([1,2,3])
        b = sc.parallelize([3,4,5])
        U = a.union(b).collect()
        print(U)
    my_union()

    print("***************************union_distinct()***************************")
    def my_distinct():
        #這個和數學并集一樣了
        a = sc.parallelize([1, 2, 3])
        b = sc.parallelize([3, 4, 2])
        D = a.union(b).distinct().collect()
        print(D)
    my_distinct()

    print("***************************join()***************************")
    def my_join():
        a = sc.parallelize([("A", "a1"), ("C", "c1"), ("D", "d1"), ("F", "f1"), ("F", "f2")])
        b = sc.parallelize([("A", "a2"), ("C", "c2"), ("C", "c3"), ("E", "e1")])
        J = a.fullOuterJoin(b).collect
        print(J)
    my_join()

    sc.stop()

'''
Spark Core核心算子回顧
 -- Transformation算子程式設計:
   map、filter、groupByKey、flatMap、reduceByKey、sortByKey、join等
'''
           

練習2:Action算子程式設計

from pyspark import SparkConf, SparkContext
if __name__ == '__main__':
    conf = SparkConf().setMaster("local[2]").setAppName("spark0401")
    sc = SparkContext(conf=conf)
    def my_action():
        data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
        rdd = sc.parallelize(data)
        rdd.collect()
        rdd.count()
        rdd.take(3)  #取前三個
        rdd.max()    #最大值
        rdd.min()    #最小值
        rdd.sum()    #求和
        rdd.reduce(lambda x, y: x + y) #相鄰兩個相加
        rdd.foreach(lambda x: print(x))
    my_action()
    sc.stop()
           

四、PySpark運作模式【一份代碼可在多個模式上運作】:

(1)local模式:主要是開發和測試時使用
  --master     叢集
  --name       應用程式名稱
  --py-file

  例子:
     ./spark-submit --master local[2] --name spark-local /home/hadoop/
     script/spark0402.py file:///home/hadoop/data/hello.txt file:///home/hadoop/
     wc/output
  注意:
     local:運作在一個線程上
     local[k]:運作在k個線程上
     local[K,F]:運作在K線程上,和最大錯誤設定
     local[*]:用本地盡可能多的線程運作
     将上述例子中的local[2]改為其他模式名即可在對應模式上運作
(2)standalone模式
	hdfs: NameNode  DataNode
	yarn: ResourceManager NodeManager

	master:
	worker:

	$SPARK_HOME/conf/slaves
		hadoop000

		假設你有5台機器,就應該進行如下slaves的配置
		hadoop000
		hadoop001
		hadoop002
		hadoop003
		hadoop005
		如果是多台機器,那麼每台機器都在相同的路徑下部署spark
	啟動spark叢集
		$SPARK_HOME/sbin/start-all.sh
		ps: 要在spark-env.sh中添加JAVA_HOME,否則會報錯
                檢測:jps: Master和Worker程序,就說明我們的standalone模式安裝成功
(3)yarn模式:
  - spark作為作業用戶端而已,他需要做的事情就是送出作業到yarn上去執行
  - yarn vs standalone
    yarn: 你隻需要一個節點,然後送出作業即可   
           這時是不需要spark叢集的(不需要啟動master和worker的)
    standalone:你的spark叢集上每個節點都需要部署spark
                然後需要啟動spark叢集(需要master和worker)
  - 例子:
    ./spark-submit --master yarn --name spark-yarn /home/hadoop/
     script/spark0402.py hdfs://hadoop000:8020/wc.txt hdfs://hadoop000:8020/
     wc/output
  - 指定hadoop_conf或者yarn_conf_dir是為了指定加載其路徑下面的配置檔案,spark 想要跑在yarn
    上勢必要知道HDFS 和 yarn 的資訊,不然 spark怎麼找到yarn 
  - yarn支援client和cluster模式:那麼driver運作在哪裡呢?
	本地時是client【預設】:送出作業的程序是不能停止的,否則作業就挂了
	叢集時是cluster:送出完作業,那麼送出作業端就可以斷開了,因為driver是運作在
                       am(application master)裡面的
  - yarn相關的報錯資訊:
    Error: Cluster deploy mode is not applicable to Spark shells
           pyspark/spark-shell : 互動式運作程式  client
    如何檢視已經運作完的yarn的日志資訊: yarn logs -applicationId <applicationId>
           

五、Spark核心概述

1.名詞解析:
Application	:基于Spark的應用程式 =  1 driver + executors
  pyspark、spark-shell都是應用程式
Driver program	:在py檔案的主方法__main__下建立一個SparkContext	
Cluster manager :從外部去擷取資源,同時可以設定申請多少資源
  spark-submit --master local[2]/spark://hadoop000:7077/yarn
Deploy mode	:區分driver在什麼地方啟動
  In "cluster" mode, the framework launches the driver inside of the cluster. 
  In "client" mode, the submitter launches the driver outside of the cluster.
Worker node	:工作節點,就像manage節點
	Any node that can run application code in the cluster
	standalone: slave節點 slaves配置檔案
	yarn: nodemanager
Executor	:為工作節點上的應用程式啟動的程序
	- runs tasks 
	- keeps data in memory or disk storage across them
	- Each application has its own executors.	

Task	       :一個工作單元,将被發送給一個執行者
Job	       :一個action對應一個job
①Spark=a driver + executors
②driver = main方法 + sparkContext
③executors是一個程序啟動在worknode上,能夠運作任務能夠緩存資料,而且每個應用程式有一組獨立的executor
④申請資源時是通過Cluster manager去申請的,可以自定義本地或叢集
⑤自定義運作時,Deploy mode可以跑在cluster上也可以跑在client上
⑥Executor運作在worknode上,task運作在Executor上,task(map、flatMap等屬于task)從driver上發起
⑦Job是一個并行的計算,由多個①Spark=a driver + executors
②driver = main方法 + sparkContext
③executors是一個程序啟動在worknode上,能夠運作任務能夠緩存資料,而且每個應用程式有一組獨立的executor
④申請資源時是通過Cluster manager去申請的,可以自定義本地或叢集
⑤自定義運作時,Deploy mode可以跑在cluster上也可以跑在client上組成,spark中一個action(save,collect)對應一個job
⑥Stage:一個job會被拆分為一個小的任務集,一個stage的邊界往往是從某個地方取資料開始,到shuffle結束
           

六、Spark SQL

(1)Spark SQL前世今生:
  SQL:MySQL、Oracle、DB2、SQLService
       - 我們很熟悉的資料處理語言是SQL
       - 但是資料量越來越大 ==> 大資料(Hive、Spark Core)
          -- hive是基于Hadoop的一個資料倉庫工具,可以将結構化的資料檔案映射為一張資料庫表,
            并提供簡單的sql查詢功能,可以将sql語句轉換為MapReduce任務進行運作。
          -- Spark Core:得熟悉Java、Python等語言
      - 綜上:能通過SQL語言處理大資料問題是人們最喜歡的啦
出現了:SQL on Hadoop
  Hive on Map Reduce 
  Shark【沒有了。。。】
  Impala:比較吃記憶體,Cloudera
  Presto:京東再用
發展:
Hive on Map Reduce 
Shark on Spark
Spark SQL:on Spark:Spark社群的
共同點:metastore mysql,基于源資料建表
Hive on Spark:Hive社群的不同于Spark SQL,在Hive能運作的SparkSQL不一定可以
(2)官方描述:Spark SQL是Apache Spark的一個子產品,是用來處理結構化資料的
①程式設計和SQL可以無縫對接:
  支援SQL和DATa Frame API(Java、Scala、Python、R)
  代碼示例:results = spark.sql("SELCET * FROM people")
            names = results.map(lambda p:p.name)
②統一的資料通路:可以直接将Hive、ORC、JSON、JDBC結果做連接配接
   spark.read.json("s3n://...").registerTempTable("json")
   results = spark.sql(
   """SELECT *
   FROM people
   JOIN hson ...""")
 查詢和連接配接不同資料源【Spark SQL不僅僅是SQL】
③Spark SQL 可以使用已經存在的Hive倉庫matastores,UDFs等
④提供了标準的JDBC、ODBC接口,外部工具可以直接通路Spark
結:Spark SQL 強調的是“結構化資料”而非“SQL”
           

繼續閱讀