手把手带你科研入门系列 | PyAOS基础教程十:大数据文件

2021-07-27 15:19:58 浏览数 (1)


1、前言

文章解答以下疑问:

第一:如何在多CMIP6文件的场景下避免内存泄漏。

文章的目标

第一:了解netCDF数据块chunk的概念;

第二:导入dask库,并启动并行处理机制;

第三:计算并绘制高分辨率模型的最大日降雨量。

2、数据处理

首先看一下测试nc文件,总计7个文件,每个文件大约6.7G,是CNRM-CM6-1-HR模式按照25年的时间分开存储的。

由于模式数据非常巨大,一般pc的内存不够大,无法一次性处理如此大的文件,因此这里不再使用xarray库直接读取数据,而是先用glob库,通过glob库提供的方法将上述7个文件导入系统,但这个时候数据还未读取到系统内存

代码语言:javascript复制
import glob

pr_files = glob.glob('data/pr*.nc')
pr_files.sort()
print(pr_files)

输出:

代码语言:javascript复制
['/Users/irving/Desktop/data-carpentry/data/pr_day_CNRM-CM6-1-HR_historical_r1i1p1f2_gr_18500101-18741231.nc',
 '/Users/irving/Desktop/data-carpentry/data/pr_day_CNRM-CM6-1-HR_historical_r1i1p1f2_gr_18750101-18991231.nc',
 '/Users/irving/Desktop/data-carpentry/data/pr_day_CNRM-CM6-1-HR_historical_r1i1p1f2_gr_19000101-19241231.nc',
 '/Users/irving/Desktop/data-carpentry/data/pr_day_CNRM-CM6-1-HR_historical_r1i1p1f2_gr_19250101-19491231.nc',
 '/Users/irving/Desktop/data-carpentry/data/pr_day_CNRM-CM6-1-HR_historical_r1i1p1f2_gr_19500101-19741231.nc',
 '/Users/irving/Desktop/data-carpentry/data/pr_day_CNRM-CM6-1-HR_historical_r1i1p1f2_gr_19750101-19991231.nc',
 '/Users/irving/Desktop/data-carpentry/data/pr_day_CNRM-CM6-1-HR_historical_r1i1p1f2_gr_20000101-20141231.nc']

然后,用xarray读取数据,但是这里读取数据的方法,与前面的课程有非常明显的不同(前面用的是xarray.open_dataset来一次性读取nc文件到内存中),这里用到的是xarray.open_mfdataset函数分批读取数据,我们具体来看看它是如何读取数据的。

代码语言:javascript复制
import xarray as xr

dset = xr.open_mfdataset(pr_files, chunks={'time': '500MB'})
print(dset)

pr_files即为上面的glob抓取的文件,虽说glob一次性抓取了7个nc文件,但是这里xarray读取依然类似于一个文件,参数chunks(数据块)是一个关键,这里的意思是在time维度上一次性读取500MB的数据块,实现按需读取数据。

输出:

代码语言:javascript复制
<xarray.Dataset>
Dimensions:      (axis_nbounds: 2, lat: 360, lon: 720, time: 60265)
Coordinates:
  * lat          (lat) float64 -89.62 -89.12 -88.62 -88.13 ... 88.62 89.12 89.62
  * lon          (lon) float64 0.0 0.5 1.0 1.5 2.0 ... 358.0 358.5 359.0 359.5
  * time         (time) datetime64[ns] 1850-01-01T12:00:00 ... 2014-12-31T12:...
Dimensions without coordinates: axis_nbounds
Data variables:
    time_bounds  (time, axis_nbounds) datetime64[ns] dask.array<chunksize=(9131, 2), meta=np.ndarray>
    pr           (time, lat, lon) float32 dask.array<chunksize=(397, 360, 720), meta=np.ndarray>
