又见dask! 如何使用dask-geopandas处理大型地理数据

2024-06-20 18:10:25 浏览数 (1)

前言

读者来信

我之前是 1、先用arcgis 栅格转点 2、给点添加xy坐标 3、给添加xy坐标后的点通过空间连接的方式添加行政区属性 4、最后计算指定行政区的质心 之前的解决办法是用arcgis 完成第一步和第二步,虽然完成的很慢,但是看起来好像没太大问题 但是第三步用arcgis会卡死,后来用geopandas也会卡死,后来了解到dask-geopandas,但是处理了两百万个点左右好像也报错了,不知道是我写的代码有问题还是我对dask的理解有问题,想要请教一下大佬

读者的问题涉及到地理信息系统(GIS)操作的一系列步骤,具体包括将栅格数据转换为点数据、为这些点数据添加XY坐标、通过空间连接给这些点添加行政区属性、以及计算指定行政区的质心。读者在使用ArcGIS软件完成前两步时未遇到明显问题,但在执行第三步时遇到了性能瓶颈,即使用ArcGIS和GeoPandas进行空间连接操作时系统会卡死。为了解决这个问题,读者尝试使用了dask-geopandas来处理约两百万个点的数据,但似乎遇到了错误。

针对这个情况,我们可以从几个方面进行分析和建议:

性能瓶颈分析:

ArcGIS和GeoPandas在处理大量数据时可能会遇到性能问题,特别是在普通硬件上运行时。这是因为这些操作往往需要大量的内存和CPU资源。 空间连接特别是在点数据量很大时,是一个资源密集型的操作,因为它需要对每个点检查其与其他几何对象(如行政区边界)的空间关系。 dask-geopandas的使用:

dask-geopandas旨在解决类似的性能问题,通过并行计算和延迟执行来提高处理大规模地理空间数据的效率。如果在使用dask-geopandas时遇到错误,可能是由于多种原因导致的,包括但不限于代码问题、内存管理、任务调度等。 为了更好地诊断问题,需要检查错误消息的具体内容。这可能会指示是配置问题、资源不足还是代码逻辑错误。 优化建议:

资源分配:确保有足够的计算资源(CPU和内存)来处理数据。对于dask-geopandas,可以通过调整Dask的工作进程数和内存限制来优化性能。 代码审查:仔细检查实现代码,尤其是dask-geopandas的部分,确认是否正确使用了并行计算和数据分区功能。 批处理:如果可能,尝试将数据分成更小的批次进行处理,而不是一次性处理所有点。这可以帮助减少内存压力。 索引和优化:在进行空间连接之前,为行政区数据建立空间索引可以大大提高查询效率。

注意,运行前需要将input的rar文件解压后再运行程序

dask_geopandas环境部署

花了一番功夫解决环境问题,使用以下步骤即可使用dask_geopandas

In [1]:

代码语言:javascript复制
代码语言:javascript复制
!pip install dask_geopandas -i https://pypi.mirrors.ustc.edu.cn/simple/
!pip install PyGEOS -i https://pypi.mirrors.ustc.edu.cn/simple/
!pip install --upgrade shapely https://pypi.mirrors.ustc.edu.cn/simple/
!pip install pyogrio -i https://pypi.mirrors.ustc.edu.cn/simpl
代码语言:javascript复制

dask_geopandas简单示例

将 GeoPandas DataFrame 转换为 Dask-GeoPandas DataFrame 首先,使用 GeoPandas 读取地理数据文件:

python import geopandas df = geopandas.read_file('...') # 使用你的文件路径替换 '...' 然后,将其转换为 Dask-GeoPandas DataFrame:

python import dask_geopandas

将 GeoPandas DataFrame 分区为 Dask-GeoPandas DataFrame,这里分为4个部分

ddf = dask_geopandas.from_geopandas(df, npartitions=4) 默认情况下,这会根据行来简单地重新分区数据。但是,你也可以提供空间分区,以利用 GeoDataFrame 的空间结构。

python

执行空间重分区

