PySpark on hpc 续: 合理分区处理及合并输出单一文件

2022-01-12 16:12:28 浏览数 (1)

在HPC上启动任务以local模式运行自定义spark,可以自由选择spark、python版本组合来处理数据;起多个任务并行处理独立分区数据,只要处理资源足够,限制速度的只是磁盘io。本地集群处理需要2周的数据,2个小时就处理好了。HPC通常没有数据库,进一步BI展示或者处理需要拉回本地集群,这时候需要把数据块(比如一天)的数据保存为tsv.gz拉回本地集群。pyspark dataframe 提供write的save方法,可以写tsv.gz,spark默认是并行写,所以在提供outpath目录下写多个文件。这个时候,需要顺序拼接多个tsv文件并压缩为gz格式。

1. process_to_tsv_path

代码语言:python代码运行次数:0复制
  from pyspark.sql import SparkSession
  def process_to_tsv_path(spark, in_file, out_csv_path,tasks=8):
    result = (
        spark.read.csv(in_file, sep="t", quote=None, header=True)
        .repartition(tasks)
        .where(...)
        .select(...)
        .write.format("com.databricks.spark.csv").save(out_csv_path)
    )
    return result

repartition的需要在读取输入文件后,并根据文件大小和申请cpu、MEM数适当设定;这样就会在out_csv_path生成对应tasks个csv文件。如果把repartition放在处理之后输出write之前,那么前面处理就只有一个分区,只能调用一个cpu核(和输入文件数对应),浪费算力。做个对比试验,笔者的处理数据情况大概差距5倍。

2. tsv_path_to_gz

代码语言:python代码运行次数:0复制
  import glob, gzip
  def tsv_path_to_gz(out_csv_path, tar_file):
    interesting_files = sorted(glob.glob(f'{out_csv_path}/*.csv'))
    with gzip.open(tar_file, 'wb') as f_out:
        for file_name in interesting_files:
            with open(file_name, 'rb') as f_in:
                f_out.writelines(f_in)

0 人点赞