并行执行(二)、multiprocessing

2022-07-27 10:46:49 浏览数 (1)

multiprocessing模块

multiprocessing包是Python中的多进程管理包。它与 threading.Thread类似,可以利用multiprocessing.Process对象来创建一个进程。该进程可以允许放在Python程序内部编写的函数中。该Process对象与Thread对象的用法相同,拥有is_alive()、join([timeout])、run()、start()、terminate()等方法。属性有:authkey、daemon(要通过start()设置)、exitcode(进程在运行时为None、如果为–N,表示被信号N结束)、name、pid。此外multiprocessing包中也有Lock/Event/Semaphore/Condition类,用来同步进程,其用法也与threading包中的同名类一样。multiprocessing的很大一部份与threading使用同一套API,只不过换到了多进程的情境。这个模块表示像线程一样管理进程,这个是multiprocessing的核心,它与threading很相似,对多核CPU的利用率会比threading好的多。

看一下Process类的构造方法:

代码语言:javascript复制
__init__(self, group=None, target=None, name=None, args=(), kwargs={})

参数说明:

group:进程所属组。基本不用 target:表示调用对象。 args:表示调用对象的位置参数元组。 name:别名 kwargs:表示调用对象的字典。

创建进程的简单实例:

代码语言:javascript复制
#coding=utf-8

import multiprocessing


def do(n) :

 #获取当前线程的名字

 name = multiprocessing.current_process().name

 print name,'starting'

 print "worker ", n

 return


if __name__ == '__main__' :

 numList = []

 for i in xrange(5) :

  p = multiprocessing.Process(target=do, args=(i,))

  numList.append(p)

  p.start()

  p.join()

  print "Process end."

执行结果:

代码语言:javascript复制
Process-1 starting

worker 0

Process end.

Process-2 starting

worker 1

Process end.

Process-3 starting

worker 2

Process end.

Process-4 starting

worker 3

Process end.

Process-5 starting

worker 4

Process end.

创建子进程时,只需要传入一个执行函数和函数的参数,创建一个Process实例,并用其start()方法启动,这样创建进程比fork()还要简单。join()方法表示等待子进程结束以后再继续往下运行,通常用于进程间的同步。

注意:

在Windows上要想使用进程模块,就必须把有关进程的代码写在当前.py文件的if __name__ == ‘__main__' :语句的下面,才能正常使用Windows下的进程模块。Unix/Linux下则不需要。

Pool类

在使用Python进行系统管理时,特别是同时操作多个文件目录或者远程控制多台主机,并行操作可以节约大量的时间。如果操作的对象数目不大时,还可以直接使用Process类动态的生成多个进程,十几个还好,但是如果上百个甚至更多,那手动去限制进程数量就显得特别的繁琐,此时进程池就派上用场了。

Pool类可以提供指定数量的进程供用户调用,当有新的请求提交到Pool中时,如果池还没有满,就会创建一个新的进程来执行请求。如果池满,请求就会告知先等待,直到池中有进程结束,才会创建新的进程来执行这些请求。 下面介绍一下multiprocessing 模块下的Pool类下的几个方法

apply()

函数原型:

代码语言:javascript复制
apply(func[, args=()[, kwds={}]])

该函数用于传递不定参数,主进程会被阻塞直到函数执行结束(不建议使用,并且3.x以后不在出现)。

apply_async()

函数原型:

代码语言:javascript复制
apply_async(func[, args=()[, kwds={}[, callback=None]]])

与apply用法一样,但它是非阻塞且支持结果返回进行回调。

map()

函数原型:

代码语言:javascript复制
map(func, iterable[, chunksize=None])

Pool类中的map方法,与内置的map函数用法行为基本一致,它会使进程阻塞直到返回结果。 注意,虽然第二个参数是一个迭代器,但在实际使用中,必须在整个队列都就绪后,程序才会运行子进程。

close()

关闭进程池(pool),使其不在接受新的任务。

terminate()

结束工作进程,不在处理未处理的任务。

join()

主进程阻塞等待子进程的退出,join方法必须在close或terminate之后使用。

multiprocessing.Pool类的实例:

代码语言:javascript复制
import time

from multiprocessing import Pool

def run(fn):

 #fn: 函数参数是数据列表的一个元素

 time.sleep(1)

 return fn*fn