ddf = ddf.spatial_shuffle() GeoPandas 的熟悉的空间属性和方法也可用,并且将并行计算:

python

计算几何对象的面积

ddf.geometry.area.compute()

检查几何对象是否在某个多边形内

ddf.within(polygon) 此外,如果你有一个分布式的 dask.dataframe,你可以将 x-y 点的列传递给 set_geometry 方法来设置几何形状。

python import dask.dataframe as dd import dask_geopandas

从 CSV 文件读取数据

ddf = dd.read_csv('...') # 使用你的文件路径替换 '...'

使用经纬度设置几何形状

ddf = ddf.set_geometry( dask_geopandas.points_from_xy(ddf, 'longitude', 'latitude') ) 目前支持 Parquet 和 Feather 文件格式的写入(以及读回):

python

写入到 Parquet 文件

ddf.to_parquet("path/to/dir/")

从 Parquet 文件读取

ddf = dask_geopandas.read_parquet("path/to/dir/") 传统的 GIS 文件格式可以读入到分区的 GeoDataFrame 中(需要 pyogrio),但不支持写入:

python

读取文件,这里以 GeoPackage 文件为例,同时指定分区数为4

ddf = dask_geopandas.read_file("file.gpkg", npartitions=4) 以上就是如何使用 Dask-GeoPandas 对大型地理空间数据进行高效处理的简单示例。

原程序

In [2]:

代码语言:javascript复制
代码语言:javascript复制
import geopandas as gpd
import time # 添加时间模块

# 添加dask模块
import dask_geopandas

def process_row():
    outwen = '/home/mw/input/dask6250/201105.shp'
    bianjie = '/home/mw/input/dask6250/xian/2023xian.shp'
    jiabianjie = './'
    
    start_time3 = time.time()
    
    # 读取输入和裁剪边界的 shapefile
    target_gdf = gpd.read_file(outwen)
    join_gdf = gpd.read_file(bianjie)
    
    # 改成dask方式
    target_gdfnew = dask_geopandas.from_geopandas(target_gdf, npartitions=4)
       
    # 重新投影参与连接的边界以匹配目标几何图形的 CRS
    join_gdf = join_gdf.to_crs(target_gdf.crs)
    
    # 改成dask方式
    join_gdfnew = dask_geopandas.from_geopandas(join_gdf, npartitions=4)
    
    # 使用空间连接找到相交的部分
    joined = gpd.sjoin(target_gdfnew, join_gdfnew, how='inner', predicate='intersects')
    
    # 将 'bianjie' 中的属性添加到 'outwen' 中
    joined = joined.drop(columns='index_right')  # 移除多余的索引列
    result = target_gdfnew.merge(joined, how='left', on=target_gdfnew.columns.to_list())
    
    # 保存结果到输出边界
    result.to_file(jiabianjie,  encoding='utf-8-sig')  # 确保使用正确的编码
    
    end_time3 = time.time()
    execution_time3 = end_time3 - start_time3
    
    print(f"'{jiabianjie}'已添加边界,开始时间为:{start_time3:.2f},结束时间为:{end_time3:.2f},执行时间为:{execution_time3:.2f}秒")

process_row()

print('finish')
代码语言:javascript复制

好,运行一段时间爆内存了,应该考虑以下优化策略:

  1. 直接在Dask中读取Shapefiles

你的代码先用geopandas读取Shapefile,然后转换为dask_geopandas对象。这个过程中,原始数据会完全加载到内存中,这可能是导致内存溢出的原因之一。相反,你应该直接使用dask_geopandas.read_file来避免将整个数据集一次性加载到内存:

代码语言:javascript复制
python    
target_dgdf = dask_geopandas.read_file(outwen, npartitions=4)    
join_dgdf = dask_geopandas.read_file(bianjie, npartitions=4)
  1. 避免不必要的数据复制

在数据处理过程中,尽量减少不必要的数据复制。例如,在合并或连接操作之前,仔细考虑是否所有列都需要参与操作。

  1. 使用更高效的空间连接

