Python 自带异步队列的大坑

2020-06-04 16:55:25 浏览数 (1)

我们在使用 Python 的 asyncio 写异步程序的时候,可能会使用asyncio.Queue来实现一个异步队列,通过它来让生产者和消费者进行通信。

但如果你的异步队列没有填写maxsize参数,那么可能会产生让你意料之外的结果。我们来看一段代码:

代码语言:javascript复制
import asyncio
import random
import aiohttp


async def producer(queue):
    for _ in range(10):
        sleep_time = random.randint(1, 2)
        await queue.put(sleep_time)


async def consumer(queue):
    while True:
        sleep_time = await queue.get()
        size = queue.qsize()
        print(f'当前队列有:{size} 个元素')
        url = f'http://httpbin.org/delay/{sleep_time}'
        async with aiohttp.ClientSession() as client:
            resp = await client.get(url)
            print(await resp.json())

async def main():
    queue = asyncio.Queue()
    asyncio.create_task(producer(queue))
    con = asyncio.create_task(consumer(queue))
    await con


asyncio.run(main())

这段代码把 producerconsumer分别创建成异步任务,期望实现的效果是生产者不停生产数据放进异步队列,消费者不停从队列读取数据,然后发起网络请求。生产者与消费者利用 IO 等待时间实现并行。

但如果你运行一下这段代码,你会发现一件很奇怪的事情,如下图所示:

当我们的消费者开始消费的时候,队列里面实际上已经有10条数据了!由于图中代码第19行是先读取了一条数据再打印剩余的数量,所以打印的是当前队列有:9 个元素

所以,生产者与消费者根本没有并行。是生产者里面的循环完全运行完成了,才开始运行的消费者!

如果在实际代码中,你的生产者生产了几百万条数据,那么此时所有数据全部都堆放在异步队列里面,很容易就把你的内存撑爆了!

那么这个问题要如何解决呢?实际上非常简单,使用maxsize参数指定异步队列的大小:

代码语言:javascript复制
queue = asyncio.Queue(maxsize=3)

我们这里设定为3,再运行看看效果:

看到这里,可能有人会说,这仅仅是生产者先把异步队列堆满,才能进行消费,并没有什么本质区别啊,本质上还是先只有生产者运行,等他跑不动了(队列满了),消费者才能运行,还是没有实现并行啊。

这是由于,在上面的例子中,生产者的速度远远超过消费者的速度,所以才会出现生产者总是堆满队列的问题。

为了说明生产者和消费者能真正利用 IO 等待时间进行并行,我们改一下代码:

代码语言:javascript复制
import asyncio
import random
import aiohttp


async def producer(queue):
    for i in range(10):
        await queue.put(i)
        await asyncio.sleep(random.randint(1, 3))


async def consumer(queue):
    while True:
        sleep_time = await queue.get()
        size = queue.qsize()
        print(f'当前队列有:{size} 个元素')
        url = 'http://httpbin.org/delay/2'
        async with aiohttp.ClientSession() as client:
            resp = await client.get(url)
            print(await resp.json())

async def main():
    queue = asyncio.Queue(maxsize=3)
    asyncio.create_task(producer(queue))
    con = asyncio.create_task(consumer(queue))
    await con


asyncio.run(main())

生产者生产数据后,随机休眠1-3秒。而消费者请求的网址总是2秒返回数据。这样一来,有时候生产者快,有时候消费者快。我们来看看运行效果:

可以看到,当生产者快的时候,异步队列里面的数据就会堆积,当消费者快的时候,异步队列里面的数据就会变少。说明生产者与消费者实现了利用 IO 等待时间进行并行操作。

0 人点赞