multiprocess
-- coding: utf-8 --
import multiprocessing
def foo(i): print ('called function in process: %s' %i) return
if name == 'main': Process_jobs = [] for i in range(5): p = multiprocessing.Process(target=foo, args=(i,)) Process_jobs.append(p) p.start() p.join()
2 命名
命名一个进程
import multiprocessing import time
def foo(): name = multiprocessing.current_process().name print("Starting %s n" % name) time.sleep(3) print("Exiting %s n" % name)
if name == 'main': process_with_name = multiprocessing.Process(name='foo_process', target=foo) process_with_name.daemon = True # 注意原代码有这一行,但是译者发现删掉这一行才能得到正确输出 process_with_default_name = multiprocessing.Process(target=foo) process_with_name.start() process_with_default_name.start()
后台进程不能创建子进程
3、p.terminate()
杀死一个进程
import multiprocessing import time
def foo(): print('Starting function') time.sleep(0.1) print('Finished function')
if name == 'main': p = multiprocessing.Process(target=foo) print('Process before execution:', p, p.is_alive()) p.start() print('Process running:', p, p.is_alive()) p.terminate() print('Process terminated:', p, p.is_alive()) p.join() print('Process joined:', p, p.is_alive()) print('Process exit code:', p.exitcode)
4、使用queue通信 import multiprocessing import random import time
class Producer(multiprocessing.Process): def init(self, queue): multiprocessing.Process.init(self) self.queue = queue
代码语言:javascript复制def run(self):
for i in range(10):
item = random.randint(0, 256)
self.queue.put(item)
print("Process Producer : item %d appended to queue %s" % (item, self.name))
time.sleep(1)
print("The size of queue is %s" % self.queue.qsize())
class Consumer(multiprocessing.Process): def init(self, queue): multiprocessing.Process.init(self) self.queue = queue
代码语言:javascript复制def run(self):
while True:
if self.queue.empty():
print("the queue is empty")
break
else:
time.sleep(2)
item = self.queue.get()
print('Process Consumer : item %d popped from by %s n' % (item, self.name))
time.sleep(1)
if name == 'main': queue = multiprocessing.Queue() process_producer = Producer(queue) process_consumer = Consumer(queue) process_producer.start() process_consumer.start() process_producer.join() process_consumer.join()
5、进程间通信管道pipe import multiprocessing
def create_items(pipe): output_pipe, _ = pipe for item in range(10): output_pipe.send(item) output_pipe.close()
def multiply_items(pipe_1, pipe_2): close, input_pipe = pipe_1 close.close() output_pipe, _ = pipe_2 try: while True: item = input_pipe.recv() output_pipe.send(item * item) except EOFError: output_pipe.close()
if name== 'main': # 第一个进程管道发出数字 pipe_1 = multiprocessing.Pipe(True) process_pipe_1 = multiprocessing.Process(target=create_items, args=(pipe_1,)) process_pipe_1.start() # 第二个进程管道接收数字并计算 pipe_2 = multiprocessing.Pipe(True) process_pipe_2 = multiprocessing.Process(target=multiply_items, args=(pipe_1, pipe_2,)) process_pipe_2.start() pipe_1[0].close() pipe_2[0].close() try: while True: print(pipe_2[1].recv()) except EOFError: print("End")
6、barrier: Barrier: 将程序分成几个阶段,适用于有些进程必须在某些特定进程之后执行。处于障碍(Barrier)之后的代码不能同处于障碍之前的代码并行。 import multiprocessing from multiprocessing import Barrier, Lock, Process from time import time from datetime import datetime
def test_with_barrier(synchronizer, serializer): name = multiprocessing.current_process().name synchronizer.wait() now = time() with serializer: print("process %s ----> %s" % (name, datetime.fromtimestamp(now)))
def test_without_barrier(): name = multiprocessing.current_process().name now = time() print("process %s ----> %s" % (name, datetime.fromtimestamp(now)))
if name == 'main': synchronizer = Barrier(2) serializer = Lock() Process(name='p1 - test_with_barrier', target=test_with_barrier, args=(synchronizer,serializer)).start() Process(name='p2 - test_with_barrier', target=test_with_barrier, args=(synchronizer,serializer)).start() Process(name='p3 - test_without_barrier', target=test_without_barrier).start() Process(name='p4 - test_without_barrier', target=test_without_barrier).start()
image.png
barrier 1,2,3,4