多进程与多线程 | 多进程

2021-06-24 10:33:03 浏览数 (1)

multiprocessing模块创建进程

Python提供了multiprocessing。multiprocessing模块用来开启子进程,并在子进程中执行我们定制的任务(比如函数),该模块与多线程模块threading的编程接口类似。

multiprocessing模块的功能众多:支持子进程、通信和共享数据、执行不同形式的同步,并提供了Process、Queue、Pipe、Lock等组件。

本文介绍multiprocessing模块和Pool进程池这两个跨平台模块。

Process类

multiprocessing模块提供了一个Process类来代表一个进程对象。

代码语言:javascript复制
Process(
    group=None,
    target=None,
    name=None,
    args=(),
    kwargs={},
    *,
    daemon=None,
)

group: 未使用参数,默认值为**None** target: 表示当前进程启动时执行的可调用对象 name: 为当前进程实例额度别名 args: 传递给target 函数的参数元组 kwargs: 传递给target 函数的参数字典


实例化Process类,执行子进程

先实例化Process类,使用Process.start()方法启动子进程,开始执行子进程函数task()

代码语言:javascript复制
from multiprocessing import Process
import time

# 子进程
def task(interval):
   print('执行子进程')
    time.sleep(interval)
    print('结束子进程')
    
# 主进程
def main():
   print('执行主进程')
    # 实例化Process进程类
    P = Process(target=task, args=(10,))
    # 启动子进程
    P.start()
    print('结束主进程')
    
if __name__ == '__main__':
   main()

结果

执行主进程 执行子进程 结束主进程 结束子进程

Process的实例常用方法

方法

描述

start()

启动进程实例,即创建子进程。

join(timeout=None)

是否等待进程实例执行结束,或等待多少秒。

is_alive()

判断进程实例是否还在执行

run()

若没有给定target参数,对这个对象调用start()方法时,就将执行对象中的run()方法。

terminate()

不管任务是否完成,立即终止

Process的实例常用属性

属性

描述

name

当前进程实例别名,默认Process-N,N为从1开始递增的整数

pid

当前进程实例的PID值

Process类方法属性应用实例
代码语言:javascript复制
from multiprocessing import Process
import time
import os

def task_1(interval):
    print("child process {} begin, and father process is {}".format(os.getpid(), os.getppid()))
    # 计时开始
    t_start = time.time()
    # 程序将会被挂起interval秒
    time.sleep(interval)
    # 计时结束
    t_end = time.time()
    print("child process (%s) executes the task in '%0.2f' seconds" % (os.getpid(), t_end - t_start))
    
def task_2(interval):
    print("child process {} begin, and father process is {}".format(os.getpid(), os.getppid()))
    # 计时开始
    t_start = time.time()
    # 程序将会被挂起interval秒
    time.sleep(interval)
    # 计时结束
    t_end = time.time()
    print("child process (%s) executes the task in '%0.2f' seconds" % (os.getpid(), t_end - t_start))
    
def main():
    print('Father process start'.center(30, '-'))
    # 输出当前程序的PID
    print("father process PID: {}".format(os.getpid()))
    p1 = Process(target=task_1, args=(2,))
    p2 = Process(target=task_2, name='buy', args=(2,))
    # 启动子进程
    p1.start()
    p2.start()
    # 父进程继续执行,若子进程2p2还在继续执行,则会返回True
    print("p1.is_alive: {}".format(p1.is_alive()))
    print("p2.is_alive: {}".format(p2.is_alive()))
    # 输出进程别名和PID
    print("p1.name: {}".format(p1.name))
    print("p1.pid: {}".format(p1.pid))
    print("p2.name: {}".format(p2.name))
    print("p2.pid: {}".format(p2.pid))
    print("等待子进程".center(20, '-'))
    p1.join()
    p2.join()
    print('Father process end'.center(30, '-'))
    
if __name__ == "__main__":
    main()

结果

-----Father process start----- father process PID: 63373 child process 64454 begin, and father process is 63373 child process 64455 begin, and father process is 63373 p1.is_alive: True p2.is_alive: True p1.name: Process-1 p1.pid: 64454 p2.name: buy p2.pid: 64455 -------等待子进程-------- child process (64454) executes the task in '2.00' seconds child process (64455) executes the task in '2.00' seconds ------Father process end------

主进程与子进程流程示意图