if __name__ == "__main__":

 testFL = [1,2,3,4,5,6]

 print 'shunxu:' #顺序执行(也就是串行执行,单进程)

 s = time.time()

 for fn in testFL:

  run(fn)


 e1 = time.time()

 print "顺序执行时间:", int(e1 - s)


 print 'concurrent:' #创建多个进程,并行执行

 pool = Pool(5) #创建拥有5个进程数量的进程池

 #testFL:要处理的数据列表,run:处理testFL列表中数据的函数

 rl =pool.map(run, testFL)

 pool.close()#关闭进程池,不再接受新的进程

 pool.join()#主进程阻塞等待子进程的退出

 e2 = time.time()

 print "并行执行时间:", int(e2-e1)

 print rl

 执行结果:

shunxu: 顺序执行时间: 6 concurrent: 并行执行时间: 2 [1, 4, 9, 16, 25, 36]

上例是一个创建多个进程并发处理与顺序执行处理同一数据,所用时间的差别。从结果可以看出,并发执行的时间明显比顺序执行要快很多,但是进程是要耗资源的,所以平时工作中,进程数也不能开太大。

程序中的r1表示全部进程执行结束后全局的返回结果集,run函数有返回值,所以一个进程对应一个返回结果,这个结果存在一个列表中,也就是一个结果堆中,实际上是用了队列的原理,等待所有进程都执行完毕,就返回这个列表(列表的顺序不定)。 对Pool对象调用join()方法会等待所有子进程执行完毕,调用join()之前必须先调用close(),让其不再接受新的Process了。

再看一个实例:

代码语言:javascript复制
import time

from multiprocessing import Pool

def run(fn) :

 time.sleep(2)

 print fn

if __name__ == "__main__" :

 startTime = time.time()

 testFL = [1,2,3,4,5]

 pool = Pool(10)#可以同时跑10个进程

 pool.map(run,testFL)

 pool.close()

 pool.join() 

 endTime = time.time()

 print "time :", endTime - startTime

 执行结果:

代码语言:javascript复制
21


3

4

5

time : 2.51999998093

再次执行结果如下:

代码语言:javascript复制
1

34


2

5

time : 2.48600006104

结果中为什么还有空行和没有折行的数据呢?其实这跟进程调度有关,当有多个进程并行执行时,每个进程得到的时间片时间不一样,哪个进程接受哪个请求以及执行完成时间都是不定的,所以会出现输出乱序的情况。那为什么又会有没这行和空行的情况呢?因为有可能在执行第一个进程时,刚要打印换行符时,切换到另一个进程,这样就极有可能两个数字打印到同一行,并且再次切换回第一个进程时会打印一个换行符,所以就会出现空行的情况。

进程实战实例

并行处理某个目录下文件中的字符个数和行数,存入res.txt文件中, 每个文件一行,格式为:filename:lineNumber,charNumber

代码语言:javascript复制
import os

import time

from multiprocessing import Pool


