文章目录
- 背景
- 安装 PySpark
- 使用
- 连接 Spark Cluster
- Spark DataFrame
- Spark Config 条目
- DataFrame 结构使用说明
- 读取本地文件
- 查看 DataFrame 结构
- 自定义 schema
- 选择过滤数据
- 提取数据
- Row & Column
- 原始 sql 查询语句
- pyspark.sql.function 示例
背景
PySpark 通过 RPC server 来和底层的 Spark 做交互,通过 Py4j 来实现利用 API 调用 Spark 核心。 Spark (written in Scala) 速度比 Hadoop 快很多。Spark 配置可以各种参数,包括并行数目、资源占用以及数据存储的方式等等 Resilient Distributed Dataset (RDD) 可以被并行运算的 Spark 单元。它是 immutable, partitioned collection of elements
安装 PySpark
代码语言:javascript复制pip install pyspark
使用
连接 Spark Cluster
代码语言:javascript复制from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName("sparkAppExample")
sc = SparkContext(conf=conf)
Spark DataFrame
代码语言:javascript复制from pyspark.sql import SparkSession
spark = SparkSession.builder
.master("local")
.appName("Word Count")
.config("spark.some.config.option", "some-value")
.getOrCreate()
# getOrCreate表明可以视情况新建session或利用已有的session
# 如果使用 hive table 则加上 .enableHiveSupport()
Spark Config 条目
- 配置大全网址
Spark Configuration
DataFrame 结构使用说明
PySpark 的 DataFrame 很像 pandas 里的 DataFrame 结构
读取本地文件
代码语言:javascript复制# Define the Data
import json
people = [
{'name': 'Li', 'age': 12, 'address': {'country': 'China', 'city': 'Nanjing'}},
{'name': 'Richard', 'age': 14, 'address': {'country': 'USA', 'city': 'Los Angeles'}},
{'name': 'Jacob', 'age': 12, 'address': {'country': 'France', 'city': 'Paris'}},
{'name': 'Manuel', 'age': 12, 'address': {'country': 'UK', 'city': 'London'}},
{'name': 'Kio', 'age': 16, 'address': {'country': 'Japan', 'city': 'Tokyo'}},
]
json.dump(people, open('people.json', 'w'))
# Load Data into PySpark automatically
df = spark.read.load('people.json', format='json')
查看 DataFrame 结构
代码语言:javascript复制# Peek into dataframe
df
# DataFrame[address: struct<city:string,country:string>, age: bigint, name: string]
df.show(2)
"""
------------------ --- -------
| address|age| name|
------------------ --- -------
| [Nanjing, China]| 12| Li|
|[Los Angeles, USA]| 14|Richard|
------------------ --- -------
only showing top 2 rows
"""
df.columns
# ['address', 'age', 'name']
df.printSchema()
"""
root
|-- address: struct (nullable = true)
| |-- city: string (nullable = true)
| |-- country: string (nullable = true)
|-- age: long (nullable = true)
|-- name: string (nullable = true)
"""
自定义 schema
代码语言:javascript复制from pyspark.sql.types import StructField, MapType, StringType, IntegerType, StructType
# 常用的还包括 DateType 等
people_schema= StructType([
StructField('address', MapType(StringType(), StringType()), True),
StructField('age', LongType(), True),
StructField('name', StringType(), True),
])
df = spark.read.json('people.json', schema=people_schema)
df.show(1)
"""
-------------------- --- ----
| address|age|name|
-------------------- --- ----
|[country -> China...| 12| Li|
-------------------- --- ----
only showing top 1 row
"""
df.dtypes
# [('address', 'map<string,string>'), ('age', 'bigint'), ('name', 'string')]
选择过滤数据
代码语言:javascript复制# Select column
address_df = df.select(['address.city'])
# DataFrame[city: string]
# Filter column with value
df.filter(df.age == 12).show()
"""
---------------- --- ------
| address|age| name|
---------------- --- ------
|[Nanjing, China]| 12| Li|
| [Paris, France]| 12| Jacob|
| [London, UK]| 12|Manuel|
---------------- --- ------
"""
nj_df = df.filter('address.city == "Nanjing"')
nj_df.show()
"""
-------------------- --- ----
| address|age|name|
-------------------- --- ----
|[country -> China...| 12| Li|
-------------------- --- ----
"""
# 选择数据头
df.head(2)
"""
[
Row(address={'country': 'China', 'city': 'Nanjing'}, age=12, name='Li'),
Row(address={'country': 'USA', 'city': 'Los Angeles'}, age=14, name='Richard')
]
"""
提取数据
代码语言:javascript复制people = df.collect()
# return list of all Row class
len(people)
# 5
df.select('age').distinct().collect()
# [Row(age=12), Row(age=14), Row(age=16)]
Row & Column
代码语言:javascript复制# ---------------- row -----------------------
first_row = df.head()
# Row(address=Row(city='Nanjing', country='China'), age=12, name='Li')
# 读取行内某一列的属性值
first_row['age'] # 12
first_row.age # 12
getattr(first_row, 'age') # 12
first_row.address
# Row(city='Nanjing', country='China')
# -------------- column -----------------------
first_col = df[0]
first_col = df['adress']
# Column<b'address'>
# copy column[s]
address_copy = first_col.alias('address_copy')
# rename column / create new column
df.withColumnRenamed('age', 'birth_age')
df.withColumn('age_copy', df['age']).show(1)
"""
---------------- --- ---- --------
| address|age|name|age_copy|
---------------- --- ---- --------
|[Nanjing, China]| 12| Li| 12|
---------------- --- ---- --------
only showing top 1 row
"""
df.withColumn('age_over_18',df['age'] > 18).show(1)
"""
---------------- --- ---- -----------
| address|age|name|age_over_18|
---------------- --- ---- -----------
|[Nanjing, China]| 12| Li| false|
---------------- --- ---- -----------
only showing top 1 row
"""
原始 sql 查询语句
代码语言:javascript复制df.createOrReplaceTempView("people")
sql_results = spark.sql("SELECT count(*) FROM people")
sql_results.show()
"""
--------
|count(1)|
--------
| 5|
--------
"""
pyspark.sql.function 示例
代码语言:javascript复制from pyspark.sql import functions as F
import datetime as dt
# 装饰器使用
@F.udf()
def calculate_birth_year(age):
this_year = dt.datetime.today().year
birth_year = this_year - age
return birth_year
calculated_df = df.select("*", calculate_birth_year('age').alias('birth_year'))
calculated_df .show(2)
"""
------------------ --- ------- ----------
| address|age| name|birth_year|
------------------ --- ------- ----------
| [Nanjing, China]| 12| Li| 2008|
|[Los Angeles, USA]| 14|Richard| 2006|
------------------ --- ------- ----------
only showing top 2 rows
"""
# pyspark.sql.function 下很多函保活 udf(用户自定义函数)可以很好的并行处理大数据
# 这就是传说中的函数式编程,进度条显示可能如下:
# [Stage 41: >>>>>>>>>>>>>>>>> (0 1) / 1]
来源:https://zhuanlan.zhihu.com/p/171813899 https://blog.csdn.net/cymy001/article/details/78483723
- 其它阅读:
pyspark 自定义聚合函数 UDAF:https://www.cnblogs.com/wdmx/p/10156500.html