使用Process子类创建进程

在处理复杂任务的进程,通常定义一个类,使其继承Process类,每次实例化这个类即实例化一个进程对象。

使用Process子类方式创建2个子进程,分别输出子父进程的PID,以及每个子进程的状态和运行时间。

代码语言:javascript复制
from multiprocessing import Process
import time
import os

# 继承Process类
class SubProcess(Process):
   # 由于Process类本身也有__init__初始化方法,这个子类相当于重写了父类的这个方法。
    def __init__(self, interval, name=""):
        # 调用Process父类的初始化方法,否则父类初始化方法会被覆盖,无法开启进程。
        Process.__init__(self)
        self.interval = interval
        if name:
            self.name = name
    # 重写Process类的run()方法        
    def run(self):
        print("child process {} start, and father process is {}".format(os.getpid(), os.getppid()))
        # 计时开始
        t_start = time.time()
        # 程序将会被挂起interval秒
        time.sleep(self.interval)
        # 计时结束
        t_end = time.time()
        print("child process (%s) executes the task in '%0.2f' seconds" % (os.getpid(), t_end - t_start))
        
def main():
    print('Father process start'.center(30, '-'))
    # 输出当前程序的PID
    print("father process PID: {}".format(os.getpid()))
    p1 = SubProcess(interval=2)
    p2 = SubProcess(interval=2, name='buy')
    # 若Process类不包含target属性,且执行start()方法,则Process类将会执行这个类中的run()方法
    # 所以这里会执行p1.run()
    # 启动子进程
    p1.start()
    p2.start()
    # 父进程继续执行,若子进程2p2还在继续执行,则会返回True
    print("p1.is_alive: {}".format(p1.is_alive()))
    print("p2.is_alive: {}".format(p2.is_alive()))
    # 输出进程别名和PID
    print("p1.name: {}".format(p1.name))
    print("p1.pid: {}".format(p1.pid))
    print("p2.name: {}".format(p2.name))
    print("p2.pid: {}".format(p2.pid))
    print("Waiting or child process".center(20, '-'))
    p1.join()
    p2.join()
    print('Father process end'.center(30, '-'))
    
if __name__ == "__main__":
    mian()

SubProcess子类中并没有定义start()方法,但主进程中却调用了start()方法,此时就会自动执行SubProcess类的run()方法。


-----Father process start----- father process PID: 63373 child process 66856 start, and father process is 63373 child process 66857 start, and father process is 63373 p1.is_alive: True p2.is_alive: True p1.name: SubProcess-1 p1.pid: 66856 p2.name: buy p2.pid: 66857 -------Waiting or child process-------- child process (66856) executes the task in '2.00' seconds child process (66857) executes the task in '2.00' seconds ------Father process end------

进程池Pool模块

当被操作对象数目不大时,可以直接利用multiprocessing中的Process动态生成多个进程,但如果是上百个,上千个目标,手动的去限制进程数量却又太过繁琐,此时就可以用到multiprocessing模块提供的Pool方法。初始化Pool时,可以指定一个最大进程数,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程来它。

代码语言:javascript复制
Pool(processes=None, 
     initializer=None,
     initargs=(),
     maxtasksperchild=None)

processes是可选参数,设置进程池的进程数量。

Pool类的常用方法 apply_async(function, *args, **kwargs): 异步的方式添加进程事件,使用非阻塞方式调用func函数,并行执行,阻塞方式必须等待上一个进程退出才能执行下一个进程。使用方法同Process函数,function是目标函数,*args是星号元组形参,**kwargs是星号字典形参。 apply(function, *args, **kwargs): 使用阻塞方式调用函数。 close(): 关闭进程添加事件的通道,不再接受新的任务。close后,不论是父进程还是子进程都仍然在继续执行,直到所有进程运行完毕。 join(): 主进程阻塞函数,等待进程池的子进程执行完毕。代码里必须添加这个,否则父进程结束,子进程也会中断。必须在close()terminate方法之后使用。 get(): 进程池可以有返回值,通过该方法获取返回值。


非阻塞方式

代码语言:javascript复制
from multiprocessing import Pool
import os, time, random


def task(name):
    t_start = time.time()
    print("{} start, and PID is {}".format(name, os.getpid()))
    # random.random()随机生成0~1之间的浮点数
    time.sleep(random.random() * 3)
    t_end = time.time()
    print("(%s) end and executes the task in '%0.2f' seconds" % (name, t_end - t_start))

