目录:
1. 基础概念
2. 创建进程和结束进程
3. 进程之间内存空间物理隔离
4. 进程的属性方法
5. 守护进程
6. 互斥锁
7. IPC通信机制
8. 生产者消费者模型
一. 基础概念
代码语言:javascript复制1. 什么叫做程序,什么叫做进程?
程序就是程序员写的一堆代码文件。
进程指的是程序正在执行的一个过程,是一个抽象的概念,起源于操作系统。
2. 什么是操作系统?
操作系统是位于计算机硬件与应用程序之间的,用于协调,控制和管理计算机硬件和软件资源的控制程序
3. 操作系统的两大作用
(1). 将复杂丑陋的硬件操作都封装成简单的接口,提供给应用程序使用,大大的提高了应用程序的开发效率
(2). 把进程对硬件的竞争变得有序
4. 批处理系统
将程序员写的程序攒成一堆,然后一个一个的读到内存里面进行执行。
解决了第一代操作系统一个人独占计算机资源的问题,虽然节省了大量的时间,但是本质上还是串行
此时已经出现了操作系统的概念,以及进程的雏形。
5. 多道技术
产生背景:在单核下实现并发的效果。
两大核心:
空间复用:
将多个程序同时读入到内存中,等待被cpu执行。此时也就出现了多进程的概念。
特点:每个程序的内存空间都是物理隔离的。
时间复用:
复用cpu的时间片
cpu什么时候会切换进程的执行?
1.在遇到i/0阻塞的时候就会暂时的挂起此进程,切换到另一个进程去执行
2.正在执行的进程占用cpu的时间过长,或者有一个优先级更高的进程出现的时候也会切换执行
优点:大大的提高了计算机cpu的利用率,实现了并发的效果。是当代计算机操作系统的雏形。
6. 串行,并行,并发,阻塞
串行:程序从上到下依次执行。批处理系统典型的就是串行。
并行:同一时刻运行多个程序。如果只有一个核是不可能实现并行的,只有多核的时候才能真正的实现并行。
并发:同一时间段内运行多个程序,用户看起来就像是并行一样。
并发实现的本质:保存当前的状态 cpu的切换
阻塞:遇到i/0就进入阻塞状态。
7. 进程的属性
pid: 用来唯一的表示一个进程,就像是身份证号。
name: 进程名称
terminate: 杀死当前子进程
is_alive: 查看子进程是否还活着
join: 等待子进程结束
start: 向系统发送一个创建子进程的系统调用
daemon: 将子进程设置成守护进程
方法:
os.getpid() 获得当前进程的pid
os.getppid() 获得当前进程的父进程pid
8. 进程相关的win的命令
tasklist: 查看当前进程信息
taskkill /F /PID 进程号 杀死一个进程
tasklist |findstr pyth 通过管道查看相应的值
9. 僵尸进程和孤儿进程
僵尸进程:子进程死了,但是父进程还没有死,此时的子进程就称之为僵尸进程。
任何进程死了之后都会回收相应的资源,但是对于一些基本的信息是不会回收的,例如pid,name等,以被父进程所查看,这就存在一个问题。
当父进程创建了大量的子进程,而父进程又很长的时间内不会不会死掉,对于内存空间的占用倒不是很严重,但是会占用大量的pid,而pid的资源是有限的。
因此我们的程序中不应该出现大量的僵尸进程,如果父进程需要运行很长的时间,我们就需要在适当的时候回收子进程的资源,防止出现大量的无用僵尸进程。
join方法就会提供回收僵尸进程的功能。
孤儿进程:父进程死了,但是子进程没有死,这是的子进程就称之为孤儿进程。
孤儿进程没有害,因为孤儿进程会被init进程所接管,在一定的时间之后会被清理掉。但是程序中也不应该出现大量的孤儿进程。
二. 创建进程和结束进程
代码语言:javascript复制进程创建的方式
1.系统初始化,在开机的时候自动就会加载操作系统,此时就会出现一个根进程。
2.用户的交互式请求,例如在电脑上双击暴风影音,就是创建了一个进程,再双击就又创建了一个进程。
3.一个批处理作业的初始化(专用计算机的系统加载)
4.一个进程运行的过程中通过模块重新开启一个子进程 (我们关注的是这种方式的进程的创建)
linux: fork
在初始化的时候子进程和父进程是完全一样的。
win:CreateProcess
初始状态的时候子进程和父进程并不是完全一样的。
进程的结束方式
1. 正常退出
2. 出错退出
3. 被另一个进程杀死
第一种创建进程的方式
代码语言:javascript复制from multiprocessing import Process
import time
def task(name):
print('%s is running...' %name)
# 模拟子进程执行了一系列的操作
time.sleep(2)
print('%s is ending...' %name)
# 在windows下面必须要这样写
# 这是因为在win下创建一个进程的时候会重新执行一遍此模块
# 为了防止循环创建,所以必须要在此地方创建子进程
if __name__ == '__main__':
# 创建了一个进程对象
# target代表的是子进程要执行的任务,一般是函数名称
# args:里面的值是给函数传递的参数
p = Process(target=task, args=('egon',))
# 此处并不是直接创建子进程,而是向操作系统发送了一个系统调用
# 操作系统会申请一个内存空间,创建一个子进程
# 对于主进程而言,这行代码就像是平常的代码一样,主进程并不会等待子进程的创建,然后就去继续执行了
p.start()
# 因此结果是先打印下面的提示信息,之后才会去打印task函数内的东西
print('p.start()代码一旦执行完,我是不会等系统创建子进程的,我立马就要执行')
第二种创建进程的方式
代码语言:javascript复制from multiprocessing import Process
import time
class MyProcess(Process):
def __init__(self, name):
# 重用父类的功能,然后传递自己函数所需要的参数
super(MyProcess, self).__init__()
self.name = name
# p.start()其实就是系统去调用run函数,因此我们将之前的内容放到run函数里面
def run(self):
print('%s is running...' %self.name)
# 模拟子进程执行了一系列的操作
time.sleep(2)
print('%s is ending...' %self.name)
if __name__ == '__main__':
p = MyProcess('egon')
p.start()
print('主进程')
三. 进程之间内存空间物理隔离
代码语言:javascript复制from multiprocessing import Process
x = 100
def task():
global x
x = 0
print('子进程结束...')
if __name__ == '__main__':
p = Process(target=task)
p.start()
# 等待子进程结束之后在去打印x的值
p.join()
# 如果空间是共享的,等待子进程结束之后x的值应该是0
# 如果空间是隔离的,子进程结束之后x的值还是100
print(x)
四. 进程的属性方法
join方法
代码语言:javascript复制from multiprocessing import Process
import time
def task():
print('子进程开始...')
time.sleep(2)
if __name__ == '__main__':
p = Process(target=task)
p.start()
# 如果没有join,就会先打印x,然后才会去执行task函数
# 有了join之后,主进程就会阻塞在这里,等待子进程p结束之后才会打印x
p.join()
print('主进程.....')
# 结果:
# 子进程开始...
代码语言:javascript复制from multiprocessing import Process
import time
def task(n):
print('%s开始...' %n)
time.sleep(n)
if __name__ == '__main__':
start = time.time()
p1 = Process(target=task, args=(1, )) # p1睡了1s
p2 = Process(target=task, args=(2, )) # p2睡了2s
p3 = Process(target=task, args=(3, )) # p3睡了3s
p1.start()
p2.start()
p3.start()
# 程序运行到这个地方的时候p1,p2, p3可能都已经开始执行了
# 无论主进程是在等谁,所有的子进程都是会执行的
# 也就是说在等待p1的过程中子进程p3也在执行,因此整个程序的执行时间应该是最耗时的子进程时间
# 此处也就是三秒多
p1.join() # 等待子进程1s
p2.join() # 等待子进程2s
p3.join() # 等待子进程3s
end= time.time()
print('执行时间>>', start - end)
# 运行结果:
# 2开始...
# 1开始...
# 3开始...
# 执行时间>> -3.2581233978271484
join容易迷惑的地方
代码语言:javascript复制from multiprocessing import Process
import time
def task():
print('子进程开始...')
time.sleep(2)
if __name__ == '__main__':
# 如果循环的创建了子进程,需要等待所有进程结束我们就需要通过一个循环去等待
l = []
for i in range(5):
p = Process(target=task)
l.append(p)
p.start()
for p in l:
p.join()
print('主')
# 结果:
# 子进程开始...
# 主进程.....
join循环等待子进程结束
自定义查看ppid的方法(待填)
五. 守护进程
代码语言:javascript复制1. 什么是守护进程
obj = Process(target=lambda x: x 1) # 创建一个子进程
obj.daemon = True # 当设置了此属性之后,这个子进程就会变成一个守护进程
效果: 当主进程结束之后,守护进程就会随着结束
2. 为什么要有守护进程?
我们创建一个子进程就是为了并发的执行多个任务,有时候我们的子任务在主任务结束之后就没有存在的必要了,因此,在主程序结束之后,我们往往希望可以自动的结束掉这些子进程,因此就有了守护进程。
例如:当我们通过qq在传一个文件的时候,qq是主进程,传文件是子进程,当qq退出去之后还应该会传文件吗,肯定不会,所以此时就应该把传文件设置成一个守护进程,当qq退出去的时候自动的关掉子进程。
3. 重点
如果一个主进程中既有守护进程也有非守护进程,那么当主进程的代码执行完毕以后守护进程就会死掉,并不会等到主进程清理完非守护进程之后才死掉。
例子:
代码语言:javascript复制from multiprocessing import Process
import time
def task(name):
print(name, 'is running...')
time.sleep(3)
print("ending....")
if __name__ == '__main__':
p = Process(target=task, args=('egon',))
p.start()
print('主进程over')
# 运行结果: 只有等待子进程完全结束之后才会结束掉主进程,防止
# H:python_studyvenvScriptspython.exe H:/python_study/day36/博客/守护进程.py
# 主进程over
# egon is running...
# ending....
#
# Process finished with exit code 0
正常产生的主进程会等待子进程结束之后才会结束
代码语言:javascript复制from multiprocessing import Process
import time
def task(name):
print(name, 'is running...')
time.sleep(3)
print("ending....")
if __name__ == '__main__':
p = Process(target=task, args=('egon',))
p.daemon = True # 将子进程变成一个僵尸进程
p.start()
print('主进程over')
# 结果:当主进程执行完print操作之后就直接结束了,守护进程也会随之而结束
# H:python_studyvenvScriptspython.exe H:/python_study/day36/博客/守护进程.py
# 主进程over
#
# Process finished with exit code 0
主进程结束之后守护进程也就跟着结束了
代码语言:javascript复制from multiprocessing import Process
import time
def foo():
print(123)
time.sleep(0.1)
print(456)
def bar():
print(789)
time.sleep(2)
print('10002')
if __name__ == '__main__':
p1 = Process(target=foo)
p2 = Process(target=bar)
p1.daemon = True
p1.start()
p2.start()
print('主进程.....') # 当这一行代码执行完毕之后,就代表着主进程相关的任务已经执行完毕,守护进程在此时就没有守护的必要的了,因此会被干掉
# 结果: 打印完主进程之后,p1进程作为守护进程就会被干掉
# 主进程.....
# 789
# 10002
守护进程和非守护进程都存在的情况下
六. 互斥锁
代码语言:javascript复制1. 什么叫做互斥锁
对于同一个系统资源,如果一个进程加上了互斥锁,另一个进程也加上了同一个互斥锁,谁先抢到谁先执行,直到释放锁之后,另个一进程才能够使用此资源。
2. 互斥锁和join的区别
原理都是一样的,都是为了将并发变成串行,从而保证有序。
区别一:
互斥锁:进程平等的竞争,谁先抢到谁先执行。
join: 按照人为指定的顺序执行。
区别二:
互斥锁:将一部分代码进行串行
join: 只能将代码整体
区别一:互斥锁和join
步骤一:创建一个py程序,用来打印三个人的信息,创建了三个函数,每个函数里面都有一个sleep来模拟网络延迟,因此我们写出了下面的代码
代码语言:javascript复制import time
def task1():
print('task1: 名字, egon')
time.sleep(0.1)
print('task1: 性别, male')
time.sleep(0.1)
print('task1: 年龄, 13')
def task2():
print('task2: 名字, alex')
time.sleep(0.1)
print('task2: 性别, male')
time.sleep(0.1)
print('task2: 年龄, 18')
def task3():
print('task3: 名字, wxx')
time.sleep(0.1)
print('task3: 性别, male')
time.sleep(0.1)
print('task3: 年龄, 21')
if __name__ == '__main__':
task1()
task2()
task3()
正常的打印三个人的信息
步骤二:这样写虽然解决了问题,但是运行效率太慢了,完全受不了,因此想着怎么让三个任务进行并发,从而提高运行效率,因此我们创建了三个进程,分别用来执行三个任务,所以写出来了下面这个代码
代码语言:javascript复制from multiprocessing import Process
import time
def task1():
print('task1: 名字, egon')
time.sleep(0.1)
print('task1: 性别, male')
time.sleep(0.1)
print('task1: 年龄, 13')
def task2():
print('task2: 名字, alex')
time.sleep(0.1)
print('task2: 性别, male')
time.sleep(0.1)
print('task2: 年龄, 18')
def task3():
print('task3: 名字, wxx')
time.sleep(0.1)
print('task3: 性别, male')
time.sleep(0.1)
print('task3: 年龄, 21')
if __name__ == '__main__':
p1 = Process(target=task1)
p2 = Process(target=task2)
p3 = Process(target=task3)
p1.start()
p2.start()
p3.start()
通过创建子进程的形式提高运行效率
步骤三:这样子写虽然是提高了运行效率,但是我们发现结果并不是我们想要的,我们希望的是无论哪个任务先执行,总是希望可以让这个任务的信息打印完成之后才去执行之后的任务。有两种解决方法1, 就是jion方法,2. 就是互斥锁,首先我们以join的方法让当前信息打印变得有序。
代码语言:javascript复制from multiprocessing import Process
import time
def task1():
print('task1: 名字, egon')
time.sleep(0.1)
print('task1: 性别, male')
time.sleep(0.1)
print('task1: 年龄, 13')
def task2():
print('task2: 名字, alex')
time.sleep(0.1)
print('task2: 性别, male')
time.sleep(0.1)
print('task2: 年龄, 18')
def task3():
print('task3: 名字, wxx')
time.sleep(0.1)
print('task3: 性别, male')
time.sleep(0.1)
print('task3: 年龄, 21')
if __name__ == '__main__':
p1 = Process(target=task1)
p2 = Process(target=task2)
p3 = Process(target=task3)
p1.start()
p1.join() # 在此处加上join方法,使得等待第一个程序执行完成之后在去执行第二个进程
p2.start()
p1.join()
p3.start()
p1.join()
join方法使得并发变得有序
步骤四:结果变得确实有序了,但是这样写是有问题的,1. 完全没有并发的效果 关于这个问题我们暂时忽略,只是为了讨论并发才拿出来这样一个例子的。2. 这样子写其实是人为的规定了让p1进程先执行,然后是p2进程,然后是p3进程。这样是非常不公平的,我们的初衷并不希望人为的规定哪个子进程先进行操作,因此我们可以使用互斥锁,这就需要引入另一个类Lock
代码语言:javascript复制# 锁的使用方法,使用比较简单,就两个函数
mutex = Lock() # 创建一个互斥锁
mutex.acqure() # 加锁
。。。。这是我们希望控制的代码
mutex.release() # 释放锁
# 注意:
1. 在子进程中所使用的锁必须是同一把锁,就是锁必须要在if语句中创建,并且通过参数的形式传递给子进程。
2. 对于同一个进程锁只能加一次
3. 必要的代码执行完毕之后必须要释放锁
代码语言:javascript复制from multiprocessing import Process, Lock
import time
def task1(lock):
lock.acquire()
print('task1: 名字, egon')
time.sleep(0.1)
print('task1: 性别, male')
time.sleep(0.1)
print('task1: 年龄, 13')
lock.release()
def task2(lock):
lock.acquire()
print('task2: 名字, alex')
time.sleep(0.1)
print('task2: 性别, male')
time.sleep(0.1)
print('task2: 年龄, 18')
lock.release()
def task3(lock):
lock.acquire() # 加锁
print('task3: 名字, wxx')
time.sleep(0.1)
print('task3: 性别, male')
time.sleep(0.1)
print('task3: 年龄, 21')
lock.release() # 解锁
if __name__ == '__main__':
mutex = Lock() # 创建一个锁
p1 = Process(target=task1, args=(mutex,)) # 然后将这一个锁当做参数进行传递
p2 = Process(target=task2, args=(mutex,))
p3 = Process(target=task3, args=(mutex,))
p1.start()
p2.start()
p3.start()
互斥锁解决并发的乱序的问题
区别二:共享锁写一个简单的抢票小程序
步骤一:创建一个db文件用来存放共享数据,也就是票的数量。因为进程之间的通信目前还没有学到,但是对于磁盘的访问每个进程都是可以访问的,因此先创建db文件
代码语言:javascript复制{"count": 1}
步骤二:写一个不加锁不加延迟的一个简单功能
代码语言:javascript复制from multiprocessing import Process
import os
import json
import time
def search():
"""查询当前还有几张票"""
with open('db.json', 'r', encoding='utf-8') as f:
print('%s 剩余票数 %s' %(os.getpid(), json.load(f)['count']))
def get():
"""购买票"""
with open('db.json', 'r', encoding='utf-8') as f:
dic = json.load(f)
if dic['count'] > 0:
dic['count'] -= 1
with open('db.json', 'w', encoding='utf-8') as f:
json.dump(dic, f)
print('%s 购买票成功' % os.getpid())
def task():
"""抢票程序,包含一个查询票和购买票的功能"""
search()
get()
if __name__ == '__main__':
for i in range(5):
p = Process(target=task)
p.start()
不加锁不加延迟
步骤三: 如果没有延迟的情况下,我们的程序目前来看已经具备了抢票的功能,因为在没有延迟的情况下,当所有进程建立完成之后就是一些基本的运算,cpu的执行是非常快的,因此cpu在执行get函数的时候基本上是不会切换到其他的进程中执行的。但是如果有延迟的情况就不一样了,加上延迟之后因为cpu会切换,所以导致结果不可控,有两种解决方案,1. join将整个进程变成串行的 , 虽然join可以解决问题,但是对于查询票数这个操作来说,我们并不希望是串行的,因此此方法并不合适 2. 用互斥锁
代码语言:javascript复制from multiprocessing import Process,Lock
import os
import json
import time
import random
def search():
"""查询当前还有几张票"""
# 加上查询票的延迟
time.sleep(random.randint(1,3))
with open('db.json', 'r', encoding='utf-8') as f:
print('%s 剩余票数 %s' %(os.getpid(), json.load(f)['count']))
def get():
"""购买票"""
with open('db.json', 'r', encoding='utf-8') as f:
dic = json.load(f)
if dic['count'] > 0:
dic['count'] -= 1
# 加上购买票的延迟
time.sleep(random.randint(1, 3))
with open('db.json', 'w', encoding='utf-8') as f:
json.dump(dic, f)
print('%s 购买票成功' % os.getpid())
def task(lock):
"""抢票程序,包含一个查询票和购买票的功能"""
search()
lock.acquire()
get()
lock.release()
if __name__ == '__main__':
metux = Lock()
for i in range(10):
p = Process(target=task, args=(metux,))
p.start()
用互斥锁解决抢票问题
七. IPC通信机制
代码语言:javascript复制IPC: 进程之间的通信
问题: 两个进程之间的内存空间是物理隔离的,因此怎么通信呢?
进程之间通信的方式:
1. 上面讲到的创建一个共享文件
文件的i/0操作太浪费时间,因此这个方式不建议使用
2. 通道
之前学过subprocess也可以实现进程之间的通信,但是这个进程之间必须是父子进程,并且是半双工模式,因此也不推荐使用
3. 共享内存
Manager,可以通过此类创建一个共享的字典或者列表,为了防止数据出错,在修改数据的时候我们需要自己添加锁,很麻烦,也不建议使用
Queue: 队列,我们可以通过队列的方式实现进程之间的通信,队列在内部已经帮我们添加了锁。
IPC机制应该遵循的原则
1. 所有进程都应该可以共享数据
2. 共享的数据最好应该在内存中
3. 并且我们不需要去操作锁,也就是IPC应该帮我们处理好锁的功能
Manager存在的问题:要自己定义锁才能对数据进行修改
代码语言:javascript复制# 创建共享内存空间,并进行修改
from multiprocessing import Process, Manager
def task(dic):
dic['num'] -= 1
if __name__ == '__main__':
m = Manager()
dic = m.dict({'num': 10})
l = []
for i in range(10):
p = Process(target=task, args=(dic, ))
p.start()
l.append(p)
# 等待子进程结束
for p in l:
p.join()
print(dic)
# 结果:
# {'num': 0}
# 创建共享内存空间,并进行修改
步骤一:创建一个共享的内存空间,然后通过创建进程去修改共享字典的内容,我们发现我们想要的效果Manager确实是已经帮我们实现了,但是当我们在task函数做了以下修改之后,再打印信息就会发现所出来结果不是0而是9了
代码语言:javascript复制def task(dic):
temp = dic['num'] # 和之前的dic['num'] = 1性质是一样的,但是为什么结果不一样呢
time.sleep(0.1)
dic['num'] = temp - 1
步骤二:这是因为在创建进程之后,所有的进程基本上都会在同一时间内拿到temp的值为10,也就是说10个进程的temp都是10, 当他们睡完0.1秒之后无论是谁进行修改dic的值都是10-1所以结果是0,因此对于Manager创建的内存空间默认在修改数据的时候是不会给我们加锁的,因此我们需要自己去加锁对数据进行修改,否则数据就会被损坏。
代码语言:javascript复制from multiprocessing import Process, Manager, Lock
import time
mutex = Lock()
def task(dic, lock):
lock.acquire() # 加锁,防止数据被破坏
temp = dic['num']
time.sleep(0.1)
dic['num'] = temp - 1
lock.release()
if __name__ == '__main__':
m = Manager()
dic = m.dict({'num': 10})
l = []
for i in range(10):
p = Process(target=task, args=(dic, mutex))
p.start()
l.append(p)
# 等待子进程结束
for p in l:
p.join()
print(dic)
# 结果:
# {'num': 0}
加锁使得数据变得安全
Queue队列简介
代码语言:javascript复制特点
1. 先进先出
2. 队列只应该传送消息,数据量不应该过大
3.创建队列的长度不应该过大,因为它占用的是内存的空间
方法
put: 往队列里面添加东西
参数一:obj,放到队列里面的对象
参数二: block,如果队列满了是否阻塞
参数三: timeout,超时时间,在队列的阻塞的状态下才有意义
get: 从队列里面拿东西
参数一:block, 如果队列为空是否阻塞,
参数二:timeout,超时时间,在队列阻塞状态下才有意义
例子:
# 创建队列,队列长度为3
q = Queue(3)
q.put('first')
q.put({'second': None})
q.put('三')
# q.put('四') # 默认会阻塞
q.get()
q.get()
q.get()
八:生产者消费者模型
代码语言:javascript复制生产者和消费者模型
1.模型指的是一种解决问题的套路
2.该模式下具备两种角色
生产者: 生产数据
消费者: 处理数据
3.该模型的运作方式
生产者生产数据,放到一个共享的空间中,然后消费者取走进行处理
4.该模型的实现方式一
生产者进程 队列 消费者进程
队列中存放的是一些消息,不应该存放大量的数据
5.该模型的应用场景
如果程序中由明显的两类任务,一类任务是负责生产数据,另外一类是负责处理数据的
就应该使用生产者和消费者模型
6.该模式的优点
1. 实现了生产者和消费者解耦和
2. 平衡了生产者的生产数据的能力与消费者处理数据的能力
案例:模拟一个生产者和消费者模型
代码语言:javascript复制import time
import random
from multiprocessing import Process, Queue
def consumer(name, q):
while True:
res = q.get()
time.sleep(random.randint(1,2))
print('