python parallel

2022-05-13 17:00:24 浏览数 (1)

1、线程

代码语言:javascript复制
 """
 thread basics: start 5 copies of a function running in parallel;
 uses time.sleep so that the main thread doesn't die too early--
 this kills all other threads on some platforms; stdout is shared:
 thread outputs may be intermixed in this version arbitrarily.
 """
import _thread as thread, time
def counter(myId, count):                        # function run in threads
 for i in range(count):
 time.sleep(1)                            # simulate real work
 print('[%s] => %s' % (myId, i))
for i in range(5):                               # spawn 5 threads
 thread.start_new_thread(counter, (i, 5))     # each thread loops 5 times
time.sleep(6)
 print('Main thread exiting.')                    # don't exit too early

2、 mutex.accquire()/release() thread.allocate_lock()

代码语言:javascript复制
 """
 synchronize access to stdout: because it is shared global,
 thread outputs may be intermixed if not synchronized
 """
import _thread as thread, time
def counter(myId, count):                        # function run in threads
 for i in range(count):
 time.sleep(1)                            # simulate real work
 mutex.acquire()
 print('[%s] => %s' % (myId, i))          # print isn't interrupted now
 mutex.release()
mutex = thread.allocate_lock()                   # make a global lock object
 for i in range(5):                               # spawn 5 threads
 thread.start_new_thread(counter, (i, 5))     # each thread loops 5 times
time.sleep(6)
 print('Main thread exiting.')                    # don't exit too early

3、等待所有线程结束

代码语言:javascript复制
 """
 uses simple shared global data (not mutexes) to know when threads
 are done in parent/main thread; threads share list but not its items,
 assumes list won't move in memory once it has been created initially
 """
import _thread as thread
 stdoutmutex = thread.allocate_lock()
 exitmutexes = [False] * 10
def counter(myId, count):
 for i in range(count):
 stdoutmutex.acquire()
 print('[%s] => %s' % (myId, i))
 stdoutmutex.release()
 exitmutexes[myId] = True  # signal main thread
for i in range(10):
 thread.start_new_thread(counter, (i, 100))
while False in exitmutexes: pass
 print('Main thread exiting.')

4、等待所有线程结束:

代码语言:javascript复制
 """
 passed in mutex object shared by all threads instead of globals;
 use with context manager statement for auto acquire/release;
 sleep calls added to avoid busy loops and simulate real work
 """
import _thread as thread, time
 stdoutmutex = thread.allocate_lock()
 numthreads  = 5
 exitmutexes = [thread.allocate_lock() for i in range(numthreads)]
def counter(myId, count, mutex):                     # shared object passed in
 for i in range(count):
 time.sleep(1 / (myId 1))                     # diff fractions of second
 with mutex:                                  # auto acquire/release: with
 print('[%s] => %s' % (myId, i))
 exitmutexes[myId].acquire()                      # global: signal main thread
for i in range(numthreads):
 thread.start_new_thread(counter, (i, 5, stdoutmutex))
while not all(mutex.locked() for mutex in exitmutexes): time.sleep(0.25)
 print('Main thread exiting.')

5、多线程用法:常规

代码语言:javascript复制
 """
 thread class instances with state and run() for thread's action;
 uses higher-level Java-like threading module object join method (not
 mutexes or shared global vars) to know when threads are done in main
 parent thread; see library manual for more details on threading;
 """
import threading
class Mythread(threading.Thread):              # subclass Thread object
 def init(self, myId, count, mutex):
 self.myId  = myId
 self.count = count                     # per-thread state information
 self.mutex = mutex                     # shared objects, not globals
 threading.Thread.init(self)
def run(self):                             # run provides thread logic
    for i in range(self.count):            # still sync stdout access
        with self.mutex:
            print('[%s] => %s' % (self.myId, i))
stdoutmutex = threading.Lock()                 # same as thread.allocate_lock()
 threads = []
 for i in range(10):
 thread = Mythread(i, 100, stdoutmutex)     # make/start 10 threads
 thread.start()                             # starts run method in a thread
 threads.append(thread)
for thread in threads:
 thread.join()                              # wait for thread exits
 print('Main thread exiting.')

6、

代码语言:javascript复制
//threading.Thread(target=func,args=(x,)).start()
 7  共享对象需要加锁:
 "prints 200 each time, because shared resource access synchronized"
import threading, time
 count = 0
def adder(addlock):                 # shared lock object passed in
 global count
 with addlock:
 count = count   1           # auto acquire/release around stmt
 time.sleep(0.5)
 with addlock:
 count = count   1           # only 1 thread updating at once
addlock = threading.Lock()
 threads = []
 for i in range(100):
 thread = threading.Thread(target=adder, args=(addlock,))
 thread.start()
 threads.append(thread)
for thread in threads: thread.join()
 print(count)

8、Timer 5秒后运行程序

9、sys.exit

10

pipe.read()

pipe.close()

11 bufsize ~

12、subprocess

pipe.stdout.read()

pipe.wait()

pipe.communicate()

pipe.returncode

13 threadss.daemon=True

14 time.ctime(time.time()) ///now

15 import signal,捕获信号

16 signal.alarm几秒后发送信号

17、multiprocess 使用

代码语言:javascript复制
 """
 multiprocess basics: Process works like threading.Thread, but
 runs function call in parallel in a process instead of a thread;
 locks can be used to synchronize, e.g. prints on some platforms;
 starts new interpreter on windows, forks a new process on unix;
 """
import os
 from multiprocessing import Process, Lock
def whoami(label, lock):
 msg = '%s: name:%s, pid:%s'
 with lock:
 print(msg % (label, name, os.getpid()))
if name == 'main':
 lock = Lock()
 whoami('function call', lock)
代码语言:javascript复制
p = Process(target=whoami, args=('spawned child', lock))
p.start()
p.join()

for i in range(5):
    Process(target=whoami, args=(('run process %s' % i), lock)).start()

with lock:
    print('Main process exit.')

18、os.execlp

代码语言:javascript复制
 "Use multiprocessing to start independent programs, os.fork or not"
import os
 from multiprocessing import Process
def runprogram(arg):
 os.execlp('python', 'python', 'child.py', str(arg))
if name == 'main':
 for i in range(5):
 Process(target=runprogram, args=(i,)).start()
 print('parent exit')
 19| pool使用
 "Plus much more: process pools, managers, locks, condition,..."
import os
 from multiprocessing import Pool
def powers(x):
 #print(os.getpid())                  # enable to watch children
 return 2 ** x
if name == 'main':
 workers = Pool(processes=5)
代码语言:javascript复制
results = workers.map(powers, [2]*100)
print(results[:16])
print(results[-2:])

results = workers.map(powers, range(100))
print(results[:16])
print(results[-2:])

20、process不能使用lambda

21 os.spawnv使用

22 os.startfile

0 人点赞