RDD创建
从文件系统中加载数据生成RDD
spark
的sparkcontext
通过textfile()
读取数据生成内存中的RDD
,文件来源:
- 本地文件系统
- 分布式文件系统
HDFS
Amazon S3
等云端的文件
# 本地加载
lines = sc.textFile("file:///usr/local/spark/rdd/word.txt")
lines.foreach(print) # 查看具体信息
# 分布式文件系统进行加载数据
lines = sc.textFile("hdfs://localhost:9000/usr/hadoop/word.txt")
lines = sc.textFile("/usr/hadoop/word.txt") # 去掉绝对路径
lines = sc.textFile("word.txt") # 文件放在当前主用户的目录下
通过并行集合(数组)创建RDD的实例
并行parallelize()
方法创建
array = [1,3,4,5,2]
rdd = sc.parallelize(array)
rdd.foreach(print)
RDD操作
转换操作
RDD是只读的,只能在修改的过程进行修改。转换过程是惰性机制。整个转换过程只是记录转换的轨迹,并不会发生真正的计算。只有遇到行动操作action
时候,才会发生真正的计算。三种操作:
- filter
- map
- flatmap
- groupbykey
filter(func)
筛选满足函数func的元素,并且返回一个新的数据集
代码语言:javascript复制lines = sc.textFile("word.txt")
linesWithSpark = lines.filter(lambda line: "Spark" in line)
lineWithSpark.foreach(print)
map(func)
将RDD
对象中的元素放入func
函数中进行操作
data = [1,2,3,4]
rdd1 = sc.parallelize(data)
rdd2 = rdd1.map(lambda x: x 10)
rdd2.foreach(print)
代码语言:javascript复制lines = sc.textFile("word.txt")
words = lines.map(lambda line:line.split(" ")) # 函数功能是用空格进行分割
words.foreach(print)
flatmap(func)
与map比较类似,但是每个输入元素都可以映射到0个或者多个输出结果(可乐罐栗子)
代码语言:javascript复制lines = sc.textFile("file:///usr/local/spark/code/radd/word.txt") # 加载本地文件
words = lines.flatMap(lambda line:line.split(" "))
groupbykey()
通过key进行分组;相同的值放到一个元组中,是以iterable的数据形式存放。
代码语言:javascript复制words = sc.paralelize(obj)
words1 = words.groupByKey()
wordsq.foreach(print)
reduceByKey(func)
将返回的值value通过func函数进行计算
行动操作action
执行行动类型操作,发生真正的计算
函数 | 说明 |
---|---|
count() | 返回数据集中的总个数 |
collect() | 以列表或数组的形式返回数据集中的所有元素 |
first() | 返回第一个元素 |
take(n) | 以列表的形式返回前n个元素 |
reduce(func) | 通过func函数聚合数据集中的所有元素 |
foreach(func) | 将数据集中的元素传递给函数func进行运行 |
惰性机制
在RDD的操作中,只有遇到行动类型的操作才是开始计算操作
代码语言:javascript复制lines = sc.textFile("word.txt")
linelength = lines.map(lambda s: len(s)) # 记录轨迹
totallength = linelength.reduce(lambda a,b: a b) # 遇到reduce函数才开始执行计算过程
print(totallength)