阅读(4904) (0)

Tornado 协程队列

2022-03-10 11:57:46 更新

协程的异步队列。 这些类与标准库的 ​asyncio包中提供的类非常相似。

注意:

与标准库的 ​queue模块不同,这里定义的类不是线程安全的。 要从另一个线程使用这些队列,请在调用任何队列方法之前使用 ​IOLoop.add_callback​ 将控制权转移到 ​IOLoop线程。

队列

class tornado.queues.Queue(maxsize: int = 0)

协调生产者和消费者协程。

如果 ​maxsize为 0(默认值),则队列大小是无限的。

from tornado import gen
from tornado.ioloop import IOLoop
from tornado.queues import Queue

q = Queue(maxsize=2)

async def consumer():
    async for item in q:
        try:
            print('Doing work on %s' % item)
            await gen.sleep(0.01)
        finally:
            q.task_done()

async def producer():
    for item in range(5):
        await q.put(item)
        print('Put %s' % item)

async def main():
    # Start consumer without waiting (since it never finishes).
    IOLoop.current().spawn_callback(consumer)
    await producer()     # Wait for producer to put all tasks.
    await q.join()       # Wait for consumer to finish all tasks.
    print('Done')

IOLoop.current().run_sync(main)

结果如下:

Put 0
Put 1
Doing work on 0
Put 2
Doing work on 1
Put 3
Doing work on 2
Put 4
Doing work on 3
Doing work on 4
Done

在没有原生协程的 Python 版本中(3.5 之前),​consumer()​ 可以写成:

@gen.coroutine
def consumer():
    while True:
        item = yield q.get()
        try:
            print('Doing work on %s' % item)
            yield gen.sleep(0.01)
        finally:
            q.task_done()

maxsize

队列中允许的项目数。

qsize() → int

队列中的项目数。

put(item: _T, timeout: Union[float, datetime.timedelta, None] = None) → Future[None]

将一个项目放入队列中,也许等到有空间。

返回一个 ​Future​,它会在超时后引发 ​tornado.util.TimeoutError​。

timeout可以是一个表示时间的数字(与 ​tornado.ioloop.IOLoop.time​ 的比例相同,通常是 ​time.time​),或者是相对于当前时间的截止日期的 ​datetime.timedelta​ 对象。

put_nowait(item: _T) → None

将一个项目放入队列而不阻塞。

如果没有立即可用的空闲槽,则提高 ​QueueFull​。

get(timeout: Union[float, datetime.timedelta, None] = None) → Awaitable[_T]

从队列中移除并返回一个项目。

返回一个等待项目,一旦项目可用就解决,或在超时后引发 ​tornado.util.TimeoutError​。

timeout可以是一个表示时间的数字(与 ​tornado.ioloop.IOLoop.time​ 的比例相同,通常是​time.time​),或者是相对于当前时间的截止日期的 ​datetime.timedelta​ 对象。

注意:

该方法的 ​timeout参数与标准库的 ​queue.Queue.get​ 不同。 该方法将数值解释为相对超时; 这将它们解释为绝对截止日期,并且需要 ​timedelta对象用于相对超时(与 Tornado 中的其他超时一致)。

get_nowait() → _T

从队列中移除并返回一个项目而不阻塞。

如果一个项目立即可用,则返回一个项目,否则引发 ​QueueEmpty​。

task_done() → None

指示以前排队的任务已完成。

由队列消费者使用。 对用于获取任务的每个 ​get ​,对 ​task_done的后续调用会告诉队列该任务的处理已完成。

如果一个连接被阻塞,它会在所有项目都被处理后恢复; 也就是说,当每个 ​put​都与 ​task_done​匹配时。

如果调用次数多于 ​put​,则引发 ​ValueError​。

join(timeout: Union[float, datetime.timedelta, None] = None) → Awaitable[None]

阻塞直到队列中的所有项目都处理完毕。

返回一个 ​awaitable​,它在超时后引发 ​tornado.util.TimeoutError​。

优先队列

class tornado.queues.PriorityQueue(maxsize: int = 0)

按优先级顺序检索条目的队列,最低优先。

条目通常是元组,如(​priority number​, ​data​)。

from tornado.queues import PriorityQueue

q = PriorityQueue()
q.put((1, 'medium-priority item'))
q.put((0, 'high-priority item'))
q.put((10, 'low-priority item'))

print(q.get_nowait())
print(q.get_nowait())
print(q.get_nowait())

结果如下:

(0, 'high-priority item')
(1, 'medium-priority item')
(10, 'low-priority item')

队列

class tornado.queues.LifoQueue(maxsize: int = 0)

队列首先检索最近放置的项目。

from tornado.queues import LifoQueue

q = LifoQueue()
q.put(3)
q.put(2)
q.put(1)

print(q.get_nowait())
print(q.get_nowait())
print(q.get_nowait())

结果如下:

1
2
3

队列空

exception tornado.queues.QueueEmpty

当队列没有项目时由 ​Queue.get_nowait​ 引发。

队列满

exception tornado.queues.QueueFull

当队列达到最大大小时由 ​Queue.put_nowait​ 引发。