第36天并发编程之进程篇

2020-01-20 14:41:07 浏览数 (1)

目录:

1. 基础概念

  2. 创建进程和结束进程

  3. 进程之间内存空间物理隔离

  4. 进程的属性方法

  5. 守护进程

  6. 互斥锁

  7. IPC通信机制

  8. 生产者消费者模型

一. 基础概念

代码语言:javascript复制
1. 什么叫做程序,什么叫做进程?
  
  程序就是程序员写的一堆代码文件。
  进程指的是程序正在执行的一个过程,是一个抽象的概念,起源于操作系统。

2. 什么是操作系统?
  
  操作系统是位于计算机硬件与应用程序之间的,用于协调,控制和管理计算机硬件和软件资源的控制程序

3. 操作系统的两大作用
  
  (1). 将复杂丑陋的硬件操作都封装成简单的接口,提供给应用程序使用,大大的提高了应用程序的开发效率
  (2). 把进程对硬件的竞争变得有序

4. 批处理系统
    
  将程序员写的程序攒成一堆,然后一个一个的读到内存里面进行执行。
  解决了第一代操作系统一个人独占计算机资源的问题,虽然节省了大量的时间,但是本质上还是串行
  此时已经出现了操作系统的概念,以及进程的雏形。
  
5. 多道技术
  
  产生背景:在单核下实现并发的效果。
  两大核心:
    空间复用:
      将多个程序同时读入到内存中,等待被cpu执行。此时也就出现了多进程的概念。
      特点:每个程序的内存空间都是物理隔离的。
    时间复用:  
      复用cpu的时间片
      cpu什么时候会切换进程的执行?
        1.在遇到i/0阻塞的时候就会暂时的挂起此进程,切换到另一个进程去执行
        2.正在执行的进程占用cpu的时间过长,或者有一个优先级更高的进程出现的时候也会切换执行
  优点:大大的提高了计算机cpu的利用率,实现了并发的效果。是当代计算机操作系统的雏形。

6. 串行,并行,并发,阻塞
  
  串行:程序从上到下依次执行。批处理系统典型的就是串行。
  并行:同一时刻运行多个程序。如果只有一个核是不可能实现并行的,只有多核的时候才能真正的实现并行。
  并发:同一时间段内运行多个程序,用户看起来就像是并行一样。
    并发实现的本质:保存当前的状态   cpu的切换
  阻塞:遇到i/0就进入阻塞状态。

7. 进程的属性
  
  pid: 用来唯一的表示一个进程,就像是身份证号。
  name: 进程名称
  terminate: 杀死当前子进程
  is_alive: 查看子进程是否还活着
  join: 等待子进程结束
  start: 向系统发送一个创建子进程的系统调用
  daemon: 将子进程设置成守护进程

  方法:
    os.getpid()  获得当前进程的pid
    os.getppid() 获得当前进程的父进程pid

8. 进程相关的win的命令
  tasklist: 查看当前进程信息
  taskkill /F /PID 进程号    杀死一个进程
  tasklist |findstr pyth   通过管道查看相应的值

9. 僵尸进程和孤儿进程
  
  僵尸进程:子进程死了,但是父进程还没有死,此时的子进程就称之为僵尸进程。
    任何进程死了之后都会回收相应的资源,但是对于一些基本的信息是不会回收的,例如pid,name等,以被父进程所查看,这就存在一个问题。
    当父进程创建了大量的子进程,而父进程又很长的时间内不会不会死掉,对于内存空间的占用倒不是很严重,但是会占用大量的pid,而pid的资源是有限的。
    因此我们的程序中不应该出现大量的僵尸进程,如果父进程需要运行很长的时间,我们就需要在适当的时候回收子进程的资源,防止出现大量的无用僵尸进程。
    join方法就会提供回收僵尸进程的功能。 

  孤儿进程:父进程死了,但是子进程没有死,这是的子进程就称之为孤儿进程。
    孤儿进程没有害,因为孤儿进程会被init进程所接管,在一定的时间之后会被清理掉。但是程序中也不应该出现大量的孤儿进程。

 二. 创建进程和结束进程

