对于并行处理,Apache Spark可以使用共享变量。
即当驱动程序将任务发送到集群后,共享变量的副本将在集群的每个节点上运行,以便可以将该变量应用于节点中执行的任务。
今天将要学习的就是Apache Spark支持的两种类型的共享变量:广播与累加器。
广播
广播类型变量用于跨所有节点保存数据副本。此变量缓存在所有Spark节点的机器上,而不仅仅是在执行任务的节点上保存。以下示例代码是PySpark中广播类的结构:
代码语言:javascript复制class pyspark.Broadcast (
sc = None,
value = None,
pickle_registry = None,
path = None
)
如下代码是一个广播类型的变量使用示例。这个广播类型的对象有一个value属性,通过value属性我们可以获取到广播对象中存储的值。
代码语言:javascript复制words_new = sc.broadcast(["scala", "java", "hadoop", "spark", "akka"])
data = words_new.value
print "Stored data -> %s" % (data)
elem = words_new.value[2]
print "Printing a particular element in RDD -> %s" % (elem)
累加器
累加器变量主要用于统计操作记录数据。例如,我们可以在MapReduce中利用累加器进行求和或计数。 一个累加器的数据结构如下所示:
代码语言:javascript复制class pyspark.Accumulator(aid, value, accum_param)
如下的示例中显示了如何使用累加器变量。累加器变量与广播变量类似,同样可以通过value属性来查询数据,但是仅仅能在驱动程序中调用。在下面的例子中,我们将一个累计器用于多个工作节点并返回一个累加值。
代码语言:javascript复制num = sc.accumulator(10)
def f(x):
global num
num =x
rdd = sc.parallelize([20,30,40,50])
rdd.foreach(f)
final = num.value
print "Accumulated value is -> %i" % (final)
# Accumulated value is -> 150