众所周知,python本身是单线程的,python中的线程处理是由python解释器分配时间片的;但在python 3.0中吸收了开源模块,开始支持系统原生的进程处理——multiprocessing. 注意:这个模块的某些函数需要操作系统的支持,例如,multiprocessing.synchronize模块在某些平台上引入时会激发一个ImportError 1)Process 要创建一个Process是很简单的。
- from multiprocessing import Process
- def f(name):
- print('hello', name)
- if __name__ == '__main__':
- p = Process(target=f, args=('bob',))
- p.start()
- p.join()
要获得一个Process的进程ID也是很简单的。
- from multiprocessing import Process
- import os
- def info(title):
- print title
- print 'module name:', __name__
- print 'parent process:', os.getppid()#这个测试不通过,3.0不支持
- print 'process id:', os.getpid()
- def f(name):
- info('function f')
- print 'hello', name
- if __name__ == '__main__':
- info('main line')
- p = Process(target=f, args=('bob',))
- p.start()
- p.join()
创建进程:multiprocessing.Process([group[, target[, name[, args[, kargs]]]]]) 参数: group: None,它的存在仅仅是为了与threading.Thread兼容 target: 一般是函数 name: 进程名 args: 函数的参数 kargs: keywords参数 函数: run() 默认的run()函数调用target的函数,你也可以在子类中覆盖该函数 start() 启动该进程 join([timeout]) 父进程被停止,直到子进程被执行完毕。 当timeout为None时没有超时,否则有超时。 进程可以被join很多次,但不能join自己 is_alive() terminate() 结束进程。 在Unix上使用的是SIGTERM 在Windows平台上使用TerminateProcess 属性: name 进程名 daemon 守护进程 pid 进程ID exitcode 如果进程还没有结束,该值为None authkey 2)Queue Queue类似于queue.Queue,一般用来进程间交互信息 例子:
- from multiprocessing import Process, Queue
- def f(q):
- q.put([42, None, 'hello'])
- if __name__ == '__main__':
- q = Queue()
- p = Process(target=f, args=(q,))
- p.start()
- print(q.get()) # prints "[42, None, 'hello']"
- p.join()
注意:Queue是进程和线程安全的。 Queue实现了queue.Queue的大部分方法,但task_done()和join()没有实现。 创建Queue:multiprocessing.Queue([maxsize]) 函数: qsize() 返回Queue的大小 empty() 返回一个boolean值表示Queue是否为空 full() 返回一个boolean值表示Queue是否满 put(item[, block[, timeout]]) put_nowait(item) get([block[, timeout]]) get_nowait() get_no_wait() close() 表示该Queue不在加入新的元素 join_thread() cancel_join_thread() 3)JoinableQueue 创建:multiprocessing.JoinableQueue([maxsize]) task_done() join() 4)Pipe
- from multiprocessing import Process, Pipe
- def f(conn):
- conn.send([42, None, 'hello'])
- conn.close()
- if __name__ == '__main__':
- parent_conn, child_conn = Pipe()
- p = Process(target=f, args=(child_conn,))
- p.start()
- print(parent_conn.recv()) # prints "[42, None, 'hello']"
- p.join()
multiprocessing.Pipe([duplex]) 返回一个Connection对象 5)异步化synchronization
- from multiprocessing import Process, Lock
- def f(l, i):
- l.acquire()
- print('hello world', i)
- l.release()
- if __name__ == '__main__':
- lock = Lock()
- for num in range(10):
- Process(target=f, args=(lock, num)).start()
6)Shared Memory
- from multiprocessing import Process, Value, Array
- def f(n, a):
- n.value = 3.1415927
- for i in range(len(a)):
- a[i] = -a[i]
- if __name__ == '__main__':
- num = Value('d', 0.0)
- arr = Array('i', range(10))
- p = Process(target=f, args=(num, arr))
- p.start()
- p.join()
- print(num.value)
- print(arr[:])
1>Value 2>Array 7)Manager
- from multiprocessing import Process, Manager
- def f(d, l):
- d[1] = '1'
- d['2'] = 2
- d[0.25] = None
- l.reverse()
- if __name__ == '__main__':
- manager = Manager()
- d = manager.dict()
- l = manager.list(range(10))
- p = Process(target=f, args=(d, l))
- p.start()
- p.join()
- print(d)
- print(l)
8)Pool
- from multiprocessing import Pool
- def f(x):
- return x*x
- if __name__ == '__main__':
- pool = Pool(processes=4) # start 4 worker processes
- result = pool.apply_async(f, [10]) # evaluate "f(10)" asynchronously
- print result.get(timeout=1) # prints "100" unless your computer is *very* slow
- print pool.map(f, range(10)) # prints "[0, 1, 4,..., 81]"
multiprocessing.Pool([processes[, initializer[, initargs]]]) 函数: apply(func[, args[, kwds]]) apply_async(func[, args[, kwds[, callback]]]) map(func,iterable[, chunksize]) map_async(func,iterable[, chunksize[, callback]]) imap(func, iterable[, chunksize]) imap_unordered(func, iterable[, chunksize]) close() terminate() join()
- from multiprocessing import Pool
- def f(x):
- return x*x
- if __name__ == '__main__':
- pool = Pool(processes=4) # start 4 worker processes
- result = pool.apply_async(f, (10,)) # evaluate "f(10)" asynchronously
- print(result.get(timeout=1)) # prints "100" unless your computer is *very* slow
- print(pool.map(f, range(10))) # prints "[0, 1, 4,..., 81]"
- it = pool.imap(f, range(10))
- print(next(it)) # prints "0"
- print(next(it)) # prints "1"
- print(it.next(timeout=1)) # prints "4" unless your computer is *very* slow
- import time
- result = pool.apply_async(time.sleep, (10,))
- print(result.get(timeout=1)) # raises TimeoutError
9)杂项 multiprocessing.active_children() 返回所有活动子进程的列表 multiprocessing.cpu_count() 返回CPU数目 multiprocessing.current_process() 返回当前进程对应的Process对象 multiprocessing.freeze_support() multiprocessing.set_executable() 10)Connection对象 send(obj) recv() fileno() close() poll([timeout]) send_bytes(buffer[, offset[, size]]) recv_bytes([maxlength]) recv_bytes_info(buffer[, offset])
- >>> from multiprocessing import Pipe
- >>> a, b = Pipe()
- >>> a.send([1, 'hello', None])
- >>> b.recv()
- [1, 'hello', None]
- >>> b.send_bytes('thank you')
- >>> a.recv_bytes()
- 'thank you'
- >>> import array
- >>> arr1 = array.array('i', range(5))
- >>> arr2 = array.array('i', [0] * 10)
- >>> a.send_bytes(arr1)
- >>> count = b.recv_bytes_into(arr2)
- >>> assert count == len(arr1) * arr1.itemsize
- >>> arr2
- array('i', [0, 1, 2, 3, 4, 0, 0, 0, 0, 0])