终于弄清怎么在rxpy中使用flat_map调用协程了,直接上代码
代码语言:javascript复制import asyncio
from rx import Observable
def warp_future(func):
def inner(arg):
future = asyncio.ensure_future(func(arg))
return Observable.from_future(future)
return inner
async def main():
async def add(args):
return args[0] args[1]
s1 = Observable.of(1, 3, 5)
s2 = Observable.of(2, 4, 6)
stream = Observable.zip_array(s1, s2)
.flat_map(warp_future(add))
.where(lambda x: x > 3)
.map(lambda x: print(x))
await stream
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()