PySpark on HPC 续:批量处理的框架的工程实现

2022-01-21 19:09:17 浏览数 (1)

PySpark on HPC系列记录了我独自探索在HPC利用PySpark处理大数据业务数据的过程,由于这方面资料少或者搜索能力不足,没有找到需求匹配的框架,不得不手搓一个工具链,容我虚荣点,叫“框架”。框架的实现功能如下:

  1. generate job file(生成批量任务描述文件):读取raw data folder,生成带读取raw file list,根据输入job参数(batch size)等输出系列job file(描述输入raw文件路径,生成文件路径);
  2. job script -- single job file(任务脚本:输入一个job file,执行单批次的任务);
  3. job script- array job file(任务脚本:输入array job,执行系列化任务):根据job file folder和array id并行处理多批次raw data file。

1 Framework overview

frameworkframework

如上图所示,另外有几个注意点:

  • PySpark Env详见 pyspark on hpc
  • HPC处理,处理环境(singularity镜像,或者conda环境)和输入输出数据、任务描述(job file)需要存放于HPC各个节点都可以访问的存储上;

2 Process script & job file generate

具体任务处理脚本有几点注意事项:

  1. 初始化HPC PySpark环境;
  2. 入口函数接受一个job file路径,该文件是一个表格文件(如csv),有3列,in_file,out_file,tmp_folder(用于Spark输出,后面gzip压缩成单个文件后删除);
  3. 日志文件要每个job(task)一个,典型的是日期加一个随机值或者job_id;
代码语言:python代码运行次数:0复制
...
os.environ["PYSPARK_PYTHON"] = "<env_path>/python"
os.environ["SPARK_HOME"] = "<env_path>/spark"
os.environ["PYLIB"] = os.environ["SPARK_HOME"]   "/python/lib"
sys.path.insert(0, os.environ["PYLIB"]  "/py4j-10.9-src.zip")
sys.path.insert(0, os.environ["PYLIB"]  "/pyspark.zip")
sys.path.insert(0, '<work_path>')
...

def process_raw(spark, in_file, file_output, out_csv_path):
    raw_to_csv(spark, in_file, out_csv_path)
    csv_to_zip(out_csv_path, file_output)
    shutil.rmtree(out_csv_path)

def process_job_file(in_file,spark):
    df = pd.read_csv(in_file)
    for index, row in df.iterrows():
        in_file, out_file, tmp_path = row['in_file'],row['out_file'],row['tmp_path']
        process_raw(spark, in_file, out_file, tmp_path)

def get_parser():
    parser = argparse.ArgumentParser(description='...')
    parser.add_argument("-j", help="job type", dest="job_type",default='process_raw') 
    # process_raw:
    parser.add_argument("-i", help="input job file", dest="input_file")
    # generate job file
    parser.add_argument("-b", help="one job batch size", dest="batch_size",default=8)
    parser.add_argument("-g", help="generate job file root", dest="gen_file_root", default='./jobs')
    parser.add_argument("-r", help="raw data root", dest="raw_data_root")
    parser.add_argument("-t", help="target data root", dest="tar_data_root")

if __name__ == '__main__':
    parser = get_parser()
    args = parser.parse_args() 
    if args.job_type == 'process_raw' and args.input_file is not None:
       spark = get_spark()
       process_job_file(args.input_file,spark)
    elif args.job_type == 'gen_job_file':
        generate_jobfile_from_folder(args.raw_data_root, args.tar_data_root, batch_size=args.batch_size, job_file_folder=args.gen_file_root)
    else:
        parser.print_help()

3 Jobsript

1) 处理单个任务文件: spark-hpc-batch.sh

代码语言:shell复制
#!/usr/bin/env bash
#SBATCH --job-name=<job_file>
#SBATCH --time=1:00:00
#SBATCH --nodes=1
#SBATCH --cpus-per-task=8
#SBATCH --mem=12G
#SBATCH --output=<job_name>-%j.out
#SBATCH --error=<job_name>-%j.err

<path_to_env>/python <process_file_path>.py -i $1

调用方法

代码语言:shell复制
sbatch spark-hpc-batch.sh <job_file_path>

2) 处理队列任务文件: spark-hpc-batch-array.sh

代码语言:shell复制
#!/usr/bin/env bash
#SBATCH --job-name=<job_file>
#SBATCH --time=1:00:00
#SBATCH --nodes=1
#SBATCH --cpus-per-task=8
#SBATCH --mem=12G
#SBATCH --output=<job_name>-%j.out
#SBATCH --error=<job_name>-%j.err

JOB_FILE_ROOT=$1
<path_to_env>/python <process_file_path>.py -i $1 "$JOB_FILE_ROOT/$SLURM_ARRAY_TASK_ID.csv"

调用方法

代码语言:shell复制
sbatch --array=0-29 spark-hpc-batch-array.sh <job_file_root>

0 人点赞