代码语言:javascript复制
进程创建的方式
  
  1.系统初始化,在开机的时候自动就会加载操作系统,此时就会出现一个根进程。
  2.用户的交互式请求,例如在电脑上双击暴风影音,就是创建了一个进程,再双击就又创建了一个进程。
  3.一个批处理作业的初始化(专用计算机的系统加载)
  
  4.一个进程运行的过程中通过模块重新开启一个子进程 (我们关注的是这种方式的进程的创建)
    linux: fork
      在初始化的时候子进程和父进程是完全一样的。 
    win:CreateProcess
      初始状态的时候子进程和父进程并不是完全一样的。

进程的结束方式
  
  1. 正常退出
  2. 出错退出
  3. 被另一个进程杀死

第一种创建进程的方式

代码语言:javascript复制
from multiprocessing import Process
import time

def task(name):
    print('%s is running...' %name)
    # 模拟子进程执行了一系列的操作
    time.sleep(2)
    print('%s is ending...' %name)

# 在windows下面必须要这样写
# 这是因为在win下创建一个进程的时候会重新执行一遍此模块
# 为了防止循环创建,所以必须要在此地方创建子进程
if __name__ == '__main__':
    # 创建了一个进程对象
    # target代表的是子进程要执行的任务,一般是函数名称
    # args:里面的值是给函数传递的参数
    p = Process(target=task, args=('egon',))
    # 此处并不是直接创建子进程,而是向操作系统发送了一个系统调用
    # 操作系统会申请一个内存空间,创建一个子进程
    # 对于主进程而言,这行代码就像是平常的代码一样,主进程并不会等待子进程的创建,然后就去继续执行了
    p.start()
    # 因此结果是先打印下面的提示信息,之后才会去打印task函数内的东西
    print('p.start()代码一旦执行完,我是不会等系统创建子进程的,我立马就要执行')

第二种创建进程的方式

代码语言:javascript复制
from multiprocessing import Process
import time


class MyProcess(Process):
    def __init__(self, name):
        # 重用父类的功能,然后传递自己函数所需要的参数
        super(MyProcess, self).__init__()
        self.name = name

    # p.start()其实就是系统去调用run函数,因此我们将之前的内容放到run函数里面
    def run(self):
        print('%s is running...' %self.name)
        # 模拟子进程执行了一系列的操作
        time.sleep(2)
        print('%s is ending...' %self.name)


if __name__ == '__main__':
    p = MyProcess('egon')
    p.start()
    print('主进程')

三. 进程之间内存空间物理隔离

代码语言:javascript复制
from multiprocessing import Process
x = 100

def task():
    global x
    x = 0
    print('子进程结束...')

if __name__ == '__main__':
    p = Process(target=task)
    p.start()
    # 等待子进程结束之后在去打印x的值
    p.join()
    # 如果空间是共享的,等待子进程结束之后x的值应该是0
    # 如果空间是隔离的,子进程结束之后x的值还是100
    print(x)

四. 进程的属性方法

 join方法

代码语言:javascript复制
from multiprocessing import Process
import time

def task():
    print('子进程开始...')
    time.sleep(2)

if __name__ == '__main__':
    p = Process(target=task)
    p.start()
    # 如果没有join,就会先打印x,然后才会去执行task函数
    # 有了join之后,主进程就会阻塞在这里,等待子进程p结束之后才会打印x
    p.join()
    print('主进程.....')

# 结果:
# 子进程开始...
代码语言:javascript复制
from multiprocessing import Process
import time

def task(n):
    print('%s开始...' %n)
    time.sleep(n)

if __name__ == '__main__':
    start = time.time()
    p1 = Process(target=task, args=(1, ))  # p1睡了1s
    p2 = Process(target=task, args=(2, ))  # p2睡了2s
    p3 = Process(target=task, args=(3, ))  # p3睡了3s

    p1.start()
    p2.start()
    p3.start()

    # 程序运行到这个地方的时候p1,p2, p3可能都已经开始执行了
    # 无论主进程是在等谁,所有的子进程都是会执行的
    # 也就是说在等待p1的过程中子进程p3也在执行,因此整个程序的执行时间应该是最耗时的子进程时间
    # 此处也就是三秒多
    p1.join()  # 等待子进程1s
    p2.join()  # 等待子进程2s
    p3.join()  # 等待子进程3s
    end= time.time()
    print('执行时间>>', start - end)


