python 之进程与线程

2019-08-01 10:40:47 浏览数 (1)

进程和线程的概念

从系统调度和资源分配的角度来看,进程是 CPU 资源分配的最小单位,线程是 CPU 调度的最小单位。从 CPU 执行时间的角度来看,进程是包含了上下文切换的程序执行时间总和,线程是共享了进程的上下文环境的更为细小的 CPU 时间段。

多进程

Python 使用 multiprocessing 包来实现多进程。使用 multiprocessing 包我们只需要定义一个函数(Python 会完成其他所有事情)即可完成从单进程到多进程的转换。

使用 Process 类创建子进程

Python 使用 multiprocessing 模块提供的 Process 类来代表一个进程对象。

建立一个进程对象使用 Process(group=None, target=None, name=None, args=(), kwargs={}),其中group总是设置为None。target表示调用对象。name为别名。args表示调用对象的位置参数元组。kwargs表示调用对象的字典。

Process 类含有以下方法:

  1. start 启动进程
  2. join 等待进程执行完毕
  3. is_alive 进程是否处于活动状态
  4. terminate 终止进程,用在进程为死循环的情况下,手动终止进程。
代码语言:javascript复制
from multiprocessing import Process
import os

def info(title):
    print title
    print 'module name:', __name__
    if hasattr(os, 'getppid'):
        print 'parent process:', os.getppid()
    print 'process id:', os.getpid()

def f(name):
    info('function f')
    print 'hello', name

if __name__ == '__main__':
    info('main line')
    # 建立一个进程对象
    p = Process(target=f, args=('bob',))
    # 启动新建的进程
    p.start()
    print p.is_alive()
    # 等待进程执行完毕
    p.join()
    print p.is_alive()

运行结果如下:

代码语言:javascript复制
main line
module name: __main__
parent process: 657
process id: 767
True
function f
module name: __main__
parent process: 767
process id: 768
hello bob
False

使用 Pool 类启动多个子进程

若果需要批量启动多个子进程,可以使用进程池的方式批量创建子进程

代码语言:javascript复制
from multiprocessing import Pool, TimeoutError
import time
import os

def f(x):
    return x * x

if __name__ == '__main__':
    pool = Pool(processes=4)

    print pool.map(f, range(10))

    for i in pool.imap_unordered(f, range(10)):
        print i

    res = pool.apply_async(f, (20,))
    print res.get(timeout = 1)

    res = pool.apply_async(os.getpid, ())
    print res.get(timeout = 1)

    multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
    print [res.get(timeout=1) for res in multiple_results]

    res = pool.apply_async(time.sleep, (10,))
    try:
        print res.get(timeout = 1)
    except TimeoutError:
        print 'We lacked patience and got a multiporcessing.TimoutError'

运行结果如下:

代码语言:javascript复制
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
0
1
4
9
16
25
36
64
49
81
400
944
[946, 945, 947, 944]
We lacked patience and got a multiporcessing.TimoutError

进程间通信

multiprocessing 支持 Queues 和 Pipes 两种类型进行进程间通信。

Pipe 既管道模式,调用 pipe 将返回管道的两端的两个链接。pipe 仅仅适用于只有两个进程且一读一写的单双工模式,既信息只能从一个方向向另一个方向流动。

pipe 适用于读写小于要求高的一读一写的单双工模式。

代码语言:javascript复制
from multiprocessing import Process, Pipe

def f(conn):
    # 向主进程发送信息
    conn.send([42, None, 'hello'])
    conn.close()

if __name__ == '__main__':
    # 建立 Pipe 并获取连接
    parent_conn, child_conn = Pipe()
    # 建立子进程
    p = Process(target=f, args=(child_conn,))
    p.start()
    # 获取子进程发送的信息
    print parent_conn.recv()
    p.join()

运行结果如下:

代码语言:javascript复制
[42, None, 'hello']

Queue 在 python 中是基于 Pipe 实现的,Queue 也是一边发送一边接收,Queue 与 Pipe 最大的区别是 Queue 允许同时又多个进程发送和接收数据。

代码语言:javascript复制
from multiprocessing import Process, Queue
import os, time, random

# 写数据进程执行的代码:
def write(q):
    print('Process to write: %s' % os.getpid())
    for value in ['A', 'B', 'C']:
        print('Put %s to queue...' % value)
        q.put(value)
        time.sleep(random.random())

# 读数据进程执行的代码:
def read(q):
    print('Process to read: %s' % os.getpid())
    while True:
        value = q.get(True)
        print('Get %s from queue.' % value)

if __name__=='__main__':
    # 父进程创建Queue,并传给各个子进程:
    q = Queue()
    pw = Process(target=write, args=(q,))
    pr = Process(target=read, args=(q,))
    # 启动子进程pw,写入:
    pw.start()
    # 启动子进程pr,读取:
    pr.start()
    # 等待pw结束:
    pw.join()
    # pr进程里是死循环,无法等待其结束,只能强行终止:
    pr.terminate()

运行结果如下:

代码语言:javascript复制
Process to write: 1046
Put A to queue...
Process to read: 1047
Get A from queue.
Put B to queue...
Get B from queue.
Put C to queue...
Get C from queue.

