本篇博客将会汇总记录大部分的Spark RDD / Dataset的常用操作以及一些容易混淆的操作对比。
0. 基本概念
首先介绍一下基本概念,详情可以参考之前的博客:
- Spark 与 Hadoop 学习笔记 介绍及对比
- Databrick 's Blog on Spark Structured Streaming Summary
- Spark Structured Streaming Kafka使用笔记
RDD概念
代码语言:txt复制RDD是弹性分布式数据集,存储在硬盘或者内存上。
RDD特征
代码语言:txt复制1)有一个分片列表,就是能被切分,和Hadoop一样,能够切分的数据才能够并行计算
代码语言:txt复制2)由一个函数计算每一个分片
代码语言:txt复制3)对其他RDD有依赖,但并不是所有的rdd都有依赖
代码语言:txt复制4)key-value的RDD是根据哈希来分区的
RDD具体操作分为Transformation操作与Action操作,分别是
- 变换Transformation 变换的返回值是一个新的 RDD 集合,而不是单个值。调用一个变换方法,不会有任何求值计算,它只获取一个 RDD 作为参数,然后返回一个新的 RDD。 变换函数包括:map,filter,flatMap,groupByKey,reduceByKey,aggregateByKey,pipe和coalesce。
- 行动Action 行动操作计算并返回一个新的值。当在一个 RDD 对象上调用行动函数时,会在这一时刻计算全部的数据处理查询并返回结果值。 行动操作包括:reduce,collect,count,first,take,countByKey 以及 foreach。
1. Transformation 操作
Transformation | Meaning |
---|---|
map(func) | 返回一个新的分布式数据集,将数据源的每一个元素传递给函数 func映射组成。Return a new distributed dataset formed by passing each element of the source through a function func. |
filter(func) | 返回一个新的数据集,从数据源中选中一些元素通过函数 func 返回 true。Return a new dataset formed by selecting those elements of the source on which _func_returns true. |
flatMap(func) | 类似于 map,但是每个输入项能被映射成多个输出项(所以 func 必须返回一个 Seq,而不是单个 item)。Similar to map, but each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item). |
mapPartitions(func) | 类似于 map,但是分别运行在 RDD 的每个分区上,所以 func 的类型必须是 |
mapPartitionsWithIndex(func) | 类似于 mapPartitions,但是 func 需要提供一个 integer 值描述索引(index),所以 func 的类型必须是 (Int, Iterator) => Iterator 当运行在类型为 T 的 RDD 上。Similar to mapPartitions, but also provides func with an integer value representing the index of the partition, so func must be of type (Int, Iterator<T>) => Iterator<U> when running on an RDD of type T. |
sample(withReplacement, fraction, seed) | 对数据进行采样。sample(withReplacement,fraction,seed)是根据给定的随机种子seed,随机抽样出数量为frac的数据。withReplacement:是否放回样;fraction:比例,0.1表示10% ;Sample a fraction fraction of the data, with or without replacement, using a given random number generator seed. |
union(otherDataset) | union(otherDataset)是数据合并,返回一个新的数据集,由原数据集和otherDataset联合而成。Return a new dataset that contains the union of the elements in the source dataset and the argument. |
intersection(otherDataset) | intersection(otherDataset)是数据交集,返回一个新的数据集,包含两个数据集的交集数据;Return a new RDD that contains the intersection of elements in the source dataset and the argument. |
distinct(numPartitions)) | distinct(numTasks))是数据去重,返回一个数据集,是对两个数据集去除重复数据,numTasks参数是设置任务并行数量。Return a new dataset that contains the distinct elements of the source dataset. |
groupByKey(numPartitions) | groupByKey(numTasks)是数据分组操作,在一个由(K,V)对组成的数据集上调用,返回一个(K,SeqV)对的数据集。When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable<V>) pairs. Note: If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using |
reduceByKey(func, numPartitions) | reduceByKey(func, numTasks)是数据分组聚合操作,在一个(K,V)对的数据集上使用,返回一个(K,V)对的数据集,key相同的值,都被使用指定的reduce函数聚合到一起。When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function func, which must be of type (V,V) => V. Like in |
aggregateByKey(zeroValue)(seqOp, combOp, numPartitions) | aggreateByKey(zeroValue: U)(seqOp: (U, T)=> U, combOp: (U, U) =>U) 和reduceByKey的不同在于,reduceByKey输入输出都是(K, V),而aggreateByKey输出是(K,U),可以不同于输入(K, V) ,aggreateByKey的三个参数:zeroValue: U,初始值,比如空列表{} ;seqOp: (U,T)=> U,seq操作符,描述如何将T合并入U,比如如何将item合并到列表 ;combOp: (U,U) =>U,comb操作符,描述如果合并两个U,比如合并两个列表 ;所以aggreateByKey可以看成更高抽象的,更灵活的reduce或group 。When called on a dataset of (K, V) pairs, returns a dataset of (K, U) pairs where the values for each key are aggregated using the given combine functions and a neutral "zero" value. Allows an aggregated value type that is different than the input value type, while avoiding unnecessary allocations. Like in |
sortByKey(ascending, numPartitions) | sortByKey(ascending,numTasks)是排序操作,对(K,V)类型的数据按照K进行排序,其中K需要实现Ordered方法。When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean |
join(otherDataset, numPartitions) | join(otherDataset, numTasks)是连接操作,将输入数据集(K,V)和另外一个数据集(K,W)进行Join, 得到(K, (V,W));该操作是对于相同K的V和W集合进行笛卡尔积 操作,也即V和W的所有组合;When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key. Outer joins are supported through |
cogroup(otherDataset, numPartitions) | cogroup(otherDataset, numTasks)是将输入数据集(K, V)和另外一个数据集(K, W)进行cogroup,得到一个格式为(K, SeqV, SeqW)的数据集。When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (Iterable<V>, Iterable<W>)) tuples. This operation is also called |
cartesian(otherDataset) | cartesian(otherDataset)是做笛卡尔积:对于数据集T和U 进行笛卡尔积操作, 得到(T, U)格式的数据集。When called on datasets of types T and U, returns a dataset of (T, U) pairs (all pairs of elements). |
pipe(command, envVars) | 把每个分区输出到stdin,然后执行命令,最后读回stdout,以每行为元素,生成新的RDD。注意这里执行命令的单位是分区,不是元素。Pipe each partition of the RDD through a shell command, e.g. a Perl or bash script. RDD elements are written to the process's stdin and lines output to its stdout are returned as an RDD of strings. |
coalesce(numPartitions) | 将RDD的分区数量减少到numPartitions个,在对一个大数据集进行filter操作之后,调用一下减少分区数量可以提高效率。Decrease the number of partitions in the RDD to numPartitions. Useful for running operations more efficiently after filtering down a large dataset. |
repartition(numPartitions) | 随机打乱RDD内全部分区的数据,并且平衡一下。Reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them. This always shuffles all data over the network. |
repartitionAndSortWithinPartitions(partitioner) | repartitionAndSortWithinPartitions函数是repartition函数的变种,与repartition函数不同的是,repartitionAndSortWithinPartitions在给定的partitioner内部进行排序,性能比repartition要高。 Repartition the RDD according to the given partitioner and, within each resulting partition, sort records by their keys. This is more efficient than calling |
2. Actions
Action | Meaning |
---|---|
reduce(func) | reduce(func)是对数据集的所有元素执行聚集(func)函数,该函数必须是可交换的。Aggregate the elements of the dataset using a function func (which takes two arguments and returns one). The function should be commutative and associative so that it can be computed correctly in parallel. |
collect() | collect相当于toArray,toArray已经过时不推荐使用,collect将分布式的RDD返回为一个单机的scala Array数组。在这个数组上运用scala的函数式操作。Return all the elements of the dataset as an array at the driver program. This is usually useful after a filter or other operation that returns a sufficiently small subset of the data. |
count() | 返回数据集中元素的个数。Return the number of elements in the dataset. |
first() | 返回数据集中的第一个元素, 类似于take(1)。Return the first element of the dataset (similar to take(1)). |
take(n) | Take(n)返回一个包含数据集中前n个元素的数组, 当前该操作不能并行。Return an array with the first n elements of the dataset. |
takeSample(withReplacement, num, seed) | akeSample(withReplacement,num, seed)返回包含随机的num个元素的数组,和Sample不同,takeSample 是行动操作,所以返回的是数组而不是RDD , 其中第一个参数withReplacement是抽样时是否放回,第二个参数num会精确指定抽样数,而不是比例。Return an array with a random sample of num elements of the dataset, with or without replacement, optionally pre-specifying a random number generator seed. |
takeOrdered(n, ordering) | takeOrdered(n, ordering)是返回包含随机的n个元素的数组,按照顺序输出。Return the first n elements of the RDD using either their natural order or a custom comparator. |
saveAsTextFile(path) | 把数据集中的元素写到一个文本文件,Spark会对每个元素调用toString方法来把每个元素存成文本文件的一行。存储到HDFS的指定目录。Write the elements of the dataset as a text file (or set of text files) in a given directory in the local filesystem, HDFS or any other Hadoop-supported file system. Spark will call toString on each element to convert it to a line of text in the file. |
saveAsSequenceFile(path) (Java and Scala) | 支持Java和Scala),将所有元素写入一个 Hadoop SequenceFile, 支持 本地文件系统 、HDFS 和 Hadoop支持的任何文件系统。只有实现 HadoopWritable 接口的键值对类型的RDD支持此操作。在Scala里, 可以隐式转换到Writable的类型也支持这个操作, (Spark对基本类型Int, Double, String等都写好了隐式转换)。Write the elements of the dataset as a Hadoop SequenceFile in a given path in the local filesystem, HDFS or any other Hadoop-supported file system. This is available on RDDs of key-value pairs that implement Hadoop's Writable interface. In Scala, it is also available on types that are implicitly convertible to Writable (Spark includes conversions for basic types like Int, Double, String, etc). |
saveAsObjectFile(path) (Java and Scala) | saveAsObjectFile将分区中的每10个元素组成一个Array,然后将这个Array序列化,映射为(Null,BytesWritable(Y))的元素,写入HDFS为SequenceFile的格式。Write the elements of the dataset in a simple format using Java serialization, which can then be loaded using |
countByKey() | 对于(K, V)类型的RDD. 返回一个(K, Int)的map, Int为K的个数。Only available on RDDs of type (K, V). Returns a hashmap of (K, Int) pairs with the count of each key. |
foreach(func) | foreach(func)是对数据集中的每个元素都执行func函数。不返回RDD和Array,而是返回Uint。Run a function func on each element of the dataset. This is usually done for side effects such as updating an Accumulator or interacting with external storage systems. Note: modifying variables other than Accumulators outside of the |
3. Pair RDD操作
3.1 Transformation 操作
pair RDD可以使用所有标准RDD上的可能的转化操作,还有其他如下
Transformation | Meaning |
---|---|
reduceBykey(func) | 合并具有相同键的值 Merge the values for each key using an associative and commutative reduce function. This will also perform the merging locally on each mapper before sending results to a reducer, similarly to a "combiner" in MapReduce. |
RDD<scala.Tuple2<K,scala.collection.Iterable<V>>> groupByKey(Partitioner partitioner) | 对具有相同键的值进行分组Group the values for each key in the RDD into a single sequence. Allows controlling the partitioning of the resulting key-value pair RDD by passing a Partitioner. The ordering of elements within each group is not guaranteed, and may even differ each time the resulting RDD is evaluated. |
<C> RDD<scala.Tuple2<K,C>> combineByKey(scala.Function1<V,C> createCombiner,scala.Function2<C,V,C> mergeValue, scala.Function2<C,C,C> mergeCombiners,int numPartitions) | 使用不同的的返回类型合并具有相同键的值 Simplified version of combineByKeyWithClassTag that hash-partitions the resulting RDD using the existing partitioner/parallelism level. This method is here for backward compatibility. It does not provide combiner classtag information to the shuffle. |
<U> RDD<scala.Tuple2<K,U>> mapValues(scala.Function1<V,U> f) | 对pair RDD中的每个值应用一个函数而不改变键 Pass each value in the key-value pair RDD through a map function without changing the keys; this also retains the original RDD's partitioning. |
<U> RDD<scala.Tuple2<K,U>> flatMapValues (scala.Function1<V,scala.collection.TraversableOnce<U>> f) | 对pair RDD中的每个值应用一个返回迭代器的函数, 然后对返回的每个元素都生成一个对应原键的键值对记录。 通常用于符号化。Pass each value in the key-value pair RDD through a flatMap function without changing the keys; this also retains the original RDD's partitioning. |
keys() | 返回一个仅包含键的RDD |
values() | 返回一个仅包含值的RDD |
sortByKey() | 返回一个根据键排序的RDD |
针对两个pair RDD转化操作
Transformation | Meaning |
---|---|
subtractByKey | 删掉RDD中键与other RDD中的键相同的元素 |
join | 对两个RDD进行内连接 |
rightOuterJoin | 对两个RDD进行连接操作,确保第一个RDD的键必须存在(右外连接) |
leftOuterJoin | 对两个RDD进行连接操作,确保第二个RDD的键必须存在(左外连接) |
cogroup | 将两个RDD中拥有相同键的数据分组到一起 |
3.2 Action操作
Action | Meaning |
---|---|
countByKey() | 对每个键对应的元素分别计数 |
collectAsMap() | 将结果以映射表的形式返回,以便查询 |
lookup(key) | 返回给定键对应的所有值 |
4. reduceByKey、groupByKey、combineBykey 比较
4.1 reduceByKey
当采用reduceByKeyt时,Spark可以在每个分区移动数据之前将待输出数据与一个共用的key结合。借助下图可以理解在reduceByKey里究竟发生了什么。 注意在数据对被搬移前同一机器上同样的key是怎样被组合的(reduceByKey中的lamdba函数)。然后lamdba函数在每个区上被再次调用来将所有值reduce成一个最终结果。整个过程如下:
4.2 groupByKey
当采用groupByKey时,由于它不接收函数,spark只能先将所有的键值对(key-value pair)都移动,这样的后果是集群节点之间的开销很大,导致传输延时。整个过程如下:
在对大数据进行复杂计算时,reduceByKey优于groupByKey,reduceByKey在数据量比较大的时候会远远快于groupByKey。
另外,如果仅仅是group处理,那么以下函数应该优先于 groupByKey :
- combineByKey 组合数据,但是组合之后的数据类型与输入时值的类型不一样。
- foldByKey合并每一个 key 的所有值,在级联函数和“零值”中使用。foldByKey合并每一个 key 的所有值,在级联函数和“零值”中使用。
4.3 combineByKey
combineByKey函数主要接受了三个函数作为参数,分别为createCombiner、mergeValue、mergeCombiners。这三个函数足以说明它究竟做了什么。理解了这三个函数,就可以很好地理解combineByKey。
要理解combineByKey(),要先理解它在处理数据时是如何处理每个元素的。由于combineByKey()会遍历分区中的所有元素,因此每个元素的键要么还没有遇到过,要么就和之前的键相同。combineByKey()的处理流程如下:
- 如果是一个新的元素,此时使用createCombiner()来创建那个键对应的累加器的初始值。(!注意:这个过程会在每个分区第一次出现各个键时发生,而不是在整个RDD中第一次出现一个键时发生。)
- 如果这是一个在处理当前分区中之前已经遇到键,此时combineByKey()使用mergeValue()将该键的累加器对应的当前值与这个新值进行合并。
- 由于每个分区都是独立处理的,因此对于同一个键可以有多个累加器。如果有两个或者更多的分区都有对应同一个键的累加器,就需要使用用户提供的mergeCombiners()将各个分区的结果进行合并。
5. map与flatmap比较
map()是将函数用于RDD中的每个元素,将返回值构成新的RDD。
代码语言:txt复制val rdd = sc.parallelize(List("coffee panda","happy panda","happiest panda party"))
代码语言:txt复制rdd.map(x=>x).collect
代码语言:txt复制res9: Array[String] = Array(coffee panda, happy panda, happiest panda party)
flatmap()是将函数应用于RDD中的每个元素,将返回的迭代器的所有内容构成新的RDD
代码语言:txt复制rdd.flatMap(x=>x.split(" ")).collect
代码语言:txt复制res8: Array[String] = Array(coffee, panda, happy, panda, happiest, panda, party)
6. map与mapPartition比较
现在有10个分区,共1000条数据,假设每个分区的数据=1000/10=100条,分别使用map和mapPartition遍历。
- 使用 map(func()) 遍历 现在,当我们将map(func)方法应用于rdd时,func()操作将应用于每一行,在这种情况下,func()操作将被调用1000次。即在一些时间关键的应用中会耗费时间。
- 使用 mapPartition(func()) 遍历 如果我们在rdd上调用mapPartition(func)方法,则func()操作将在每个分区上而不是在每一行上调用。在这种特殊情况下,它将被称为10次(分区数)。通过这种方式,你可以在涉及时间关键的应用程序时阻止一些处理。
6.1 mapPrtition的优势
- 机器学习应用程序,特别是深度学习应用程序 - 使用矢量化时,执行比简单for循环要好上百倍。mapPartitions将帮助使用矢量化。一般来说,性能提高300倍 (这不是百分比,是300倍)
- 连接创建和清理任务很昂贵,每个元素都会使代码效率低下。这适用于数据库或其他连接。但是使用mapPartitions,你可以只对整个分区执行一次init / cleanup循环。
- 一般来说,JVM带有乱序执行(它将完全使用CPU并使你的代码运行得更快),JVM需要分析你的代码,并且必须重写你的代码。使用mapPartitions,JVM可以更好地进行分析优化(与分析调用函数相比,它可以分析/优化简单代码)
- 对于map(),CPU需要每次调用lambda函数(以arg形式传递以进行映射),这会带来10-15ns的开销,并导致CPU寄存器刷新并再次加载(堆栈指针,基址指针和指令指针)
6.2 与mapPartitions相比,map有什么用处?
- 更简单的API,易于编码和易于理解,可以直接使用为List / Array / Map编写的现有函数
- 功能性编程遗留下来的贡献很小。
Reference
- https://www.cnblogs.com/LuisYao/p/6813228.html
- https://spark.apache.org/docs/latest/rdd-programming-guide.html#transformations
- https://blog.csdn.net/u012893747/article/details/77074757
- https://www.jianshu.com/p/64aab52fbb21
- https://blog.csdn.net/lovehuangjiaju/article/details/48622757
- https://blog.csdn.net/dream_an/article/details/50524340?utm_source=blogxgwz1
- https://spark.apache.org/docs/2.3.1/api/java/org/apache/spark/rdd/PairRDDFunctions.html
- https://data-flair.training/blogs/spark-paired-rdd/
- https://www.edureka.co/blog/apache-spark-combinebykey-explained
- https://blog.csdn.net/high2011/article/details/79384159