def main():
    p = Pool(5)  # 定义一个进程池,最大进程数5

    # 往进程池中添加任务
    for i in range(10):
        # Pool.apply_async(要调用的目标,(传递给目标的参数元祖,))
        # 每次循环将会用空闲出来的子进程去调用目标
        p.apply_async(task, (f'buy{i}',))

    print('start'.center(30, '-'))
    p.close()  # 关闭进程池,关闭后po不再接收新的请求
    p.join()  # 等待po中所有子进程执行完成,必须放在close语句之后
    print('all done'.center(30, '-'))

if __name__ == '__main__':
    main()

结果

buy2 start, and PID is 67834 buy0 start, and PID is 67832 buy1 start, and PID is 67833 buy4 start, and PID is 67836 buy3 start, and PID is 67835 ------------start------------- (buy1) end and executes the task in '0.80' seconds buy5 start, and PID is 67833 (buy0) end and executes the task in '1.03' seconds buy6 start, and PID is 67832 (buy2) end and executes the task in '1.09' seconds buy7 start, and PID is 67834 (buy7) end and executes the task in '0.12' seconds buy8 start, and PID is 67834 (buy3) end and executes the task in '1.24' seconds buy9 start, and PID is 67835 (buy8) end and executes the task in '0.35' seconds (buy5) end and executes the task in '1.01' seconds (buy4) end and executes the task in '2.94' seconds (buy9) end and executes the task in '1.93' seconds (buy6) end and executes the task in '2.20' seconds -----------all done-----------

创建一个进程池pool,并设定进程的数量为5,range(10)会相继产生10个对象,10个对象被提交到pool中,因pool指定进程数为5,所以0、1、2、3、4会直接送到进程中执行,当其中一个执行完后才空出一个进程处理对象,继续去执行新的对象,所以会出现输出"buy5 start, and PID is 67833"出现在"(buy1) end and executes the task in '0.80' seconds"后。因为为非阻塞,主函数会自己执行自个的,不搭理进程的执行,所以运行完for循环后直接输出"------------start-------------",主程序在pool.join()处等待各个进程的结束。

阻塞方式

代码语言:javascript复制
from multiprocessing import Pool
import os, time, random

def task(name):
    t_start = time.time()
    print("{} start, and PID is {}".format(name, os.getpid()))
    # random.random()随机生成0~1之间的浮点数
    time.sleep(random.random() * 3)
    t_end = time.time()
    print("(%s) end and executes the task in '%0.2f' seconds" % (name, t_end - t_start))

def main():
    p = Pool(5)  # 定义一个进程池,最大进程数5
    # 往进程池中添加任务
    for i in range(10):
        # Pool.apply(要调用的目标,(传递给目标的参数元祖,))
        p.apply(task, (f'buy{i}',))
    print('start'.center(30, '-'))
    p.close()  # 关闭进程池,关闭后po不再接收新的请求
    p.join()  # 等待po中所有子进程执行完成,必须放在close语句之后
    print('all done'.center(30, '-'))

if __name__ == '__main__':
    main()

结果

buy0 start, and PID is 67900 (buy0) end and executes the task in '1.49' seconds buy1 start, and PID is 67901 (buy1) end and executes the task in '0.67' seconds buy2 start, and PID is 67902 (buy2) end and executes the task in '1.98' seconds buy3 start, and PID is 67903 (buy3) end and executes the task in '1.56' seconds buy4 start, and PID is 67904 (buy4) end and executes the task in '2.14' seconds buy5 start, and PID is 67900 (buy5) end and executes the task in '2.08' seconds buy6 start, and PID is 67901 (buy6) end and executes the task in '2.10' seconds buy7 start, and PID is 67902 (buy7) end and executes the task in '1.80' seconds buy8 start, and PID is 67903 (buy8) end and executes the task in '1.03' seconds buy9 start, and PID is 67904 (buy9) end and executes the task in '0.93' seconds ------------start------------- -----------all done-----------

因为是阻塞,主函数会等待进程的执行,执行完之后才会继续往下,所以运行完所有进程后才输出"------------start-------------"

使用进程池,并返回结果

代码语言:javascript复制
from multiprocessing import Pool
import os, time, random

