天天看点

python实战spark(五)常用API

常用API

​​Spark官方文档​​

class pyspark.StorageLevel(useDisk, useMemory, useOffHeap, deserialized, replication=1)

用于控制RDD存储。每个StorageLevel记录:

是否使用内存,如果内存不足,是否将RDD放到磁盘上,是否以特定于java的序列化格式将数据保存在内存中,以及是否在多个节点上复制RDD分区。还包含一些常用存储级别的静态常量,MEMORY_ONLY。由于数据总是在Python端序列化,所以所有常量都使用序列化格式。

class pyspark.Broadcast(sc=None, value=None, pickle_registry=None, path=None, sock_file=None)

使用​

​SparkContext.broadcast()​

​​创建广播变量。通过值​

​.value​

​访问值。

>>> from pyspark.context import SparkContext
>>> sc = SparkContext('local', 'test')
>>> b = sc.broadcast([1, 2, 3, 4, 5])
>>> b.value
[1, 2, 3, 4, 5]
>>> sc.parallelize([0, 0]).flatMap(lambda x: b.value).collect()
[1, 2, 3, 4, 5, 1, 2, 3, 4, 5]
>>> b.unpersist()      
>>> large_broadcast = sc.broadcast(range(10000))      

1.​

​destroy()​

​​ 删除与此广播变量相关的所有数据和元数据。一旦广播变量被销毁,就不能再使用它。此方法阻塞,直到删除完成。

2.​

​dump(value,f)​

​ 3.​

​load(file)​

​ 4.​

​load_from_path(path)​

​ 5.​

​unpersist(blocking=False)​

​ 删除执行器上此广播的缓存副本。如果在调用后使用广播,则需要将其重新发送给每个执行程序。

参数:

blocking-- 是否阻塞,直到完成非持久化

6.​

​property value​

class pyspark.Accumulator(aid, value, accum_param)

可以累积的共享变量,具有可交换和可关联的“add”操作。Spark集群上的工作任务可以使用​

​+=operator​

​​向累加器添加值,但是只有驱动程序可以使用value访问它的值。来自worker的更新会自动传播到driver。

虽然SparkContext支持基本数据类型(如int和float)的累加器,但是用户也可以通过提供一个自定义的AccumulatorParam对象来为自定义类型定义累加器。以该模块的doctest为例。

1.​​

​add(term)​

​​ 2.​

​property value​

class pyspark.AccumulatorParam

定义如何累积给定类型的值的helper对象。

1.​​

​addInPlace(value1, value2)​

​​添加累加器数据类型的两个值,返回一个新值;为了提高效率,还可以在适当的地方更新value1并返回它。

2.​​

​zero(value)​

​为类型提供一个“零值”,在维度上与提供的值兼容(例如,一个零向量)

​class pyspark.MarshalSerializer​

使用Python的Marshal序列化对象。该序列化更快,但支持少量数据

1.​​

​dumps(obj)​

​​ 2.​

​loads(obj)​

​class pyspark.PickleSerializer​

该序列化器支持几乎所有Python对象,但可能不像其他专用的序列化器那么快。

1.​​

​dumps(obj)​

​​ 2.​

​loads(obj)​

​class pyspark.StatusTracker(jtracker)​

用于监视job和stage progress的低级状态报告api。

这些api有意提供非常弱的一致性语义;这些api的使用者应该准备好处理空的/丢失的信息。例如,作业的stage id可能是已知的,但是状态API可能没有关于这些stage细节的任何信息,因此​​

​getStageInfo​

​​可能会为有效的stage id返回None。

为了限制内存使用,这些api只提供关于最近jobs/stages的信息。这些api将为最后一个​​

​spark.ui.retainedStages​

​​和​

​spark.ui.retainedJobs​

​​提供信息。

1.​​

​getActiveJobsIds()​

​​ 返回一个包含所有活跃jobs的id的数组

2.​

​getActiveStageIds()​

​ 返回一个包含所有活跃stages的id的数组

3.​

​getJobIdsForGroup(jobGroup=None)​

​ 返回特定作业group中所有已知作业的列表。如果jobGroup为None,则返回所有与作业组无关的已知作业。