# 运行结果:
# 2开始...
# 1开始...
# 3开始...
# 执行时间>> -3.2581233978271484

join容易迷惑的地方

代码语言:javascript复制
from multiprocessing import Process
import time

def task():
    print('子进程开始...')
    time.sleep(2)

if __name__ == '__main__':
    # 如果循环的创建了子进程,需要等待所有进程结束我们就需要通过一个循环去等待
    l = []
    for i in range(5):
        p = Process(target=task)
        l.append(p)
        p.start()

    for p in l:
        p.join()
    print('主')
# 结果:
# 子进程开始...
# 主进程.....

join循环等待子进程结束

自定义查看ppid的方法(待填)

五. 守护进程

代码语言:javascript复制
1. 什么是守护进程
  
   obj = Process(target=lambda x: x   1)  # 创建一个子进程
   obj.daemon = True   # 当设置了此属性之后,这个子进程就会变成一个守护进程
   效果: 当主进程结束之后,守护进程就会随着结束

2. 为什么要有守护进程?
  
  我们创建一个子进程就是为了并发的执行多个任务,有时候我们的子任务在主任务结束之后就没有存在的必要了,因此,在主程序结束之后,我们往往希望可以自动的结束掉这些子进程,因此就有了守护进程。
  例如:当我们通过qq在传一个文件的时候,qq是主进程,传文件是子进程,当qq退出去之后还应该会传文件吗,肯定不会,所以此时就应该把传文件设置成一个守护进程,当qq退出去的时候自动的关掉子进程。

3. 重点
  
  如果一个主进程中既有守护进程也有非守护进程,那么当主进程的代码执行完毕以后守护进程就会死掉,并不会等到主进程清理完非守护进程之后才死掉。

 例子:

代码语言:javascript复制
from multiprocessing import Process
import time


def task(name):
    print(name, 'is running...')
    time.sleep(3)
    print("ending....")


if __name__ == '__main__':
    p = Process(target=task, args=('egon',))
    p.start()
    print('主进程over')


# 运行结果: 只有等待子进程完全结束之后才会结束掉主进程,防止
# H:python_studyvenvScriptspython.exe H:/python_study/day36/博客/守护进程.py
# 主进程over
# egon is running...
# ending....
#
# Process finished with exit code 0

正常产生的主进程会等待子进程结束之后才会结束

代码语言:javascript复制
from multiprocessing import Process
import time


def task(name):
    print(name, 'is running...')
    time.sleep(3)
    print("ending....")


if __name__ == '__main__':
    p = Process(target=task, args=('egon',))
    p.daemon = True  # 将子进程变成一个僵尸进程
    p.start()
    print('主进程over')

# 结果:当主进程执行完print操作之后就直接结束了,守护进程也会随之而结束
# H:python_studyvenvScriptspython.exe H:/python_study/day36/博客/守护进程.py
# 主进程over
# 
# Process finished with exit code 0

主进程结束之后守护进程也就跟着结束了

代码语言:javascript复制
from multiprocessing import Process
import time

def foo():
    print(123)
    time.sleep(0.1)
    print(456)

def bar():
    print(789)
    time.sleep(2)
    print('10002')


if __name__ == '__main__':
    p1 = Process(target=foo)
    p2 = Process(target=bar)
    p1.daemon = True
    p1.start()
    p2.start()
    print('主进程.....')  # 当这一行代码执行完毕之后,就代表着主进程相关的任务已经执行完毕,守护进程在此时就没有守护的必要的了,因此会被干掉

# 结果: 打印完主进程之后,p1进程作为守护进程就会被干掉
# 主进程.....
# 789
# 10002

守护进程和非守护进程都存在的情况下

六. 互斥锁

代码语言:javascript复制
1. 什么叫做互斥锁
  
  对于同一个系统资源,如果一个进程加上了互斥锁,另一个进程也加上了同一个互斥锁,谁先抢到谁先执行,直到释放锁之后,另个一进程才能够使用此资源。

