[1014]PySpark使用笔记

2021-07-14 14:09:58 浏览数 (1)

文章目录

    • 背景
    • 安装 PySpark
    • 使用
      • 连接 Spark Cluster
      • Spark DataFrame
      • Spark Config 条目
    • DataFrame 结构使用说明
      • 读取本地文件
      • 查看 DataFrame 结构
      • 自定义 schema
      • 选择过滤数据
      • 提取数据
      • Row & Column
      • 原始 sql 查询语句
      • pyspark.sql.function 示例

背景

PySpark 通过 RPC server 来和底层的 Spark 做交互,通过 Py4j 来实现利用 API 调用 Spark 核心。 Spark (written in Scala) 速度比 Hadoop 快很多。Spark 配置可以各种参数,包括并行数目、资源占用以及数据存储的方式等等 Resilient Distributed Dataset (RDD) 可以被并行运算的 Spark 单元。它是 immutable, partitioned collection of elements

安装 PySpark

代码语言:javascript复制
pip install pyspark

使用

连接 Spark Cluster

代码语言:javascript复制
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName("sparkAppExample")
sc = SparkContext(conf=conf)

Spark DataFrame

代码语言:javascript复制
from pyspark.sql import SparkSession
spark = SparkSession.builder 
          .master("local") 
          .appName("Word Count") 
          .config("spark.some.config.option", "some-value") 
          .getOrCreate()
# getOrCreate表明可以视情况新建session或利用已有的session
# 如果使用 hive table 则加上 .enableHiveSupport()

Spark Config 条目

  • 配置大全网址

Spark Configuration

DataFrame 结构使用说明

PySpark 的 DataFrame 很像 pandas 里的 DataFrame 结构

读取本地文件

代码语言:javascript复制
# Define the Data
import json
people = [
    {'name': 'Li', 'age': 12, 'address': {'country': 'China', 'city': 'Nanjing'}},
    {'name': 'Richard', 'age': 14, 'address': {'country': 'USA', 'city': 'Los Angeles'}},
    {'name': 'Jacob', 'age': 12, 'address': {'country': 'France', 'city': 'Paris'}},
    {'name': 'Manuel', 'age': 12, 'address': {'country': 'UK', 'city': 'London'}},
    {'name': 'Kio', 'age': 16, 'address': {'country': 'Japan', 'city': 'Tokyo'}},
]
json.dump(people, open('people.json', 'w'))

# Load Data into PySpark automatically
df = spark.read.load('people.json', format='json')

查看 DataFrame 结构

代码语言:javascript复制
# Peek into dataframe
df
# DataFrame[address: struct<city:string,country:string>, age: bigint, name: string]

df.show(2)
"""
 ------------------ --- ------- 
|           address|age|   name|
 ------------------ --- ------- 
|  [Nanjing, China]| 12|     Li|
|[Los Angeles, USA]| 14|Richard|
 ------------------ --- ------- 
only showing top 2 rows
"""

df.columns
# ['address', 'age', 'name']

df.printSchema()
"""
root
 |-- address: struct (nullable = true)
 |    |-- city: string (nullable = true)
 |    |-- country: string (nullable = true)
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)
"""

自定义 schema

代码语言:javascript复制
from pyspark.sql.types import StructField, MapType, StringType, IntegerType, StructType
# 常用的还包括 DateType 等

people_schema= StructType([
    StructField('address', MapType(StringType(), StringType()), True),
    StructField('age', LongType(), True),
    StructField('name', StringType(), True),
])

df = spark.read.json('people.json', schema=people_schema)

df.show(1)
"""
 -------------------- --- ---- 
|             address|age|name|
 -------------------- --- ---- 
|[country -> China...| 12|  Li|
 -------------------- --- ---- 
only showing top 1 row
"""

df.dtypes
# [('address', 'map<string,string>'), ('age', 'bigint'), ('name', 'string')]

选择过滤数据

