RDD 五大特性
- A list of partitions 一组分区:多个分区,在RDD中用分区的概念。
- A function for computing each split 函数:每个(split/partitions)对应的计算逻辑
- A list of dependencies on other RDDs 依赖关系:可对其他RDD有依赖关系,比如上一个RDD结果需要由下一个RDD进行处理。
- Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned) 分区器:key-value型的RDD是根据哈希来分区的,类似于mapreduce当中的paritioner接口,控制Key分到哪个reduce。
- Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file) 优先位置:作用在每个分区上的优先位置。由spark自动分配
https://blog.csdn.net/zym1117/article/details/79532458
RDD的创建方式
- 通过本地集合创建
makeRDD
代码语言:javascript复制def makeRDD[T: ClassTag](seq: Seq[T],numSlices: Int = defaultParallelism)
底层实现采用 parallelize
def makeRDD[T: ClassTag](seq: Seq[T],numSlices: Int = defaultParallelism): RDD[T] = withScope {
parallelize(seq, numSlices)
}
seq:传入一个集合队列
numSlices:指定分区数;若不指定会采用默认的
通过源码知道numSlices
默认值通过 spark.default.parallelism
配置
override def defaultParallelism(): Int =scheduler.conf.getInt("spark.default.parallelism", totalCores)
totalCores
通过 LocalSchedulerBackend
类传进来的。
至于为啥是LocalSchedulerBackend
,因为我用的是本地模式,当然也有StandaloneSchedulerBackend
(集群模式)
private[spark] class LocalSchedulerBackend(conf: SparkConf,scheduler: TaskSchedulerImpl,val totalCores: Int)
LocalSchedulerBackend
会从SparkContext
中背创建
val conf=new SparkConf().setMaster("local[4]").setAppName("custom-app")
val sc=new SparkContext(conf)
master
就是在.setMaster("local[4]")
指定的值。也就是说,该值会在初始化SparkContext
是指定。
master match {
case "local" =>
checkResourcesPerTask(clusterMode = false, Some(1))
val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
val backend = new LocalSchedulerBackend(sc.getConf, scheduler, 1)
scheduler.initialize(backend)
(backend, scheduler)
case LOCAL_N_REGEX(threads) =>
def localCpuCount: Int = Runtime.getRuntime.availableProcessors()
// local[*] estimates the number of cores on the machine; local[N] uses exactly N threads.
val threadCount = if (threads == "*") localCpuCount else threads.toInt
if (threadCount <= 0) {
throw new SparkException(s"Asked to run locally with $threadCount threads")
}
checkResourcesPerTask(clusterMode = false, Some(threadCount))
val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
val backend = new LocalSchedulerBackend(sc.getConf, scheduler, threadCount)
scheduler.initialize(backend)
(backend, scheduler)
case LOCAL_N_FAILURES_REGEX(threads, maxFailures) =>
def localCpuCount: Int = Runtime.getRuntime.availableProcessors()
// local[*, M] means the number of cores on the computer with M failures
// local[N, M] means exactly N threads with M failures
val threadCount = if (threads == "*") localCpuCount else threads.toInt
checkResourcesPerTask(clusterMode = false, Some(threadCount))
val scheduler = new TaskSchedulerImpl(sc, maxFailures.toInt, isLocal = true)
val backend = new LocalSchedulerBackend(sc.getConf, scheduler, threadCount)
scheduler.initialize(backend)
(backend, scheduler)
case SPARK_REGEX(sparkUrl) =>
checkResourcesPerTask(clusterMode = true, None)
val scheduler = new TaskSchedulerImpl(sc)
val masterUrls = sparkUrl.split(",").map("spark://" _)
val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
scheduler.initialize(backend)
(backend, scheduler)
case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) =>
checkResourcesPerTask(clusterMode = true, Some(coresPerSlave.toInt))
// Check to make sure memory requested <= memoryPerSlave. Otherwise Spark will just hang.
val memoryPerSlaveInt = memoryPerSlave.toInt
if (sc.executorMemory > memoryPerSlaveInt) {
throw new SparkException(
"Asked to launch cluster with %d MiB RAM / worker but requested %d MiB/worker".format(
memoryPerSlaveInt, sc.executorMemory))
}
// For host local mode setting the default of SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED
// to false because this mode is intended to be used for testing and in this case all the
// executors are running on the same host. So if host local reading was enabled here then
// testing of the remote fetching would be secondary as setting this config explicitly to
// false would be required in most of the unit test (despite the fact that remote fetching
// is much more frequent in production).
sc.conf.setIfMissing(SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED, false)
val scheduler = new TaskSchedulerImpl(sc)
val localCluster = new LocalSparkCluster(
numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt, sc.conf)
val masterUrls = localCluster.start()
val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
scheduler.initialize(backend)
backend.shutdownCallback = (backend: StandaloneSchedulerBackend) => {
localCluster.stop()
}
(backend, scheduler)
case masterUrl =>
checkResourcesPerTask(clusterMode = true, None)
val cm = getClusterManager(masterUrl) match {
case Some(clusterMgr) => clusterMgr
case None => throw new SparkException("Could not parse Master URL: '" master "'")
}
try {
val scheduler = cm.createTaskScheduler(sc, masterUrl)
val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler)
cm.initialize(scheduler, backend)
(backend, scheduler)
} catch {
case se: SparkException => throw se
case NonFatal(e) =>
throw new SparkException("External scheduler cannot be instantiated", e)
}
}
local有三种模式
- setMaster("local");默认指定一个处理线程
case "local" =>
checkResourcesPerTask(clusterMode = false, Some(1))
val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
val backend = new LocalSchedulerBackend(sc.getConf, scheduler, 1)
scheduler.initialize(backend)
(backend, scheduler)
通过此代码可以看出,totalCores
默认值为1
val backend = new LocalSchedulerBackend(sc.getConf, scheduler, 1)
- setMaster("local[*]") 或者 setMaster("local[N]");
*
表示服务器cpu总核数N
表示自己指定一个core
数
通过正则的方式匹配.setMaster("local[4]")
是local[*]
还是 local[N]
val LOCAL_N_REGEX = """local[([0-9] |*)]""".r
代码语言:javascript复制// 当前设置的local[4],所以 threads=4
case LOCAL_N_REGEX(threads) =>
// 获取当前服务器的cpu核数
def localCpuCount: Int = Runtime.getRuntime.availableProcessors()
// local[*] estimates the number of cores on the machine; local[N] uses exactly N threads.
val threadCount = if (threads == "*") localCpuCount else threads.toInt
// 校验 threads是否大于0
if (threadCount <= 0) {
throw new SparkException(s"Asked to run locally with $threadCount threads")
}
checkResourcesPerTask(clusterMode = false, Some(threadCount))
val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
// 通过上面代码得出,
// 如果是 * 那么 threadCount =计算机cpu 核数
// 如果是数字,那么就是具体的数字
val backend = new LocalSchedulerBackend(sc.getConf, scheduler, threadCount)
scheduler.initialize(backend)
(backend, scheduler)
那么此时验证一下,是否如想到那样 partition
数是 4
def main(args: Array[String]): Unit = {
val conf=new SparkConf().setMaster("local[4]").setAppName("custom-app")
val sc=new SparkContext(conf)
val list=List(1,2,3,4,5,6,7,8)
val value: RDD[Int] = sc.makeRDD(list)
println(s"Partitions=${value.getNumPartitions}")
println(value.collect.toList)
}
代码语言:javascript复制Partitions=4
List(1, 2, 3, 4, 5, 6, 7, 8)
如果是local[*]
;我的电脑就是16核
Partitions=16
List(1, 2, 3, 4, 5, 6, 7, 8)
除了使用默认的(由spark分配),当然我们也可以自动分配,比如设置为3
代码语言:javascript复制 def main(args: Array[String]): Unit = {
val list=List(1,2,3,4,5,6,7,8)
val value: RDD[Int] = sc.makeRDD(list,3)
println(s"Partitions=${value.getNumPartitions}")
println(value.collect.toList)
}
代码语言:javascript复制Partitions=3
List(1, 2, 3, 4, 5, 6, 7, 8)
前面说过makeRDD
底层就是parallelize
,所以直接使用parallelize
也是没问题的。
def main(args: Array[String]): Unit = {
//val lines: RDD[String] = sc.textFile("file:///C:/Users/123456/Desktop/worldCount.txt",3)
val list=List(1,2,3,4,5,6,7,8)
val value: RDD[Int] = sc.parallelize(list,3)
println(s"Partitions=${value.getNumPartitions}")
println(value.collect.toList)
}
代码语言:javascript复制Partitions=3
List(1, 2, 3, 4, 5, 6, 7, 8)
parallelize:底层实现
代码语言:javascript复制def parallelize[T: ClassTag](seq: Seq[T],numSlices: Int = defaultParallelism): RDD[T] = withScope {
assertNotStopped()
new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
}
- 通过读取文件创建
通过读取文件创建RDD 如果集群配置文件中有配置HADOOP_CONF_DIR,此时spark默认读取的是HDFS文件 1、读取HDFS文件: 1、sc.textFile("/.../...") 2、sc.textFile("hdfs:///.../...") 3、sc.textFile("hdfs://hadoop102:8020/.../...") 2、读取本地文件: sc.textFile("file:///.../...") 如果集群配置文件中没有配置HADOOP_CONF_DIR,此时spark默认读取的是本地文件 1、读取HDFS文件: sc.textFile("hdfs://hadoop102:8020/.../...") 2、读取本地文件: 1、sc.textFile("/.../...") 2、sc.textFile("file:///.../...")
读取hdfs文件hdfs://
@Test
def readHdfs():Unit={
val conf=new SparkConf().setMaster("local[4]").setAppName("custom-app")
val sc=new SparkContext(conf)
val lines=sc.textFile("hdfs://hadoop102:9820/input/wordcount")
println(s"Partitions=${lines.getNumPartitions}")
println(lines.collect.toList)
}
为了方便测试,使用了junit
Partitions=2
List(你好 google , python 你好 count word hello, count 你好 google)
读取本地文件file://
@Test
def readLocalFile():Unit={
val conf=new SparkConf().setMaster("local[4]").setAppName("custom-app")
val sc=new SparkContext(conf)
val lines= sc.textFile("file:///C:/Users/123456/Desktop/worldCount.txt")
println(s"Partitions=${lines.getNumPartitions}")
println(lines.collect.toList)
}
代码语言:javascript复制Partitions=2
List(hello java shell, python java java, wahaha java shell)
明明指定的是local[4]
为啥Partitions
=2?
分享textFile
源码
从textFile
参数列表中可以看出,除了指定文件地址外,还需要配置一个minPartitions
;入股不配置使用默认值
def textFile(path: String,minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
assertNotStopped()
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
minPartitions).map(pair => pair._2.toString).setName(path)
}
代码语言:javascript复制 /**
* Default min number of partitions for Hadoop RDDs when not given by user
* Notice that we use math.min so the "defaultMinPartitions" cannot be higher than 2.
* The reasons for this are discussed in https://github.com/mesos/spark/pull/718
*/
def defaultMinPartitions: Int = math.min(defaultParallelism, 2)
通过源码追踪 defaultParallelism
就是我们上面获取到的(如下)。所以defaultParallelism=4
override def defaultParallelism(): Int = scheduler.conf.getInt("spark.default.parallelism", totalCores)
由此得出若不指定minPartitions
默认值为 小于等于2
。
我们指定一个分区数试试,比如5
代码语言:javascript复制@Test
def readLocalFile():Unit={
val lines= sc.textFile("file:///C:/Users/123456/Desktop/worldCount.txt",5)
println(s"Partitions=${lines.getNumPartitions}")
println(lines.collect.toList)
}
代码语言:javascript复制Partitions=5
通过读取文件创建的RDD的分区数 1、如果有设置minPartitions参数, RDD分区数 >= minPartitions 2、如果没有设置minPartitions参数,RDD分区数 >= Math.min( defaultParallelism, 2) RDD的分区数最终看文件的切片数
- 通过其他RDD衍生 完善worldCount功能
@Test
def readLocalFile():Unit={
val lines= sc.textFile("file:///C:/Users/123456/Desktop/worldCount.txt",4)
println(s"Partitions=${lines.getNumPartitions}")
val groupList: RDD[(String, Iterable[String])] = lines.flatMap(_.split(" ")).groupBy(x=>x)
val value: RDD[(String, Int)] = groupList.map({ case (k, v) => (k, v.size) })
println(value.collect.toList)
}
以其他方式参数的RDD
如:flatMap,map 就称为RDD
的衍生
Partitions=4
List((python,2), (wahaha,2), (shell,3), (hello,2), (java,7))
代码语言:javascript复制 * 通过集合创建的RDD
* 如果在通过parallelize在创建RDD的时候有指定numSlices,此时RDD的分区数 = numSlices
* 如果在通过parallelize在创建RDD的时候没有指定numSlices
* defaultParallelism的值:
* 如果有设置spark.default.parallelism参数,则RDD的分区数 = spark.default.parallelism参数值
* 如果没有设置spark.default.parallelism参数
* 1、master=local[N]的时候,RDD的分区数 = N
* 2、master=local[*]的时候,RDD的分区数 = 机器CPU个数
* 3、master=local的时候,RDD的分区数 = 1
* 4、master=spark://.. 的时候,RDD的分区数 = 本次任务所有executor cpu总核数
*/
- textFile 文件切片
运行程序
代码语言:javascript复制 @Test
def readLocalFile():Unit={
val lines= sc.textFile("file:///C:/Users/123456/Desktop/worldCount.txt",4)
println(s"Partitions=${lines.getNumPartitions}")
val groupList: RDD[(String, Iterable[String])] = lines.flatMap(_.split(" ")).groupBy(x=>x)
val value: RDD[(String, Int)] = groupList.map({ case (k, v) => (k, v.size) })
println(value.collect.toList)
}
worldCount.txt 文件大小
267 字节 (267 字节)
切片源码
代码语言:javascript复制public InputSplit[] getSplits(JobConf job, int numSplits)
throws IOException {
StopWatch sw = new StopWatch().start();
FileStatus[] files = listStatus(job);
// 获取文件总大小 totalSize =267
job.setLong(NUM_INPUT_FILES, files.length);
long totalSize = 0; // compute total size
for (FileStatus file: files) { // check we have valid files
if (file.isDirectory()) {
throw new IOException("Not a file: " file.getPath());
}
totalSize = file.getLen();
}
// 计算每个分区分配多大文件 267/4 =66 取整
long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
// 获取最低切片个数,默认为1。
long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.
FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize);
// splits 存储切块数据。
ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
NetworkTopology clusterMap = new NetworkTopology();
for (FileStatus file: files) {
Path path = file.getPath();
long length = file.getLen();
// 判断文件大小
if (length != 0) {
FileSystem fs = path.getFileSystem(job);
BlockLocation[] blkLocations;
if (file instanceof LocatedFileStatus) {
blkLocations = ((LocatedFileStatus) file).getBlockLocations();
} else {
blkLocations = fs.getFileBlockLocations(file, 0, length);
}
// 判断文件是否可切
if (isSplitable(fs, path)) {
// 获取分块大小,本地 32M;HDFS:128M
long blockSize = file.getBlockSize();
// 获取切片大小(通过下面分析得出 splitSize =66)
long splitSize = computeSplitSize(goalSize, minSize, blockSize);
// 循环得出切片大小
//SPLIT_SLOP=1.1 用于处理剩余文件
// 第一次 267/66 = 4.045454545454546 ;bytesRemaining =267-66=201
// 第二次 201/66 = 3.0454545454545454 ;bytesRemaining =201-66=135
// 第三次 135/66 = 2.0454545454545454 ;bytesRemaining =267-66=69
// 第四次 69/66 = 1.0454545454545454; 该结果小于 SPLIT_SLOP 所以不进while
long bytesRemaining = length; // bytesRemaining =267
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,
length-bytesRemaining, splitSize, clusterMap);
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
splitHosts[0], splitHosts[1]));
bytesRemaining -= splitSize;
}
// 第四次 由于上面最后一次不满足while 条件,所以不进;剩下的数据都将合在一起。
if (bytesRemaining != 0) {
String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, length
- bytesRemaining, bytesRemaining, clusterMap);
splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining,
splitHosts[0], splitHosts[1]));
}
} else {
String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,0,length,clusterMap);
splits.add(makeSplit(path, 0, length, splitHosts[0], splitHosts[1]));
}
} else {
//Create empty hosts array for zero length files
splits.add(makeSplit(path, 0, length, new String[0]));
}
}
sw.stop();
if (LOG.isDebugEnabled()) {
LOG.debug("Total # of splits generated by getSplits: " splits.size()
", TimeTaken: " sw.now(TimeUnit.MILLISECONDS));
}
// splits 大小=4;表示最终分区分为splits.size个。
return splits.toArray(new FileSplit[splits.size()]);
}
computeSplitSize Hadoop 切片大小公式
代码语言:javascript复制// goalSize=66
// minSize=1
// blockSize=32<<20 字节
protected long computeSplitSize(long goalSize, long minSize,long blockSize) {
// Math.min(goalSize, blockSize) 找最小值 结果就是66
// Math.max(minSize, 66); 找最大值 66
return Math.max(minSize, Math.min(goalSize, blockSize));
}
调整 worldCount.txt 文件大小
79 字节 (79 字节)
此时发现,程序最终Partitions结果为 5
代码语言:javascript复制Partitions=5
通过切片来说明
获取文件总大小
代码语言:javascript复制long totalSize = 79;
计算每个分区分配多大文件 goalSize = 79/4 =19 取整
代码语言:javascript复制long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
获取最低切片个数。 minSize =1
代码语言:javascript复制long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize);
获取切片大小 splitSize = 19
代码语言:javascript复制long splitSize = computeSplitSize(goalSize, minSize, blockSize);
切片
代码语言:javascript复制 // 第一次 79 /19 = 4.157894736842105 ; bytesRemaining =79-19=60
// 第二次 60 /19 = 3.1578947368421053 ; bytesRemaining =60-19=41
// 第三次 41 /19 = 2.1578947368421053 ; bytesRemaining =41-19=22
// 第四次 22 /19 = 1.1578947368421053 ; bytesRemaining =22-19=3
// 第五次 3 /19 = 0.15789473684210525 ;不满足 while 循环,
long bytesRemaining = length; // bytesRemaining =79
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,
length-bytesRemaining, splitSize, clusterMap);
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
splitHosts[0], splitHosts[1]));
bytesRemaining -= splitSize;
}
// 第五次 将剩下文件分区合并在一起。
if (bytesRemaining != 0) {
String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, length
- bytesRemaining, bytesRemaining, clusterMap);
splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining,
splitHosts[0], splitHosts[1]));
}
此次切片之后,切分了5次,最终的分区数所以就是5。
虽然在上面,设置了切片数为4,他只是表示最低的切片数为4。具体分多少分区,还得看最终文件切片数量。
代码语言:javascript复制al lines= sc.textFile("file:///C:/Users/123456/Desktop/worldCount.txt",4)