案例
根据几个实际的应用案例来学会spark
中map、filter、take
等函数的使用
案例1
找出TOP5的值
- filter(func):筛选出符合条件的数据
- map(func):对传入数据执行func操作
- sortByKey():只能对键值对进行操作,默认是升序
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local").setAppName("ReadHBase")
sc = SparkContext(conf=conf)
lines = sc.textFile("file:///usr/local/spark/mycode/rdd/file") # 得到RDD元素,每个RDD元素都是文本文件中的一行数据(可能存在空行)
res1 = lines.filter(lambda line:(len(line.strip()) > 0) and (len(line.split(",")) == 4)) # 字符串后面的空格去掉,并且保证长度是4
res2 = res1.map(lambda x:x.split(",")[2]) # 将列表中的元素分割,取出第3个元素,仍是字符串
res3 = res2.map(lambda x:(int(x), "")) # 将字符串转成int类型,并且变成key-value形式(50, ""),value都是空格
res4 = res3.repartition(1)
res5 = res4.sortByKey(False) # sortByKey的对象必须是键值对;按照key进行降序排列,value不动
res6 = res5.map(lambda x:x[0]) # 取出第一个元素并通过take取出前5个
res7 = res6.take(5)
for a in res7:
print(a)
文件全局排序
代码语言:javascript复制from pyspark import SparkConf, SparkContext
index = 0
def getindex():
global index
index = 1
return index
def main():
conf = SparkConf().setMaster("local").setAppName("ReadHBase")
sc = SparkContext(conf=conf)
lines = sc.textFile("file:///usr/local/spark/rdd/filesort/file.txt")
index = 0
res1 = lines.filter(lambda line:(len(line.strip()) > 0 ))
res2 = res1.map(lambda x: (int(x.strip()),""))
res3 = res2.repartition(1)
res4 = res3.sortByKey(True) # 升序排列
res5 = res4.map(lambda x:x[0])
res6 = res5.map(lambda x:(getindex(),x))
res6.foreach(print)
res6.saveAsFile("file:///usr/local/spark/code/rdd/filesort/result") # 结果写进目录中-
二次排序
代码语言:javascript复制from operator import gt
from pyspark import SparkContext, SparkConf
class SecondarySortKey():
def __int__(self,k): # 构造函数
self.column1 = k[0]
self.column2 = k[1]
def __gt__(self,other): # 重写比较函数
if other.column1 = self.column1: # 如果第一个元素相等,表第二个
return gt(self.column2, other.column2)
else:
return gt(self.column1, other.column1) # 否则直接比较第一个
def main():
conf = SparkConf().setMaster("local").setAppName("ReadHBase")
sc = SparkContext(conf=conf)
rdd1 = sc.textFile("file:///usr/local/spark/rdd/filesort/file.txt")
rdd2 = rdd1.filter(lambda x:(len(x.strip()) > 0 )) # 空行消掉
rdd3 = rdd2.map(lambda x: (int(x.split(" ")[0]), int(x.split(" ")[1])), x)
rdd4 = rdd3.map(lambda x:(SecondarySortKey(x[0]), x[1]))
rdd5 = rdd4.sortByKey(False)
rdd6 = rdd5.map(lambda x: x[1])