代码语言:javascript复制
# coding=utf-8
from pyspark import SparkConf, SparkContext
from pyspark import Row
from pyspark.sql import SparkSession
# 初始化spark,生成一个sparkcontext
sc = SparkContext()
print "======================n========================n======================n"
print "=======firt part======n"
# 用sc创建一个RDD -- resilient distributed dataset
lines = sc.textFile("D:/spark-2.1.2-bin-hadoop2.7/bin/readme.txt")
# RDD支持转化操作和行动操作
# 转化操作是返回一个新的RDD
# 行动操作是向驱动器程序返回结果,或将结果写入输出,会触发实际的计算
# 转化操作例子:filter
pyline = lines.filter(lambda line: "a" in line)
# 行动操作:
c = pyline.first()
count = pyline.count()
print c
print count
sq_only = lines.distinct()
print sq_only.collect()
print "=======second part======n"
nums = sc.parallelize([1, 2, 3, 4, 4, 4])
sq_rdd = nums.map(lambda x: x * x)
sq = sq_rdd.collect() # map是转化操作,collect是行动操作
# 注意:collect用于获取整个RDD的数据,只有确保本地机器可以放得下所有数据时才可以使用该函数
for i in sq:
print i
nums_2 = sc.parallelize([4, 5, 6, 7])
# union() 生成一个包含两个RDD中所有元素的RDD
number_all = nums.union(nums_2).distinct()
print type(number_all)
for i in number_all.collect():
print i
# intersection() 求两个RDD共同元素的RDD
number_in = nums.intersection(nums_2)
number_dis = number_all.subtract(nums_2) # number_all没有变化
print number_dis.collect()
print "=======third part======n"
lin2 = sc.parallelize(["hello message", "hi fank", "one"])
# flatmap 将函数应用于RDD中的每一个元素,将返回的迭代器的所有内容构成新的RDD
words = lin2.flatMap(lambda line: line.split(" "))
# 计数
print words.count()
print words.collect()
# 求和
sum_num = number_all.reduce(lambda x, y: x y)
print sum_num
# 统计
value_cnt = nums.countByValue()
print value_cnt
print "=======fourth part======n"
# 键值对操作
# 用map生成一个键值对
pairs = lines.map(lambda x: (x.split(" ")[0], x))
pairs_1 = sc.parallelize([('c', 7), ('b', 1), ('d', 3)])
pairs2 = sc.parallelize([('a', 3), ('b', 4), ('a', 1), ('c', 6)])
# 合并相同键的值
pairs_3 = pairs2.reduceByKey(lambda x, y: x y)
print pairs_3.collect()
# 按键值分组
pairs_4 = pairs2.groupByKey()
print pairs_4.collect()
# 对每个值应用函数
pairs_5 = pairs2.mapValues(lambda x: x ** 2)
print pairs_5.collect()
# 获取返回key值的RDD
pairs_key = pairs2.keys()
print pairs_key.collect()
values = pairs2.values()
# 聚合
pair_animal = sc.parallelize([('panda', 0), ('pink', 3), ('pirate', 3), ('panda', 1), ('pink', 4)])
# 统计pair rdd中每个键对应的值的和并计数,可用于求平均
animal_a = pair_animal.mapValues(lambda x: (x, 1)).reduceByKey(lambda x, y: (x[0] y[0], x[1] y[1]))
animal_avg = animal_a.mapValues(lambda x: x[0] / float(x[1]))
print animal_avg.collect()
print animal_avg.sortByKey(ascending=True, numPartitions=None, keyfunc=lambda x: str(x)).collect()
'''
# 针对2个 pari RDD 的转化操作
# substractByKey ,删掉RDD中与other RDD 键相同的元素
# join
pairs_all=pairs_1.join(pairs2)
for i in pairs_all.collect():
print i[1] # (’c',(7,6))
# rightOuterJoin 右外连接
pairs_right=pairs_1.rightOuterJoin(pairs2)
for i in pairs_right.collect():
print i[1]
# lefOuterJoin 左外连接
pairs_left=pairs_1.leftOuterJoin(pairs2)
for i in pairs_left.collect():
print i[1]
# 从hdfs获取文件
# hdfs_file = sc.textFile(
# "hdfs://sh.hdfs.cr.ied.com:9000/tdw-transfer-data/ihocpro/20171030111201736/train_result/2017112301/model_param.csv")
# print "======test HDFS====="
# print hdfs_file.collect()
# DataFrame及spark sql
# 从文件生成DataFrame
# 用sc创建一个RDD -- resilient distributed dataset
table_rdd = sc.textFile("D:/spark-2.1.2-bin-hadoop2.7/bin/people.txt")
people_sp = table_rdd.map(lambda r: r.split(" "))
people = people_sp.map(lambda p: Row(name=p[0], age=int(p[1]),country=p[2]))
# 创建DataFrame的方法
# 首先创建一个sparksession,不然没有toDF方法
print hasattr(table_rdd,"toDF") #验证rdd是否有toDF方法
spark=SparkSession(sc)
print hasattr(table_rdd,"toDF")
# 方法1:toDF()
df_people = people.toDF()
print df_people.show()
'''
--- -----
|age| name|
--- -----
| 33| jim|
| 34| tom|
| 23|alice|
| 19|gorge|
| 41|saddy|
| 55|marry|
--- -----
'''
# 创建datafram的方法二:createDataFrame
df_people2=spark.createDataFrame(people)
# 建立视图
df_people2.createOrReplaceTempView("people")
# 执行sql查询
print spark.sql("select name,age from people where age >30").show()
'''
----- ---
| name|age|
----- ---
| jim| 33|
| tom| 34|
|saddy| 41|
|marry| 55|
----- ---
'''
df_people2.groupBy("country")
#执行sql会产生新的dataframe
group_p=spark.sql("select country,count(name) from people group by country")
print group_p.show()
'''
------- -----------
|country|count(name)|
------- -----------
| Janpan| 2|
| Frach| 1|
|America| 1|
| china| 2|
|England| 1|
------- -----------
'''
# dataframe转化为rdd
print group_p.rdd.collect()
# 获取列
print group_p.select(group_p.country).alias("CON").collect()
#