PySpark - 广播和累加器

Apache Spark 使用共享变量进行并行处理。 当驱动程序向集群上的执行器发送任务时,共享变量的副本会在集群的每个节点上运行,以便它可以用于执行任务。

Apache Spark 支持两种类型的共享变量 −

  • Broadcast(广播)
  • Accumulator(累加器)

让我们详细了解一下。


Broadcast(广播)

Broadcast 广播变量用于保存跨所有节点的数据副本。 这个变量缓存在所有机器上,而不是在有任务的机器上发送。 以下代码块包含 PySpark 广播类的详细信息。

class pyspark.Broadcast (
   sc = None, 
   value = None, 
   pickle_registry = None, 
   path = None
)

以下示例展示了如何使用广播变量。 广播变量有一个名为 value 的属性,它存储数据并用于返回广播值。

----------------------------------------broadcast.py--------------------------------------
from pyspark import SparkContext 
sc = SparkContext("local", "Broadcast app") 
words_new = sc.broadcast(["scala", "java", "hadoop", "spark", "akka"]) 
data = words_new.value 
print "Stored data -> %s" % (data) 
elem = words_new.value[2] 
print "Printing a particular element in RDD -> %s" % (elem)
----------------------------------------broadcast.py--------------------------------------

命令 − 广播变量的命令如下 −

$SPARK_HOME/bin/spark-submit broadcast.py

输出 − 以下命令的输出如下所示。

Stored data -> [
   'scala',  
   'java', 
   'hadoop', 
   'spark', 
   'akka'
]
Printing a particular element in RDD -> hadoop

Accumulator(累加器)

Accumulator 累加器变量用于通过关联和交换操作聚合信息。 例如,您可以将累加器用于求和运算或计数器(在 MapReduce 中)。 以下代码块包含 PySpark 的 Accumulator 类的详细信息。

class pyspark.Accumulator(aid, value, accum_param)

以下示例说明如何使用累加器变量。 累加器变量有一个名为 value 的属性,类似于广播变量的属性。 它存储数据并用于返回累加器的值,但只能在驱动程序中使用。

在这个例子中,一个累加器变量被多个工人使用并返回一个累加值。

----------------------------------------accumulator.py------------------------------------
from pyspark import SparkContext 
sc = SparkContext("local", "Accumulator app") 
num = sc.accumulator(10) 
def f(x): 
   global num 
   num+=x 
rdd = sc.parallelize([20,30,40,50]) 
rdd.foreach(f) 
final = num.value 
print "Accumulated value is -> %i" % (final)
----------------------------------------accumulator.py------------------------------------

命令 − 累加器变量的命令如下 −

$SPARK_HOME/bin/spark-submit accumulator.py

输出 − 上述命令的输出如下所示。

Accumulated value is -> 150