天天看点

pyspark基础入门

  • 工作方式 单机 分布式
  • 内存缓存 单机缓存 persist() or cache()将转换的RDDs保存在内存
  • df可变性 pandas 是可变的 spark_df中RDDs是不可变的 所以DF不可变
  • 创建

RDD数据结构的常用函数

  • 创建RDD
  • 1 是textFile加载本地或者集群文件系统中的数据,
  • 2 用parallelize方法将Driver中的数据结构并行化成RDD。
  • 常用Action操作
  • 1 collect 将数据汇集到Driver,数据过大时有超内存风险
  • 2 take 将前若干个数据汇集到Driver,比collect安全
  • 3 takeSample 可以随机取若干个到Driver,第一个参数设置是否放回抽样
  • 4 first 取第一个数据
  • 5 count 查看RDD元素数量
  • 6 reduce 利用二元函数对数据进行规约
  • 7 foreach 对每一个元素执行某种操作,不生成新的RDD
  • 8 countByKey 对Pair RDD按key统计数量
  • 9 saveAsTextFile 保存rdd成text文件到本地
  • 常用Transformation操作
  • 1 Transformation 转换操作具有懒惰执行的特性,它只指定新的RDD和其父RDD的依赖关系,只有当Action操作触发到该依赖的时候,它才被计算。
  • 2 map 操作对每个元素进行一个映射转换
  • 3 filter 应用过滤条件过滤掉一些数据
  • 4 flatMap 操作执行将每个元素生成一个Array后压平
  • 5 sample 对原rdd在每个分区按照比例进行抽样,第一个参数设置是否可以重复抽样
  • 6 distinct 去重
  • 7 subtract 找到属于前一个rdd而不属于后一个rdd的元素
  • 8 union 合并数据
  • 9 intersection 求交集
  • 10 cartesian 笛卡尔积
  • 11 sortBy 按照某种方式进行排序
  • 12 zip 按照拉链方式连接两个RDD,效果类似python的zip函数 需要两个RDD具有相同的分区,每个分区元素数量相同4
  • 13 zipWithIndex 将RDD和一个从0开始的递增序列按照拉链方式连接。
  • 常用PairrDD的转换操作 PairRDD指的是数据为长度为2的tuple类似(k,v)结构的数据类型的RDD,其每个数据的第一个元素被当做key,第二个元素被当做value.
  • 1 reduceByKey对相同的key对应的values应用二元归并操作
  • 2 groupByKey将相同的key对应的values收集成一个Iterator 迭代器
  • 3 sortByKey按照key排序,可以指定是否降序
  • 4 join相当于根据key进行内连接
  • 5 rightOuterJoin相当于关系表的右连接
  • 6 leftOuterJoin相当于关系表的左连接
  • 7 cogroup相当于对两个输入分别goupByKey然后再对结果进行groupByKey
  • 8 subtractByKey去除x中那些key也在y中的元素
  • 9 oldByKey的操作和reduceByKey类似,但是要提供一个初始值
  • 缓存操作
  • 共享变量
  • 分区操作
# 打印版本
import pyspark
print(pyspark.__version__)
      
3.2.0
      
# 参数配置
conf = pyspark.SparkConf().setAppName("rdd_tutorial")
#主函数
sc=pyspark.SparkContext(conf=conf)
      
# 创建RDD

file="./test.txt"
rdd=sc.textFile(file,3)
      
['hello world,',
 "hello spark',",
 'spark love jupyter,',
 'spark love pandas,',
 'spark love sql']
      
#parallelize将Driver中的数据结构生成RDD,第二个参数指定分区数,即将数据放多几个分布式端 数据和分区
# 并行集合的一个重要参数是slices,表示数据集切分的份数。Spark将会在集群上为每一份数据起一个任务
rdd = sc.parallelize(range(1,11),2)
rdd.collect()
      
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
      
# 常用action操作
# Action操作将触发基于RDD依赖关系的计算
rdd=sc.parallelize(range(10))
      
#collect操作将数据汇集到Driver,数据过大时有超内存风险
all_data = rdd.collect()
all_data
      
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
      
