在本节中,我们使用 Dask 和 dask.delayed
并行化简单的 for 循环样例代码。通常,这是将函数转换为与 Dask 一起使用所需的唯一函数。
这是使用 dask
并行化现有代码库或构建复杂系统的一种简单方法。这也将有助于我们对后面的部分进行理解。
相关文档
- Delayed documentation
- Delayed screencast
- Delayed API
- Delayed examples
- Delayed best practices
正如我们将在分布式调度器笔记本中看到的,Dask 有多种并行执行代码的方法。我们将通过创建 dask.distributed.Client
来使用分布式调度器。现在,这将为我们提供一些不错的诊断。稍后我们将深入讨论调度器。
from dask.distributed import Client
client = Client(n_workers=4)
基础
首先让我们创建一些玩具函数,inc
和 add
,它们会休眠一段时间来模拟工作。然后我们将正常运行这些函数。
在下一节中,我们将并行化此代码。
代码语言:javascript复制from time import sleep
def inc(x):
sleep(1)
return x 1
def add(x, y):
sleep(1)
return x y
我们使用 %%time
magic 指令来计时这段普通代码的执行时间,这是 Jupyter Notebook 的一个特殊功能。
%%time
# 这需要三秒钟才能运行,因为我们依次调用每个函数,一个接一个
x = inc(1)
y = inc(2)
z = add(x, y)
代码语言:javascript复制Wall time: 3.02 s
使用 dask.delayed
装饰器并行化
两个 inc
调用可以并行调用,因为它们完全相互独立。
我们将使用 dask.delayed
函数转换 inc
和 add
函数。当我们通过传递参数调用延迟版本时,与以前完全一样,原始函数实际上还没有被调用 —— 这就是单元执行很快完成的原因。相反,会生成一个延迟对象,它会跟踪要调用的函数和要传递给它的参数。
from dask import delayed
代码语言:javascript复制%%time
# 这会立即运行,它所做的只是构建一个图
x = delayed(inc)(1)
y = delayed(inc)(2)
z = delayed(add)(x, y)
代码语言:javascript复制Wall time: 1e 03 µs
上述代码立即运行,因为还没有真正发生任何事情。
要获得结果,请调用 compute
。请注意,这比原始代码运行得更快。
%%time
# 实际上使用本地线程池运行我们的计算
z.compute()
代码语言:javascript复制Wall time: 2.05 s
5
刚才发生了什么?
z
对象是一个惰性 Delayed
对象。这个对象包含我们计算最终结果所需的一切,包括对所有所需函数的引用,以及它们的输入和相互之间的关系。我们可以使用上面的 .compute()
评估结果,或者我们可以使用 .visualize()
可视化此值的任务图。
z
代码语言:javascript复制Delayed('add-25aea027-2aa1-4253-9eb7-962a7d804914')
查看 z
的任务图
z.visualize()
请注意,这包括之前的函数名称,以及 inc
函数输出到 add
输入的逻辑流。
一些需要考虑的问题
为什么我们从 3s 变成了 2s?为什么我们不能并行化到 1s?
如果 inc
和 add
函数不包括 sleep(1)
会发生什么?Dask 还能加速这段代码吗?
不会加速
代码语言:javascript复制def inc_v2(x):
return x 1
def add_v2(x, y):
return x y
代码语言:javascript复制%%time
x = inc_v2(1)
y = inc_v2(2)
z = add_v2(x, y)
z
代码语言:javascript复制Wall time: 0 ns
5
代码语言:javascript复制x = delayed(inc_v2)(1)
y = delayed(inc_v2)(2)
z = delayed(add_v2)(x, y)
代码语言:javascript复制%%time
z.compute()
代码语言:javascript复制Wall time: 24 ms
5
如果我们有多个输出或者还想访问 x
或 y
怎么办?
练习:并行化 for
循环
for
循环是我们想要并行化的最常见的事情之一。在 inc
和 sum
上使用 dask.delayed
并行化以下计算。
串行代码
代码语言:javascript复制data = [1, 2, 3, 4, 5, 6, 7, 8]
代码语言:javascript复制%%time
# 串行代码
results = []
for x in data:
y = inc(x)
results.append(y)
total = sum(results)
代码语言:javascript复制Wall time: 8.05 s
代码语言:javascript复制total
代码语言:javascript复制44
并行代码
代码语言:javascript复制%%time
for x in data:
y = delayed(inc)(x)
results.append(y)
total = delayed(sum)(results)
print("Before computing:", total) # 查看 total 的类型
result = total.compute()
print("After computing:", result) # 计算之后
代码语言:javascript复制Before computing: Delayed('sum-492662c6-3934-408a-beea-763b4f421a40')
After computing: 88
Wall time: 1.04 s
与直接使用 sum
函数而不是延迟包装的版本相比,图形可视化与给定的解决方案相比如何?你能解释一下后面的版本吗?您可能会发现以下表达式的结果很有启发性
delayed(inc)(1) delayed(inc)(2)
代码语言:javascript复制z = delayed(inc)(1) delayed(inc)(2) delayed(inc)(3)
z.visualize()
代码语言:javascript复制z = delayed(sum)(delayed(inc)(1), delayed(inc)(2), delayed(inc)(3))
z.visualize()
练习:并行化带有流程控制的 for 循环代码
通常我们只想延迟一些函数,立即运行其中的几个。当这些函数速度很快时,这尤其有用,并帮助我们确定应该调用哪些其他较慢的函数。这个决定,延迟还是不延迟,通常是我们在使用 dask.delayed
时需要深思熟虑的地方。
在下面的示例中,我们遍历输入列表。如果输入是偶数,那么我们想调用 inc
。如果输入是奇数,那么我们要调用 double
。必须立即(而不是懒惰地)做出调用 inc
或 double
的 is_even
决定,以便我们的图形构建 Python 代码继续进行。
def double(x):
sleep(1)
return 2 * x
def is_even(x):
return not x % 2
data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
串行代码
代码语言:javascript复制%%time
results = []
for x in data:
if is_even(x):
y = double(x)
else:
y = inc(x)
results.append(y)
total = sum(results)
print(total)
代码语言:javascript复制90
Wall time: 10.1 s
并行版本
代码语言:javascript复制%%time
results = []
for x in data:
if is_even(x):
y = delayed(double)(x)
else:
y = delayed(inc)(x)
results.append(y)
total = delayed(sum)(results)
print(total)
代码语言:javascript复制Delayed('sum-f5af7db2-ff32-4186-af6c-2106e51a7341')
Wall time: 999 µs
代码语言:javascript复制%time total.compute()
代码语言:javascript复制Wall time: 2.04 s
90
代码语言:javascript复制total.visualize()
一些需要考虑的问题
我们不能使用延迟的其他控制流示例是什么?
如果我们在上面的例子中延迟了 is_even(x)
的计算会发生什么?
你对延迟 sum()
有什么看法?这个函数既是计算又运行快速。
创建数据
运行此代码以准备一些数据。
这将下载并提取 1990 年至 2000 年间从纽约出发的航班的一些历史航班数据。数据最初来自此处。
代码语言:javascript复制%run prep.py -d flights
查看数据
代码语言:javascript复制import pathlib
sorted(pathlib.Path("data", "nycflights").iterdir())
代码语言:javascript复制[WindowsPath('data/nycflights/1990.csv'),
WindowsPath('data/nycflights/1991.csv'),
WindowsPath('data/nycflights/1992.csv'),
WindowsPath('data/nycflights/1993.csv'),
WindowsPath('data/nycflights/1994.csv'),
WindowsPath('data/nycflights/1995.csv'),
WindowsPath('data/nycflights/1996.csv'),
WindowsPath('data/nycflights/1997.csv'),
WindowsPath('data/nycflights/1998.csv'),
WindowsPath('data/nycflights/1999.csv')]
使用 pandas.read_csv
读取一个文件,并计算平均起飞延误
import pandas as pd
df = pd.read_csv(pathlib.Path("data", "nycflights", "1990.csv"))
df.head()
数据模式
代码语言:javascript复制df.dtypes
代码语言:javascript复制Year int64
Month int64
DayofMonth int64
DayOfWeek int64
DepTime float64
CRSDepTime int64
ArrTime float64
CRSArrTime int64
UniqueCarrier object
FlightNum int64
TailNum float64
ActualElapsedTime float64
CRSElapsedTime int64
AirTime float64
ArrDelay float64
DepDelay float64
Origin object
Dest object
Distance float64
TaxiIn float64
TaxiOut float64
Cancelled int64
Diverted int64
dtype: object
数据中有哪些始发机场
代码语言:javascript复制df.Origin.unique()
代码语言:javascript复制array(['EWR', 'LGA', 'JFK'], dtype=object)
每个机场平均起飞延误
代码语言:javascript复制df.groupby("Origin").DepDelay.mean()
代码语言:javascript复制Origin
EWR 9.168411
JFK 11.857274
LGA 8.560045
Name: DepDelay, dtype: float64
串行代码:每个机场平均起飞延误
上述单元格计算每个机场一年的平均起飞延误。在这里,我们使用顺序 for 循环将其扩展到所有年份。
代码语言:javascript复制import pathlib
filenames = sorted(pathlib.Path("data", "nycflights").glob("*.csv"))
filenames
代码语言:javascript复制[WindowsPath('data/nycflights/1990.csv'),
WindowsPath('data/nycflights/1991.csv'),
WindowsPath('data/nycflights/1992.csv'),
WindowsPath('data/nycflights/1993.csv'),
WindowsPath('data/nycflights/1994.csv'),
WindowsPath('data/nycflights/1995.csv'),
WindowsPath('data/nycflights/1996.csv'),
WindowsPath('data/nycflights/1997.csv'),
WindowsPath('data/nycflights/1998.csv'),
WindowsPath('data/nycflights/1999.csv')]
代码语言:javascript复制%%time
sums = []
counts = []
for fn in filenames:
# 读取文件
df = pd.read_csv(fn)
# 按起飞机场分组
by_origin = df.groupby("Origin")
# 按起飞机场计算所有起飞延误和
total = by_origin.DepDelay.sum()
# 按机场汇总航班数
count = by_origin.DepDelay.count()
# 保存中间结果
sums.append(total)
counts.append(count)
# 组合中间结果得到全部 mean-delay-per-origin
total_delays = sum(sums)
n_flights = sum(counts)
mean = total_delays / n_flights
代码语言:javascript复制Wall time: 9.92 s
代码语言:javascript复制mean
代码语言:javascript复制Origin
EWR 10.295469
JFK 10.351299
LGA 7.431142
Name: DepDelay, dtype: float64
并行化
使用 dask.delayed 并行化上面的代码。需要知道一些额外的事情。
- 延迟对象上的方法和属性访问会自动工作,因此如果您有一个延迟对象,您可以对其执行正常的算术、切片和方法调用,它将产生正确的延迟调用。
x = delayed(np.arange)(10)
y = (x 1)[::2].sum() # 所有计算都被延迟
- 当您只有一个输出时,调用
.compute()
方法效果很好。当您有多个输出时,您可能需要使用dask.compute
函数:
>>> from dask import compute
>>> x = delayed(np.arange)(10)
>>> y = x ** 2
>>> min_, max_ = compute(y.min(), y.max())
>>> min_, max_
(0, 81)
这样 Dask 就可以共享中间值 (比如 y = x**2
)
因此,您的目标是使用 dask.delayed
并行化上面的代码 (已在下面复制)。您可能还想对一些计算进行可视化,看看您是否正确地进行了计算。
from dask import compute
代码语言:javascript复制%%time
sums = []
counts = []
for fn in filenames:
# 读取文件
df = delayed(pd.read_csv)(fn)
# 按起飞机场分组
by_origin = df.groupby("Origin")
# 按起飞机场计算所有起飞延误和
total = by_origin.DepDelay.sum()
# 按机场汇总航班数
count = by_origin.DepDelay.count()
# 保存中间结果
sums.append(total)
counts.append(count)
# 组合中间结果得到全部 mean-delay-per-origin
sums, counts = compute(sums, counts)
total_delays = sum(sums)
n_flights = sum(counts)
mean = total_delays / n_flights
代码语言:javascript复制Wall time: 2.55 s
代码语言:javascript复制mean
代码语言:javascript复制Origin
EWR 10.295469
JFK 10.351299
LGA 7.431142
Name: DepDelay, dtype: float64
一些需要考虑的问题
你得到了多少加速?这是您期望的加速程度吗?
尝试在何处调用 compute
。当你在 sum
和 counts
上使用时会发生什么?如果你等待并在 mean
上调用会发生什么?
mean
上使用 compute
sums = []
counts = []
for fn in filenames:
# 读取文件
df = delayed(pd.read_csv)(fn)
# 按起飞机场分组
by_origin = df.groupby("Origin")
# 按起飞机场计算所有起飞延误和
total = by_origin.DepDelay.sum()
# 按机场汇总航班数
count = by_origin.DepDelay.count()
# 保存中间结果
sums.append(total)
counts.append(count)
# 组合中间结果得到全部 mean-delay-per-origin
total_delays = delayed(sum)(sums)
n_flights = delayed(sum)(counts)
mean = delayed(lambda a, b: a/b)(total_delays, n_flights)
代码语言:javascript复制mean.visualize()
代码语言:javascript复制%%time
mean = mean.compute()
代码语言:javascript复制Wall time: 1.98 s
尝试延迟 sum
调用。如果 sum
延迟,图形会是什么样子?如果不是,图表会是什么样子?
sum
上使用 compute
sums = []
counts = []
for fn in filenames:
# 读取文件
df = delayed(pd.read_csv)(fn)
# 按起飞机场分组
by_origin = df.groupby("Origin")
# 按起飞机场计算所有起飞延误和
total = by_origin.DepDelay.sum()
# 按机场汇总航班数
count = by_origin.DepDelay.count()
# 保存中间结果
sums.append(total)
counts.append(count)
# 组合中间结果得到全部 mean-delay-per-origin
total_delays = delayed(sum)(sums)
n_flights = delayed(sum)(counts)
代码语言:javascript复制from dask import visualize
visualize(total_delays, n_flights)
代码语言:javascript复制%%time
total_delays, n_flights = compute(total_delays, n_flights)
mean = total_delays / n_flights
代码语言:javascript复制Wall time: 2.12 s
原始版本
代码语言:javascript复制sums = []
counts = []
for fn in filenames:
# 读取文件
df = delayed(pd.read_csv)(fn)
# 按起飞机场分组
by_origin = df.groupby("Origin")
# 按起飞机场计算所有起飞延误和
total = by_origin.DepDelay.sum()
# 按机场汇总航班数
count = by_origin.DepDelay.count()
# 保存中间结果
sums.append(total)
counts.append(count)
代码语言:javascript复制visualize(sums, counts)
你能想出你想要以一种方式减少另一种方式的任何原因吗?
学习更多
访问 Delayed documentation。特别是,(delayed screencast 将强化您在此处学到的概念,delayed best practices 文档收集了有关如何使用 dask.delayed
的建议。
关闭客户端
在继续下一个练习之前,请确保关闭您的客户端或停止此内核。
代码语言:javascript复制client.close()
参考
dask-tutorial
https://github.com/dask/dask-tutorial
Dask 教程
- 简介
- 延迟执行
相关文章
使用 Dask 并行抽取站点数据
题图由 Jan-Christoph Horn 在 Pixabay 上发布。