def task(name):
    print("{} start, and PID is {}".format(name, os.getpid()))
    # random.random()随机生成0~1之间的浮点数
    time.sleep(random.random() * 3)
    return name,os.getpid()

def main():
    p = Pool(3)  # 定义一个进程池,最大进程数3

    res=[]
    # 往进程池中添加任务
    for i in range(0, 5):
        # 每次循环将会用空闲出来的子进程去调用目标
        res.append(p.apply_async(task, (f'buy{i}',)))

    print('start'.center(30, '-'))
    p.close()  # 关闭进程池,关闭后po不再接收新的请求
    p.join()  # 等待po中所有子进程执行完成,必须放在close语句之后
    for result in res:
        print(result.get())  #get()函数得出每个返回结果的值
    print('all done'.center(30, '-'))

if __name__ == '__main__':
    main()

结果

buy0 start, and PID is 68032 buy1 start, and PID is 68033 buy2 start, and PID is 68034 ------------start------------- buy3 start, and PID is 68033 buy4 start, and PID is 68034 ('buy0', 68032) ('buy1', 68033) ('buy2', 68034) ('buy3', 68033) ('buy4', 68034) -----------all done-----------

多进程执行多个任务

代码语言:javascript复制
from multiprocessing import Pool
import os, time, random

def task1(name):
    print("{} start executes the task1, and PID is {}".format(name, os.getpid()))
    # random.random()随机生成0~1之间的浮点数
    time.sleep(random.random() * 3)

def task2(name):
    print("{} start executes the task2, and PID is {}".format(name, os.getpid()))
    # random.random()随机生成0~1之间的浮点数
    time.sleep(random.random() * 3)

def task3(name):
    print("{} start executes the task3, and PID is {}".format(name, os.getpid()))
    # random.random()随机生成0~1之间的浮点数
    time.sleep(random.random() * 3)

def main():
    po = Pool(5)  # 定义一个进程池,最大进程数5
    task_list=[task1,task2,task3]
    # 往进程池中添加任务
    for task in task_list:
        for i in range(3):
            po.apply_async(task, (f'buy{i}',))

    print('start'.center(30, '-'))
    po.close()  # 关闭进程池,关闭后po不再接收新的请求
    po.join()  # 等待po中所有子进程执行完成,必须放在close语句之后
    print('all done'.center(30, '-'))

if __name__ == '__main__':
    main()

线程池5个线程执行3个任务,每个任务执行3次

结果

buy0 start executes the task1, and PID is 68141 buy1 start executes the task1, and PID is 68142 buy2 start executes the task1, and PID is 68143 buy1 start executes the task2, and PID is 68145 buy0 start executes the task2, and PID is 68144 ------------start------------- buy2 start executes the task2, and PID is 68142 buy0 start executes the task3, and PID is 68143 buy1 start executes the task3, and PID is 68143 buy2 start executes the task3, and PID is 68144 -----------all done-----------

进程间通信

此处使用队列Queue实现在进程间通信。创建两个子进程,一个子进程负责向队列中写入数据,另一个子进程负责从队列中读取数据。为保证能够正确从队列读取数据,设置读取数据的进程等待时间为3秒。若超时仍然无法读取数据,则抛出异常。

代码语言:javascript复制
from multiprocessing import Process, Queue
import time

# 写入数据
def write_task(queue):
    if not queue.full():
        for i in range(5):
            message = "message"   str(i)
            queue.put(message)
            print(f"write:{message}")
# 读取数据
def read_task(queue):
    time.sleep(2)
    while not queue.empty():
       message = queue.get(True, 3) # 等待3秒,如果还没有读取到任何消息,则抛出"Queue.Empty"异常
        print(f"read:{message}")
        
def main():
    print('start'.center(30, '-'))
    queue = Queue()
    # 实例化写入队列的子进程,并且传递队列
    pw = Process(target=write_task, args=(queue,))
    # 实例化读取队列的子进程,并且传递队列
    pr = Process(target=read_task, args=(queue,))
    pw.start()
    pr.start()
    pw.join()
    pr.join()
    print('all done'.center(30, '-'))
    
from multiprocessing import Pool

if __name__ == '__main__':
    main()

结果

------------start------------- write:message0 write:message1 write:message2 write:message3 write:message4 read:message0 read:message1 read:message2 read:message3 read:message4 -----------all done-----------

队列可参考多线程里队列的介绍。

0 人点赞