1.基本介绍
multiprocessing模块可以衍生出子进程。multiprocessing模块同时提供本地和远程的并发操作。multiprocessing模块不像threading模块那样会受到GIL全局解释器锁的限制,它使用进程代替线程。基于这样的特性,multiprocessing模块可以让程序员在一台服务器上使用多个处理器。
代码语言:javascript复制In [106]: from multiprocessing import Pool
In [107]: def f(x):
.....: return x*x;
.....:
In [108]: if __name__ == '__main__':
.....: p=Pool(5)
.....: print(p.map(f,[1,2,3]))
.....:
[1, 4, 9]
在multiprocessing模块中,子进程是通过一个Process对象生成的。然后调用start()函数
代码语言:javascript复制In [116]: from multiprocessing import Process
In [117]: def f(name):
.....: print 'hello',name
.....:
In [118]: if __name__ == '__main__':
.....: p=Process(target=f,args=('john',))
.....: p.start()
.....: p.join()
.....:
hello john
如果要查看各自的进程ID,可以使用以下代码
代码语言:javascript复制#!/usr/sbin/python
from multiprocessing import Process
import os
def info(title):
print title
print 'module name:',__name__
if hasattr(os,'getppid'):
print 'parent process:',os.getppid()
print 'process id:', os.getpid()
def f(name):
info('function f')
print 'hello',name
if __name__ == '__main__':
info('main line')
p = Process(target=f,args=('john',))
p.start()
p.join()
代码语言:javascript复制main line
module name: __main__
parent process: 17148
process id: 18168
function f
module name: __main__
parent process: 18168
process id: 18169
hello john
2.进程间通信
multiprocessing模块支持Queues和Pipes两种方式来进行进程间通信
使用Queue
代码语言:javascript复制In [123]: from multiprocessing import Process,Queue
In [124]: def f(q):
.....: q.put([42,None,'hello'])
.....:
In [125]: if __name__ == '__main__':
.....: q=Queue()
.....: p=Process(target=f,args=(q,))
.....: p.start()
.....: print q.get()
.....: p.join()
.....:
[42, None, 'hello']
使用Queues,对于线程和进程来说都是安全的
使用Pipe
代码语言:javascript复制In [136]: from multiprocessing import Process,Pipe
In [137]: def f(conn):
.....: conn.send([42,None,'hello'])
.....: conn.close()
.....:
In [138]: if __name__ == '__main__':
.....: parent_conn,child_conn=Pipe()
.....: p=Process(target=f,args=(child_conn,))
.....: p.start()
.....: print parent_conn.recv()
.....: p.join()
.....:
[42, None, 'hello']
Pipe()返回一对连接对象,这两个连接对象分别代表Pipe的两端。每个连接对象都有send()和recv()方法。需要注意的是如果两个不同的进程在同一时间对同一个Pipe的末端或者连接对象进行读写操作,那么Pipe中的数据可能被损坏。不同的进程在不同的末端同一时间读写数据不会造成数据损坏。
3.进程间同步
代码语言:javascript复制In [143]: from multiprocessing import Process,Lock
In [144]: def f(l,i):
l.acquire()
print 'hello world',i
l.release()
.....:
In [145]: if __name__ == '__main__':
lock=Lock()
for num in range(10):
Process(target=f,args=(lock,num)).start()
.....:
hello world 0
hello world 1
hello world 2
hello world 3
hello world 4
hello world 5
hello world 6
hello world 7
hello world 8
hello world 9
4.进程间共享状态信息
在进行并发编程的过程中,尽量不要使用共享状态。如果一定要在进程间共享数据,multiprocessing模块提供了一些方法。
共享内存
代码语言:javascript复制In [11]: from multiprocessing import Process,Value,Array
In [12]: def f(n,a):
....: n.value=3.1415927
....: for i in range(len(a)):
....: a[i] = -a[i]
....:
In [13]: if __name__ == '__main__':
....: num=Value('d',0.0)
....: arr=Array('i',range(10))
....: p=Process(target=f,args=(num,arr))
....: p.start()
....: p.join()
....: print num.value
....: print arr[:]
....:
3.1415927
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]
共享进程
代码语言:javascript复制In [27]: from multiprocessing import Process,Manager
In [28]: def f(d,l):
d[1] = '1'
d['2']=2
d[0.25]=None
l.reverse()
....:
In [29]: if __name__ == '__main__':
manager=Manager()
d=manager.dict()
l=manager.list(range(10))
p=Process(target=f,args=(d,l))
p.start()
p.join()
print d
print l
....:
{0.25: None, 1: '1', '2': 2}
[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]
5.使用一组工作进程
使用Pool对象会创建一组worker进程
代码语言:javascript复制from multiprocessing import Pool, TimeoutErrorimport timeimport osdef f(x):
return x*xif __name__ == '__main__':
pool = Pool(processes=4) # start 4 worker processes
# print "[0, 1, 4,..., 81]"
print pool.map(f, range(10))
# print same numbers in arbitrary order
for i in pool.imap_unordered(f, range(10)):
print i
# evaluate "f(20)" asynchronously
res = pool.apply_async(f, (20,)) # runs in *only* one process
print res.get(timeout=1) # prints "400"
# evaluate "os.getpid()" asynchronously
res = pool.apply_async(os.getpid, ()) # runs in *only* one process
print res.get(timeout=1) # prints the PID of that process
# launching multiple evaluations asynchronously *may* use more processes
multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
print [res.get(timeout=1) for res in multiple_results]
# make a single worker sleep for 10 secs
res = pool.apply_async(time.sleep, (10,))
try:
print res.get(timeout=1)
except TimeoutError:
print "We lacked patience and got a multiprocessing.TimeoutError"
代码语言:javascript复制[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
0
1
4
9
25
36
64
49
81
16
400
27150
[27149, 27152, 27151, 27150]
We lacked patience and got a multiprocessing.TimeoutError
Pool对象的函数只能是创建它的进程可以使用。
multiprocessing模块的提供的函数需要子进程可以导入__main__模块
参考文档:
https://docs.python.org/2/library/multiprocessing.html