天天看点

【Spark】广播变量和累加器

文章目录

  • ​​一、Spark广播变量​​
  • ​​二、累加器​​
  • ​​Reference​​

一、Spark广播变量

【Spark】广播变量和累加器

多进程编程中,不同进程可以通过创建共享内存,进行进程间通信。而在分布式中,Spark通过【广播变量】和【累加器】进行共享变量。

【Spark】广播变量和累加器

【栗子】广播变量

import findspark
findspark.init()
##############################################
from pyspark.sql import SparkSession
spark = SparkSession.builder \
        .master("local[1]") \
        .appName("RDD Demo") \
        .getOrCreate();
sc = spark.sparkContext
#############################################
conf = {"ip":"192.168.1.1","key":"cumt"}
#广播变量
brVar =  sc.broadcast(conf)
#获取广播变量值
a = brVar.value

#{'ip': '192.168.1.1', 'key': 'cumt'}
print(a)

#cumt
print(a["key"])

#更新广播变量
brVar.unpersist()
conf["key"] = "jackwang"

#再次广播
brVar =  sc.broadcast(conf)

#获取广播新变量值
a = brVar.value

#{'ip': '192.168.1.1', 'key': 'jackwang'}
print(a)

#destroy()可将广播变量的数据和元数据一同销毁,销毁后不能使用

brVar.destroy()
##############################################
sc.stop()      

分析:

(1)上面通过​​

​SparkContext.broadcast(conf)​

​​将普通变量​

​conf​

​​创建成广播变量(一个包装变量,这时候该广播变量就能在集群中的其他节点进行共享数值了),我们可通过​

​value​

​​方法(即​

​brVar.value​

​),在各个节点访问其值,即引用广播变量的数值。

(2)如果需要改变广播变量​

​brVar​

​​的值,需要先使用​

​brVar.unpersist()​

​,然后修改数值后再次广播,就能够被集群的其他节点获取数值。

二、累加器

累加器只能利用关联操作做【加】操作。累加器能在调试时对作业的执行过程的相关事件进行计数。

这里也贴一个累加器的栗子:

import findspark
findspark.init()
##############################################
from pyspark.sql import SparkSession
spark = SparkSession.builder \
        .master("local[1]") \
        .appName("RDD Demo") \
        .getOrCreate();
sc = spark.sparkContext

#############################################
rdd = sc.range(1,101)

#创建累加器,初始值0
acc = sc.accumulator(0)
def fcounter(x):
        global acc
        if x % 2 == 0 :
                acc += 1
                #unsupported operand type(s) for -=
                #acc -= 1
rdd_counter =  rdd.map(fcounter)
#获取累加器值
#0
print(acc.value)

#保证多次正确获取累加器值
rdd_counter.persist()

#100
print(rdd_counter.count())

#50
print(acc.value)

#100
print(rdd_counter.count())
#50
print(acc.value)
##############################################
sc.stop()      

分析:

(1)上面首先创建了一个元素个数为100的RDD对象,后面在该RDD对象上执行一个​​

​map​

​​操作,​

​map​

​​的函数体是用​

​global acc​

​​引入一个全局累加器​

​acc​

​​(即一开始创建的​

​sc.accumulator(0)​

​的acc对象,而不是局部变量)。

(2)注意:要保证多次正确获得累加器值,需要先执行​

​rdd_counter.;persist()​

​​。如最后一组​

​rdd_counter.count()​

​​执行时,触发​

​fcounter​

​​函数,累加器会再次执行,变为50+50=100,但是一开始的​

​rdd_counter.persisit()​

​​切断了action的链条,导致只执行一次,所以​

​acc.value​

​还是50。

Reference

继续阅读