前言
本篇文章主要介绍高级RDD操作,重点介绍键值RDD,这是操作数据的一种强大的抽象形式。我们还涉及一些更高级的主题,如自定义分区,这是你可能最想要使用RDD的原因。使用自定义分区函数,你可以精确控制数据在集群上的分布,并相应的操作单个分区。
准备数据集
代码语言:javascript复制 val myCollection = "WeChat official account big data brother"
.split(" ")
// 设置并行度
val word = sc.parallelize(myCollection, 2)
Key-Value基础(key-value RDD)
基于RDD的许多方法要求数据是Key-Value格式,这种方法都有形如 <some-operation>BeyKey的API名称,只要在方法名称中看到Bykey,就意味着只能以PairRDD类型执行此操作。最简单的方法就是当前RDD映射到基本的key-Value结构,也就是说在RDD的每个记录中都有两个值:
代码语言:javascript复制 val Key_Value = word.map(word => (word.toLowerCase(), 1))
KeyBy
前面的实例演示了创建Key的简单方法,但是也可以使用keyBy函数,它是根据当前的value创建key的函数。本列中,将单词中第一个字母作为key,然后Spark将该单词记录保持为RDD的value:
代码语言:javascript复制val KeyByWord = word.keyBy(word => word.toLowerCase.toSeq(0).toString)
对值进行映射
在有一组键值对的之后,你可以开始对他们进行操作。如果我们有一个元组,Spakr将假设第一个元素是Key,第二个是value。这种格式中,你可以显示选择映射value(并忽略key)。当然,可以手动执行此操作。但当手动执行测操作,但当你知道只是要修改value时,这可以帮助防止错误:
代码语言:javascript复制val mapValue = KeyByWord.mapValues(word => word.toUpperCase()).collect()
// 运行结果
(w,WECHAT)
(o,OFFICIAL)
(a,ACCOUNT)
(b,BIG)
(d,DATA)
(b,BROTHER)```
你可以在(ROW)上进行flatMap 操作来扩展行数,是每行表示一个字符。在下面的示例中,我们将单词转换为每个字符数组:
```scala
val flatMapValues = KeyByWord.flatMapValues(word => word.toUpperCase()).collect()
提取Key和value
当我们的数据是键值对这个种格式时,我们还可以使用以下方法提取特定的key或value:
代码语言:javascript复制val values = KeyByWord.values.collect()
val keys = KeyByWord.keys.collect()
lookup
在RDD上很多常用的任务就是查找某个key对对应的value。请注意:并不强调规定每一个输入都只是一个键值,所以如果当我们查找 b
时,我们将获得与该key相关的两个value 。即big
和brother
:
KeyByWord.lookup("b")
sampleByKey
有两种方法可以通过一组key开采样RDD,这可以是近似的方法可以是精确的方法。这两种操作都可以使用或不适用替换策略,以及根据给定的键值对数据集部分采样。这是通过对RDD的一次遍历来检点随机采样,采样数据大于是key-value对数量的math.ceil(numitems * samplingRate)这么多:
代码语言:javascript复制val distinctChar = word.flatMap(word => word.toLowerCase().toSeq).distinct().collect()
import scala.util.Random
val sampleMap = distinctChar.map(c => (c, new Random().nextDouble())).toMap
val tuples = word.map(word => (word.toLowerCase().toSeq(0), word))
.sampleByKey(true, sampleMap, 6L)
.collect()
下面使用sampleByKeyExact的方法不同于sampleByKey,因为他需要额外的遍历RDD,以为99.99%的置信度构造大小等于key-value对数量的math.ceil(numitems*samlingRate) 这么多的样本集合。若设置不替换,则要一次额外遍历RDD以保证样本大小,若设置替换取样,需要额外两次额外遍历:
代码语言:javascript复制 val tuples1 = word.map(word => (word.toLowerCase.toSeq(0), word))
.sampleByKeyExact(true, sampleMap, 6L).collect()
聚合操作
你可以在纯RDD或PairRDD上执行聚合操作,具体取决于所用的方法。下面使用数据集来演示一下:
代码语言:javascript复制 val chars = word.flatMap(word => word.toLowerCase.toSeq)
val KVcharcters = chars.map(letter => (letter, 1))
def maxFunc(left: Int, right: Int) = math.max(left, right)
def addFunc(left:Int,right:Int)=left right
val nums = sc.parallelize(1 to 30, 5)
明白了这些内容之后可以执行类似countByKey的操作,它对每个key对应的项进行计数。
countByKey
可以计算每个key对应的数据项的数量,并将结果写入到本地Map中,你还可以近似的执行操作,在Scala 中指定超时时间和置信度。
代码语言:javascript复制 KVcharcters.countByKey()
KVcharcters.countByKeyApprox(1000L,0.95)
了解聚合操作的实现
有几种方法可以创建key-value PairRDD,但是实现方法对任务的稳定性非常重要。我们比较两个基本的法方法:groupBy 和rduce。我们仅介绍groupByKey和reduceByKey的实现,groupBy和reduceBy的实现思路类似。
groupBykey
你可能会觉得groupByKeype配合使用Map操作是汇总每个key的数据量的最佳方法:
代码语言:javascript复制 KVcharcters.groupByKey().map(row => (row._1, row._2.reduce(addFunc))).collect()
但是,在大多数情况下,这是错误的方法。根本问题是每天执行器在执行函数之前必须把内存中报错一个key对应的所有value。这会有什么问题么?如果有严重的key负载倾斜显示,则某些分组可能由于key对应这太多的value而导致超载问题,进而出现OutPutMemoryErrorrs错误。当前的小数据集显然不会出现这种问题但他可能会在处理大规模数据时爆发严正的问题。这不一定会发生,但他可能会发生。groupByKey在某些情况下是可以的。如果每个key的value数量都差不多,并且知道他们能够被执行器的内存容纳那就可以了。对于其他情况,有一种首选方法,就是使用reduceByKey。
reduceByKey
因为我们是执行一个简单的计算,一个更稳定是同样执行flatMap,然后执行map将每个单词实例映射为数字,人啊执行reduceByKey配以求和一结果存储到数组中。这种方法更稳定,因为reduce发生在每个分组,并且不需要执行所有内容放在内存中。此外此操作不会导致shuffle过程
,在执行最后到reduce之前所有任务都在每个工作节点单独执行。这个大大提供了执行速度该操作相对稳定性:
KVcharcters.reduceByKey(addFunc).collect()
其他聚合方法
还有很多高级聚合操作,使用它主要取决于具体工作负载,而我们发现在当今spark作业中,用户极少遇到这种工作负载(或需要执行这种操作)。因为使用结构化API执行更简单好聚合时,很少会使用这些非常低级的工具。这些函数允许你具体地控制在集群上执行某些聚合操作。
aggregate
有一个函数叫做aggregate,此函数需要一个null值作为起始值,并且需要你指定两个不同的函数第一个函数执行分区内函数,第二个执行分区聚合。起始值在两个聚合级别都使用:
代码语言:javascript复制nums.aggregate(0)(maxFunc,addFunc)
aggregate确实有一些性能问题,因为他在驱动上执行最终聚合。如果执行器的结果太大,则会导致驱动出现OutOfMemoryError
错误并且最终让程序崩掉。还有另一个方法treeAggreate
,他基于不同的实现方法可以得到aggregate
相同的结果。它基本是以下推
方式完成一些子聚合(创建执行器到执行器传输聚合结果的树),最后在执行最终聚合。多层的形式确保驱动在聚合过程中不会耗尽内存,这些基于树实现的通常会提高某些操作的稳定性:
nums.treeAggregate(0)(maxFunc,addFunc,3)
aggregateByKey
此函数与aggregate基本相同,但是基于key聚合而非基于分区聚合
。起始值和函数数的属性配置也都相同:
KVcharcters.aggregateByKey(0)(addFunc,maxFunc).collect()
combineByKey
不同可以指定聚合函数,还可以指定一个合并函数。该函数针对某些key进行操作,并根据某个函数对value合并,然后合并各个合并器输出结果并得出最终结果。我们还可以按照自定义输出分区程序指定输出分区数量:
代码语言:javascript复制 val valToCombiner: Int => List[Int] = (value: Int) => List(value)
val mergeValueFunc=(vals:List[Int],valToAppend:Int)=>valToAppend ::vals
val mergeCombinerFunc: (List[Int], List[Int]) => List[Int] = (vals1: List[Int], vals2: List[Int]) => vals1 ::: vals2
val outputPartition=6
KVcharcters.combineByKey(
valToCombiner,
mergeValueFunc,
mergeCombinerFunc,
outputPartition
).collect()
foldByKey
foldByKey使用满足结合律函数和中性的零值
合并每个key的value,支持多次累积到结果并且不能更改结果(例如,0为加法,或1为减法)
KVcharcters.foldByKey(0)(addFunc).collect()
GoGroups
GoGroups在scala中允许将三个key-value RDD一起分组,在Python 中允许将两个key-value RDD 一起分组。它基于key连接value,这实际上等效基于组的RDD连接操作。执行此操作时,还可以指定多个数输出分区或自定义分区函数,以精确控制此数据在整个集群上分布情况:
代码语言:javascript复制 import scala.util.Random
val distinctChars = word.flatMap(word => word.toCharArray.toSeq).distinct()
val charRdd = distinctChars.map(c => (c, new Random().nextDouble()))
val charRdd2 = distinctChars.map(c => (c, new Random().nextDouble()))
val charRdd3 = distinctChars.map(c => (c, new Random().nextDouble()))
charRdd.cogroup(charRdd,charRdd2,charRdd3)
连接操作 RDD的链接与结构化API中的连接有很多相同之处,他们都遵循相同的基本格式,包括执行了操作的两个RDD,以及输出分区数或自定义分区函数。
内连接
下面给出内连接示例代码。请注意:我们如何设置输出分区数
:
val keyedChar= distinctChars.map(c => (c, new Random().nextDouble()))
val outputPartitions=10
KVcharcters.join(keyedChar).count()
KVcharcters.join(keyedChar,outputPartitions).count()
zip
zip 其实别不是一个连接操作,但是它将两个RDD组合在一起,因此我们暂将它归类为连接操作。zip把两个RDD的元素对应的匹配在一起,要求两个RDD的元素个数相同,同时也要求两个RDD分区数也相同,结果会生成一个PairRDD:
代码语言:javascript复制 val numRange = sc.parallelize(0 to 9, 2)
word.zip(numRange).collect()
控制分区
使用RDD,可以以控制数据在整个集群上的物理分布,其中一些方法与结构API中基本相同,但是最关键区别(在结构化API中不支持的)在于,他可以指定一个数据分区函数。
coalesce
coalesce有效地折叠(collapse)同一个工作节点上的分区,以便在重新分区时避免数据洗牌(shuffle)。假如存储words变量的RDD当前有两个分区,可以使用coalesce将其折叠为一个分区,从避免了数据shuffle。
代码语言:javascript复制word.coalesce(1).getNumPartitions
repartition
repartition 操作将数据进行重新分区,跨节点的分区会执行shuffle操作,对于map和filter操作,增加分区可以可以提高并行度。
代码语言:javascript复制 word.repartition(10)
自定义分区
自定义分区是使用RDD的主要原因之一,而结构化API不支持自定义数据分区,RDD包含影响任务能否成功的低级实现细节。自定义分区的典型示例PageRank实现,你需要控制集群的分布并避免shuffle操作,而在我们的shopping数据集中,可能需要我们根据客户ID对数据进行分区。
简单而言之,自定义分区的唯一目标是数据均匀地分布在整个集群中,以避免如数据倾斜之类的问题。
如果要使用自定义分区,则应从结构化API定义的数据降级为RDD,应用自定义分区程序,然后将RDD转换回DataFrame 或DataSet。只有真正需要时,才会使用RDD自定义分区,这样的可以利用两方面的优势。
需要执行自定义分区,你需要实现Partitioner子类。只有当你很了解特定领域知识时,你才需要这样做。如果你只是想对一个值或一组值(列)进行分区,那么DataFrame API 实现就可以了。
代码语言:javascript复制 val df= spark.read.option("header","true").option("inferSchema","true")
.csv("./data/all")
val rdd=df.coalesce(10).rdd
Spark有两个内置的分区器,你可以在RDD API中调用,他们适用于离散值划分的HashPartitioner(基于哈希值的分区)以及RangePartitioner(根据数值范围分区),这两个分区器分别针对离散和连续值。Spark的结构化API已经包含了他们,可以在RDD中使用他们:
代码语言:javascript复制 val df= spark.read.option("header","true").option("inferSchema","true")
.csv("./data/all")
val rdd=df.coalesce(10).rdd
import org.apache.spark.HashPartitioner
rdd.map(r=>r(6).task(5)).foreach(println)
val keyedRdd=rdd.keyBy(row=>row(6).asInstanceOf[Int].toDouble)
keyedRdd.partitiionBy(new HashPartitioner(10)).task(10)
随热按哈希值分区和范围分区程序都很有用,但他们最基本的分区方法。有时,因为数据量很多并存在严正的数据倾斜(由于某些key对应的value项比其他key对应的value项目多很多导致的数据倾斜),你将需要实现一些非底层的分区方法。你希望尽可能的拆分出这些key以提高并行度,并行度过程中防止OutOfMemoryError错误发生。
一个典型情况是,(当且仅当某个key有特定形式时)由于某个key对应的value太多。需要将这个key拆分成很多key。例如:数据集中可能对某两个客户的数据处理总是会在使用分析过程中崩溃,我们需要对两个客户数据进行细分,就是说比如其他客户ID更细粒度地分解他们。由于这两个key切斜的情况很严严重,所以需要特别处理,而其他的key可以被数据中到大组中,这虽然是一个极端的例子,但你可能会在数据中看到类似的情况。
代码语言:javascript复制 import org.apache.spark.Partitioner
class DomainPartitioner extends Partitioner{
def numPartitions=3
def getPartition(key:Any):Int={
val customerId = key.asInstanceOf[Double].toInt
if (customerId ==17850.0 || customerId==12583.0){
return 0
}else{
return new java.util.Random().nextInt(2) 1
}
}
}
keyedRdd.partitionBy(new DomainPartitioner).map(_._1).glom().map(_.toSet.toSet.length).task(5)
运行代码后,你将看到每个分区中的结果数量,而第二个分区和第三个分区的数量会有所不同,因为后两个分区是随机分布的.
代码语言:javascript复制 def partitionFunc(key):
import random
if key==17850 or key ==12583:
return 0
else:
return random.randint(1,2)
keyedRdd.rdd.keyBy(lambda row: row[6])
keyedRdd.partitionBy(3,partitionFunc)
.map(lambda x:x[0])
.glom()
.map(lambda x:len(set x))
.task(50)
自定义key分发逻辑尽在RDD级别适用。当然,这是一个简单的示例,但他展示了以任何逻辑在集群中部署数据的能力。
自定义序列化
最后一个主题是Kryo序列化问题,任何你希望并行处理(或函数操作)的对象都必须是可序列化的:
代码语言:javascript复制class SomeClass extends Serializable {
var someValue = 0
def setSomeValue(i: Int) = {
someValue = i
this
}
}
sc.parallelize(1 to 10).map(num =new SomeClass().setSomeValue(num))
默认的序列化方法可能很慢,Spark可以使用Kryo库更快地序列化队形。kryo序列化的速度比Java序列化更快,压缩更紧凑(通常是10倍),但别不是所有的序列化类型的,并且要求你先注册程序中使用的类。
你可以借助于SparkConf使用kryo初始化你的任务,并设置spark.serizlizer
为org.apche.spark.serizlizer.KryoSerizlizer
。此配置用于在工作节点之间数据传输或将RDD写入到磁盘上时,Spark采用序列化工具。Spark没有选择Kryo作为默认序列化工具的原因是它要求自定义注册,但我们建议在网络传输量大的应用程序中尝试使用它,自Spark.2.0.0之后,我们在对简单类型,简单类型数组或字符串类型的RDD进行shuffle操作时,已经默认采用kryo序列化。
Spark为Twitter chill库中AllScalaRegistrar函数的许多常用核心Scala类自动使用了Kryo序列化。
代码语言:javascript复制val conf = new SparkConf().setAppName("mysql").setMaster("local[*]")
conf.registerKryoClasses(Array(classOf[MyClass1]),classOf[myClass2])
val sc =new SparkContext()
小结
在本文章中,我们讨论了有关RDD的许多更高的主题。特定需要主要的部分是自定义分区,它允许你特定的函数来活分数据。
面试真经 | 美团优选大数据开发岗面试真题(附答案)
面试真经 | 大数据/Spark Core灵魂讲解
面试真经 | 大数据/数仓面试灵魂30问
?分享、点赞、在看,给个3连击呗!?