文章目录
- 一、Spark广播变量
- 二、累加器
- Reference
一、Spark广播变量
多进程编程中,不同进程可以通过创建共享内存,进行进程间通信。而在分布式中,Spark通过【广播变量】和【累加器】进行共享变量。
【栗子】广播变量
import findspark
findspark.init()
##############################################
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.master("local[1]") \
.appName("RDD Demo") \
.getOrCreate();
sc = spark.sparkContext
#############################################
conf = {"ip":"192.168.1.1","key":"cumt"}
#广播变量
brVar = sc.broadcast(conf)
#获取广播变量值
a = brVar.value
#{'ip': '192.168.1.1', 'key': 'cumt'}
print(a)
#cumt
print(a["key"])
#更新广播变量
brVar.unpersist()
conf["key"] = "jackwang"
#再次广播
brVar = sc.broadcast(conf)
#获取广播新变量值
a = brVar.value
#{'ip': '192.168.1.1', 'key': 'jackwang'}
print(a)
#destroy()可将广播变量的数据和元数据一同销毁,销毁后不能使用
brVar.destroy()
##############################################
sc.stop()
分析:
(1)上面通过
SparkContext.broadcast(conf)
将普通变量
conf
创建成广播变量(一个包装变量,这时候该广播变量就能在集群中的其他节点进行共享数值了),我们可通过
value
方法(即
brVar.value
),在各个节点访问其值,即引用广播变量的数值。
(2)如果需要改变广播变量
brVar
的值,需要先使用
brVar.unpersist()
,然后修改数值后再次广播,就能够被集群的其他节点获取数值。
二、累加器
累加器只能利用关联操作做【加】操作。累加器能在调试时对作业的执行过程的相关事件进行计数。
这里也贴一个累加器的栗子:
import findspark
findspark.init()
##############################################
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.master("local[1]") \
.appName("RDD Demo") \
.getOrCreate();
sc = spark.sparkContext
#############################################
rdd = sc.range(1,101)
#创建累加器,初始值0
acc = sc.accumulator(0)
def fcounter(x):
global acc
if x % 2 == 0 :
acc += 1
#unsupported operand type(s) for -=
#acc -= 1
rdd_counter = rdd.map(fcounter)
#获取累加器值
#0
print(acc.value)
#保证多次正确获取累加器值
rdd_counter.persist()
#100
print(rdd_counter.count())
#50
print(acc.value)
#100
print(rdd_counter.count())
#50
print(acc.value)
##############################################
sc.stop()
分析:
(1)上面首先创建了一个元素个数为100的RDD对象,后面在该RDD对象上执行一个
map
操作,
map
的函数体是用
global acc
引入一个全局累加器
acc
(即一开始创建的
sc.accumulator(0)
的acc对象,而不是局部变量)。
(2)注意:要保证多次正确获得累加器值,需要先执行
rdd_counter.;persist()
。如最后一组
rdd_counter.count()
执行时,触发
fcounter
函数,累加器会再次执行,变为50+50=100,但是一开始的
rdd_counter.persisit()
切断了action的链条,导致只执行一次,所以
acc.value
还是50。