返回的列表可能包含正在运行、失败和已完成的作业,并且在此方法的不同调用中可能有所不同。此方法不保证其结果中元素的顺序。

4.​

​getJobInfo(jobId)​

​ 返回SparkJobInfo对象,如果找不到作业信息或作业信息已被垃圾收集,则返回None。

5.​

​getStageInfo(stageId)​

​ 返回SparkStageInfo对象,如果找不到作业信息或作业信息已被垃圾收集,则返回None。

​class pyspark.SparkJobInfo​

暴露有关Spark作业的信息。

​class pyspark.SparkStageInfo​

暴露有关Spark阶段的信息。

​class pyspark.Profiler(ctx)​

PySpark支持自定义分析器,这是为了允许使用不同的分析器,以及输出到不同的格式,而不是在BasicProfiler中提供的。

自定义分析器必须定义或继承以下方法:

  • profile–将生成某种类型的系统配置文件。
  • stats–返回收集到的统计信息。
  • dump–将概要文件转储到路径
  • add --将概要文件添加到现有的累积概要文件

创建SparkContext时选择profiler类

>>> from pyspark import SparkConf, SparkContext
>>> from pyspark import BasicProfiler
>>> class MyCustomProfiler(BasicProfiler):
...     def show(self, id):
...         print("My custom profiles for RDD:%s" % id)
...
>>> conf = SparkConf().set("spark.python.profile", "true")
>>> sc = SparkContext('local', 'test', conf=conf, profiler_cls=MyCustomProfiler)
>>> sc.parallelize(range(1000)).map(lambda x: 2 * x).take(10)
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
>>> sc.parallelize(range(1000)).count()
1000
>>> sc.show_profiles()
My custom profiles for RDD:1
My custom profiles for RDD:3
>>> sc.stop()      

1.​

​dump(id, path)​

​​ 将profile转储到path中,id是RDD id

2.​

​profile(func)​

​ 利用函数分析

3.​

​show(id)​

​ 打印profile状态到输出

4.​

​stats()​

​ 返回收集到的分析状态

​class pyspark.BasicProfiler(ctx)​

BasicProfiler是默认的profiler,它是基于cProfile和累加器实现的

1.​​

​profile(func)​

​​ 运行并配置传入的方法to_profile。返回一个profile对象。

2.​

​stats()​

​ 返回收集到的profiling统计信息(pstats.Stats)

​class pyspark.TaskContext​

任务的上下文信息,可以在执行过程中读取或修改。要访问正在运行的任务的TaskContext通过​

​TaskContext.get()​

​​。

1.​​

​attemptNumber()​

​​ “这个任务已经尝试了多少次了。第一次任务尝试将被分配为尝试号= 0,后续尝试的尝试号将不断增加。

2.​

​classmethod get()​

​ 返回当前活动的TaskContext。这可以在用户函数内部调用,以访问有关正在运行的任务的上下文信息。

注意:必须是called on worker,而不是driver。如果没有初始化,则返回None。

3.​

​getLocalProperty(key)​

​ 在driver的上游设置一个本地属性,如果它丢失,则不设置。

4.​

​partitionId()​

​ 此任务计算的RDD分区的ID。

5.​

​stageId()​

​ 此任务所属的阶段的ID。

6.​

​taskAttemptId()​

​ 此任务尝试的唯一ID(在相同的SparkContext中,没有两个任务尝试的尝试id不同)。这大致相当于Hadoop的TaskAttemptID。

​class pyspark.RDDBarrier(rdd)​

将RDD包装在barrier阶段中,这迫使Spark一起启动这个阶段的任务。​

​RDDBarrier​

​​实例是由​

​RDD.barrier()​

​​创建的。

1.​​

​mapPartitions(f, preservesPartitioning=False)​

​ 通过将一个函数应用到包装好的RDD的每个分区,返回一个新的RDD,其中的任务一起在barrier阶段启动。该接口与RDD.mapPartitions()相同。

​class pyspark.BarrierTaskContext​

​class pyspark.BarrierTaskInfo(address)​

继续阅读