Queue 是一个线程和进程安全的队列。 Queues are thread and process safe.

多线程

相比于进程,线程更加的轻量,可以实现并发。Python 中的多线程实时上并非真正的多线程,这要从全局解释器锁(GIL)说起,Python 使用的解释器 Cpython 的线程是操作系统的原生线程,在解释器内执行的 Python 代码,都需要获取全局解释器锁才能执行,只有在遇到 I/O 操作时会释放全局解释器锁,由于 Python 的进程做为一个整体,因此解释器进程内只有一个线程在执行,其它的线程都处于等待状态等着全局解释器锁的释放。

若要尽量使用 CPU 资源使用多进程是不错的选择,对于 I/O 密集型操作使用多线程是不错的选择,Python 不适用于计算密集型应用。

在 Python 中使用 Threading 模块(Threading 模块是 Thread 的增强版本,Threading 从 Python 1.5.2 版本开始加入)来实现多线程操作。

使用 Threading 创建线程

看以下简单示例:

代码语言:javascript复制
import threading

def threadHandler(number):
    print 'thread %s is running...' % threading.current_thread().name
    print number * 2
    print 'thread %s ended.' % threading.current_thread().name

if __name__ == '__main__':
    print 'thread %s is runnging...' % threading.current_thread().name
    for i in range(5):
        my_thread = threading.Thread(target=threadHandler, args=(i,))
        my_thread.start()
        print 'thread %s ended.' % threading.current_thread().name

运行结果如下:

代码语言:javascript复制
thread MainThread is runnging...
thread Thread-1 is running...
0
thread Thread-1 ended.
 thread MainThread ended.
thread Thread-2 is running...
2
thread Thread-2 ended.
thread MainThread ended.
thread Thread-3 is running...
4
thread Thread-3 ended.
thread MainThread ended.
thread Thread-4 is running...
6
thread Thread-4 ended.
 thread MainThread ended.
thread Thread-5 is running...
8
 thread MainThread ended.
thread Thread-5 ended.

由于任何进程默认都会启动一个线程,我们把该线程称为主线程,主线程又可以启动新的线程,Python 的 threading 模块的 current_thread() 函数,返回当前线程的实例。

线程锁与线程同步

由于线程共享了进程的上下文环境,所以在多线程中,所有变量都由所有线程共享,任何一个变量都可以被任何一个线程修改。因此,线程之间共享数据最大的风险在于多个线程同个更改一个变量,将内容给改乱了。

我们来看一个多线程操作变量的示例:

代码语言:javascript复制
import threading, time

total = 0

def update():
    global total
    for i in range(10000):
        total = total   i
        total = total - i

if __name__ == '__main__':
    for i in range(10):
        thread = threading.Thread(target=update, args=())
        thread.start()
    print total

以上示例的设计预期最终 total 的值应该为 0,但是在实际运行中会发现 total 的实际运行结果每次均不相同。造成这种情况的原因是由于 total 在多个线程中被修改造成存储内容的混乱。

现在我们在该示例代码中加入线程锁,有两种方法可以实现 try/finally 和 with。

以下示例使用 try/finally 来实现增加线程锁。

代码语言:javascript复制
import threading, time

total = 0
lock = threading.Lock()

def update():
    global total
    for i in range(10000):
        lock.acquire()
        try:
            total = total   i
            total = total - i
        finally:
            lock.release()

if __name__ == '__main__':
    for i in range(10):
        my_thread = threading.Thread(target=update, args=())
        my_thread.start()
    print total

以下示例使用 with来实现增加线程锁。

代码语言:javascript复制
import threading, time

total = 0
lock = threading.Lock()

def update():
    global total
    for i in range(10000):
        with lock:
            total = total   i
            total = total - i

if __name__ == '__main__':
    for i in range(10):
        my_thread = threading.Thread(target=update, args=())
        my_thread.start()
    print total

在使用 try/finally 和 with 增加锁后,以上代码的运行结果均符合预期。

线程间通信

Queue 模块用来实现消息队列功能,可以实现线程间安全的消息交换。各个线程可以通过调用消息队列实例对消息队列进行操纵。

代码语言:javascript复制
from Queue import Queue
import threading
import os, time, random

# 写数据线程执行的代码:
def write(q):
    print 'thread to write: %s' % threading.current_thread().name
    for value in ['A', 'B', 'C']:
        print 'Put %s to queue...' % value
        q.put(value)


# 读数据线程执行的代码:
def read(q):
    print 'Process to read: %s' % threading.current_thread().name
    while q.empty() == False:
        value = q.get(True)
        print 'Get %s from queue.' % value

if __name__=='__main__':
    # 父线程创建Queue,并传给各个子进程:
    q = Queue()
    pw = threading.Thread(target=write, args=(q,))
    pr = threading.Thread(target=read, args=(q,))
    # 启动子线程pw,写入:
    pw.start()
    # 等待pw结束:
    pw.join()
    # 启动子线程pr,读取:
    pr.start()

运行结果如下:

代码语言:javascript复制
thread to write: Thread-1
Put A to queue...
Put B to queue...
Put C to queue...
Process to read: Thread-2
Get A from queue.
Get B from queue.
Get C from queue.

0 人点赞