一、大資料簡史,從hadoop到Spark
1.hadoop的出現:
(1)問題:1990年,電商爆發以及機器産生了大量資料,單一的系統無法承擔
(2)辦法:為了解決(1)的問題許多公司,尤其是大公司上司了普通硬體叢集的水準擴充
(3)執行:hadoop應運而生
2.spark的出現:
(1)hadoop面臨問題:
- 硬體瓶頸:多年來,記憶體技術突飛猛進,而硬碟技術沒有太大的變化。hadoop主要
運用的是硬碟,沒有利用好記憶體技術。
- 程式設計困難,hadoop的MapReduce程式設計不太容易,後續才出現了Pig、Hive
- 場景不适,批處理要根據不同場景進行開發
(2)spark應運而生
3.叢集的強大之處:
(1)存儲:切割 HDFS的Block
(2)計算:切割 【分布式并行計算】 MapReduce/Spark
(3)存儲 + 計算: HDFS/S3 + MapReduce/Spark
二、Spark簡介
(1)目标:企業資料處理的統一引擎
(2)特點:
- 廣支援:一套系統解決多種環境
- 高速度:記憶體上運作Hadoop快100倍,硬碟上運作比Hadoop快10倍
- 多接口:如:Python、Java、R...
- 多應用:sparkSQL、sparkStreaming、MLlib、GraphX
(3)為啥spark解決了hadoop慢的問題呢?
- 減少網絡使用:Spark設計思想是減少節點間的資料互動
- 運用記憶體技術:Spark是和記憶體進行互動,Hadoop是磁盤進行互動
(4)大資料處理的三種情況:
- 複雜的批量處理:時間長,跨度為10min~N hr
- 曆史資料為基礎的互動式查詢:時間通常為10sec~N min
- 實時資料為基礎的流式資料:時間通常為N ms~N sec
(5)Spark的對應方案:
- Spark Core:以RDD為基礎提供了豐富的基礎操作接口,
使得Spark可以靈活處理類似MR的批處理作業
- Spark SQL:相容HIVE資料,提供比Hive更快的查詢速度
(10~100x)的分布式SQL引擎
- Spark Streaming:将流式計算分解成一系列小的批處理作業
利用spark輕量級低延遲時間的架構來支援流數
據處理,目前已經支援Kafka,Flume等
- GraphX:提供圖形計算架構,與Pregel/GraphLab相容
- Milb:提供基于Spark的機器學習算法庫
三、Spark Core核心之RDD
(1)RDD是什麼:
①RDD是一個抽象類
②RDD支援多種類型,即泛形
③RDD:彈性分布式資料集
彈性 【容錯,自動進行資料的修複】
分布式 【不同節點運作】
資料集
- 不可變 (1,2,3,4,5,6)
- 可分區 (1,2,3)(4,5,6)
(2)RDD的特性:
①一個RDD由多個分區/分片組成
②對RDD進行一個函數操作,會對RDD的所有分區都執行相同函數操作
③一個RDD依賴于其他RDD,RDD1->RDD2->RDD3->RDD4->RDD5,若RDD1中某節點資料丢失,
後面的RDD會根據前面的資訊進行重新計算
④對于Key-Value的RDD可以制定一個partitioner,告訴他如何分片。常用hash/range
⑤移動資料不如移動計算,注:移動資料,不如移動計算。考慮磁盤IO和網絡資源傳輸等
(3)圖解RDD:
RDD圖解
(4)SparkContext&SparkConf
SparkContext意義:主入口點
SparkContext作用:連接配接Spark叢集
SparkConf作用:建立SparkContext前得使用SparkConf進行配置,以鍵值對形式進行
①建立SparkContext
- 連接配接到Spark“叢集”:local、standalone、yarn、mesos
- 通過SparkContext來建立RDD、廣播變量到叢集
②建立SparkContext前還需要建立SparkConf對象
③SparkConf().setAppName(appname).setMaster('local')這個設定高于系統設定
④pyspark.SparkContext連接配接到Spark‘叢集’即master(local[單機]、Standalone[标準]
、yarn(hadoop上)、mesos),建立RDD,廣播變量到叢集
⑤代碼:
conf = SparkConf().setAppName(appname).setMaster('local')
sc = SparkContext(Conf=Conf)
(5)pyspark的使用:
①預設:pySpark在啟動時就會建立一個SparkContext,别名為sc
<SparkContext master = local[*] appName = PySparkShell>
②參數的使用:
./bin/pyspark --master local[4]
./bin/pyspark --master[4] --py-files code.py
(6)RDD的建立:
①集合轉RDD
data = [1,2,3]
distData = sc.parallelize(data,3) #這行代碼可以将清單轉為RDD資料集
distData.collect() #這行代碼可以列印輸出RDD資料集#【觸發一個job】
distData.reduce(lambda a,b :a+b) #【觸發一個job】
注意:一個CPU可以設定2~4個partition
②外部資料集轉RDD
distFile = sc.textFile("hello.txt") #将外部資料轉換為RDD對象
distFile.collect()
(7)送出pyspark作業到伺服器上運作
./spark-submit --master local[2] --name spark0301 /home/hadoop/scrip
t/spark0301.py
(8)PySpark實戰之Spark Core核心RDD常用算子:
【兩個算子】
①transfermation: map、filter【過濾】、group by、distinct
map()是将傳入的函數依次作用到序列的每個元素,每個元素都是獨自被函數“作用”一次
②action: count, reduce, collect
注意:(1)
1)All transformations in Spark are lazy,in that they do not
compute their results right away.
-- Spark的transformations很懶,因為他們沒有馬上計算出結果
2)Instead they just remember the transformations applied to some
base dataset
-- 相反,他們隻記得應用于基本資料集
(2)
1)action triggers the computation
-- 動作觸發計算
2) action returns values to driver or writes data to external storage
-- action将傳回值資料寫入外部存儲
【單詞記憶】
applied to:施加到
Instead:相反
in that:因為
external storage:外部存儲
map()是将傳入的函數依次作用到序列的每個元素,每個元素都是獨自被函數“作用”一次
(1)map
map(func)
#将func函數作用到資料集每一個元素上,生成一個新的分布式【資料集】傳回
(2)filter
filter(func)
傳回所有func傳回值為true的元素,生成一個新的分布式【資料集】傳回
(3)flatMap #flat壓扁以後做map
flatMap(func)
輸入的item能夠被map或0或多個items輸出,傳回值是一個【Sequence】
(4)groupByKey:把相同的key的資料分發到一起
['hello', 'spark', 'hello', 'world', 'hello', 'world']
('hello',1) ('spark',1)........
(5)reduceByKey: 把相同的key的資料分發到一起并進行相應的計算
mapRdd.reduceByKey(lambda a,b:a+b)
[1,1] 1+1
[1,1,1] 1+1=2+1=3
[1] 1
(6)左連接配接:以左表為基準
右連接配接:以右表為基準
全連接配接:以左右都為基準
練習1:Transformation算子程式設計
from pyspark import SparkConf, SparkContext
if __name__ == '__main__':
conf = SparkConf().setMaster("local[2]").setAppName("spark0401")
sc = SparkContext(conf=conf)
'''
map:
map(func)
将func函數作用到資料集的每個元素上,生成一個新的分布式資料集傳回
'''
print("***************************map***************************")
def my_map():
# 建立一個序列
data = [1,2,3,4,5]
# 将序列轉換為RDD
rdd1 = sc.parallelize(data)
# 使用函數對RDD進行作用,生成RDD2
rdd2 = rdd1.map(lambda x:x*2)
# 使用collect()講結果輸出
print(rdd2.collect())
my_map()
def my_map2():
a = sc.parallelize(["dog","tiger","lion","cat","panter","eagle"])
b = a.map(lambda x:(x,1)) #進來一個x,傳回一個(x,1)的形式
print(b.collect())
my_map2()
print("***************************filter***************************")
def my_filter():
#給一個資料
data = [1,2,3,4,5]
rdd1 = sc.parallelize(data)
mapRdd = rdd1.map(lambda x:x**2)
filterRdd = mapRdd.filter(lambda x:x>5)
print(filterRdd.collect())
'''
filter:
filter(func)
傳回所有func傳回值為true的元素,生成一個新的分布式資料集傳回
'''
def my_filter():
data = [1,2,3,4,5]
rdd1 = sc.parallelize(data)
mapRdd = rdd1.map(lambda x:x*2)
filterRdd = mapRdd.filter(lambda x:x > 5)
print(filterRdd.collect())
print(sc.parallelize(data).map(lambda x:x*2).filter(lambda x:x>5).collect())
my_filter()
print("***************************flatMap()***************************")
#Wordcount第一步:
def my_flatMap():
#flatMap,将東西壓扁/拆開 後做map
data = ["hello spark","hello world","hello world"]
rdd = sc.parallelize(data)
print(rdd.flatMap(lambda line:line.split(" ")).collect())
my_flatMap()
print("***************************groupBy()***************************")
def my_groupBy():
data = ["hello spark","hello world","hello world"]
rdd = sc.parallelize(data)
mapRdd = rdd.flatMap(lambda line:line.split(" ")).map(lambda x:(x,1))
groupByRdd = mapRdd.groupByKey()
print(groupByRdd.collect())
print(groupByRdd.map(lambda x:{x[0]:list(x[1])}).collect())
my_groupBy()
print("***************************reduceByKey()***************************")
#出現Wordcount結果
def my_reduceByKey():
data = ["hello spark", "hello world", "hello world"]
rdd = sc.parallelize(data)
mapRdd = rdd.flatMap(lambda line: line.split(" ")).map(lambda x: (x, 1))
reduceByKeyRdd = mapRdd.reduceByKey(lambda a,b:a+b)
print(reduceByKeyRdd.collect())
my_reduceByKey()
print("***************************sortByKey()***************************")
#将Wordcount結果中數字出現的次數進行降序排列
def my_sort():
data = ["hello spark", "hello world", "hello world"]
rdd = sc.parallelize(data)
mapRdd = rdd.flatMap(lambda line: line.split(" ")).map(lambda x: (x, 1))
reduceByKeyRdd = mapRdd.reduceByKey(lambda a, b: a + b)
#reduceByKeyRdd.sortByKey().collect() 此時是按照字典在排序
#reduceByKeyRdd.sortByKey(False).collect()
#先對對鍵與值互換位置,再排序,再換位置回來
reduceByKey=reduceByKeyRdd.map(lambda x:(x[1],x[0])).sortByKey(False).map(lambda x:(x[1],x[0])).collect()
print(reduceByKey)
my_sort()
print("***************************union()***************************")
def my_union():
a = sc.parallelize([1,2,3])
b = sc.parallelize([3,4,5])
U = a.union(b).collect()
print(U)
my_union()
print("***************************union_distinct()***************************")
def my_distinct():
#這個和數學并集一樣了
a = sc.parallelize([1, 2, 3])
b = sc.parallelize([3, 4, 2])
D = a.union(b).distinct().collect()
print(D)
my_distinct()
print("***************************join()***************************")
def my_join():
a = sc.parallelize([("A", "a1"), ("C", "c1"), ("D", "d1"), ("F", "f1"), ("F", "f2")])
b = sc.parallelize([("A", "a2"), ("C", "c2"), ("C", "c3"), ("E", "e1")])
J = a.fullOuterJoin(b).collect
print(J)
my_join()
sc.stop()
'''
Spark Core核心算子回顧
-- Transformation算子程式設計:
map、filter、groupByKey、flatMap、reduceByKey、sortByKey、join等
'''
練習2:Action算子程式設計
from pyspark import SparkConf, SparkContext
if __name__ == '__main__':
conf = SparkConf().setMaster("local[2]").setAppName("spark0401")
sc = SparkContext(conf=conf)
def my_action():
data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
rdd = sc.parallelize(data)
rdd.collect()
rdd.count()
rdd.take(3) #取前三個
rdd.max() #最大值
rdd.min() #最小值
rdd.sum() #求和
rdd.reduce(lambda x, y: x + y) #相鄰兩個相加
rdd.foreach(lambda x: print(x))
my_action()
sc.stop()
四、PySpark運作模式【一份代碼可在多個模式上運作】:
(1)local模式:主要是開發和測試時使用
--master 叢集
--name 應用程式名稱
--py-file
例子:
./spark-submit --master local[2] --name spark-local /home/hadoop/
script/spark0402.py file:///home/hadoop/data/hello.txt file:///home/hadoop/
wc/output
注意:
local:運作在一個線程上
local[k]:運作在k個線程上
local[K,F]:運作在K線程上,和最大錯誤設定
local[*]:用本地盡可能多的線程運作
将上述例子中的local[2]改為其他模式名即可在對應模式上運作
(2)standalone模式
hdfs: NameNode DataNode
yarn: ResourceManager NodeManager
master:
worker:
$SPARK_HOME/conf/slaves
hadoop000
假設你有5台機器,就應該進行如下slaves的配置
hadoop000
hadoop001
hadoop002
hadoop003
hadoop005
如果是多台機器,那麼每台機器都在相同的路徑下部署spark
啟動spark叢集
$SPARK_HOME/sbin/start-all.sh
ps: 要在spark-env.sh中添加JAVA_HOME,否則會報錯
檢測:jps: Master和Worker程序,就說明我們的standalone模式安裝成功
(3)yarn模式:
- spark作為作業用戶端而已,他需要做的事情就是送出作業到yarn上去執行
- yarn vs standalone
yarn: 你隻需要一個節點,然後送出作業即可
這時是不需要spark叢集的(不需要啟動master和worker的)
standalone:你的spark叢集上每個節點都需要部署spark
然後需要啟動spark叢集(需要master和worker)
- 例子:
./spark-submit --master yarn --name spark-yarn /home/hadoop/
script/spark0402.py hdfs://hadoop000:8020/wc.txt hdfs://hadoop000:8020/
wc/output
- 指定hadoop_conf或者yarn_conf_dir是為了指定加載其路徑下面的配置檔案,spark 想要跑在yarn
上勢必要知道HDFS 和 yarn 的資訊,不然 spark怎麼找到yarn
- yarn支援client和cluster模式:那麼driver運作在哪裡呢?
本地時是client【預設】:送出作業的程序是不能停止的,否則作業就挂了
叢集時是cluster:送出完作業,那麼送出作業端就可以斷開了,因為driver是運作在
am(application master)裡面的
- yarn相關的報錯資訊:
Error: Cluster deploy mode is not applicable to Spark shells
pyspark/spark-shell : 互動式運作程式 client
如何檢視已經運作完的yarn的日志資訊: yarn logs -applicationId <applicationId>
五、Spark核心概述
1.名詞解析:
Application :基于Spark的應用程式 = 1 driver + executors
pyspark、spark-shell都是應用程式
Driver program :在py檔案的主方法__main__下建立一個SparkContext
Cluster manager :從外部去擷取資源,同時可以設定申請多少資源
spark-submit --master local[2]/spark://hadoop000:7077/yarn
Deploy mode :區分driver在什麼地方啟動
In "cluster" mode, the framework launches the driver inside of the cluster.
In "client" mode, the submitter launches the driver outside of the cluster.
Worker node :工作節點,就像manage節點
Any node that can run application code in the cluster
standalone: slave節點 slaves配置檔案
yarn: nodemanager
Executor :為工作節點上的應用程式啟動的程序
- runs tasks
- keeps data in memory or disk storage across them
- Each application has its own executors.
Task :一個工作單元,将被發送給一個執行者
Job :一個action對應一個job
①Spark=a driver + executors
②driver = main方法 + sparkContext
③executors是一個程序啟動在worknode上,能夠運作任務能夠緩存資料,而且每個應用程式有一組獨立的executor
④申請資源時是通過Cluster manager去申請的,可以自定義本地或叢集
⑤自定義運作時,Deploy mode可以跑在cluster上也可以跑在client上
⑥Executor運作在worknode上,task運作在Executor上,task(map、flatMap等屬于task)從driver上發起
⑦Job是一個并行的計算,由多個①Spark=a driver + executors
②driver = main方法 + sparkContext
③executors是一個程序啟動在worknode上,能夠運作任務能夠緩存資料,而且每個應用程式有一組獨立的executor
④申請資源時是通過Cluster manager去申請的,可以自定義本地或叢集
⑤自定義運作時,Deploy mode可以跑在cluster上也可以跑在client上組成,spark中一個action(save,collect)對應一個job
⑥Stage:一個job會被拆分為一個小的任務集,一個stage的邊界往往是從某個地方取資料開始,到shuffle結束
六、Spark SQL
(1)Spark SQL前世今生:
SQL:MySQL、Oracle、DB2、SQLService
- 我們很熟悉的資料處理語言是SQL
- 但是資料量越來越大 ==> 大資料(Hive、Spark Core)
-- hive是基于Hadoop的一個資料倉庫工具,可以将結構化的資料檔案映射為一張資料庫表,
并提供簡單的sql查詢功能,可以将sql語句轉換為MapReduce任務進行運作。
-- Spark Core:得熟悉Java、Python等語言
- 綜上:能通過SQL語言處理大資料問題是人們最喜歡的啦
出現了:SQL on Hadoop
Hive on Map Reduce
Shark【沒有了。。。】
Impala:比較吃記憶體,Cloudera
Presto:京東再用
發展:
Hive on Map Reduce
Shark on Spark
Spark SQL:on Spark:Spark社群的
共同點:metastore mysql,基于源資料建表
Hive on Spark:Hive社群的不同于Spark SQL,在Hive能運作的SparkSQL不一定可以
(2)官方描述:Spark SQL是Apache Spark的一個子產品,是用來處理結構化資料的
①程式設計和SQL可以無縫對接:
支援SQL和DATa Frame API(Java、Scala、Python、R)
代碼示例:results = spark.sql("SELCET * FROM people")
names = results.map(lambda p:p.name)
②統一的資料通路:可以直接将Hive、ORC、JSON、JDBC結果做連接配接
spark.read.json("s3n://...").registerTempTable("json")
results = spark.sql(
"""SELECT *
FROM people
JOIN hson ...""")
查詢和連接配接不同資料源【Spark SQL不僅僅是SQL】
③Spark SQL 可以使用已經存在的Hive倉庫matastores,UDFs等
④提供了标準的JDBC、ODBC接口,外部工具可以直接通路Spark
結:Spark SQL 強調的是“結構化資料”而非“SQL”