python 进程池Pool

2019-06-02 13:53:00 浏览数 (1)

进程池Pool

当需要创建的子进程数量不多时,可以直接利用multiprocessing中的Process动态成生多个进程,但如果是上百甚至上千个目标,手动的去创建进程的工作量巨大,此时就可以用到multiprocessing模块提供的Pool方法。

初始化Pool时,可以指定一个最大进程数,当有新的请求提交到Pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到指定的最大值,那么该请求就会等待,直到池中有进程结束,才会用之前的进程来执行新的任务,请看下面的实例:

代码语言:javascript复制
# -*- coding:utf-8 -*-
from multiprocessing import Process
from multiprocessing import Pool
import time
import os
import random


def smoke(num):
    t_start = time.time() # 记录开始时间
    print("start %d smoke !!!! and pid = %d" % (num,os.getpid()))
    time.sleep(random.random()*2) # random.random随机生成0~1之间的浮点数
    t_stop = time.time()
    print("finish %d smoke, time = %0.2f" % (num,(t_start-t_stop)))

def main():

    # 创建进程池Pool
    po = Pool(3) # 定义一个进程池,最大进程数为3

    # 编写一个循环,加入进程池中
    for i in range(0,10):
        print("------循环 %d --------" % i)
        # Pool().apply_async(调用的目标函数,(传递的参数元组))
        # 每次循环会用空闲出来的子进程去调用目标
        po.apply_async(smoke,(i,))

    print("----start-----")
    po.close() # 关闭进程池
    po.join() # 等待进程池所有子进程执行完毕,必须在close语句之后
    print("----end-----")

if __name__ == "__main__":
   main()

执行如下:

代码语言:javascript复制
[root@server01 process]# python pool.py 
------循环 0 --------
------循环 1 --------
------循环 2 --------
------循环 3 --------
------循环 4 --------
------循环 5 --------
------循环 6 --------
------循环 7 --------
------循环 8 --------
------循环 9 --------
----start-----
start 0 smoke !!!! and pid = 1656
start 1 smoke !!!! and pid = 1657
start 2 smoke !!!! and pid = 1655
finish 0 smoke, time = -0.10
start 3 smoke !!!! and pid = 1656
finish 3 smoke, time = -0.87
start 4 smoke !!!! and pid = 1656
finish 2 smoke, time = -0.98
start 5 smoke !!!! and pid = 1655
finish 1 smoke, time = -1.67
start 6 smoke !!!! and pid = 1657
finish 4 smoke, time = -1.51
start 7 smoke !!!! and pid = 1656
finish 5 smoke, time = -1.59
start 8 smoke !!!! and pid = 1655
finish 7 smoke, time = -0.17
start 9 smoke !!!! and pid = 1656
finish 6 smoke, time = -1.17
finish 8 smoke, time = -1.60
finish 9 smoke, time = -1.62
----end-----
[root@server01 process]# 

可以从执行的结果看出来,在进行完毕循环的过程中,将方法加入进程池并不会被堵塞,而是被存储起来了,然后再三个进程进行调用。 从下面的调用顺序来看,以此是 1656 --》 1657 --》 1655 三个子进程依次调用方法。

multiprocessing.Pool常用函数解析:

apply_async(func[, args[, kwds]]) :使用非阻塞方式调用func(并行执行,堵塞方式必须等待上一个进程退出才能执行下一个进程),args为传递给func的参数列表,kwds为传递给func的关键字参数列表;

  • close():关闭Pool,使其不再接受新的任务;
  • terminate():不管任务是否完成,立即终止;
  • join():主进程阻塞,等待子进程的退出, 必须在closeterminate之后使用;

进程池中的Queue - 传递信息:fat boss,come on tobacco

如果要使用Pool创建进程,就需要使用multiprocessing.Manager()中的Queue(),而不是multiprocessing.Queue(),否则会得到一条如下的错误信息:

RuntimeError: Queue objects should only be shared between processes through inheritance.

下面的实例演示了进程池中的进程如何通信:

代码语言:javascript复制
# -*- coding:utf-8 -*-
from multiprocessing import Process
from multiprocessing import Pool,Manager
import time
import os
import random


def fatboy_libai(q):
    print("fatboy_libai pid = %d" % os.getpid()) # 打印子进程号pid
    for i in range(10):
        print("-- %d -- put msg to queue" % i)
        q.put("fat boss,come on tabacco!!")   # 给队列写入内容

def fatboss(q):
    print("fatboss pid = %d" % os.getpid()) # 打印子进程号pid
    print("queue size=",q.qsize())
    # 读取队列信息
    for i in range(q.qsize()):
        print("-- %d -- get msg from queue" % i)
        print("Queue %d data=%s" % (i,q.get(True)))

def main():
    
    # 创建进程池Pool
    po = Pool() # 定义一个进程池
    
    # 创建一个进程池的队列
    q = Manager().Queue() 
   
    # 进程调用肥仔白的方法,将信息写入队列中
    po.apply_async(fatboy_libai,(q,))
   
    time.sleep(1) # 等待一下,先让上面的任务向Queue写入数据,然后再用进程调用胖子老板的方法,取数据

    # 进程调用胖子老板的方法,读取队列信息
    po.apply_async(fatboss,(q,)) 

    po.close() # 关闭进程池
    po.join() # 等待进程池所有子进程执行完毕,必须在close语句之后
    print("----end-----")

if __name__ == "__main__":
   main()

执行如下:

代码语言:javascript复制
[root@server01 process]# python pool3.py 
fatboy_libai pid = 2178
-- 0 -- put msg to queue
-- 1 -- put msg to queue
-- 2 -- put msg to queue
-- 3 -- put msg to queue
-- 4 -- put msg to queue
-- 5 -- put msg to queue
-- 6 -- put msg to queue
-- 7 -- put msg to queue
-- 8 -- put msg to queue
-- 9 -- put msg to queue
fatboss pid = 2178
('queue size=', 10)
-- 0 -- get msg from queue
Queue 0 data=fat boss,come on tabacco!!
-- 1 -- get msg from queue
Queue 1 data=fat boss,come on tabacco!!
-- 2 -- get msg from queue
Queue 2 data=fat boss,come on tabacco!!
-- 3 -- get msg from queue
Queue 3 data=fat boss,come on tabacco!!
-- 4 -- get msg from queue
Queue 4 data=fat boss,come on tabacco!!
-- 5 -- get msg from queue
Queue 5 data=fat boss,come on tabacco!!
-- 6 -- get msg from queue
Queue 6 data=fat boss,come on tabacco!!
-- 7 -- get msg from queue
Queue 7 data=fat boss,come on tabacco!!
-- 8 -- get msg from queue
Queue 8 data=fat boss,come on tabacco!!
-- 9 -- get msg from queue
Queue 9 data=fat boss,come on tabacco!!
----end-----
[root@server01 process]# 

0 人点赞