在使用dask_geopandas进行空间连接时,确保操作是高效的。你的代码尝试使用geopandas.sjoin,但是应该使用dask_geopandas.sjoin。此外,确保在执行空间连接之前,两个数据集已经有了匹配的坐标参考系统(CRS)。这样可以避免在每个分区上重复昂贵的CRS转换操作。

  1. 调整npartitions

npartitions的选择对性能和内存使用有重大影响。太少的分区可能会导致单个分区过大,而太多的分区则会增加调度开销。你可能需要实验不同的npartitions值来找到最佳平衡。

  1. 检查最终保存步骤

在保存结果时,如果尝试将整个处理后的数据集写入单个文件,这可能也会导致内存问题。dask_geopandas目前可能不支持直接写入文件格式如Shapefile,因为这通常涉及将数据集合并到单个分区。你可能需要先将数据写入Parquet等格式,或者手动分批写入。

2.0

对自己的资源修改npartitions参数

In [1]:

代码语言:javascript复制
代码语言:javascript复制
import dask_geopandas as dgd
import time

input_shapefile = '/home/mw/input/dask6250/201105.shp'
boundary_shapefile = '/home/mw/input/dask6250/xian/2023xian.shp'
output_directory = './output/'

def process_row(target_gdf, join_gdf, jiabianjie_pat):


    start_time = time.time()
    
    # 直接使用dask-geopandas读取shapefiles
    target_dgdf = dgd.read_file(input_shapefile, npartitions=16)
    join_dgdf = dgd.read_file(boundary_shapefile, npartitions=16)
    
    # 确保边界shapefile与目标shapefile的CRS一致
    join_dgdf = join_dgdf.to_crs(target_dgdf.crs)
    
    # 使用空间连接找到相交的部分
    joined = dgd.sjoin(target_dgdf, join_dgdf, how='inner', predicate='intersects')
    
    # 移除多余的索引列
    joined = joined.drop(columns='index_right')
    

    joined.compute().to_file(output_directory   'result.shp', driver='ESRI Shapefile', encoding='utf-8')

    # end_time = time.time()
    # print(f"已添加边界,开始时间为:{start_time:.2f},结束时间为:{end_time:.2f},执行时间为:{end_time - start_time:.2f}秒")

if __name__ == "__main__":
    process_row(input_shapefile,boundary_shapefile,output_directory)
    print('finish')
代码语言:javascript复制

In [ ]:

代码语言:javascript复制
代码语言:javascript复制
import dask_geopandas as dgd
import time
import gc  # 导入垃圾收集器

input_shapefile = '/home/mw/input/dask6250/201105.shp'
boundary_shapefile = '/home/mw/input/dask6250/xian/2023xian.shp'
output_directory = './'

def process_row(target_gdf, join_gdf, jiabianjie_pat):
    start_time = time.time()
    
    # 根据你的硬件配置调整npartitions,减少分区数以减少内存开销
    target_dgdf = dgd.read_file(input_shapefile, npartitions=16)  # 适当调整npartitions
    join_dgdf = dgd.read_file(boundary_shapefile, npartitions=16)  # 适当调整npartitions
    
    join_dgdf = join_dgdf.to_crs(target_dgdf.crs)
    
    joined = dgd.sjoin(target_dgdf, join_dgdf, how='inner', predicate='intersects')
    
    joined = joined.drop(columns='index_right')
    
    # 计算结果前启动垃圾收集
    gc.collect()
    
    joined.compute().to_file(output_directory   'result.shp', driver='ESRI Shapefile', encoding='utf-8')
    
    # 手动启动垃圾收集释放内存
    gc.collect()

    end_time = time.time()
    print(f"已添加边界,开始时间为:{start_time:.2f},结束时间为:{end_time:.2f},执行时间为:{end_time - start_time:.2f}秒")

if __name__ == "__main__":
    process_row(input_shapefile, boundary_shapefile, output_directory)
    print('finish')
