python 多进程

2022-05-13 08:47:05 浏览数 (1)

multiprocess

-- coding: utf-8 --

import multiprocessing

def foo(i): print ('called function in process: %s' %i) return

if name == 'main': Process_jobs = [] for i in range(5): p = multiprocessing.Process(target=foo, args=(i,)) Process_jobs.append(p) p.start() p.join()

2 命名

命名一个进程

import multiprocessing import time

def foo(): name = multiprocessing.current_process().name print("Starting %s n" % name) time.sleep(3) print("Exiting %s n" % name)

if name == 'main': process_with_name = multiprocessing.Process(name='foo_process', target=foo) process_with_name.daemon = True # 注意原代码有这一行,但是译者发现删掉这一行才能得到正确输出 process_with_default_name = multiprocessing.Process(target=foo) process_with_name.start() process_with_default_name.start()

后台进程不能创建子进程

3、p.terminate()

杀死一个进程

import multiprocessing import time

def foo(): print('Starting function') time.sleep(0.1) print('Finished function')

if name == 'main': p = multiprocessing.Process(target=foo) print('Process before execution:', p, p.is_alive()) p.start() print('Process running:', p, p.is_alive()) p.terminate() print('Process terminated:', p, p.is_alive()) p.join() print('Process joined:', p, p.is_alive()) print('Process exit code:', p.exitcode)

4、使用queue通信 import multiprocessing import random import time

class Producer(multiprocessing.Process): def init(self, queue): multiprocessing.Process.init(self) self.queue = queue

代码语言:javascript复制
def run(self):
    for i in range(10):
        item = random.randint(0, 256)
        self.queue.put(item)
        print("Process Producer : item %d appended to queue %s" % (item, self.name))
        time.sleep(1)
        print("The size of queue is %s" % self.queue.qsize())

class Consumer(multiprocessing.Process): def init(self, queue): multiprocessing.Process.init(self) self.queue = queue

代码语言:javascript复制
def run(self):
    while True:
        if self.queue.empty():
            print("the queue is empty")
            break
        else:
            time.sleep(2)
            item = self.queue.get()
            print('Process Consumer : item %d popped from by %s n' % (item, self.name))
            time.sleep(1)

if name == 'main': queue = multiprocessing.Queue() process_producer = Producer(queue) process_consumer = Consumer(queue) process_producer.start() process_consumer.start() process_producer.join() process_consumer.join()

5、进程间通信管道pipe import multiprocessing

def create_items(pipe): output_pipe, _ = pipe for item in range(10): output_pipe.send(item) output_pipe.close()

def multiply_items(pipe_1, pipe_2): close, input_pipe = pipe_1 close.close() output_pipe, _ = pipe_2 try: while True: item = input_pipe.recv() output_pipe.send(item * item) except EOFError: output_pipe.close()

if name== 'main': # 第一个进程管道发出数字 pipe_1 = multiprocessing.Pipe(True) process_pipe_1 = multiprocessing.Process(target=create_items, args=(pipe_1,)) process_pipe_1.start() # 第二个进程管道接收数字并计算 pipe_2 = multiprocessing.Pipe(True) process_pipe_2 = multiprocessing.Process(target=multiply_items, args=(pipe_1, pipe_2,)) process_pipe_2.start() pipe_1[0].close() pipe_2[0].close() try: while True: print(pipe_2[1].recv()) except EOFError: print("End")

6、barrier: Barrier: 将程序分成几个阶段,适用于有些进程必须在某些特定进程之后执行。处于障碍(Barrier)之后的代码不能同处于障碍之前的代码并行。 import multiprocessing from multiprocessing import Barrier, Lock, Process from time import time from datetime import datetime

def test_with_barrier(synchronizer, serializer): name = multiprocessing.current_process().name synchronizer.wait() now = time() with serializer: print("process %s ----> %s" % (name, datetime.fromtimestamp(now)))

def test_without_barrier(): name = multiprocessing.current_process().name now = time() print("process %s ----> %s" % (name, datetime.fromtimestamp(now)))

if name == 'main': synchronizer = Barrier(2) serializer = Lock() Process(name='p1 - test_with_barrier', target=test_with_barrier, args=(synchronizer,serializer)).start() Process(name='p2 - test_with_barrier', target=test_with_barrier, args=(synchronizer,serializer)).start() Process(name='p3 - test_without_barrier', target=test_without_barrier).start() Process(name='p4 - test_without_barrier', target=test_without_barrier).start()

image.png

barrier 1,2,3,4

0 人点赞