PySpark on HPC系列记录了我独自探索在HPC利用PySpark处理大数据业务数据的过程,由于这方面资料少或者搜索能力不足,没有找到需求匹配的框架,不得不手搓一个工具链,容我虚荣点,叫“框架”。框架的实现功能如下:
- generate job file(生成批量任务描述文件):读取raw data folder,生成带读取raw file list,根据输入job参数(batch size)等输出系列job file(描述输入raw文件路径,生成文件路径);
- job script -- single job file(任务脚本:输入一个job file,执行单批次的任务);
- job script- array job file(任务脚本:输入array job,执行系列化任务):根据job file folder和array id并行处理多批次raw data file。
1 Framework overview
如上图所示,另外有几个注意点:
- PySpark Env详见 pyspark on hpc
- HPC处理,处理环境(singularity镜像,或者conda环境)和输入输出数据、任务描述(job file)需要存放于HPC各个节点都可以访问的存储上;
2 Process script & job file generate
具体任务处理脚本有几点注意事项:
- 初始化HPC PySpark环境;
- 入口函数接受一个job file路径,该文件是一个表格文件(如csv),有3列,in_file,out_file,tmp_folder(用于Spark输出,后面gzip压缩成单个文件后删除);
- 日志文件要每个job(task)一个,典型的是日期加一个随机值或者job_id;
...
os.environ["PYSPARK_PYTHON"] = "<env_path>/python"
os.environ["SPARK_HOME"] = "<env_path>/spark"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] "/python/lib"
sys.path.insert(0, os.environ["PYLIB"] "/py4j-10.9-src.zip")
sys.path.insert(0, os.environ["PYLIB"] "/pyspark.zip")
sys.path.insert(0, '<work_path>')
...
def process_raw(spark, in_file, file_output, out_csv_path):
raw_to_csv(spark, in_file, out_csv_path)
csv_to_zip(out_csv_path, file_output)
shutil.rmtree(out_csv_path)
def process_job_file(in_file,spark):
df = pd.read_csv(in_file)
for index, row in df.iterrows():
in_file, out_file, tmp_path = row['in_file'],row['out_file'],row['tmp_path']
process_raw(spark, in_file, out_file, tmp_path)
def get_parser():
parser = argparse.ArgumentParser(description='...')
parser.add_argument("-j", help="job type", dest="job_type",default='process_raw')
# process_raw:
parser.add_argument("-i", help="input job file", dest="input_file")
# generate job file
parser.add_argument("-b", help="one job batch size", dest="batch_size",default=8)
parser.add_argument("-g", help="generate job file root", dest="gen_file_root", default='./jobs')
parser.add_argument("-r", help="raw data root", dest="raw_data_root")
parser.add_argument("-t", help="target data root", dest="tar_data_root")
if __name__ == '__main__':
parser = get_parser()
args = parser.parse_args()
if args.job_type == 'process_raw' and args.input_file is not None:
spark = get_spark()
process_job_file(args.input_file,spark)
elif args.job_type == 'gen_job_file':
generate_jobfile_from_folder(args.raw_data_root, args.tar_data_root, batch_size=args.batch_size, job_file_folder=args.gen_file_root)
else:
parser.print_help()
3 Jobsript
1) 处理单个任务文件: spark-hpc-batch.sh
代码语言:shell复制#!/usr/bin/env bash
#SBATCH --job-name=<job_file>
#SBATCH --time=1:00:00
#SBATCH --nodes=1
#SBATCH --cpus-per-task=8
#SBATCH --mem=12G
#SBATCH --output=<job_name>-%j.out
#SBATCH --error=<job_name>-%j.err
<path_to_env>/python <process_file_path>.py -i $1
调用方法
代码语言:shell复制sbatch spark-hpc-batch.sh <job_file_path>
2) 处理队列任务文件: spark-hpc-batch-array.sh
代码语言:shell复制#!/usr/bin/env bash
#SBATCH --job-name=<job_file>
#SBATCH --time=1:00:00
#SBATCH --nodes=1
#SBATCH --cpus-per-task=8
#SBATCH --mem=12G
#SBATCH --output=<job_name>-%j.out
#SBATCH --error=<job_name>-%j.err
JOB_FILE_ROOT=$1
<path_to_env>/python <process_file_path>.py -i $1 "$JOB_FILE_ROOT/$SLURM_ARRAY_TASK_ID.csv"
调用方法
代码语言:shell复制sbatch --array=0-29 spark-hpc-batch-array.sh <job_file_root>