2. 互斥锁和join的区别
  
  原理都是一样的,都是为了将并发变成串行,从而保证有序。
  区别一:
    互斥锁:进程平等的竞争,谁先抢到谁先执行。
    join: 按照人为指定的顺序执行。
  区别二:
    互斥锁:将一部分代码进行串行
    join: 只能将代码整体

区别一:互斥锁和join

步骤一:创建一个py程序,用来打印三个人的信息,创建了三个函数,每个函数里面都有一个sleep来模拟网络延迟,因此我们写出了下面的代码

代码语言:javascript复制
import time


def task1():
    print('task1: 名字, egon')
    time.sleep(0.1)
    print('task1: 性别, male')
    time.sleep(0.1)
    print('task1: 年龄, 13')

def task2():
    print('task2: 名字, alex')
    time.sleep(0.1)
    print('task2: 性别, male')
    time.sleep(0.1)
    print('task2: 年龄, 18')

def task3():
    print('task3: 名字, wxx')
    time.sleep(0.1)
    print('task3: 性别, male')
    time.sleep(0.1)
    print('task3: 年龄, 21')


if __name__ == '__main__':
    task1()
    task2()
    task3()

正常的打印三个人的信息

步骤二:这样写虽然解决了问题,但是运行效率太慢了,完全受不了,因此想着怎么让三个任务进行并发,从而提高运行效率,因此我们创建了三个进程,分别用来执行三个任务,所以写出来了下面这个代码

代码语言:javascript复制
from multiprocessing import Process
import time


def task1():
    print('task1: 名字, egon')
    time.sleep(0.1)
    print('task1: 性别, male')
    time.sleep(0.1)
    print('task1: 年龄, 13')

def task2():
    print('task2: 名字, alex')
    time.sleep(0.1)
    print('task2: 性别, male')
    time.sleep(0.1)
    print('task2: 年龄, 18')

def task3():
    print('task3: 名字, wxx')
    time.sleep(0.1)
    print('task3: 性别, male')
    time.sleep(0.1)
    print('task3: 年龄, 21')


if __name__ == '__main__':
    p1 = Process(target=task1)
    p2 = Process(target=task2)
    p3 = Process(target=task3)
    p1.start()
    p2.start()
    p3.start()

通过创建子进程的形式提高运行效率

步骤三:这样子写虽然是提高了运行效率,但是我们发现结果并不是我们想要的,我们希望的是无论哪个任务先执行,总是希望可以让这个任务的信息打印完成之后才去执行之后的任务。有两种解决方法1, 就是jion方法,2. 就是互斥锁,首先我们以join的方法让当前信息打印变得有序。

代码语言:javascript复制
from multiprocessing import Process
import time

def task1():
    print('task1: 名字, egon')
    time.sleep(0.1)
    print('task1: 性别, male')
    time.sleep(0.1)
    print('task1: 年龄, 13')

def task2():
    print('task2: 名字, alex')
    time.sleep(0.1)
    print('task2: 性别, male')
    time.sleep(0.1)
    print('task2: 年龄, 18')

def task3():
    print('task3: 名字, wxx')
    time.sleep(0.1)
    print('task3: 性别, male')
    time.sleep(0.1)
    print('task3: 年龄, 21')


if __name__ == '__main__':
    p1 = Process(target=task1)
    p2 = Process(target=task2)
    p3 = Process(target=task3)
    p1.start()
    p1.join()  # 在此处加上join方法,使得等待第一个程序执行完成之后在去执行第二个进程
    p2.start()
    p1.join()
    p3.start()
    p1.join()

join方法使得并发变得有序

步骤四:结果变得确实有序了,但是这样写是有问题的,1. 完全没有并发的效果  关于这个问题我们暂时忽略,只是为了讨论并发才拿出来这样一个例子的。2. 这样子写其实是人为的规定了让p1进程先执行,然后是p2进程,然后是p3进程。这样是非常不公平的,我们的初衷并不希望人为的规定哪个子进程先进行操作,因此我们可以使用互斥锁,这就需要引入另一个类Lock

