在python多线程编程中,Lock是最基础的同步工具,除了Lock之外,python还提供了一些更高级的同步工具,本文简单聊一下Condition。
基本原理
Condition翻译过来是条件,应用场景是如果一个线程需要满足某个条件才继续执行,一个典型的应用场景是“生产者-消费者”模型的编程,当消费者获得锁去访问缓冲区的时候,缓冲区必须有数据,消费者才能工作,这就是“条件”。如果缓冲区没有数据,也就不符合消费者的工作条件,消费者该怎么办呢?我们来对比一下Lock和Condition在处理这一问题上的不同。
如果使用Lock,发现不符合条件后,那么就什么都不做,然后释放Lock,之后线程仍然是活跃的
。线程会不断地再去获得锁,检查缓冲区,无休止的重复,浪费了资源。举个例子,就好比你去小米手机店购买Mi 12S Ultra,去了之后没货,你回家了,第二天再去,再没货,第三天再去,再没货。。。。。周而复始天天去,肯定很累啊。那么你就想了,能不能给店员留个联系方式,等有货了通知你,你就不用天天跑了。当然可以,Condition就是这样做的。
使用了Condition后,仍然要先获得锁,然后检查条件是否满足,如果条件不满足,Condition的骚操作就来了,他可以用wait()方法释放锁并进入阻塞状态
,阻塞以后就啥都不干了,节约了资源,然后等待其他线程用notify()方法通知他,接到通知后,他再变得活跃,去做相应的事情。
主要方法
代码语言:python代码运行次数:0复制acquire() # 获得锁。一般情况下,Condition自己会创建一个锁,因此直接调用这个方法就能获得锁
release() # 释放锁。一般情况下,Condition自己会创建一个锁,因此直接调用这个方法就能释放锁
wait_for(predicate,timeout=None) # 等待条件得到满足。第一个参数是条件,必须是一个返回值为True/False的函数
notify_all() # 通知所有因等待而陷入阻塞的线程,大家都醒来吧,条件发生改变了,看看是否对你合适?
当然还有wait(),notify(n=1)等方法,根据情况使用。在这里要强调一下,notify()的含义是“条件发生改变”,并不严格等价于“条件已满足”,因此等待线程每次被唤醒之后,都要继续判断条件的符合性,所以使用wait_for()是比较方便的。还有,Condition是支持上下文管理的,使用with块可以自动获取和释放锁。
小例子
根据“生产者-消费者”模型,这里构建一个“装苹果-拿苹果”的小例子。水果店老板(生产者)往一个篮子里放苹果,三个人(消费者)从篮子里拿苹果,相关的条件是:1.如果篮子没苹果了,消费者就等待。2.如果篮子空了,老板才能往里放苹果。代码如下:
代码语言:python代码运行次数:0复制from concurrent.futures import ThreadPoolExecutor
import threading
import random
class Basket:
'''篮子类,用于装苹果'''
def __init__(self, n:int) -> None:
'''篮子容量为n,装载数量为0'''
self.capacity = n
self.apple_count = 0
def fill_up(self) -> None:
'''将篮子装满'''
self.apple_count = self.capacity
def get_apples(self, n:int) -> int:
'''从篮子中取走苹果,如果n大于剩余苹果数量,则仅返回剩余苹果数量'''
x = n if n <= self.apple_count else self.apple_count
self.apple_count -= x
return x
def fill_up(b:Basket, cond:threading.Condition, name:str):
'''装满篮子线程的功能函数,只有当篮子空了的时候才装。
只装20次,目的是让程序能够结束,方便观察结果'''
print(f'{name} 开始工作')
for i in range(20):
with cond:
cond.wait_for(lambda : b.apple_count==0)
b.fill_up()
print(f'n{i 1}. {name} 把篮子装满了,共有 {b.capacity} 个苹果。')
cond.notify_all()
print(f'{name} 下班回家了~~~~~~~~~')
def get_apple(b:Basket, cond:threading.Condition, name:str):
'''取走苹果线程的功能函数,每次取走若干个'''
print(f'{name} 进入了水果店')
while True:
n = random.randint(1, 5)
with cond:
flag = cond.wait_for(lambda : b.apple_count>0, 2)
if not flag:
print(f'{name} 迟迟等不到苹果,生气的走了~~~~~~~~~~')
break
x = b.get_apples(n)
print(f'{name} 从篮子中拿走了 {x} 个苹果,还剩 {b.apple_count} 个。')
if b.apple_count == 0:
cond.notify()
names = ['班尼特', '胡桃', '神里凌华']
cond = threading.Condition()
bask = Basket(50)
n = len(names)
with ThreadPoolExecutor(4) as pool:
pool.map(get_apple, [bask]*n, [cond]*n, names)
pool.submit(fill_up, bask, cond, '水果店老板')
print('n程序结束n')