def getFile(path) :

 #获取目录下的文件list

 fileList = []

 for root, dirs, files in list(os.walk(path)) :

  for i in files :

   if i.endswith('.txt') or i.endswith('.10w') :

    fileList.append(root   "\"   i)

 return fileList


def operFile(filePath) :

 #统计每个文件中行数和字符数,并返回

 filePath = filePath

 fp = open(filePath)

 content = fp.readlines()

 fp.close()

 lines = len(content)

 alphaNum = 0

 for i in content :

  alphaNum  = len(i.strip('n'))

 return lines,alphaNum,filePath


def out(list1, writeFilePath) :

 #将统计结果写入结果文件中

 fileLines = 0

 charNum = 0

 fp = open(writeFilePath,'a')

 for i in list1 :

  fp.write(i[2]   " 行数:"  str(i[0])   " 字符数:" str(i[1])   "n")

  fileLines  = i[0]

  charNum  = i[1]

 fp.close()

 print fileLines, charNum


if __name__ == "__main__":

 #创建多个进程去统计目录中所有文件的行数和字符数

 startTime = time.time()

 filePath = "C:\wcx\a"

 fileList = getFile(filePath)

 pool = Pool(5)

 resultList =pool.map(operFile, fileList)

 pool.close()

 pool.join()


 writeFilePath = "c:\wcx\res.txt"

 print resultList

 out(resultList, writeFilePath)

 endTime = time.time()

 print "used time is ", endTime - startTime

执行结果:

耗时不到1秒,可见多进程并发执行速度是很快的。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持脚本之家。

Queue

Queue是mp库当中用来提供多进程对象交换的方式。对象交换和上一部分当中提到的对象共享都是使多个进程访问同一个对象的方式,两者的区别就是,对象共享是多个进程访问同一个对象,对象交换则是将对象从一个进程传输的另一个进程。

multiprocessing当中的Queue使用方式和Python内置的threading.Queue对象很像,它支持一个put操作,将对象放入Queue,也支持一个get操作,将对象从Queue当中读出。和threading.Queue不同的是,mp.Queue默认不支持join()和task_done操作,这两个支持需要使用mp.JoinableQueue对象。

由于Queue对象负责进程之间的对象传输,因此第一个问题就是如何在两个进程之间共享这个Queue对象本身。在上一部分所言的三种共享方式当中,Queue对象只能使用继承(inheritance)的方式共享。这是因为Queue本身基于unix的Pipe对象实现,而Pipe对象的共享需要通过继承。因此,在一个典型的应用实现模型当中,应该是父进程创建Queue,然后创建子进程共享该Queue,由父进程和子进程分别读写。例如下面的这个例子:

代码语言:javascript复制
import multiprocessing
 
q = multiprocessing.Queue()
 
def reader_proc():
    print q.get()
 
reader = multiprocessing.Process(target=reader_proc)
reader.start()
 
q.put(100)
reader.join()

另一种实现方式是父进程创建Queue,创建多个子进程,有的子进程读Queue,有的子进程写Queue,例如:

代码语言:javascript复制
q = multiprocessing.Queue() 
def writer_proc():    
    try:       
        q.put(1,block = False)    #这个block的选项要加上,默认block为True,读不到就阻塞,会让人感觉像死锁了一样,而操作没有成功会抛出Queue.Full的异常,所以要处理一下   
    except:       
        pass 
def reader_proc():    
    try:       
        q.get(block = False)  #这个block的选项要加上,默认block为True,读不到就阻塞,会让人感觉像死锁了一样,而操作没有成功会抛出Queue.Full的异常,所以要处理一下   
    except:       
        pass
reader = multiprocessing.Process(target=reader_proc)
reader.start()
writer = multiprocessing.Process(target=writer_proc)
writer.start() 
reader.join()
writer.join()

由于使用继承的方式共享Queue,因此代码当中并没有明显的传输Queue对象本身的代码,看起来似乎只要将multiprocessing当中的对象换成threading当中的对象,程序仍然能够工作。反之,拿到一个现有的多线程程序,是不是将threading改成multiprocessing就可以工作呢?也许可以,但是更可能的情况是你会遇到很多问题。 第一个问题就是mp的Queue需要考虑多进程之间的对象传输,因此所传输的对象必须是可以pickle的。否则,在Queue的put操作上会抛出PicklingError。

其他的一些差异表现在一些技术细节上,这些不是任何高层逻辑可以抽象掉的,不知道这些差异会导致一些潜在的错误,例如死锁。在总结这些潜在的犯错的可能的同时,我们会简单看一下mp当中Queue的实现方式,以便能够方便的理解为什么会有这样的行为。这些实现问题仅仅针对Linux,Windows上面的实现和出现的问题在这里不涉及。

mp.Queue建构在系统的Pipe之上,但是实际上进程并不是直接将对象写入到Pipe里面,而是先写入一个本地的buffer,再由一个专门的feed线程将其放入Pipe当中。读取端则是直接从Pipe当中读出对象。之所以有这样一个feed线程,是为了能够提供Queue接口函数所需要的put的超时控制。但是由于这个feed线程的存在,mp.Queue提供了几个额外的函数来控制它,一个函数close来停止该线程,以及join_thread来join该线程。close同时负责把所有在buffer当中的对象刷新到Pipe当中。

但是这个feed线程也是个麻烦制造者,为了保证所有被放入Queue的东西最终都能够到达另外一端的进程,mp库注册了一个atexit的处理函数,用来在进程退出的时候自动close并且join该feed线程。这个join动作带来了很多问题,比如潜在的死锁。考虑下面一种状况:一个父进程创建了两个子进程,一个子进程读,另一个子进程写。当需要停止这些进程的时候,父进程如果先把读进程结束,但是同时写进程已经将太多的对象写入Queue,导致后继的对象等待在buffer当中,则这个进程将无法终止,因为atexit的处理函数等待把所有buffer当中的对象放入Pipe,但是Pipe已经满了,然后陷入了死锁。

有人可能会问,那只要保证总是按照数据流的顺序来停止进程不就行。问题是在很多复杂的系统流程当中,可能存在一个环形的数据流,这种情况下,无论按照什么顺序停止进程,终究有一个进程可能陷入这种情景当中。

幸运的是,Queue对象还提供了一个成员函数cancel_join_thread,这个函数可以使得在进程停止的时候不进行join操作,这样可以避免死锁,代价就是这个时候尚未刷新到Pipe当中的对象都会丢失。鉴于即使调用了join_thread,残留在Pipe当中的对象仍然可能丢失,所以一旦选择使用mp的Queue对象,就不要假设不会在流程当中丢对象了。

另外一个可能的方案是使用mp库当中的SimpleQueue对象。这个对象在文档当中没有提及,但是在multiprocessing.queue模块当中有定义。这个对象就是去掉了buffer的Queue对象,因此可能能够避免上面说的问题的。但是SimpleQueue没有提供put和get的超时处理,两个动作都是阻塞的。

除了使用multiprocessing.Queue,还可以使用multiprocessing.Pipe进行通信。mp.Pipe是Queue的底层结构,但是没有feed线程和put/get的超时控制。一定程度上和SimpleQueue很像。需要注意的是Pipe带有一个参数 duplex,当设置为True(默认)的时候,Pipe并不是使用系统的pipe来实现,而是通过socketpair,即Unix Domain Socket来实现。这个和pipe相比有些微的性能差异。

另外一个使用Queue的方式不是mp库内置的。这种方式使用上一篇文章当中提到的server process的方式来共享一个Queue对象。这个Queue对象实际上在server process当中,所有的子进程通过socket连接到server process获取该Queue的代理对象进行操作。说到这有人会想起来mp库有一个内置的SyncManager对象,可以通过multiprocess.Manager函数获取到,通过该对象的Queue方法可以获取一个Queue的代理对象。不幸的是,这个方法不是正确的获取Queue的方式,原因正如上一篇文章所说,SyncManager.Queue方法的每次调用获取到的是一个新建对象的代理对象,而不是一个共享对象。正确的使用server process当中的Queue的方式是:

共同部分:

代码语言:javascript复制
import multiprocessing.managers as mpm
import Queue
 
class SharedQueueManager(mpm.BaseManager): pass
q = Queue.Queue()
SharedQueueManager.register('Queue', lambda: q)

服务进程:

代码语言:javascript复制
mgr = SharedQueueManager(address=('', 12345))
server = mgr.get_server()
server.serve_forever()

客户进程:

代码语言:javascript复制
mgr = SharedQueueManager(address=('localhost', 12345))
mgr.connect()
q = mgr.Queue() # 这里q就是共享的Queue对象的代理对象

这种方式比起mp库内置的Queue,有一些性能上的影响,因为毕竟牵涉到多次网络通讯,但是带来的好处是没有feed线程带来的一系列问题,而且理论上不会存在丢数据的问题,除非server process崩溃。但是正如上一篇所说,server process本身就不是很靠谱的,因此这里也只是“理论上”不会丢数据而已。 说到性能,这里就列两个性能数据,以前在twitter上面提到过的(这两个连接无法访问的请联系我):

操作对象为 pickle后512字节的对象,通过proxy操作Queue的性能大约是7000次/秒(本机)或1100次/秒(多机),如果使用 multiprocessing.Queue,效率可达54000次/秒。

Queue

使用Queue对象可以实现进程间通信,并且Queue对象是线程及进程安全的:

代码语言:javascript复制
# Written by CSDN: Mars Loo的博客

from multiprocessing import Queue, Process


def func(q):

    q.put([1, 'str', None])


if __name__ == "__main__":

    q = Queue()

    p = Process(target=func, args=(q,))

    p.start()

    p.join()

    print q.get()

如果声明了q = Queue(n)的Queue对象,则该对象的容量为n。

0 人点赞