代码语言:javascript复制
# 锁的使用方法,使用比较简单,就两个函数

mutex = Lock()   # 创建一个互斥锁
mutex.acqure()   # 加锁
。。。。这是我们希望控制的代码
mutex.release()  # 释放锁

# 注意:

1. 在子进程中所使用的锁必须是同一把锁,就是锁必须要在if语句中创建,并且通过参数的形式传递给子进程。
2. 对于同一个进程锁只能加一次
3. 必要的代码执行完毕之后必须要释放锁
代码语言:javascript复制
from multiprocessing import Process, Lock
import time


def task1(lock):
    lock.acquire()
    print('task1: 名字, egon')
    time.sleep(0.1)
    print('task1: 性别, male')
    time.sleep(0.1)
    print('task1: 年龄, 13')
    lock.release()

def task2(lock):
    lock.acquire()
    print('task2: 名字, alex')
    time.sleep(0.1)
    print('task2: 性别, male')
    time.sleep(0.1)
    print('task2: 年龄, 18')
    lock.release()


def task3(lock):
    lock.acquire()   # 加锁
    print('task3: 名字, wxx')
    time.sleep(0.1)
    print('task3: 性别, male')
    time.sleep(0.1)
    print('task3: 年龄, 21')
    lock.release() # 解锁


if __name__ == '__main__':
    mutex = Lock()   # 创建一个锁
    p1 = Process(target=task1, args=(mutex,))  # 然后将这一个锁当做参数进行传递
    p2 = Process(target=task2, args=(mutex,))
    p3 = Process(target=task3, args=(mutex,))
    p1.start()
    p2.start()
    p3.start()

互斥锁解决并发的乱序的问题

区别二:共享锁写一个简单的抢票小程序

 步骤一:创建一个db文件用来存放共享数据,也就是票的数量。因为进程之间的通信目前还没有学到,但是对于磁盘的访问每个进程都是可以访问的,因此先创建db文件

代码语言:javascript复制
{"count": 1}

步骤二:写一个不加锁不加延迟的一个简单功能

代码语言:javascript复制
from multiprocessing import Process
import os
import json
import time


def search():
    """查询当前还有几张票"""
    with open('db.json', 'r', encoding='utf-8') as f:
        print('%s  剩余票数 %s' %(os.getpid(), json.load(f)['count']))


def get():
    """购买票"""
    with open('db.json', 'r', encoding='utf-8') as f:
        dic = json.load(f)

    if dic['count'] > 0:
        dic['count'] -= 1
        with open('db.json', 'w', encoding='utf-8') as f:
            json.dump(dic, f)
        print('%s 购买票成功' % os.getpid())


def task():
    """抢票程序,包含一个查询票和购买票的功能"""
    search()
    get()


if __name__ == '__main__':
    for i in range(5):
        p = Process(target=task)
        p.start()

不加锁不加延迟

步骤三: 如果没有延迟的情况下,我们的程序目前来看已经具备了抢票的功能,因为在没有延迟的情况下,当所有进程建立完成之后就是一些基本的运算,cpu的执行是非常快的,因此cpu在执行get函数的时候基本上是不会切换到其他的进程中执行的。但是如果有延迟的情况就不一样了,加上延迟之后因为cpu会切换,所以导致结果不可控,有两种解决方案,1. join将整个进程变成串行的  , 虽然join可以解决问题,但是对于查询票数这个操作来说,我们并不希望是串行的,因此此方法并不合适 2. 用互斥锁

代码语言:javascript复制
from multiprocessing import Process,Lock
import os
import json
import time
import random


def search():
    """查询当前还有几张票"""
    # 加上查询票的延迟
    time.sleep(random.randint(1,3))
    with open('db.json', 'r', encoding='utf-8') as f:
        print('%s  剩余票数 %s' %(os.getpid(), json.load(f)['count']))


def get():
    """购买票"""
    with open('db.json', 'r', encoding='utf-8') as f:
        dic = json.load(f)

    if dic['count'] > 0:
        dic['count'] -= 1
        # 加上购买票的延迟
        time.sleep(random.randint(1, 3))
        with open('db.json', 'w', encoding='utf-8') as f:
            json.dump(dic, f)
        print('%s 购买票成功' % os.getpid())