代码语言:javascript复制
代码语言:javascript复制
/opt/conda/lib/python3.9/site-packages/geopandas/_compat.py:111: UserWarning: The Shapely GEOS version (3.10.0-CAPI-1.16.0) is incompatible with the GEOS version PyGEOS was compiled with (3.10.4-CAPI-1.16.2). Conversions between both will be slow.
  warnings.warn(
/opt/conda/lib/python3.9/site-packages/geopandas/io/file.py:362: FutureWarning: pandas.Int64Index is deprecated and will be removed from pandas in a future version. Use pandas.Index with the appropriate dtype instead.
  pd.Int64Index,

3.0

分批运行与采用gpkg方式存储

In [3]:

代码语言:javascript复制
代码语言:javascript复制
import dask_geopandas as dgd
import time
import gc
from dask import delayed, compute  # 从dask中导入compute函数

input_shapefile = '/home/mw/input/dask6250/201105.shp'
boundary_shapefile = '/home/mw/input/dask6250/xian/2023xian.shp'
output_directory = './'

@delayed
def process_batch(batch, join_gdf, output_path):
    # 将边界数据转换为目标数据的坐标参考系统
    join_gdf = join_gdf.to_crs(batch.crs)
    
    # 执行空间连接
    joined = dgd.sjoin(batch, join_gdf, how='inner', predicate='intersects')
    
    # 删除不必要的列
    joined = joined.drop(columns='index_right')
    
    # 计算并保存结果
    joined.compute().to_file(output_path, driver='GPKG', layer='result', encoding='utf-8')  # 使用GeoPackage格式

def main():
    start_time = time.time()
    
    # 明确指定npartitions
    target_dgdf = dgd.read_file(input_shapefile, npartitions=16)  # 明确设置npartitions
    join_dgdf = dgd.read_file(boundary_shapefile, npartitions=16)  # 明确设置npartitions
    
    # 将目标数据集分批处理
    batches = target_dgdf.to_delayed()
    
    tasks = [process_batch(batch, join_dgdf, f"{output_directory}result_{i}.gpkg") for i, batch in enumerate(batches)]
    
    # 使用dask的compute函数来执行所有延迟任务
    compute(*tasks)
    
    gc.collect()  # 手动启动垃圾收集释放内存
    
    end_time = time.time()
    print(f"已添加边界,开始时间为:{start_time:.2f},结束时间为:{end_time:.2f},执行时间为:{end_time - start_time:.2f}秒")

if __name__ == "__main__":
    main()
    print('finish')
代码语言:javascript复制
代码语言:javascript复制
/opt/conda/lib/python3.9/site-packages/geopandas/_compat.py:111: UserWarning: The Shapely GEOS version (3.10.0-CAPI-1.16.0) is incompatible with the GEOS version PyGEOS was compiled with (3.10.4-CAPI-1.16.2). Conversions between both will be slow.
  warnings.warn(

注意,由于资源限制,以上最终的result并没有运行完全,可以看到project目录下还有一部分gpkg 因为输出文件大于1g的限制,还请有兴趣的在自己的电脑运行,根据相应资源修改参数

另外gpkg可以使用geopandas转为为需要的shp

In [ ]:

代码语言:javascript复制
代码语言:javascript复制
import geopandas as gpd
import pandas as pd

# GeoPackage文件列表
gpkg_files = ['path/to/your/first_file.gpkg', 'path/to/your/second_file.gpkg']

# 读取所有GeoPackage文件到GeoDataFrame列表中
gdf_list = [gpd.read_file(gpkg) for gpkg in gpkg_files]

# 合并GeoDataFrame对象
merged_gdf = pd.concat(gdf_list, ignore_index=True)

# 指定输出Shapefile的路径
output_shp_path = 'path/to/your/output_file.shp'

# 将合并后的GeoDataFrame保存为Shapefile
merged_gdf.to_file(output_shp_path, driver='ESRI Shapefile')

print(f"合并后的Shapefile已保存至:{output_shp_path}")
代码语言:javascript复制

点击链接可查看完整代码与在线运行代码

0 人点赞