进程池Pool
当需要创建的子进程数量不多时,可以直接利用multiprocessing
中的Process
动态成生多个进程,但如果是上百甚至上千个目标,手动的去创建进程的工作量巨大,此时就可以用到multiprocessing
模块提供的Pool方法。
初始化Pool时,可以指定一个最大进程数,当有新的请求提交到Pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到指定的最大值,那么该请求就会等待,直到池中有进程结束,才会用之前的进程来执行新的任务,请看下面的实例:
代码语言:javascript复制# -*- coding:utf-8 -*-
from multiprocessing import Process
from multiprocessing import Pool
import time
import os
import random
def smoke(num):
t_start = time.time() # 记录开始时间
print("start %d smoke !!!! and pid = %d" % (num,os.getpid()))
time.sleep(random.random()*2) # random.random随机生成0~1之间的浮点数
t_stop = time.time()
print("finish %d smoke, time = %0.2f" % (num,(t_start-t_stop)))
def main():
# 创建进程池Pool
po = Pool(3) # 定义一个进程池,最大进程数为3
# 编写一个循环,加入进程池中
for i in range(0,10):
print("------循环 %d --------" % i)
# Pool().apply_async(调用的目标函数,(传递的参数元组))
# 每次循环会用空闲出来的子进程去调用目标
po.apply_async(smoke,(i,))
print("----start-----")
po.close() # 关闭进程池
po.join() # 等待进程池所有子进程执行完毕,必须在close语句之后
print("----end-----")
if __name__ == "__main__":
main()
执行如下:
代码语言:javascript复制[root@server01 process]# python pool.py
------循环 0 --------
------循环 1 --------
------循环 2 --------
------循环 3 --------
------循环 4 --------
------循环 5 --------
------循环 6 --------
------循环 7 --------
------循环 8 --------
------循环 9 --------
----start-----
start 0 smoke !!!! and pid = 1656
start 1 smoke !!!! and pid = 1657
start 2 smoke !!!! and pid = 1655
finish 0 smoke, time = -0.10
start 3 smoke !!!! and pid = 1656
finish 3 smoke, time = -0.87
start 4 smoke !!!! and pid = 1656
finish 2 smoke, time = -0.98
start 5 smoke !!!! and pid = 1655
finish 1 smoke, time = -1.67
start 6 smoke !!!! and pid = 1657
finish 4 smoke, time = -1.51
start 7 smoke !!!! and pid = 1656
finish 5 smoke, time = -1.59
start 8 smoke !!!! and pid = 1655
finish 7 smoke, time = -0.17
start 9 smoke !!!! and pid = 1656
finish 6 smoke, time = -1.17
finish 8 smoke, time = -1.60
finish 9 smoke, time = -1.62
----end-----
[root@server01 process]#
可以从执行的结果看出来,在进行完毕循环的过程中,将方法加入进程池并不会被堵塞,而是被存储起来了,然后再三个进程进行调用。
从下面的调用顺序来看,以此是 1656 --》 1657 --》 1655
三个子进程依次调用方法。
multiprocessing.Pool常用函数解析:
apply_async(func[, args[, kwds]])
:使用非阻塞方式调用func
(并行执行,堵塞方式必须等待上一个进程退出才能执行下一个进程),args
为传递给func的参数列表,kwds
为传递给func的关键字参数列表;
-
close()
:关闭Pool
,使其不再接受新的任务; -
terminate()
:不管任务是否完成,立即终止; -
join()
:主进程阻塞,等待子进程的退出, 必须在close
或terminate
之后使用;
进程池中的Queue - 传递信息:fat boss,come on tobacco
如果要使用Pool
创建进程,就需要使用multiprocessing.Manager()
中的Queue()
,而不是multiprocessing.Queue()
,否则会得到一条如下的错误信息:
RuntimeError: Queue objects should only be shared between processes through inheritance.
下面的实例演示了进程池中的进程如何通信:
代码语言:javascript复制# -*- coding:utf-8 -*-
from multiprocessing import Process
from multiprocessing import Pool,Manager
import time
import os
import random
def fatboy_libai(q):
print("fatboy_libai pid = %d" % os.getpid()) # 打印子进程号pid
for i in range(10):
print("-- %d -- put msg to queue" % i)
q.put("fat boss,come on tabacco!!") # 给队列写入内容
def fatboss(q):
print("fatboss pid = %d" % os.getpid()) # 打印子进程号pid
print("queue size=",q.qsize())
# 读取队列信息
for i in range(q.qsize()):
print("-- %d -- get msg from queue" % i)
print("Queue %d data=%s" % (i,q.get(True)))
def main():
# 创建进程池Pool
po = Pool() # 定义一个进程池
# 创建一个进程池的队列
q = Manager().Queue()
# 进程调用肥仔白的方法,将信息写入队列中
po.apply_async(fatboy_libai,(q,))
time.sleep(1) # 等待一下,先让上面的任务向Queue写入数据,然后再用进程调用胖子老板的方法,取数据
# 进程调用胖子老板的方法,读取队列信息
po.apply_async(fatboss,(q,))
po.close() # 关闭进程池
po.join() # 等待进程池所有子进程执行完毕,必须在close语句之后
print("----end-----")
if __name__ == "__main__":
main()
执行如下:
代码语言:javascript复制[root@server01 process]# python pool3.py
fatboy_libai pid = 2178
-- 0 -- put msg to queue
-- 1 -- put msg to queue
-- 2 -- put msg to queue
-- 3 -- put msg to queue
-- 4 -- put msg to queue
-- 5 -- put msg to queue
-- 6 -- put msg to queue
-- 7 -- put msg to queue
-- 8 -- put msg to queue
-- 9 -- put msg to queue
fatboss pid = 2178
('queue size=', 10)
-- 0 -- get msg from queue
Queue 0 data=fat boss,come on tabacco!!
-- 1 -- get msg from queue
Queue 1 data=fat boss,come on tabacco!!
-- 2 -- get msg from queue
Queue 2 data=fat boss,come on tabacco!!
-- 3 -- get msg from queue
Queue 3 data=fat boss,come on tabacco!!
-- 4 -- get msg from queue
Queue 4 data=fat boss,come on tabacco!!
-- 5 -- get msg from queue
Queue 5 data=fat boss,come on tabacco!!
-- 6 -- get msg from queue
Queue 6 data=fat boss,come on tabacco!!
-- 7 -- get msg from queue
Queue 7 data=fat boss,come on tabacco!!
-- 8 -- get msg from queue
Queue 8 data=fat boss,come on tabacco!!
-- 9 -- get msg from queue
Queue 9 data=fat boss,come on tabacco!!
----end-----
[root@server01 process]#