def task(lock):
    """抢票程序,包含一个查询票和购买票的功能"""
    search()
    lock.acquire()
    get()
    lock.release()


if __name__ == '__main__':
    metux = Lock()
    for i in range(10):
        p = Process(target=task, args=(metux,))
        p.start()

用互斥锁解决抢票问题

七. IPC通信机制

代码语言:javascript复制
IPC: 进程之间的通信

问题: 两个进程之间的内存空间是物理隔离的,因此怎么通信呢?

进程之间通信的方式:

1. 上面讲到的创建一个共享文件
  文件的i/0操作太浪费时间,因此这个方式不建议使用
2. 通道
  之前学过subprocess也可以实现进程之间的通信,但是这个进程之间必须是父子进程,并且是半双工模式,因此也不推荐使用
3. 共享内存
  Manager,可以通过此类创建一个共享的字典或者列表,为了防止数据出错,在修改数据的时候我们需要自己添加锁,很麻烦,也不建议使用
  Queue: 队列,我们可以通过队列的方式实现进程之间的通信,队列在内部已经帮我们添加了锁。

IPC机制应该遵循的原则
  1. 所有进程都应该可以共享数据
  2. 共享的数据最好应该在内存中
  3. 并且我们不需要去操作锁,也就是IPC应该帮我们处理好锁的功能

Manager存在的问题:要自己定义锁才能对数据进行修改

代码语言:javascript复制
# 创建共享内存空间,并进行修改

from multiprocessing import Process, Manager

def task(dic):
    dic['num'] -= 1

if __name__ == '__main__':
    m = Manager()
    dic = m.dict({'num': 10})
    l = []
    for i in range(10):
        p = Process(target=task, args=(dic, ))
        p.start()
        l.append(p)
    # 等待子进程结束
    for p in l:
        p.join()
    print(dic)
# 结果:
# {'num': 0}

# 创建共享内存空间,并进行修改

 步骤一:创建一个共享的内存空间,然后通过创建进程去修改共享字典的内容,我们发现我们想要的效果Manager确实是已经帮我们实现了,但是当我们在task函数做了以下修改之后,再打印信息就会发现所出来结果不是0而是9了

代码语言:javascript复制
def task(dic):
    temp = dic['num']   # 和之前的dic['num'] = 1性质是一样的,但是为什么结果不一样呢
    time.sleep(0.1)
    dic['num'] = temp - 1

步骤二:这是因为在创建进程之后,所有的进程基本上都会在同一时间内拿到temp的值为10,也就是说10个进程的temp都是10, 当他们睡完0.1秒之后无论是谁进行修改dic的值都是10-1所以结果是0,因此对于Manager创建的内存空间默认在修改数据的时候是不会给我们加锁的,因此我们需要自己去加锁对数据进行修改,否则数据就会被损坏。

代码语言:javascript复制
from multiprocessing import Process, Manager, Lock
import time
mutex = Lock()
def task(dic, lock):
    lock.acquire()  # 加锁,防止数据被破坏
    temp = dic['num']
    time.sleep(0.1)
    dic['num'] = temp - 1
    lock.release()

if __name__ == '__main__':
    m = Manager()
    dic = m.dict({'num': 10})
    l = []
    for i in range(10):
        p = Process(target=task, args=(dic, mutex))
        p.start()
        l.append(p)
    # 等待子进程结束
    for p in l:
        p.join()
    print(dic)
# 结果:
# {'num': 0}

加锁使得数据变得安全

Queue队列简介

代码语言:javascript复制
特点
  1. 先进先出
  2. 队列只应该传送消息,数据量不应该过大
  3.创建队列的长度不应该过大,因为它占用的是内存的空间 

方法
  put: 往队列里面添加东西 
    参数一:obj,放到队列里面的对象
    参数二: block,如果队列满了是否阻塞
    参数三: timeout,超时时间,在队列的阻塞的状态下才有意义
  get: 从队列里面拿东西
    参数一:block, 如果队列为空是否阻塞,
    参数二:timeout,超时时间,在队列阻塞状态下才有意义 

