终结python协程----从yield到actor模型的实现

2023-10-19 19:16:35 浏览数 (1)

把应用程序的代码分为多个代码块,正常情况代码自上而下顺序执行。如果代码块A运行过程中,能够切换执行代码块B,又能够从代码块B再切换回去继续执行代码块A,这就实现了协程

我们知道线程的调度(线程上下文切换)是由操作系统决定的,当一个线程启动后,什么时候占用CPU、什么时候让出CPU,程序员都无法干涉。假设现在启动4个线程,CPU线程时间片为 5 毫秒,也就是说,每个线程每隔5ms就让出CPU,让其他线程抢占CPU。可想而知,等4个线程运行结束,要进行多少次切换?

如果我们能够自行调度自己写的程序,让一些代码块遇到IO操作时,切换去执行另外一些需要CPU操作的代码块,是不是节约了很多无畏的上下文切换呢?是的,协程就是针对这一情况而生的。我们把写好的一个应用程序分为很多个代码块,如下图所示:

把应用程序的代码分为多个代码块,正常情况代码自上而下顺序执行。如果代码块A运行过程中,能够切换执行代码块B,又能够从代码块B再切换回去继续执行代码块A,这就实现了协程(通常是遇到IO操作时切换才有意义)。示意图如下:

所以,关于协程可以总结以下两点:

(1)线程的调度是由操作系统负责,协程调度是程序自行负责。

(2)与线程相比,协程减少了无畏的操作系统切换。

实际上当遇到IO操作时做切换才更有意义,(因为IO操作不用占用CPU),如果没遇到IO操作,按照时间片切换,无意义。

python中的yield 关键字用来实现生成器,但是生成器在一定的程度上与协程其实也是差不多。我们来看个例子:

代码语言:javascript复制
def sayHello(n):
    while n > 0:
        print("hello~", n)
        yield n
        n -= 1
    print('say hello')

    
if __name__ == "__main__":
    sayHello(5)  # 测试1
    # next(sayHello(5))  # 测试2
    
    # 测试3
    # for i in sayHello(5):
    #     pass

挨个测试,你会发现第一个测试是不能通过的,什么都不会输出,这就是我们的生成器特性了,一旦函数内部有yield关键字,此函数就是生成器,只有调用next 或是 for之类的能够迭代的才能够使得生成器执行。那么这与我们的协程有什么关系呢?请看代码:

代码语言:javascript复制
from collections import deque
 
def sayHello(n):
    while n > 0:
        print("hello~", n)
        yield n
        n -= 1
    print('say hello')
 
def sayHi(n):
    x = 0
    while x < n:
        print('hi~', x)
        yield
        x  = 1
    print("say hi")
 
# 使用yield语句,实现简单任务调度器
class TaskScheduler(object):
    def __init__(self):
        self._task_queue = deque()
 
    def new_task(self, task):
        '''
        向调度队列添加新的任务
        '''
        self._task_queue.append(task)
 
    def run(self):
        '''
        不断运行,直到队列中没有任务
        '''
        while self._task_queue:
            task = self._task_queue.popleft()
            try:
                next(task)
                self._task_queue.append(task)
            except StopIteration:
                # 生成器结束
                pass


if __name__ == "__main__":
    sched = TaskScheduler()
    sched.new_task(sayHello(10))
    sched.new_task(sayHi(15))
    sched.run()

代码运行下,你就发现了,这就是我们对协程的定义了。接下来我们说下actor模型。actor模式是一种最古老的也是最简单的并行和分布式计算解决方案。下面我们通过yield来实现:

代码语言:javascript复制
from collections import deque
 
class ActorScheduler:
    def __init__(self):
        self._actors = {}
        self._msg_queue = deque()
 
    def new_actor(self, name, actor):
        self._msg_queue.append((actor, None))
        self._actors[name] = actor
 
    def send(self, name, msg):
        actor = self._actors.get(name)
        if actor:
            self._msg_queue.append((actor, msg))
 
    def run(self):
        while self._msg_queue:
            # print("队列:", self._msg_queue)
            actor, msg = self._msg_queue.popleft()
            # print("actor", actor)
            # print("msg", msg)
            try:
                 actor.send(msg)
            except StopIteration:
                 pass
 
 
if __name__ == '__main__':
    def say_hello():
        while True:
            msg = yield
            print("say hello", msg)
 
    def say_hi():
        while True:
            msg = yield
            print("say hi", msg)
 
    def counter(sched):
        while True:
            n = yield
            print("counter:", n)
            if n == 0:
                break
            sched.send('say_hello', n)
            sched.send('say_hi', n)
            sched.send('counter', n-1)
 
    sched = ActorScheduler()
    # 创建初始化 actors
    sched.new_actor('say_hello', say_hello())
    sched.new_actor('say_hi', say_hi())
    sched.new_actor('counter', counter(sched))
 
    sched.send('counter', 10)
    sched.run()

(1) ActorScheduler 负责事件循环 (2) counter() 负责控制终止 (3) say_hello() / say_hi() 相当于切换的协程,当程序运行到这些函数内部的yield处,就开始切换。

所以,当执行时,我们能够看到say_hello() / say_hi()不断交替切换执行,直到counter满足终止条件之后,协程终止。看懂上例可能需要花费一些时间。实际上我们已经实现了一个“操作系统”的最小核心部分。 生成器函数(含有yield的函数)就是认为,而yield语句是任务挂起的信号。 调度器循环检查任务列表直到没有任务要执行为止。

而这就是廖雪峰的python官网教程里面的协程代码的最好解释,这也是之前一直在思考的问题,请看代码:

代码语言:javascript复制
def consumer():
    r = ''
    while True:
        n = yield r
        if not n:
            return
        print('[CONSUMER] Consuming %s...' % n)
        r = '200 OK'

def produce(c):
    c.send(None)
    n = 0
    while n < 5:
        n = n   1
        print('[PRODUCER] Producing %s...' % n)
        r = c.send(n)
        print('[PRODUCER] Consumer return: %s' % r)
    c.close()

c = consumer()
produce(c)

我之前一直纳闷send()函数是如何激活生成器的,原来是实现了actor模型的协程!

0 人点赞