文章目录- 1. 线程与协程对比
- 2. 使用 asyncio 和 aiohttp 下载
- 3. 避免阻塞型调用
- 4. 使用 asyncio.as_completed
- 5. 使用Executor对象,防止阻塞事件循环
- 6. 从回调到期物和协程
learn from 《流畅的python》
1. 线程与协程对比
threading
代码语言:javascript复制import threading
import itertools
import time
import sys
class Signal:
go = True
def spin(msg, signal):
write, flush = sys.stdout.write, sys.stdout.flush
for char in itertools.cycle("|/-\"): # 无限循环
status = char ' ' msg
write(status)
flush()
write("x08" * len(status)) # x08 退格键,光标移动回去
time.sleep(0.1)
if not signal.go:
break
write(' ' * len(status) "x08" * len(status))
# 使用空格清除状态消息,把光标移回开头
def slow_function(): # 假设是一个耗时的计算过程
time.sleep(10) # sleep 会阻塞主线程,释放GIL,创建从属线程
return 42
def supervisor(): # 该函数,设置从属线程,显示线程对象,运行耗时的计算,最后杀死线程
signal = Signal()
spinner = threading.Thread(target=spin, args=("thinking!", signal))
print("spinner object:", spinner) # 显示从属线程对象
spinner.start() # 启动从属线程
result = slow_function() # 运行计算程序,阻塞主线程,从属线程动画显示旋转指针
signal.go = False # 改变signal 状态,终止 spin 中的for循环
spinner.join() # 等待spinner线程结束
return result
def main():
result = supervisor() # 运行 supervisor
print("Answer:", result)
if __name__ == '__main__':
main()
适合 asyncio
的协程要由调用方驱动,并由调用方通过 yield from
调用(语法过时了,新版的用 async / await )
或者把协程传给 asyncio
包中的某个函数
一篇博文参考:https://www.cnblogs.com/dhcn/p/9032461.html
代码语言:javascript复制import asyncio
import itertools
import sys
# https://docs.python.org/3.8/library/asyncio.html
async def spin(msg): # py3.5以后的新语法 async / await,协程函数
write, flush = sys.stdout.write, sys.stdout.flush
for char in itertools.cycle("|/-\"): # 无限循环
status = char ' ' msg
write(status)
flush()
write("x08" * len(status)) # x08 退格键,光标移动回去
try:
await asyncio.sleep(0.1)
except asyncio.CancelledError: # 遇到取消异常,退出循环
print("cancel")
break
write(' ' * len(status) "x08" * len(status))
print("end spin")
async def slow_function(): # 协程函数
print("start IO")
await asyncio.sleep(3) # 假装进行 IO 操作
print("end IO ")
return 42
async def supervisor(): # 协程函数
spinner = asyncio.ensure_future(spin("thinking!")) # spinner 排定任务
print("spinner object:", spinner) # 显示从属线程对象
# spinner object: <Task pending coro=<spin() running at D: >
print("start slow")
result = await slow_function()
print("end slow")
spinner.cancel() # task对象可以取消,抛出CancelledError异常
return result
def main():
loop = asyncio.get_event_loop() # 获取事件循环的引用
result = loop.run_until_complete(supervisor()) # 驱动 supervisor 协程,让它运行完毕
loop.close()
print("answer:", result)
if __name__ == '__main__':
main()
输出:
代码语言:javascript复制spinner object: <Task pending coro=<spin() running at D:gitcode >
start slow
start IO
end IO ng!(期间thinking!在输出,后来被覆盖)
end slow
cancel
end spin
answer: 42
请按任意键继续. . .
2. 使用 asyncio 和 aiohttp 下载
代码语言:javascript复制import time
import sys
import os
import asyncio
import aiohttp
POP20_CC = ('CN IN US ID BR PK NG BD RU JP ' 'MX PH VN ET EG DE IR TR CD FR').split()
BASE_URL = 'http://flupy.org/data/flags'
DEST_DIR = './'
def save_flag(img, filename): # 保存图像
path = os.path.join(DEST_DIR, filename)
with open(path, 'wb') as fp:
fp.write(img)
def show(text): # 打印信息
print(text, end=' ')
sys.stdout.flush()
async def get_flag(cc): # 获取图像
url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc=cc.lower())
async with aiohttp.request("GET", url) as resp:
image = await resp.read()
return image
async def download_one(cc):
image = await get_flag(cc)
show(cc)
save_flag(image, cc.lower() '.gif')
return cc
def download_many_(cc_list):
loop = asyncio.get_event_loop()
todo = [download_one(cc) for cc in sorted(cc_list)] # 协程对象
wait_coro = asyncio.wait(todo) # 包装成 task,wait是协程函数,返回协程或者生成器对象
res, _ = loop.run_until_complete(wait_coro)
# 驱动协程,返回 第一个元素是一系列结束的期物,第二个元素是一系列未结束的期物
# loop.close(),好像不需要这句 上面 with 处可能自动关闭了
return len(res)
def main(download_many):
t0 = time.time()
count = download_many(POP20_CC)
elapsed = time.time() - t0
msg = 'n{} flags downloaded in {:.2f}s'
print(msg.format(count, elapsed)) # 计时信息
if __name__ == '__main__':
main(download_many_)
# US RU ID ET BR FR CN PH BD NG DE JP EG TR MX IN PK IR CD VN
# 20 flags downloaded in 3.88s
3. 避免阻塞型调用
执行硬盘
或网络 I/O
操作的函数定义为 阻塞型函数
有两种方法能 避免阻塞型调用 中止整个应用程序 的进程:
- 在单独的线程中运行各个阻塞型操作
- 把每个阻塞型操作 转换成非阻塞的异步调用 使用
4. 使用 asyncio.as_completed
代码语言:javascript复制import collections
import time
import sys
import os
import asyncio
from http import HTTPStatus
import aiohttp
from aiohttp import web
import tqdm
POP20_CC = ('CN IN US ID BR PK NG BD RU JP ' 'MX PH VN ET EG DE IR TR CD FR').split()
BASE_URL = 'http://flupy.org/data/flags'
DEST_DIR = './'
DEFAULT_CONCUR_REQ = 5
MAX_CONCUR_REQ = 1000
class FetchError(Exception):
def __init__(self, country_code):
self.country_code = country_code
def save_flag(img, filename): # 保存图像
path = os.path.join(DEST_DIR, filename)
with open(path, 'wb') as fp:
fp.write(img)
def show(text): # 打印信息
print(text, end=' ')
sys.stdout.flush()
async def get_flag(cc): # 获取图像
url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc=cc.lower())
async with aiohttp.request("GET", url) as resp:
if resp.status == 200:
image = await resp.read()
return image
elif resp.status == 404:
raise web.HTTPNotFound()
else:
raise aiohttp.WebSocketError(code=resp.status, message=resp.reason)
async def download_one(cc, semaphore, verbose):
try:
async with semaphore:
image = await get_flag(cc)
except web.HTTPNotFound:
status = HTTPStatus.NOT_FOUND
msg = "not found"
except Exception as exc:
raise FetchError(cc) from exc
else:
save_flag(image, cc.lower() '.gif')
status = HTTPStatus.OK
msg = "OK"
if verbose and msg:
print(cc, msg)
return (status, cc)
async def downloader_coro(cc_list, verbose, concur_req): # 协程函数
counter = collections.Counter()
semaphore = asyncio.Semaphore(value=concur_req) # 最多可以使用这个计数器的协程个数
todo = [download_one(cc, semaphore, verbose=True) for cc in sorted(cc_list)] # 协程对象列表
todo_iter = asyncio.as_completed(todo) # 获取迭代器,会在期物运行结束后返回期物
if not verbose:
todo_iter = tqdm.tqdm(todo_iter, total=len(cc_list)) # 迭代器传给tqdm,显示进度条
for future in todo_iter: # 迭代器运行结束的期物
try:
res = await future # 获取期物对象的结果
except FetchError as exc:
country_code = exc.country_code
try:
error_msg = exc.__cause__.args[0]
except IndexError:
error_msg = exc.__cause__.__class__.__name__
if verbose and error_msg:
msg = '*** Error for {}: {}'
print(msg.format(country_code, error_msg))
status = HTTPStatus.error
else:
status = res[0]
counter[status] = 1 # 记录结果
return counter # 返回计数器
def download_many_(cc_list, verbose, concur_req):
loop = asyncio.get_event_loop()
coro = downloader_coro(cc_list, verbose=verbose, concur_req=concur_req)
# 实例化 downloader_coro协程,然后通过 run_until_complete 方法把它传给事件循环
counts = loop.run_until_complete(coro)
# loop.close() # 好像不需要这句 上面 with 处可能自动关闭了
return counts
def main(download_many):
t0 = time.time()
count = download_many(POP20_CC, True, MAX_CONCUR_REQ)
elapsed = time.time() - t0
msg = 'n{} flags downloaded in {:.2f}s'
print(msg.format(count, elapsed)) # 计时信息
if __name__ == '__main__':
main(download_many_)
5. 使用Executor对象,防止阻塞事件循环
loop.run_in_executor
方法把阻塞的作业(例如保存文件)委托给线程池做
async def download_one(cc, semaphore, verbose):
try:
async with semaphore:
image = await get_flag(cc)
except web.HTTPNotFound:
status = HTTPStatus.NOT_FOUND
msg = "not found"
except Exception as exc:
raise FetchError(cc) from exc
else:
# 因此保存文件时,整个应用程序都会冻结,为了避免,使用下面方法
loop = asyncio.get_event_loop() # 获取事件循环对象的引用
loop.run_in_executor(None, # 方法的第一个参数是 Executor 实例;
# 如果设为 None,使用事件循环的默认 ThreadPoolExecutor 实例
save_flag, image, cc.lower() ".gif")
# 余下的参数是可调用的对象,以及可调用对象的位置参数
status = HTTPStatus.OK
msg = "OK"
if verbose and msg:
print(cc, msg)
return (status, cc)
6. 从回调到期物和协程
- 如果一个操作需要依赖之前操作的结果,那就得嵌套回调
def stage1(response1):
request2 = step1(response1)
api_call2(request2, stage2)
def stage2(response2):
request3 = step2(response2)
api_call3(request3, stage3)
def stage3(response3):
tep3(response3)
api_call1(request1, stage1)
好的写法:
代码语言:javascript复制async def three_stages(request1):
response1 = await api_call1(request1)
# 第一步
request2 = step1(response1)
response2 = await api_call2(request2)
# 第二步
request3 = step2(response2)
response3 = await api_call3(request3)
# 第三步
step3(response3)
loop.create_task(three_stages(request1)) # 必须显式调度执行
协程 必须使用 事件循环 显式排定 协程的执行时间
异步系统 能 避免用户级线程的开销,这是它能比多线程系统管理更多并发连接的主要原因