例子:
    # 创建队列,队列长度为3
    q = Queue(3)
    
    q.put('first')
    q.put({'second': None})
    q.put('三')
    # q.put('四')  # 默认会阻塞
    q.get()
    q.get()
    q.get()

八:生产者消费者模型

代码语言:javascript复制
生产者和消费者模型
1.模型指的是一种解决问题的套路

2.该模式下具备两种角色
  生产者: 生产数据
  消费者: 处理数据

3.该模型的运作方式
  生产者生产数据,放到一个共享的空间中,然后消费者取走进行处理

4.该模型的实现方式一
  生产者进程   队列   消费者进程 
  队列中存放的是一些消息,不应该存放大量的数据

5.该模型的应用场景
  如果程序中由明显的两类任务,一类任务是负责生产数据,另外一类是负责处理数据的
  就应该使用生产者和消费者模型

6.该模式的优点
  1. 实现了生产者和消费者解耦和
  2. 平衡了生产者的生产数据的能力与消费者处理数据的能力

案例:模拟一个生产者和消费者模型

代码语言:javascript复制
import time
import random
from multiprocessing import Process, Queue


def consumer(name, q):
    while True:
        res = q.get()
        time.sleep(random.randint(1,2))
        print('33[35m消费者==> %s 吃了 %s33[0m' %(name, res))


def producer(name, q, food):
    for i in range(5):
        res = '%s %s' %(food, i)
        time.sleep(random.randint(1, 2))
        q.put(res)
        print('33[34m生产者>> %s 生产了 %s 33[0m' %(name, res))


if __name__ == '__main__':
    q = Queue()
    p1 = Process(target=producer, args=('egon', q, '包子'))
    c1 = Process(target=consumer, args=('alex', q))

    p1.start()
    c1.start()

简单的生产消费者模型代码

问题一:我们会发现虽然实现了生产者和消费者并发执行的效果,但是当消费者吃完包子之后主程序阻塞掉了。主程序阻塞原因有两个: 1. 主程序自己的代码没有执行完毕,2. 主进程在等待子进程执行完毕。此处主程序的代码很明显是阻塞在了消费者模型里面。如何去解决这样的问题呢?

解决方案一:因为程序是在子进程的获取队列的时候阻塞掉了,因此我们考虑将队列的修改成非阻塞状态,但是发现报错了,这是因为在c1和p1进程起来之后我们是不能确定谁先执行的,如果c1先执行了get,发现队列里面没有内容,又是非阻塞状态,就会报错。因此队列不能是非阻塞状态。

代码语言:javascript复制
def consumer(name, q):
    while True:
        # 在此处阻塞掉了,因此我们考虑将队列设置为非阻塞
        res = q.get(block=False)
        time.sleep(random.randint(1,2))
        print('33[35m消费者==> %s 吃了 %s33[0m' %(name, res))

解决方案二:既然队列必须是阻塞状态,那么我们能不能设置一个超时时间,但是对于消费者而言,超时时间设置为多少才合适呢?我们并不能确定生产者每次生产数据的时间,因此如果设置成了4, 但是生产者过了5s才生产一个数据该怎么办呢?这个方式也是不合理的。

代码语言:javascript复制
def consumer(name, q):
    while True:
        # 队列必须是阻塞状态,因此设置超时时间
        res = q.get(timeout=4)
        time.sleep(random.randint(1,2))
        print('33[35m消费者==> %s 吃了 %s33[0m' %(name, res))

解决方案三:既然从消费者的角度无法解决这样的问题,那么我们就从生产者的角度来解决这样的问题。当我生产完数据之后我额外的放一个None,当消费者收到这个标志的时候就代表生产的数据完了,如下

代码语言:javascript复制
def consumer(name, q):
    while True:
        res = q.get()
        # 当收到一个None时就结束掉子进程
        if not res:
            break 
        time.sleep(random.randint(1,2))
        print('33[35m消费者==> %s 吃了 %s33[0m' %(name, res))


def producer(name, q, food):
    for i in range(5):
        res = '%s %s' %(food, i)
        time.sleep(random.randint(1, 2))
        q.put(res)
        print('33[34m生产者>> %s 生产了 %s 33[0m' %(name, res))
    # 在我生产完数据之后就在队列里面设置一个None,然后消费者收到None之后就结束进程
    q.put(None)

