python:rxpy 使用async协程

2019-11-22 00:15:44 浏览数 (1)

终于弄清怎么在rxpy中使用flat_map调用协程了,直接上代码

代码语言:javascript复制
import asyncio

from rx import Observable


def warp_future(func):
    def inner(arg):
        future = asyncio.ensure_future(func(arg))
        return Observable.from_future(future)

    return inner


async def main():
    async def add(args):
        return args[0]   args[1]

    s1 = Observable.of(1, 3, 5)
    s2 = Observable.of(2, 4, 6)

    stream = Observable.zip_array(s1, s2) 
        .flat_map(warp_future(add)) 
        .where(lambda x: x > 3) 
        .map(lambda x: print(x))
    await stream


loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

0 人点赞