函数(function)
Java中,函数需要作为实现了Spark的org.apache.spark.api.java.function包中的任一函数接口的对象来传递。(Java1.8支持了lamda表达式)
根据Spark-1.6整理如下:
Function:
CoGroupFunction
DoubleFlatMapFunction
DoubleFunction
FilterFunction
FlatMapFunction
FlatMapFunction2
FlatMapGroupsFunction
ForeachFunction
ForeachPartitionFunction
Function
Function0
Function2
Function3
Function4
MapFunction
MapGroupsFunction
MapPartitionsFunction
PairFlatMapFunction
PairFunction
ReduceFunction
VoidFunction
VoidFunction2
算子(operator)
对算子进行分类:
1. 根据固有性质划分为:创建算子(惰性)、Transformation转换算子(惰性)、Action行动算子。
2. 根据RDD的元素是Value还是Key-Value,划分为RDD或者是PairRDD。注意:PairRDD也还是RDD,本质就是元素类型为Tuple2的RDD,所以同样支持RDD所支持的算子。除此之外,介于PairRDD的键值特性,PairRDD有一些特有的算子,这些算子是针对Tuple2中的键或值作为主要区分属性进行操作!在下面的解析中,单RDD或者多RDD的操作同样适用于PairRDD!
3. 根据是对单个RDD单集合操作,还是对多个RDD的多集合操作。
1. 创建 – Value - RDD
(1) parallelize:从驱动程序中对一个集合进行并行化,每个集合元素对应RDD一个元素
(2) textFile:读取外部数据集,每行生成一个RDD元素
2. 转换 - Value - 单RDD
(1) map:将函数应用于RDD中的每个元素,返回值作为新的RDD中的对应一个元素。
(2) flatMap:将函数应用于RDD中的每个元素,将返回的迭代器的所有内容构成新的RDD。通常用来切分单词。
(3) filter:返回一个由通过传给filter()的函数的元素组成的RDD。
(4) distinct:去重。开销很大,需要将所有数据通过网络进行混洗(shuffle)。
(5) mapPartitions:将函数应用于RDD中的每个分区,将返回值构成新的RDD。
3. 转换 - Value – 多RDD
(1) union:生成一个包含两个RDD中所有元素的RDD。不会去重,不进行混洗。
(2) intersection:求两个RDD共同的元素的RDD。会去掉所有重复元素(包含单集合内的原来的重复元素),进行混洗。
(3) subtract:返回一个由只存在于第一个RDD中而不存在于第二个RDD中的所有元素组成的RDD。不会去除重复元素,需要混洗。
(4) cartesian:RDD与另一个RDD的笛卡尔积。
4. 行动 - Value - 单RDD
(1) foreach:将函数应用于RDD中的每个元素,无返回。
(2) foreachPartition:将函数应用于RDD中的每个分区,无返回。
(3) first:返回第一个元素
(4) collect:返回RDD中的所有元素,要求所有数据都放入一个机器中
(5) count:返回RDD中的元素数目
(6) countByValue:返回RDD中每个元素的出现次数,返回Map,键是元素,值是次数。
(7) take:返回RDD中num个数量的元素,返回的顺序可能和预期的不一样
(8) top:返回RDD中最大的num个元素,但也可以根据我们提供的比较函数进行选择
(9) takeOrdered:根据你给的排序方法返回一个元素序列,注意Comparator要序列化
5. 行动 – Value1-Value2 - 单RDD
(1) reduce:在一次遍历中按指定函数合并RDD中所有的元素(例如,求和)
(2) fold:和reduce功能一样,但是提供一个初值
(3) aggregate:和reduce函数类似,但是通常返回不同类型的函数
1. 创建 - KeyValue - PairRDD
(1) parallelizePairs:parallelizePairs 从驱动程序中对一个Tuple2集合进行并行化,每个Tuple2元素对应RDD一个元素。
2. 转换 - KeyValue - 单PairRDD
(1) mapToPair:将函数应用于RDD中的每个元素,转换成键值对形式的RDD。
(2) reduceByKey:分别规约每个键对应的值
(3) groupByKey:对具有相同键的值进行分组(也可以根据除键相同以外的条件进行分组)
(4) combineByKey:使用不同的返回类型聚合具有相同键的值
(5) mapValues:对pairRDD中的每个值应用一个函数而不改变键
(6) flatMapValues:对pair RDD 中的每个值应用
(7) flatMapValues:一个返回迭代器的函数,然后对返回的每个元素都生成一个对应原键的键值对记录。通常用于符号化
(8) keys:返回一个仅包含键的RDD
(9) values:返回一个仅包含值的RDD
(10) sortByKey:返回一个根据键排序的RDD
3. 转换 - KeyValue - 多PairRDD
(1) subtractByKey:删掉RDD中键与other RDD中的键相同的元素
(2) join:对两个RDD进行内连接
(3) rightOuterJoin:对两个RDD 进行连接操作,确保第二个RDD的键必须存在
(4) leftOuterJoin:对两个RDD 进行连接操作,确保第一个RDD的键必须存在
(5) cogroup:将两个RDD 中拥有相同键的数据分组到一起
(6) partitionBy:按照给定的方式进行分区,原生有Hash分区和范围分区
4. 行动 - KeyValue - 单PairRDD
(1) countByKey:对每个键对应的元素分别计数
(2) collectAsMap:将结果以映射表的形式返回,以便查询
(3) lookup:返回给定键对应的所有值
补充:
1. reduceByKey、foldByKey、combineByKey:reduceByKey、foldByKey会在为每个键计算全局的总结果之前先自动在每台机器上进行本地合并,用户不需要指定合并器。这和MapReduce中的合并器(combiner)作用类似。更泛化的combineByKey接口可以让你自定义合并的行为。
2. reduceByKey:在reduceByKey类似的行动算子时会进行一个shuffle过程,shuffle需要网络I/O,如果是在MapReduce中还会有磁盘I/O。
日志
如果觉得shell中输出的日志信息过多而使人分心,可以调整日志的级别来控制输出的信息量。你需要在conf 目录下创建一个名为log4j.properties 的文件来管理日志设置。Spark开发者们已经在Spark 中加入了一个日志设置文件的模版,叫作log4j.properties.template。要让日志看起来不那么啰嗦,可以先把这个日志设置模版文件复制一份到conf/log4j.properties 来作为日志设置文件。
例如:/etc/spark/conf.cloudera.spark_on_yarn/log4j.properties
共享变量
向集群传递函数操作时,可以使用驱动器程序中定义的变量,但集群中运行的每个任务都会得到这些变量的一份新的副本,更新这些副本的值也不会影响驱动器对应的变量,也就是说,本质上这种值的影响是单向的。Spark的两个共享变量,累加器(accumulator)与广播变量(broadcast variable),分别为结果聚合、广播这两种常见的通信模式突破了这一限制。
1. 累加器(accumulator)
对于工作节点上的任务来说,不能访问累加器的值,只可写入累加器。在这种模式下累加器的实现可以更加高效,不需要对每次更新操作进行复杂的通信。累加器的值只有在驱动器程序中可以访问。
Spark会自动重新执行失败的或较慢的任务来应对有错误的或者比较慢的机器。Spark可以抢占式地在另一个节点上启动一个“投机”(speculative)型的任务副本,如果该任务更早结束就可以直接获取结果。这种情况下可能造成累加器重复执行,所以,Spark只会把每个行动操作任务对累加器的修改只应用一次。但是1.3及其以前的版本中,在转换操作任务时并没有这种保证。
2. 广播变量(broadcast variable)
可以让程序高效的向所有工作节点发送一个较大的只读值,以供一个或多个Spark操作使用。比如发送一个较大的只读查询表,甚至是机器学习的一个较大的特征向量。
3. 基于分区的编程
基于分区对数据进行操作可以让我们避免为每个数据元素进行重复的配置工作。诸如打开数据库连接或创建随机数生成器等操作。
Spark UI
默认Spark UI在驱动程序所在机器的4040端口。但对于YARN,驱动程序会运行在集群内部,你应该通过YARN的资源管理器来访问用户界面。YARN的资源管理器会把请求直接转发给驱动程序。
(1) 作业页面:步骤与任务的进度和指标
Spark作业详细执行情况。正在运行的作业、步骤、任务的进度情况。关于物理执行过程的一些指标,例如任务在生命周期中各个阶段的时间消耗。数据倾斜是导致性能问题的常见原因之一。当看到少量任务相对于其他任务需要花费大量时间时,一般就是发生了数据倾斜。
(2) 存储页面:已缓存的RDD的信息
这个页面告诉我们到底各个RDD的哪些部分被缓存了,以及在各种不同的存储媒介(磁盘、内存等)中所缓存的数据量。
(3) 执行器页面:应用中的执行器进程列表
可以确认应用在真实环境下是否可以使用你所预期使用的全部资源量;使用线程转存(Thread Dump)按钮收集执行器进程的栈跟踪信息。可以精确的即时显示出当前执行的代码。
(4) 环境页面:调式Spark配置项
这里的配置项是应用的真实的配置项。可以检查我们的配置是否生效。
配置项
设置Spark的配置有几种方式,优先级从高到低分别为:
(1) 在用户代码中显示调用sparkConf.set()设置的配置项
(2) 其次是通过spark-submit传递的参数
(3) 再次是写在配置文件中的配置值,默认在conf/spark-defaults.conf文件中,也可以通过spark-submit的- -properties自定义该文件的路径
(4) 最后是系统默认
其中,spark-submit的一般格式:
代码语言:javascript复制bin/spark-submit
--class com.hisense.hicon.MyMainClass
--master yarn-cluster
--name "AppNme"
--files data.txt
--jars dep1.jar,dep2.jar,dep3.jar
--num-executors 25
--executor-cores 50
--executor-memory 512m
--driver-memory 2g
--total-executor-cores 50
MyApp.jar
args...
并行度调优
每个RDD都有固定数目的分区,分区数决定了在RDD上执行操作时的并行度。
在聚合、分组操作时,可以指定分区数(不指定会根据集群推算一个默认分区数),例如PairRDD的大多数聚合、分组操作,用第二个参数指定分区数。
除了聚合、分组操作如果希望指定分区数,提供了repartition函数,它会把数据通过网络进行shuffle,并创建出新的分区后的RDD。切记,分区的代价相对较大。还有一个优化版的分区操作:coalesce。除此之外,还可以使用Java中的rdd.partitions().size()查看RDD的分区数。
当Spark调度并运行任务时,Spark会为每个分区中的数据创建出一个任务。该任务在默认情况下会需要集群中的一个计算核心来执行。
从HDFS上读取输入RDD会为数据在HDFS上的每个文件区块创建一个分区。从数据混洗后的RDD派生下来的RDD则会采用与其父RDD相同的并行度。注意并行度过高时,每个分区产生的间接开销累计起来就会更大。
Spark提供了两种方法对操作的并行度进行调优:
(1) 在数据混洗操作时,使用参数的方式为混洗后的RDD指定并行度;
(2) 对于任何已有的RDD,可以进行重新分区来获取更多或者更少的分区数。
序列化调优
序列化在数据混洗时发生,此时有可能需要通过网络传输大量的数据。默认使用Java内建的序列化库。Spark也会使用第三方序列化库:Kryo。需要设置spark.serializer为org.apache.spark.serializer.KryoSerializer。为了获得最佳的性能,你还可以向Kryo注册你想要序列化的类:
代码语言:javascript复制val conf = new SparkConf()
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
// 严格要求注册类
conf.set("spark.kryo.registrationRequired", "true")
conf.registerKryoClasses(Array(classOf[MyClass], classOf[MyOtherClass]))