代码语言:javascript复制
# Select column
address_df = df.select(['address.city'])
# DataFrame[city: string]

# Filter column with value
df.filter(df.age == 12).show()
"""
 ---------------- --- ------ 
|         address|age|  name|
 ---------------- --- ------ 
|[Nanjing, China]| 12|    Li|
| [Paris, France]| 12| Jacob|
|    [London, UK]| 12|Manuel|
 ---------------- --- ------ 
"""

nj_df = df.filter('address.city == "Nanjing"')
nj_df.show()
"""
 -------------------- --- ---- 
|             address|age|name|
 -------------------- --- ---- 
|[country -> China...| 12|  Li|
 -------------------- --- ---- 
"""

# 选择数据头
df.head(2)
"""
[ 
  Row(address={'country': 'China', 'city': 'Nanjing'}, age=12, name='Li'), 
  Row(address={'country': 'USA', 'city': 'Los Angeles'}, age=14, name='Richard')
]
"""

提取数据

代码语言:javascript复制
people = df.collect()
# return list of all Row class

len(people)
# 5

df.select('age').distinct().collect()
# [Row(age=12), Row(age=14), Row(age=16)]

Row & Column

代码语言:javascript复制
# ---------------- row -----------------------

first_row = df.head()
# Row(address=Row(city='Nanjing', country='China'), age=12, name='Li')

# 读取行内某一列的属性值
first_row['age']           # 12
first_row.age              # 12
getattr(first_row, 'age')  # 12
first_row.address
# Row(city='Nanjing', country='China')

# -------------- column -----------------------

first_col = df[0]
first_col = df['adress']
# Column<b'address'>

# copy column[s]
address_copy = first_col.alias('address_copy')

# rename column / create new column
df.withColumnRenamed('age', 'birth_age')
df.withColumn('age_copy', df['age']).show(1)
"""
 ---------------- --- ---- -------- 
|         address|age|name|age_copy|
 ---------------- --- ---- -------- 
|[Nanjing, China]| 12|  Li|      12|
 ---------------- --- ---- -------- 
only showing top 1 row
"""

df.withColumn('age_over_18',df['age'] > 18).show(1)
"""
 ---------------- --- ---- ----------- 
|         address|age|name|age_over_18|
 ---------------- --- ---- ----------- 
|[Nanjing, China]| 12|  Li|      false|
 ---------------- --- ---- ----------- 
only showing top 1 row
"""

原始 sql 查询语句

代码语言:javascript复制
df.createOrReplaceTempView("people")
sql_results = spark.sql("SELECT count(*) FROM people")
sql_results.show()
"""
 -------- 
|count(1)|
 -------- 
|       5|
 -------- 
"""

pyspark.sql.function 示例

代码语言:javascript复制
from pyspark.sql import functions as F
import datetime as dt

# 装饰器使用
@F.udf()
def calculate_birth_year(age):
    this_year = dt.datetime.today().year
    birth_year = this_year - age
    return birth_year 

calculated_df = df.select("*", calculate_birth_year('age').alias('birth_year'))
calculated_df .show(2)
"""
 ------------------ --- ------- ---------- 
|           address|age|   name|birth_year|
 ------------------ --- ------- ---------- 
|  [Nanjing, China]| 12|     Li|      2008|
|[Los Angeles, USA]| 14|Richard|      2006|
 ------------------ --- ------- ---------- 
only showing top 2 rows
"""

# pyspark.sql.function 下很多函保活 udf(用户自定义函数)可以很好的并行处理大数据
# 这就是传说中的函数式编程,进度条显示可能如下:
# [Stage 41: >>>>>>>>>>>>>>>>>                    (0   1) / 1]

来源:https://zhuanlan.zhihu.com/p/171813899 https://blog.csdn.net/cymy001/article/details/78483723

  • 其它阅读:

pyspark 自定义聚合函数 UDAF:https://www.cnblogs.com/wdmx/p/10156500.html

0 人点赞