python多线程编程之Condition工具

2022-08-09 18:09:36 浏览数 (1)

在python多线程编程中,Lock是最基础的同步工具,除了Lock之外,python还提供了一些更高级的同步工具,本文简单聊一下Condition。

基本原理

Condition翻译过来是条件,应用场景是如果一个线程需要满足某个条件才继续执行,一个典型的应用场景是“生产者-消费者”模型的编程,当消费者获得锁去访问缓冲区的时候,缓冲区必须有数据,消费者才能工作,这就是“条件”。如果缓冲区没有数据,也就不符合消费者的工作条件,消费者该怎么办呢?我们来对比一下Lock和Condition在处理这一问题上的不同。

如果使用Lock,发现不符合条件后,那么就什么都不做,然后释放Lock,之后线程仍然是活跃的。线程会不断地再去获得锁,检查缓冲区,无休止的重复,浪费了资源。举个例子,就好比你去小米手机店购买Mi 12S Ultra,去了之后没货,你回家了,第二天再去,再没货,第三天再去,再没货。。。。。周而复始天天去,肯定很累啊。那么你就想了,能不能给店员留个联系方式,等有货了通知你,你就不用天天跑了。当然可以,Condition就是这样做的。

使用了Condition后,仍然要先获得锁,然后检查条件是否满足,如果条件不满足,Condition的骚操作就来了,他可以用wait()方法释放锁并进入阻塞状态,阻塞以后就啥都不干了,节约了资源,然后等待其他线程用notify()方法通知他,接到通知后,他再变得活跃,去做相应的事情。

主要方法

代码语言:python代码运行次数:0复制
acquire()  # 获得锁。一般情况下,Condition自己会创建一个锁,因此直接调用这个方法就能获得锁
release()  # 释放锁。一般情况下,Condition自己会创建一个锁,因此直接调用这个方法就能释放锁
wait_for(predicate,timeout=None)  # 等待条件得到满足。第一个参数是条件,必须是一个返回值为True/False的函数
notify_all()  # 通知所有因等待而陷入阻塞的线程,大家都醒来吧,条件发生改变了,看看是否对你合适?

当然还有wait(),notify(n=1)等方法,根据情况使用。在这里要强调一下,notify()的含义是“条件发生改变”,并不严格等价于“条件已满足”,因此等待线程每次被唤醒之后,都要继续判断条件的符合性,所以使用wait_for()是比较方便的。还有,Condition是支持上下文管理的,使用with块可以自动获取和释放锁。

小例子

根据“生产者-消费者”模型,这里构建一个“装苹果-拿苹果”的小例子。水果店老板(生产者)往一个篮子里放苹果,三个人(消费者)从篮子里拿苹果,相关的条件是:1.如果篮子没苹果了,消费者就等待。2.如果篮子空了,老板才能往里放苹果。代码如下:

代码语言:python代码运行次数:0复制
from concurrent.futures import ThreadPoolExecutor
import threading
import random

class Basket:
    '''篮子类,用于装苹果'''
    def __init__(self, n:int) -> None:
        '''篮子容量为n,装载数量为0'''
        self.capacity = n
        self.apple_count = 0

    def fill_up(self) -> None:
        '''将篮子装满'''
        self.apple_count = self.capacity

    def get_apples(self, n:int) -> int:
        '''从篮子中取走苹果,如果n大于剩余苹果数量,则仅返回剩余苹果数量'''
        x = n if n <= self.apple_count else self.apple_count
        self.apple_count -= x
        return x


def fill_up(b:Basket, cond:threading.Condition, name:str):
    '''装满篮子线程的功能函数,只有当篮子空了的时候才装。
       只装20次,目的是让程序能够结束,方便观察结果'''
    print(f'{name} 开始工作')
    for i in range(20):
        with cond:
            cond.wait_for(lambda : b.apple_count==0)
            b.fill_up()
            print(f'n{i 1}. {name} 把篮子装满了,共有 {b.capacity} 个苹果。')
            cond.notify_all()
    print(f'{name} 下班回家了~~~~~~~~~')

def get_apple(b:Basket, cond:threading.Condition, name:str):
    '''取走苹果线程的功能函数,每次取走若干个'''
    print(f'{name} 进入了水果店')
    while True:
        n = random.randint(1, 5)
        with cond:
            flag = cond.wait_for(lambda : b.apple_count>0, 2)
            if not flag:
                print(f'{name} 迟迟等不到苹果,生气的走了~~~~~~~~~~')
                break
            x = b.get_apples(n)
            print(f'{name} 从篮子中拿走了 {x} 个苹果,还剩 {b.apple_count} 个。')
            if b.apple_count == 0:
                cond.notify()


names = ['班尼特', '胡桃', '神里凌华']
cond = threading.Condition()
bask = Basket(50)

n = len(names)
with ThreadPoolExecutor(4) as pool:
    pool.map(get_apple, [bask]*n, [cond]*n, names)
    pool.submit(fill_up, bask, cond, '水果店老板')
print('n程序结束n')

0 人点赞