Attributes:
    Conventions:            CF-1.7 CMIP-6.2
    creation_date:          2019-05-23T12:33:53Z
    description:            CMIP6 historical
    title:                  CNRM-CM6-1-HR model output prepared for CMIP6 and...
    activity_id:            CMIP
    contact:                contact.cmip@meteo.fr
    data_specs_version:     01.00.21
    dr2xml_version:         1.16
    experiment_id:          historical
    experiment:             all-forcing simulation of the recent past
    external_variables:     areacella
    forcing_index:          2
    frequency:              day
    further_info_url:       https://furtherinfo.es-doc.org/CMIP6.CNRM-CERFACS...
    grid:                   data regridded to a 359 gaussian grid (360x720 la...
    grid_label:             gr
    nominal_resolution:     50 km
    initialization_index:   1
    institution_id:         CNRM-CERFACS
    institution:            CNRM (Centre National de Recherches Meteorologiqu...
    license:                CMIP6 model data produced by CNRM-CERFACS is lice...
    mip_era:                CMIP6
    parent_experiment_id:   p i C o n t r o l
    parent_mip_era:         CMIP6
    parent_activity_id:     C M I P
    parent_source_id:       CNRM-CM6-1-HR
    parent_time_units:      days since 1850-01-01 00:00:00
    parent_variant_label:   r1i1p1f2
    branch_method:          standard
    branch_time_in_parent:  0.0
    branch_time_in_child:   0.0
    physics_index:          1
    product:                model-output
    realization_index:      1
    realm:                  atmos
    references:             http://www.umr-cnrm.fr/cmip6/references
    source:                 CNRM-CM6-1-HR (2017):  aerosol: prescribed monthl...
    source_id:              CNRM-CM6-1-HR
    source_type:            AOGCM
    sub_experiment_id:      none
    sub_experiment:         none
    table_id:               day
    variable_id:            pr
    variant_label:          r1i1p1f2
    EXPID:                  CNRM-CM6-1-HR_historical_r1i1p1f2
    CMIP6_CV_version:       cv=6.2.3.0-7-g2019642
    dr2xml_md5sum:          45d4369d889ddfb8149d771d8625e9ec
    xios_commit:            1442-shuffle
    nemo_gelato_commit:     84a9e3f161dade7_8250e198106a168
    arpege_minor_version:   6.3.3
    history:                none
    tracking_id:            hdl:21.14100/223fa794-73fe-4bb5-9209-8ff910f7dc40

从第1行输出信息来看,dset依然是xarray.Dataset类型的变量,请注意看第9和10行的变量中新增的dask.array对象下的chunksize属性,这是由于我们在读取dset数据时指定chunk参数的原因。按照chunk参数指定的500MB的大小,dask并非将7个nc文件的数据一次性读取到系统内存中,而是遵从一块一块数据读取的原则。当然dask也可以把这些chunks分发到不同的cpu核上进行处理。

那么多大的chunk比较合适呢?如果chunk太小,频繁的调度数据并处理数据将导致效率低下,整体耗时可能依然比较高;如果chunk太大,可能会导致系统运行缓慢,甚至内存泄漏。因此chunk既不能太大,也不能太小,dask的官方文档中给的推荐值是10MB-1GB,比如上面的例子中就是选用的中间值500MB的chunk。

接下来,看一下上面读取的数据大小

代码语言:javascript复制
dset['pr'].data

输出:

代码语言:javascript复制
           Array                 Chunk
Bytes     62.48 GB              499.74 MB
Shape     (60265, 360, 720)     (482, 360, 720)
Count     307 Tasks             150 Chunks
Type      float32               numpy.ndarray

最后,按照时间顺序计算日最大降雨量

代码语言:javascript复制
pr_max = dset['pr'].max('time', keep_attrs=True)
print(pr_max)

输出:

代码语言:javascript复制
<xarray.DataArray 'pr' (lat: 360, lon: 720)>
dask.array<nanmax-aggregate, shape=(360, 720), dtype=float32, chunksize=(360, 720), chunktype=numpy.ndarray>
Coordinates:
  * lat      (lat) float64 -89.62 -89.12 -88.62 -88.13 ... 88.62 89.12 89.62
  * lon      (lon) float64 0.0 0.5 1.0 1.5 2.0 ... 357.5 358.0 358.5 359.0 359.5
Attributes:
    long_name:           Precipitation
    units:               kg m-2 s-1
    online_operation:    average
    cell_methods:        area: time: mean
    interval_operation:  900 s
    interval_write:      1 d
    standard_name:       precipitation_flux
    description:         at surface; includes both liquid and solid phases fr...
    history:             none
    cell_measures:       area: areacella

上面的计算过程看上去是在很短的时间里就完成了,但实际上它依然是xarray懒人模式的一种,一般来说,xarray非必要的情况下不会计算,但是绘图或者写入netCDF文件则会发生计算操作。那么有没有办法强制xarray进行数据计算呢?办法当然是有的,computer函数就可以实现此目的。

代码语言:javascript复制
%%time
pr_max.compute()

第一行代码的作用是打印当前cell的运行时间。

输出:

代码语言:javascript复制
CPU times: user 4min 1s, sys: 54.2 s, total: 4min 55s
Wall time: 3min 44s

3、并行化

上面的例子中,所有的计算处理都是运行在单核上,而dask client可以把任务分发至不同的cpu核上,实现并行化处理。使用方法如下:

代码语言:javascript复制
from dask.distributed import Client

client = Client()
client

输出:

代码语言:javascript复制
Client                                    Cluster
Scheduler: tcp://127.0.0.1:59152          Workers: 4
Dashboard: http://127.0.0.1:8787/status   Cores: 4
                                          Memory: 17.18 GB

然后,我们再来调用一下computer函数,来看看数据处理花了多少时间,跟前面的单核场景进行对比:

代码语言:javascript复制
%%time
pr_max.compute()

输出:

代码语言:javascript复制
CPU times: user 10.2 s, sys: 1.12 s, total: 11.3 s
Wall time: 2min 33s

从这个结果中,可以看到,虽然是4个cpu核参加数据处理,整个cell的运行时间是2min33s,但跟前面单核处理时间3min44s,并没有减少75%的运行时间。说明在多核cpu之间进行系统调度也是耗费时间的,因此,多核cpu并行处理化场景可能不是最优解决方案,需要根据实际情况选择方案。

4、绘图

在完成了日最大降雨量的数据计算后,即可以完成画图工作。

代码语言:javascript复制
import matplotlib.pyplot as plt
import cartopy.crs as ccrs
import numpy as np
import cmocean

pr_max.data = pr_max.data * 86400
pr_max.attrs['units'] = 'mm/day'

fig = plt.figure(figsize=[12,5])
ax = fig.add_subplot(111, projection=ccrs.PlateCarree(central_longitude=180))
pr_max.plot.contourf(ax=ax,
                     levels=np.arange(0, 450, 50),
                     extend='max',
                     transform=ccrs.PlateCarree(),
                     cbar_kwargs={'label': pr_max.units},
                     cmap=cmocean.cm.haline_r)
ax.coastlines()

model = dset.attrs['source_id']
title = f'Daily maximum precipitation, 1850-2014 ({model})'
plt.title(title)

plt.show()

5、总结

本文的主要知识点:

  • 学会用dask和xarray库让netCDF数据加载、处理和可视化等操作更加简单;
  • Dask可以通过并行加速数据处理,但需要特别注意数据分块大小。

0 人点赞