# 有序去前n个数据
rdd.take(4)
      
[0, 1, 2, 3]
      
# 随机取,第一个设置是否放回 2 num 3 随机种子
rdd.takeSample(False,7,1)
      
[6, 8, 9, 7, 5, 3, 0]
      
# 查看某函数用法
rdd.takeSample?
      
[1;31mSignature:[0m [0mrdd[0m[1;33m.[0m[0mtakeSample[0m[1;33m([0m[0mwithReplacement[0m[1;33m,[0m [0mnum[0m[1;33m,[0m [0mseed[0m[1;33m=[0m[1;32mNone[0m[1;33m)[0m[1;33m[0m[1;33m[0m[0m
[1;31mDocstring:[0m
Return a fixed-size sampled subset of this RDD.

Notes
-----
This method should only be used if the resulting array is expected
to be small, as all the data is loaded into the driver's memory.

Examples
--------
>>> rdd = sc.parallelize(range(0, 10))
>>> len(rdd.takeSample(True, 20, 1))
20
>>> len(rdd.takeSample(False, 5, 2))
5
>>> len(rdd.takeSample(False, 15, 3))
10
[1;31mFile:[0m      g:\anaconda\ana2\lib\site-packages\pyspark\rdd.py
[1;31mType:[0m      method
      
rdd.first()
      
rdd.count()
      
10
      
#foreach对每一个元素执行某种操作,不生成新的RDD
#累加器用法详见共享变量
accum = sc.accumulator(0)
rdd.foreach(lambda x:accum.add(x))
print(accum.value)
      
45
      
pairRdd = sc.parallelize([(1,1),(1,4),(1,1),(2,16)]) 
pairRdd.countByKey()
      
defaultdict(int, {1: 3, 2: 1})
      
pairRdd.countByValue()
      
defaultdict(int, {(1, 1): 2, (1, 4): 1, (2, 16): 1})
      
rdd.collect()
      
[0, 1, 2, 3, 4]
      
#saveAsTextFile保存rdd成text文件到本地 理论上可以保存成功
text_file = "./rdd.txt"
rdd = sc.parallelize(range(5))
      
rdd.saveAsTextFile(text_file)
      
---------------------------------------------------------------------------

Py4JJavaError                             Traceback (most recent call last)

<ipython-input-40-d076c4237135> in <module>
----> 1 rdd.saveAsTextFile(text_file)


G:\anaconda\ana2\lib\site-packages\pyspark\rdd.py in saveAsTextFile(self, path, compressionCodecClass)
   1826             keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path, compressionCodec)
   1827         else:
-> 1828             keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path)
   1829 
   1830     # Pair functions


G:\anaconda\ana2\lib\site-packages\py4j\java_gateway.py in __call__(self, *args)
   1307 
   1308         answer = self.gateway_client.send_command(command)
-> 1309         return_value = get_return_value(
   1310             answer, self.gateway_client, self.target_id, self.name)
   1311 


G:\anaconda\ana2\lib\site-packages\py4j\protocol.py in get_return_value(answer, gateway_client, target_id, name)
    324             value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
    325             if answer[1] == REFERENCE_TYPE:
--> 326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
    328                     format(target_id, ".", name), value)


Py4JJavaError: An error occurred while calling o331.saveAsTextFile.
: org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory file:/G:/VScode/pyspark_learning/rdd.txt already exists
   at org.apache.hadoop.mapred.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:131)
   at org.apache.spark.internal.io.HadoopMapRedWriteConfigUtil.assertConf(SparkHadoopWriter.scala:298)
   at org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:71)
   at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopDataset$1(PairRDDFunctions.scala:1090)
   at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
   at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
   at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
   at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
   at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1088)
   at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopFile$4(PairRDDFunctions.scala:1061)
   at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
   at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
   at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
   at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
   at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1026)
   at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopFile$3(PairRDDFunctions.scala:1008)
   at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
   at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
   at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
   at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
   at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1007)
   at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopFile$2(PairRDDFunctions.scala:964)
   at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
   at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
   at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
   at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
   at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:962)
   at org.apache.spark.rdd.RDD.$anonfun$saveAsTextFile$2(RDD.scala:1578)
   at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
   at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
   at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
   at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
   at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1578)
   at org.apache.spark.rdd.RDD.$anonfun$saveAsTextFile$1(RDD.scala:1564)
   at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
   at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
   at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
   at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
   at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1564)
   at org.apache.spark.api.java.JavaRDDLike.saveAsTextFile(JavaRDDLike.scala:551)
   at org.apache.spark.api.java.JavaRDDLike.saveAsTextFile$(JavaRDDLike.scala:550)
   at org.apache.spark.api.java.AbstractJavaRDDLike.saveAsTextFile(JavaRDDLike.scala:45)
   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
   at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
   at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   at java.lang.reflect.Method.invoke(Method.java:498)
   at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
   at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
   at py4j.Gateway.invoke(Gateway.java:282)
   at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
   at py4j.commands.CallCommand.execute(CallCommand.java:79)
   at py4j.GatewayConnection.run(GatewayConnection.java:238)
   at java.lang.Thread.run(Thread.java:748)
      
# 常用Transformation操作
      
rdd=sc.parallelize(range(20))
rdd.collect()
      
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
      
# map
rdd.map(lambda x:x+2).collect()
      
[2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21]
      
#filter 
rdd.filter(lambda x:x%2==1).collect()
      
[1, 3, 5, 7, 9, 11, 13, 15, 17, 19]
      
# flatMap
#flatMap操作执行将每个元素生成一个Array后压平
rdd = sc.parallelize(["hello world","hello China"])
rdd.map(lambda x:x.split(" ")).collect()
      
[['hello', 'world'], ['hello', 'China']]
      
rdd=rdd.flatMap(lambda x:x.split(" ")).collect()
      
#sample
rdd=sc.parallelize(range(20))
rdd.sample(False,5,0).collect()
      
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
      
rdd = sc.parallelize([1,1,2,2,3,3,4,5])
rdd.distinct().collect()
      
[1, 2, 3, 4, 5]
      
# 属于A不属于B的元素 补集
a = sc.parallelize(range(10))
b = sc.parallelize(range(5,15))
a.collect()
      
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
      
b.collect()
      
[5, 6, 7, 8, 9, 10, 11, 12, 13, 14]
      
#补集
a.subtract(b).collect()
      
[0, 1, 2, 3, 4]
      
# 并集
a.union(b).collect()
      
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14]
      
# 交集
a.intersection(b).collect()
      
[5, 6, 7, 8, 9]
      
# 笛卡尔积
boys = sc.parallelize(["LiLei","Tom"])
girls = sc.parallelize(["HanMeiMei","Lily"])
boys.cartesian(girls).collect()
      
[('LiLei', 'HanMeiMei'),
 ('LiLei', 'Lily'),
 ('Tom', 'HanMeiMei'),
 ('Tom', 'Lily')]
      
a.sortBy(lambda x:x,ascending=False).collect()
      
[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]
      
rdd = sc.parallelize([(1,2,3),(3,2,2),(4,1,1)])
rdd.sortBy(lambda x:x[2]).collect()
      
[(4, 1, 1), (3, 2, 2), (1, 2, 3)]
      
rdd.sortBy?
      
[1;31mSignature:[0m [0mrdd[0m[1;33m.[0m[0msortBy[0m[1;33m([0m[0mkeyfunc[0m[1;33m,[0m [0mascending[0m[1;33m=[0m[1;32mTrue[0m[1;33m,[0m [0mnumPartitions[0m[1;33m=[0m[1;32mNone[0m[1;33m)[0m[1;33m[0m[1;33m[0m[0m
[1;31mDocstring:[0m
Sorts this RDD by the given keyfunc

Examples
--------
>>> tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
>>> sc.parallelize(tmp).sortBy(lambda x: x[0]).collect()
[('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]
>>> sc.parallelize(tmp).sortBy(lambda x: x[1]).collect()
[('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
[1;31mFile:[0m      g:\anaconda\ana2\lib\site-packages\pyspark\rdd.py
[1;31mType:[0m      method
      
# zip
rdd_name = sc.parallelize(["LiLei","Hanmeimei","Lily"])
rdd_age = sc.parallelize([19,18,20])

rdd_zip = rdd_name.zip(rdd_age)
print(rdd_zip.collect())
      
[('LiLei', 19), ('Hanmeimei', 18), ('Lily', 20)]
      
#zip withindex
#将RDD和一个从0开始的递增序列按照拉链方式连接。
rdd_name =  sc.parallelize(["LiLei","Hanmeimei","Lily","Lucy","Ann","Dachui","RuHua"])
rdd_index = rdd_name.zipWithIndex()
print(rdd_index.collect())
      
[('LiLei', 0), ('Hanmeimei', 1), ('Lily', 2), ('Lucy', 3), ('Ann', 4), ('Dachui', 5), ('RuHua', 6)]
      
# 常用PairRDD的转换操作
      
rdd = sc.parallelize([("hello",1),("world",2),
                               ("hello",3),("world",5)])
rdd.collect()
      
[('hello', 1), ('world', 2), ('hello', 3), ('world', 5)]
      
rdd.reduceByKey(lambda x,y: x*y).collect()
      
[('hello', 3), ('world', 10)]
      
rdd.groupByKey().collect()
      
[('hello', <pyspark.resultiterable.ResultIterable at 0x1ab3f6892b0>),
 ('world', <pyspark.resultiterable.ResultIterable at 0x1ab3f689100>)]
      
# join 内连接
age = sc.parallelize([("LiLei",18),
                        ("HanMeiMei",16),("Jim",20)])
gender = sc.parallelize([("LiLei","male"),
                        ("HanMeiMei","female"),("Lucy","female")])
      
age.join(gender).collect()
      
[('HanMeiMei', (16, 'female')), ('LiLei', (18, 'male'))]
      
#leftOuterJoin和rightOuterJoin
age.rightOuterJoin(gender).collect()
      
[('HanMeiMei', (16, 'female')),
 ('LiLei', (18, 'male')),
 ('Lucy', (None, 'female'))]
      
#leftOuterJoin和rightOuterJoin
age.leftOuterJoin(gender).collect()
      
[('HanMeiMei', (16, 'female')), ('LiLei', (18, 'male')), ('Jim', (20, None))]
      
#cogroup connect_group相当于对两个输入分别goupByKey然后再对结果进行groupByKey

x = sc.parallelize([("a",1),("b",2),("a",3)])
y = sc.parallelize([("a",2),("b",3),("b",5)])

result = x.cogroup(y).collect()
result
      
[('b',
  (<pyspark.resultiterable.ResultIterable at 0x1ab3f832250>,
   <pyspark.resultiterable.ResultIterable at 0x1ab3f832310>)),
 ('a',
  (<pyspark.resultiterable.ResultIterable at 0x1ab3f8215b0>,
   <pyspark.resultiterable.ResultIterable at 0x1ab3f8286d0>))]
      
list(result[0][1])
      
[<pyspark.resultiterable.ResultIterable at 0x1ab3f832250>,
 <pyspark.resultiterable.ResultIterable at 0x1ab3f832310>]
      
#subtractByKey去除x中那些key也在y中的元素

x = sc.parallelize([("a",1),("b",2),("c",3)])
y = sc.parallelize([("a",2),("b",(1,2))])

x.subtractByKey(y).collect()
      
[('c', 3)]
      
#foldByKey的操作和reduceByKey类似,但是要提供一个初始值
x = sc.parallelize([("a",1),("b",2),("a",3),("b",5)],1)
x.foldByKey(1,lambda x,y:x*y).collect()
      
[('a', 3), ('b', 10)]
      

五,缓存操作

如果一个rdd被多个任务用作中间量,那么对其进行cache缓存到内存中对加快计算会非常有帮助。

声明对一个rdd进行cache后,该rdd不会被立即缓存,而是等到它第一次被计算出来时才进行缓存。

可以使用persist明确指定存储级别,常用的存储级别是MEMORY_ONLY和EMORY_AND_DISK。

如果一个RDD后面不再用到,可以用unpersist释放缓存,unpersist是立即执行的。

缓存数据不会切断血缘依赖关系,这是因为缓存数据某些分区所在的节点有可能会有故障,例如内存溢出或者节点损坏。

这时候可以根据血缘关系重新计算这个分区的数据。

#cache缓存到内存中,使用存储级别 MEMORY_ONLY。
#MEMORY_ONLY意味着如果内存存储不下,放弃存储其余部分,需要时重新计算。
a = sc.parallelize(range(10000),5)
a.cache()
sum_a = a.reduce(lambda x,y:x+y)
cnt_a = a.count()
mean_a = sum_a/cnt_a

print(mean_a)


      
4999.5
      
#persist缓存到内存或磁盘中,默认使用存储级别MEMORY_AND_DISK
#MEMORY_AND_DISK意味着如果内存存储不下,其余部分存储到磁盘中。
#persist可以指定其它存储级别,cache相当于persist(MEMORY_ONLY)
from  pyspark.storagelevel import StorageLevel
a = sc.parallelize(range(10000),5)
a.persist(StorageLevel.MEMORY_AND_DISK)
sum_a = a.reduce(lambda x,y:x+y)
cnt_a = a.count()
mean_a = sum_a/cnt_a

a.unpersist() #立即释放缓存
print(mean_a)

      
4999.5
      

六,共享变量

当spark集群在许多节点上运行一个函数时,默认情况下会把这个函数涉及到的对象在每个节点生成一个副本。

但是,有时候需要在不同节点或者节点和Driver之间共享变量。

Spark提供两种类型的共享变量,广播变量和累加器。

广播变量是不可变变量,实现在不同节点不同任务之间共享数据。

广播变量在每个机器上缓存一个只读的变量,而不是为每个task生成一个副本,可以减少数据的传输。

累加器主要是不同节点和Driver之间共享变量,只能实现计数或者累加功能。

累加器的值只有在Driver上是可读的,在节点上不可见。

# 广播变量 broadcast 不可变,在所有节点可读
rdd = sc.parallelize(range(10))
broads = sc.broadcast(100)
print(rdd.map(lambda x:x+broads.value).collect())
      
[100, 101, 102, 103, 104, 105, 106, 107, 108, 109]
      
print(broads.value)
      
100
      
#累加器 只能在Driver上可读,在其它节点只能进行累加

total = sc.accumulator(0)
rdd = sc.parallelize(range(10),3)

rdd.foreach(lambda x:total.add(x))
total.value

      
45
      
# 计算数据的平均值
rdd = sc.parallelize([1.1,2.1,3.1,4.1])
total = sc.accumulator(0)
count = sc.accumulator(0)

def func(x):
total.add(x)
count.add(1)

rdd.foreach(func)

total.value/count.value

      
2.5999999999999996
      

七,分区操作

分区操作包括改变分区操作,以及针对分区执行的一些转换操作。

glom:将一个分区内的数据转换为一个列表作为一行。

coalesce:shuffle可选,默认为False情况下窄依赖,不能增加分区。repartition和partitionBy调用它实现。

repartition:按随机数进行shuffle,相同key不一定在同一个分区

partitionBy:按key进行shuffle,相同key放入同一个分区

HashPartitioner:默认分区器,根据key的hash值进行分区,相同的key进入同一分区,效率较高,key不可为Array.

RangePartitioner:只在排序相关函数中使用,除相同的key进入同一分区,相邻的key也会进入同一分区,key必须可排序。

TaskContext: 获取当前分区id方法 TaskContext.get.partitionId

mapPartitions:每次处理分区内的一批数据,适合需要分批处理数据的情况,比如将数据插入某个表,每批数据只需要开启一次数据库连接,大大减少了连接开支

mapPartitionsWithIndex:类似mapPartitions,提供了分区索引,输入参数为(i,Iterator)

foreachPartition:类似foreach,但每次提供一个Partition的一批数据

#glom将一个分区内的数据转换为一个列表作为一行。
a = sc.parallelize(range(10),4)
# a 10个数据存储在n个分区上
b = a.glom()
b.collect() 

      
[[0, 1], [2, 3, 4], [5, 6], [7, 8, 9]]
      
a.collect()
      
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
      
#coalesce 默认shuffle为False,不能增加分区,只能减少分区
#如果要增加分区,要设置shuffle = true
#parallelize等许多操作可以指定分区数
a = sc.parallelize(range(10),3)  
print(a.getNumPartitions())
print(a.glom().collect())
      
3
[[0, 1, 2], [3, 4, 5], [6, 7, 8, 9]]
      
b = a.coalesce(2) 
      
b.getNumPartitions()
      
2
      
#repartition按随机数进行shuffle,相同key不一定在一个分区,可以增加分区
#repartition实际上调用coalesce实现,设置了shuffle = True
a = sc.parallelize(range(10),3)  
c = a.repartition(4) 
print(c.glom().collect())
      
[[6, 7, 8, 9], [3, 4, 5], [], [0, 1, 2]]
      
c = a.repartition(2) 
print(c.glom().collect())
      
[[0, 1, 2, 6, 7, 8, 9], [3, 4, 5]]
      
#partitionBy按key进行shuffle,相同key一定在一个分区
a = sc.parallelize([("a",1),("a",1),("a",2),("c",3)])  
c = a.partitionBy(2)
print(c.glom().collect())
      
[[('c', 3)], [('a', 1), ('a', 1), ('a', 2)]]
      
#mapPartitions可以对每个分区分别执行操作
#每次处理分区内的一批数据,适合需要按批处理数据的情况
#例如将数据写入数据库时,可以极大的减少连接次数。
#mapPartitions的输入分区内数据组成的Iterator,其输出也需要是一个Iterator
#以下例子查看每个分区内的数据,相当于用mapPartitions实现了glom的功能。
a = sc.parallelize(range(10),2)
a.mapPartitions(lambda it:iter([list(it)])).collect()

      
[[0, 1, 2, 3, 4], [5, 6, 7, 8, 9]]
      
#mapPartitionsWithIndex可以获取两个参数
#即分区id和每个分区内的数据组成的Iterator
a = sc.parallelize(range(11),2)

def func(pid,it):
s = sum(it)
return(iter([str(pid) + "|" + str(s)]))
    [str(pid) + "|" + str]
b = a.mapPartitionsWithIndex(func)
b.collect()

      
['0|10', '1|45']
      
#利用TaskContext可以获取当前每个元素的分区
from pyspark.taskcontext import TaskContext
a = sc.parallelize(range(5),3)
c = a.map(lambda x:(TaskContext.get().partitionId(),x))
c.collect()


      
[(0, 0), (1, 1), (1, 2), (2, 3), (2, 4)]
      
#foreachPartition对每个分区分别执行操作
#范例:求每个分区内最大值的和
total = sc.accumulator(0.0)

a = sc.parallelize(range(1,101),3)

def func(it):
total.add(max(it))

a.foreachPartition(func)
total.value

      
199.0
      
#aggregate是一个Action操作
#aggregate比较复杂,先对每个分区执行一个函数,再对每个分区结果执行一个合并函数。
#例子:求元素之和以及元素个数
#三个参数,第一个参数为初始值,第二个为分区执行函数,第三个为结果合并执行函数。
rdd = sc.parallelize(range(1,21),3)
def inner_func(t,x):
return((t[0]+x,t[1]+1))

def outer_func(p,q):
return((p[0]+q[0],p[1]+q[1]))

rdd.aggregate((0,0),inner_func,outer_func)


      
(210, 20)
      
#aggregateByKey的操作和aggregate类似,但是会对每个key分别进行操作
#第一个参数为初始值,第二个参数为分区内归并函数,第三个参数为分区间归并函数

a = sc.parallelize([("a",1),("b",1),("c",2),
                             ("a",2),("b",3)],3)
b = a.aggregateByKey(0,lambda x,y:max(x,y),
lambda x,y:max(x,y))
b.collect()

      
[('b', 3), ('a', 2), ('c', 2)]