Python小案例(十)利用PySpark循环写入数据
在做数据分析的时候,往往需要回溯历史数据。但有时候构建历史数据时需要变更参数重复跑数,公司的数仓调度系统往往只支持日期这一个参数,而且为临时数据生产调度脚本显得有点浪费。这个时候就可以结合python的字符串格式化和PySpark的Hive写入,就可以完成循环写入临时数据。
⚠️注意:以下需要在企业服务器上的jupyter上操作,本地jupyter是无法连接企业hive集群的
案例一:多参数循环写入临时表
案例背景:写入每天的热搜数据,热搜类型分为当日、近1日、近2日、近3日。这里为了方便,简化了循环的力度。
代码语言:javascript复制from pyspark.sql import *
# spark配置
spark = SparkSession
.builder
.appName("Python Spark SQL basic example")
.config("spark.executor.instances", "20")
.config("spark.executor.cores", "2")
.config("spark.executor.memory", "8g")
.config("spark.driver.memory", "8g")
.enableHiveSupport()
.getOrCreate()
# 导入其他相关库
import pandas as pd
from datetime import datetime
代码语言:javascript复制# sql创建临时表
sql_create = '''
CREATE TABLE temp.loop_write_example
(
cnt string comment "近n日cnt"
)
PARTITIONED BY (`point_date` string, `dtype` int)
ROW FORMAT SERDE
'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
WITH SERDEPROPERTIES (
'field.delim'='t',
'serialization.format'='t')
STORED AS INPUTFORMAT
'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
;
'''
spark.sql(sql_create)
代码语言:javascript复制DataFrame[]
构造日期'{dt}'
和热搜类型{num}
两个参数
# sql写入临时表
sql_insert = '''
insert overwrite table temp.loop_write_example partition (point_date = '{dt}',dtype={num})
select
sum(if(dt between date_add('{dt}',-{num}) and '{dt}',cnt,null)) as cnt
from
temp.loop_write_example_fake_data
where
dt between date_add('{dt}',-4) and '{dt}'
'''
代码语言:javascript复制dates = pd.date_range('2021-01-01','2021-01-10').strftime("%Y-%m-%d").to_list() # 日期范围
代码语言:javascript复制# 循环写入临时表
for point_date in dates:
if point_date>='2021-01-01' and point_date<'2021-01-03':
for dtype in range(0,4):
start_time = datetime.now()
spark.sql(sql_insert.format(dt=point_date, num=dtype))
end_time=datetime.now()
print (point_date, dtype, "succeed", '耗时' str((end_time-start_time).seconds) '秒')
代码语言:javascript复制2021-01-01 0 succeed 耗时8秒
2021-01-01 1 succeed 耗时7秒
2021-01-01 2 succeed 耗时8秒
2021-01-01 3 succeed 耗时8秒
2021-01-02 0 succeed 耗时8秒
2021-01-02 1 succeed 耗时8秒
2021-01-02 2 succeed 耗时8秒
2021-01-02 3 succeed 耗时8秒
案例二:并发批量写入hdfs
案例背景:将2亿 题目按规则分批写入hdfs,供研发通过接口查询,每个hdfs要求最大1000w。
代码语言:javascript复制from pyspark.sql import *
spark = SparkSession
.builder
.appName("Python Spark SQL basic example")
.config("spark.executor.instances", "20")
.config("spark.executor.cores", "2")
.config("spark.executor.memory", "8g")
.config("spark.driver.memory", "8g")
.enableHiveSupport()
.getOrCreate()
import math
import pandas as pd
from datetime import datetime
import time
import os
代码语言:javascript复制# 为了方便,通过规则生成的数据存入临时表temp.hh_qids中,规则细节无需了解
# 查看数据量级
df_cnt = spark.sql('select count(1) as cnt from temp.hh_qids').toPandas()
N = df_cnt['cnt'].loc[0] # 获取数据量级
print(N)
代码语言:javascript复制273230858
代码语言:javascript复制# 创建表,通过参数i生成表后缀
creat_sql = '''
CREATE TABLE IF NOT EXISTS temp.hh_mult_write_{i}
(
questionid string comment "题目ID"
)
ROW FORMAT SERDE
'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
WITH SERDEPROPERTIES (
'field.delim'='t',
'serialization.format'='t')
STORED AS INPUTFORMAT
'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
;
'''
代码语言:javascript复制# 写入表,写入上述创建的临时表
insert_sql = '''
insert overwrite table temp.hh_mult_write_{i}
select
questionid
from
temp.hh_qids
where
ceil(rn/10000000)={i}
order by
questionid
limit 100000000
'''
- 循环写入
%%time
# 通过循环创建多个临时表并写入
for i in range(1,math.ceil(N/10000000) 1):
start_time = datetime.now()
spark.sql(creat_sql.format(i=i)) # 创建表
spark.sql(insert_sql.format(i=i)) # 写入表
end_time=datetime.now()
print(f"成功写入hh_mult_write_{i}," '耗时' str((end_time-start_time).seconds) '秒')
代码语言:javascript复制成功写入hh_mult_write_1,耗时38秒
成功写入hh_mult_write_2,耗时59秒
成功写入hh_mult_write_3,耗时36秒
成功写入hh_mult_write_4,耗时34秒
成功写入hh_mult_write_5,耗时29秒
成功写入hh_mult_write_6,耗时26秒
成功写入hh_mult_write_7,耗时44秒
成功写入hh_mult_write_8,耗时43秒
成功写入hh_mult_write_9,耗时32秒
成功写入hh_mult_write_10,耗时49秒
成功写入hh_mult_write_11,耗时33秒
成功写入hh_mult_write_12,耗时34秒
成功写入hh_mult_write_13,耗时38秒
成功写入hh_mult_write_14,耗时24秒
成功写入hh_mult_write_15,耗时40秒
成功写入hh_mult_write_16,耗时34秒
成功写入hh_mult_write_17,耗时39秒
成功写入hh_mult_write_18,耗时45秒
成功写入hh_mult_write_19,耗时50秒
成功写入hh_mult_write_20,耗时35秒
成功写入hh_mult_write_21,耗时46秒
成功写入hh_mult_write_22,耗时38秒
成功写入hh_mult_write_23,耗时29秒
成功写入hh_mult_write_24,耗时31秒
成功写入hh_mult_write_25,耗时28秒
成功写入hh_mult_write_26,耗时36秒
成功写入hh_mult_write_27,耗时32秒
成功写入hh_mult_write_28,耗时17秒
CPU times: user 124 ms, sys: 31.8 ms, total: 156 ms
Wall time: 17min 15s
这次通过大量级数据实战演示,可以发现效率还可以,写入28个文件仅需17min 15s。但日常业务中可能存在更复杂的写入或者更大的量级,那有没有办法提高效率呢? 大家都知道python的循环是单线程的,在一次循环结束前是不会调起下次循环的。而调度系统一般也可以支持并发,那python是不是也能通过并发实现多线程呢?当然可以了,方法有不少,但我实验后发现还是
joblib
好用。
这里通过一个简单的小case演示joblib
的效果
# 查看集群服务器cpu数量
print(os.cpu_count())
48
代码语言:javascript复制%%time
# 查看简单循环的执行时间:15s
for i in range(5):
for j in range(3):
time.sleep(1)
print(i*j)
0 0 0 0 1 2 0 2 4 0 3 6 0 4 8 CPU times: user 12.2 ms, sys: 6.18 ms, total: 18.3 ms Wall time: 15 s
代码语言:javascript复制%%time
# 查看多线程下的执行时间:1.35s(好家伙,快了10倍多!)
from joblib import Parallel, delayed
def product2(x,y):
time.sleep(1)
return x*y
# n_jobs=-1表示使用全部cpu
Parallel(n_jobs=-1)(delayed(product2)(i,j) for i in range(5) for j in range(3))
CPU times: user 111 ms, sys: 233 ms, total: 344 ms Wall time: 1.35 s
[0, 0, 0, 0, 1, 2, 0, 2, 4, 0, 3, 6, 0, 4, 8]
大家可以看到,提速效果还是杠杠滴,那实际应用会不会也如此优秀呢?
- 并发写入
# 构造函数-将单次循环的主要过程包装成函数以便Parallel调用
def creat_insert(i):
start_time = datetime.now()
spark.sql(creat_sql.format(i=i)) # 创建表
spark.sql(insert_sql.format(i=i)) # 写入表
end_time=datetime.now()
print_str = f"成功写入hh_mult_test_{i}," '耗时' str((end_time-start_time).seconds) '秒'
return print_str
代码语言:javascript复制%%time
# 并发写入
from joblib import Parallel, delayed
# 集群服务器大家都在用,在做大任务处理时,不建议使用全部cpu,这里使用一半足矣
Parallel(n_jobs=24, prefer="threads")(delayed(creat_insert)(i) for i in range(1,math.ceil(N/10000000) 1))
代码语言:javascript复制CPU times: user 87.6 ms, sys: 18.8 ms, total: 106 ms
Wall time: 1min 49s
['成功写入hh_mult_test_1,耗时44秒',
'成功写入hh_mult_test_2,耗时41秒',
'成功写入hh_mult_test_3,耗时83秒',
'成功写入hh_mult_test_4,耗时49秒',
'成功写入hh_mult_test_5,耗时89秒',
'成功写入hh_mult_test_6,耗时71秒',
'成功写入hh_mult_test_7,耗时89秒',
'成功写入hh_mult_test_8,耗时72秒',
'成功写入hh_mult_test_9,耗时83秒',
'成功写入hh_mult_test_10,耗时77秒',
'成功写入hh_mult_test_11,耗时80秒',
'成功写入hh_mult_test_12,耗时65秒',
'成功写入hh_mult_test_13,耗时53秒',
'成功写入hh_mult_test_14,耗时109秒',
'成功写入hh_mult_test_15,耗时81秒',
'成功写入hh_mult_test_16,耗时73秒',
'成功写入hh_mult_test_17,耗时41秒',
'成功写入hh_mult_test_18,耗时78秒',
'成功写入hh_mult_test_19,耗时84秒',
'成功写入hh_mult_test_20,耗时93秒',
'成功写入hh_mult_test_21,耗时68秒',
'成功写入hh_mult_test_22,耗时78秒',
'成功写入hh_mult_test_23,耗时48秒',
'成功写入hh_mult_test_24,耗时88秒',
'成功写入hh_mult_test_25,耗时54秒',
'成功写入hh_mult_test_26,耗时59秒',
'成功写入hh_mult_test_27,耗时62秒',
'成功写入hh_mult_test_28,耗时37秒']
可以看到,每个文件的写入时间与循环差不多,都是在60秒左右。但整体只花了1min 49s,快了10倍以上。
- 删除测试数据
%%time
# 测试数据量较大,无端占用公司资源是不对的,所以需要删除下。
# 但要我手动一个个删除那也是不可能的,做个简单的for循环即可
for i in range(1,29):
drop_sql='''
DROP TABLE IF EXISTS temp.hh_mult_test_1{i};
'''
spark.sql(drop_sql.format(i=i)) # 删除表
代码语言:javascript复制CPU times: user 3.94 ms, sys: 1.96 ms, total: 5.91 ms
Wall time: 148 ms
总结
至此,python小案例系列也结束了,案例基本来源于我的日常业务。在处理复杂需求,提升工作效率方面,Python还是有一席之地的。不知道大家有没有什么实用的python处理日常需求的小案例呢?
共勉~