问题二:虽然我们完美的解决了问题,但是当我们的生产者变多了之后就会出现下面的这个效果,有的生产者产生的数据并没有消费者去消费,那是因为每个生产者生产完数据之后都会往队列中放入一个None,当一个消费者收到一个None的时候就会结束掉子进程,因此当你的生产者的数量一旦大于了消费者的数量,肯定会出现生产的数据没有人去处理的问题。

代码语言:javascript复制
if __name__ == '__main__':
    q = Queue()
    p1 = Process(target=producer, args=('egon', q, '包子'))
    # 添加了一个生产者
    p2 = Process(target=producer, args=('hu', q, '甘蔗'))
    c1 = Process(target=consumer, args=('alex', q))

    p1.start()
    p2.start()
    c1.start()

解决方案一: 之前的初衷是想等生产者进程结束之后在队列的后面添加None,但是这个None的数量不能超过消费者的数量,因此我们可以通过在主进程中join来确定生产者模型结束之后,由子进程统一往队列中添加None。 这种方法虽然可以解决问题,但是不好的地方在哪里呢?我们有几个消费者就要往队列中添加几个None,浪费空间,而且还麻烦,因此这种方案也不推荐。

代码语言:javascript复制
if __name__ == '__main__':
    q = Queue()
    p1 = Process(target=producer, args=('egon', q, '包子'))
    # 添加了一个生产者
    p2 = Process(target=producer, args=('hu', q, '甘蔗'))
    p3 = Process(target=producer, args=('hu', q, '米饭'))
    c1 = Process(target=consumer, args=('alex', q))
    c2 = Process(target=consumer, args=('alex', q))

    p1.start()
    p2.start()
    p3.start()
    c1.start()
    c2.start()
    
    # 记得将之前生产者函数中的put给删除掉,搞了半天,我还以为我理解错了呢
    # 等待生产者生产完成之后再往队列中添加None,个数为消费者的个数
    p1.join()
    p2.join()
    p3.join()
    q.put(None)
    q.put(None)
    print('主进程..')

解决方案二:这个时候我们就要引入JoinableQueue队列

代码语言:javascript复制
实现原理:
  1. 等待生产者生产完成之后,计入此时队列里的值,通过q.join()
  2. 每次消费者get一个内容之后都会通过 task_done将之前计入的值减1
  3. 当计入的值变成零的时候就代表队列为空了
  4. 在创建之处就设计消费者为守护进程
代码语言:javascript复制
import time
import random
from multiprocessing import Process, Queue, JoinableQueue


def consumer(name, q):
    while True:
        res = q.get()
        time.sleep(random.randint(1,2))
        print('33[35m消费者==> %s 吃了 %s33[0m' %(name, res))
        # 每次取出一个就将队列计数减一
        q.task_done()


def producer(name, q, food):
    for i in range(5):
        res = '%s %s' %(food, i)
        time.sleep(random.randint(1, 2))
        q.put(res)
        print('33[34m生产者>> %s 生产了 %s 33[0m' %(name, res))


if __name__ == '__main__':
    q = JoinableQueue()
    p1 = Process(target=producer, args=('egon', q, '包子'))
    p2 = Process(target=producer, args=('hu', q, '甘蔗'))
    p3 = Process(target=producer, args=('hu', q, '米饭'))
    c1 = Process(target=consumer, args=('alex', q))
    c2 = Process(target=consumer, args=('alex', q))

    p1.start()
    p2.start()
    p3.start()
    # 设置守护进程
    c1.daemon = True
    c2.daemon = True
    c1.start()
    c2.start()

    # 等待生产者生产完成之后再往队列中添加None,个数为消费者的个数
    p1.join()
    p2.join()
    p3.join()

    # 记录当前队列中还有值得数量
    q.join()
    
    # 当执行到这一个步骤的时候,就代表消费者已经将内容取完了,主进程代码执行完毕之后,守护进程也就被杀死了
    print('主进程..')

最终实现代码

0 人点赞