python 并行编程 多线程 锁 信号 条件 事件

2022-05-13 08:44:03 浏览数 (1)

共享内存

class threading.Thread(group=None, target=None, name=None, args=(), kwargs={}) import threading

def function(i): print ("function called by thread %in" % i) return

threads = []

for i in range(5): t = threading.Thread(target=function , args=(i, )) threads.append(t) t.start() t.join()

2、线程继承 import threading import time

exitFlag = 0

class myThread (threading.Thread): def init(self, threadID, name, counter): threading.Thread.init(self) self.threadID = threadID self.name = name self.counter = counter

代码语言:javascript复制
def run(self):
    print("Starting "   self.name)
    print_time(self.name, self.counter, 5)
    print("Exiting "   self.name)

def print_time(threadName, delay, counter): while counter: if exitFlag: # 译者注:原书中使用的thread,但是Python3中已经不能使用thread,以_thread取代,因此应该 # import _thread # _thread.exit() thread.exit() time.sleep(delay) print("%s: %s" % (threadName, time.ctime(time.time()))) counter -= 1

Create new threads

thread1 = myThread(1, "Thread-1", 1) thread2 = myThread(2, "Thread-2", 2)

Start new Threads

thread1.start() thread2.start()

以下两行为译者添加,如果要获得和图片相同的结果,

下面两行是必须的。疑似原作者的疏漏

thread1.join() thread2.join() print("Exiting Main Thread")

3、threading.Lock() l.acquire() l.release()

-- coding: utf-8 --

import threading

shared_resource_with_lock = 0 shared_resource_with_no_lock = 0 COUNT = 100000 shared_resource_lock = threading.Lock()

有锁的情况

def increment_with_lock(): global shared_resource_with_lock for i in range(COUNT): shared_resource_lock.acquire() shared_resource_with_lock = 1 shared_resource_lock.release()

def decrement_with_lock(): global shared_resource_with_lock for i in range(COUNT): shared_resource_lock.acquire() shared_resource_with_lock -= 1 shared_resource_lock.release()

没有锁的情况

def increment_without_lock(): global shared_resource_with_no_lock for i in range(COUNT): shared_resource_with_no_lock = 1

def decrement_without_lock(): global shared_resource_with_no_lock for i in range(COUNT): shared_resource_with_no_lock -= 1

if name == "main": t1 = threading.Thread(target=increment_with_lock) t2 = threading.Thread(target=decrement_with_lock) t3 = threading.Thread(target=increment_without_lock) t4 = threading.Thread(target=decrement_without_lock) t1.start() t2.start() t3.start() t4.start() t1.join() t2.join() t3.join() t4.join() print ("the value of shared variable with lock management is %s" % shared_resource_with_lock) print ("the value of shared variable with race condition is %s" % shared_resource_with_no_lock)

///rlock() 4| threading.Semaphore(0)

-- coding: utf-8 --

"""Using a Semaphore to synchronize threads""" import threading import time import random

The optional argument gives the initial value for the internal

counter;

it defaults to 1.

If the value given is less than 0, ValueError is raised.

semaphore = threading.Semaphore(0)

def consumer(): print("consumer is waiting.") # Acquire a semaphore semaphore.acquire() # The consumer have access to the shared resource print("Consumer notify : consumed item number %s " % item)

def producer(): global item time.sleep(10) # create a random item item = random.randint(0, 1000) print("producer notify : produced item number %s" % item) # Release a semaphore, incrementing the internal counter by one. # When it is zero on entry and another thread is waiting for it # to become larger than zero again, wake up that thread. semaphore.release()

if name == 'main': for i in range (0,5) : t1 = threading.Thread(target=producer) t2 = threading.Thread(target=consumer) t1.start() t2.start() t1.join() t2.join() print("program terminated")

5 condition() 生产者,消费者 from threading import Thread, Condition import time

items = [] condition = Condition()

class consumer(Thread):

代码语言:javascript复制
def __init__(self):
    Thread.__init__(self)

def consume(self):
    global condition
    global items
    condition.acquire()
    if len(items) == 0:
        condition.wait()
        print("Consumer notify : no item to consume")
    items.pop()
    print("Consumer notify : consumed 1 item")
    print("Consumer notify : items to consume are "   str(len(items)))

    condition.notify()
    condition.release()

def run(self):
    for i in range(0, 20):
        time.sleep(2)
        self.consume()

class producer(Thread):

代码语言:javascript复制
def __init__(self):
    Thread.__init__(self)

def produce(self):
    global condition
    global items
    condition.acquire()
    if len(items) == 10:
        condition.wait()
        print("Producer notify : items producted are "   str(len(items)))
        print("Producer notify : stop the production!!")
    items.append(1)
    print("Producer notify : total items producted "   str(len(items)))
    condition.notify()
    condition.release()

def run(self):
    for i in range(0, 20):
        time.sleep(1)
        self.produce()

if name == "main": producer = producer() consumer = consumer() producer.start() consumer.start() producer.join() consumer.join()

(译者在这里添加一段。乍一看这段代码好像会死锁,因为 condition.acquire() 之后就在 .wait() 了,好像会一直持有锁。其实 .wait() 会将锁释放,然后等待其他线程 .notify() 之后会重新尝试获得锁。但是要注意 .notify() 并不会自动释放锁,所以代码中有两行,先 .notify() 然后再 .release() 。

6、event事件使用

-- coding: utf-8 --

import time from threading import Thread, Event import random items = [] event = Event()

class consumer(Thread): def init(self, items, event): Thread.init(self) self.items = items self.event = event

代码语言:javascript复制
def run(self):
    while True:
        time.sleep(2)
        self.event.wait()
        item = self.items.pop()
        print('Consumer notify : %d popped from list by %s' % (item, self.name))

class producer(Thread): def init(self, integers, event): Thread.init(self) self.items = items self.event = event

代码语言:javascript复制
def run(self):
    global item
    for i in range(1):
        time.sleep(2)
        item = random.randint(0, 256)
        self.items.append(item)
        print('Producer notify : item N° %d appended to list by %s' % (item, self.name))
        print('Producer notify : event set by %s' % self.name)
        self.event.set()
        print('Produce notify : event cleared by %s '% self.name)
        self.event.clear()

if name == 'main': t1 = producer(items, event) t2 = consumer(items, event) t1.start() t2.start() t1.join() t2.join()

7、with import threading import logging logging.basicConfig(level=logging.DEBUG, format='(%(threadName)-10s) %(message)s',)

def threading_with(statement): with statement: logging.debug('%s acquired via with' % statement)

def threading_not_with(statement): statement.acquire() try: logging.debug('%s acquired directly' % statement ) finally: statement.release()

if name == 'main': # let's create a test battery lock = threading.Lock() rlock = threading.RLock() condition = threading.Condition() mutex = threading.Semaphore(1) threading_synchronization_list = [lock, rlock, condition, mutex] # in the for cycle we call the threading_with e threading_no_with function for statement in threading_synchronization_list : t1 = threading.Thread(target=threading_with, args=(statement,)) t2 = threading.Thread(target=threading_not_with, args=(statement,)) t1.start() t2.start() t1.join() t2.join()

0 人点赞