Spark笔记10-demo

2021-03-02 15:55:15 浏览数 (1)

案例

根据几个实际的应用案例来学会sparkmap、filter、take等函数的使用

案例1

找出TOP5的值

  • filter(func):筛选出符合条件的数据
  • map(func):对传入数据执行func操作
  • sortByKey():只能对键值对进行操作,默认是升序
代码语言:javascript复制
from pyspark import SparkConf, SparkContext

conf = SparkConf().setMaster("local").setAppName("ReadHBase")
sc = SparkContext(conf=conf)
lines = sc.textFile("file:///usr/local/spark/mycode/rdd/file")  # 得到RDD元素,每个RDD元素都是文本文件中的一行数据(可能存在空行)

res1 = lines.filter(lambda line:(len(line.strip()) > 0) and (len(line.split(",")) == 4))   # 字符串后面的空格去掉,并且保证长度是4
res2 = res1.map(lambda x:x.split(",")[2])  # 将列表中的元素分割,取出第3个元素,仍是字符串
res3 = res2.map(lambda x:(int(x), ""))  # 将字符串转成int类型,并且变成key-value形式(50, ""),value都是空格
res4 = res3.repartition(1)
res5 = res4.sortByKey(False)   # sortByKey的对象必须是键值对;按照key进行降序排列,value不动
res6 = res5.map(lambda x:x[0]) # 取出第一个元素并通过take取出前5个
res7 = res6.take(5)
for a in res7:
  print(a)
文件全局排序
代码语言:javascript复制
from pyspark import SparkConf, SparkContext

index = 0

def getindex():
  global index
  index  = 1
  return index

def main():
  conf = SparkConf().setMaster("local").setAppName("ReadHBase")
  sc = SparkContext(conf=conf)
  lines = sc.textFile("file:///usr/local/spark/rdd/filesort/file.txt")
  index = 0
  res1 = lines.filter(lambda line:(len(line.strip()) > 0 ))
  res2 = res1.map(lambda x: (int(x.strip()),""))
  res3 = res2.repartition(1)
  res4 = res3.sortByKey(True)  # 升序排列
  res5 = res4.map(lambda x:x[0])
  res6 = res5.map(lambda x:(getindex(),x))
  res6.foreach(print)
  res6.saveAsFile("file:///usr/local/spark/code/rdd/filesort/result") # 结果写进目录中-

二次排序
代码语言:javascript复制
from operator import gt
from pyspark import SparkContext, SparkConf

class SecondarySortKey():
  def __int__(self,k):   # 构造函数
    self.column1 = k[0]
    self.column2 = k[1]

  def __gt__(self,other):  # 重写比较函数
    if other.column1 = self.column1:   # 如果第一个元素相等,表第二个
      return gt(self.column2, other.column2)
    else:
      return gt(self.column1, other.column1)  # 否则直接比较第一个


def main():
  conf = SparkConf().setMaster("local").setAppName("ReadHBase")
  sc = SparkContext(conf=conf)
  rdd1 = sc.textFile("file:///usr/local/spark/rdd/filesort/file.txt")
  rdd2 = rdd1.filter(lambda x:(len(x.strip()) > 0 ))   # 空行消掉
  rdd3 = rdd2.map(lambda x: (int(x.split(" ")[0]), int(x.split(" ")[1])), x)
  rdd4 = rdd3.map(lambda x:(SecondarySortKey(x[0]), x[1]))
  rdd5 = rdd4.sortByKey(False)
  rdd6 = rdd5.map(lambda x: x[1])

0 人点赞