在平常工作中,难免要和大数据打交道,而有时需要读取本地文件然后存储到Hive中,本文接下来将具体讲解。
过程:
- 使用pickle模块读取.plk文件;
- 将读取到的内容转为RDD;
- 将RDD转为DataFrame之后存储到Hive仓库中;
1、使用pickle保存和读取pickle文件
代码语言:javascript复制import pickle
data = ""
path = "xxx.plj"
#保存为pickle
pickle.dump(data,open(path,'wb'))
#读取pickle
data2 = pickle.load(open(path,'rb'))
使用python3读取python2保存的pickle文件时,会报错:
UnicodeDecodeError: 'ascii' codec can't decode byte 0xa0 in position 11: ordinal not in range(128)
解决方法:
代码语言:javascript复制data2 = pickle.load(open(path,'rb',encoding='latin1'))
使用python2读取python3保存的pickle文件时,会报错:
unsupported pickle protocol:3
解决方法:
代码语言:javascript复制import pickle
path = "xxx.plk"
path2 = 'xxx2.plk'
data = pickle.load(open(path,'rb'))
#保存为python2的pickle
pickle.dump(data,open(path2,'wb'),protocol=2)
#读取pickle
data2 = pickle.load(open(path2,'rb'))
2、读取pickle的内容并转为RDD
代码语言:javascript复制from pyspark.sql import SparkSession
from pyspark.sql import Row
import pickle
spark = SparkSession
.builder
.appName("Python Spark SQL basic example")
.config("spark.some.config.option", "some-value")
.getOrCreate()
with open(picle_path,"rb") as fp:
data = pickle.load(fp)
#这里可根据data的类型进行相应的操作
#假设data是一个一维数组:[1,2,3,4,5],读取数据并转为rdd
pickleRdd = spark.parallelize(data)
3、将rdd转为dataframe并存入到Hive中
代码语言:javascript复制#定义列名
column = Row('col')
#转为dataframe
pickleDf =pickleRdd.map(lambda x:column(x))
#存储到Hive中,会新建数据库:hive_database,新建表:hive_table,以覆盖的形式添加,partitionBy用于指定分区字段
pickleDf..write.saveAsTable("hive_database.hvie_table", mode='overwrite', partitionBy=‘’)
补充存入到Hive中的知识:
(1)通过sql的方式
代码语言:javascript复制data = [
(1,"3","145"),
(1,"4","146"),
(1,"5","25"),
(1,"6","26"),
(2,"32","32"),
(2,"8","134"),
(2,"8","134"),
(2,"9","137")
]
df = spark.createDataFrame(data, ['id', "test_id", 'camera_id'])
# method one,default是默认数据库的名字,write_test 是要写到default中数据表的名字
df.registerTempTable('test_hive')
sqlContext.sql("create table default.write_test select * from test_hive")
或者:
代码语言:javascript复制# df 转为临时表/临时视图
df.createOrReplaceTempView("df_tmp_view")
# spark.sql 插入hive
spark.sql(""insert overwrite table
XXXXX # 表名
partition(分区名称=分区值) # 多个分区按照逗号分开
select
XXXXX # 字段名称,跟hive字段顺序对应,不包含分区字段
from df_tmp_view""")
(2)以saveAsTable的形式
代码语言:javascript复制# "overwrite"是重写表的模式,如果表存在,就覆盖掉原始数据,如果不存在就重新生成一张表
# mode("append")是在原有表的基础上进行添加数据
df.write.format("hive").mode("overwrite").saveAsTable('default.write_test')
以下是通过rdd创建dataframe的几种方法:
(1)通过键值对
代码语言:javascript复制d = [{'name': 'Alice', 'age': 1}]
output = spark.createDataFrame(d).collect()
print(output)
# [Row(age=1, name='Alice')]
(2)通过rdd
代码语言:javascript复制a = [('Alice', 1)]
rdd = sc.parallelize(a)
output = spark.createDataFrame(rdd).collect()
print(output)
output = spark.createDataFrame(rdd, ["name", "age"]).collect()
print(output)
# [Row(_1='Alice', _2=1)]
# [Row(name='Alice', age=1)]
(3)通过rdd和Row
代码语言:javascript复制from pyspark.sql import Row
a = [('Alice', 1)]
rdd = sc.parallelize(a)
Person = Row("name", "age")
person = rdd.map(lambda r: Person(*r))
output = spark.createDataFrame(person).collect()
print(output)
# [Row(name='Alice', age=1)]
(4)通过rdd和StrutType
代码语言:javascript复制from pyspark.sql.types import *
a = [('Alice', 1)]
rdd = sc.parallelize(a)
schema = StructType(
[
StructField("name", StringType(), True),
StructField("age", IntegerType(), True)
]
)
output = spark.createDataFrame(rdd, schema).collect()
print(output)
# [Row(name='Alice', age=1)]
(5)基于pandas dataframe创建
代码语言:javascript复制df = spark.createDataFrame(rdd, ['name', 'age'])
print(df) # DataFrame[name: string, age: bigint]
print(type(df.toPandas())) # <class 'pandas.core.frame.DataFrame'>
# 传入pandas DataFrame
output = spark.createDataFrame(df.toPandas()).collect()
print(output)
# [Row(name='Alice', age=1)]
参考:
https://blog.csdn.net/sinat_28224453/article/details/84977693
https://blog.csdn.net/weixin_39198406/article/details/104916715
https://blog.csdn.net/u011412768/article/details/93426353