在HPC上启动任务以local模式运行自定义spark,可以自由选择spark、python版本组合来处理数据;起多个任务并行处理独立分区数据,只要处理资源足够,限制速度的只是磁盘io。本地集群处理需要2周的数据,2个小时就处理好了。HPC通常没有数据库,进一步BI展示或者处理需要拉回本地集群,这时候需要把数据块(比如一天)的数据保存为tsv.gz拉回本地集群。pyspark dataframe 提供write的save方法,可以写tsv.gz,spark默认是并行写,所以在提供outpath目录下写多个文件。这个时候,需要顺序拼接多个tsv文件并压缩为gz格式。
1. process_to_tsv_path
代码语言:python代码运行次数:0复制 from pyspark.sql import SparkSession
def process_to_tsv_path(spark, in_file, out_csv_path,tasks=8):
result = (
spark.read.csv(in_file, sep="t", quote=None, header=True)
.repartition(tasks)
.where(...)
.select(...)
.write.format("com.databricks.spark.csv").save(out_csv_path)
)
return result
repartition的需要在读取输入文件后,并根据文件大小和申请cpu、MEM数适当设定;这样就会在out_csv_path生成对应tasks个csv文件。如果把repartition放在处理之后输出write之前,那么前面处理就只有一个分区,只能调用一个cpu核(和输入文件数对应),浪费算力。做个对比试验,笔者的处理数据情况大概差距5倍。
2. tsv_path_to_gz
代码语言:python代码运行次数:0复制 import glob, gzip
def tsv_path_to_gz(out_csv_path, tar_file):
interesting_files = sorted(glob.glob(f'{out_csv_path}/*.csv'))
with gzip.open(tar_file, 'wb') as f_out:
for file_name in interesting_files:
with open(file_name, 'rb') as f_in:
f_out.writelines(f_in)