1. 进程
1.1 概念
线程- 轻量级进程
操作系统进行资源 调度 的基本单位
线程必须依附于进程而存在 不能独立存在
进程
是操作系统进行资源 分配 的基本单位
同一个进程内部的多个线程共享全局资源
一个运行程序默认一个进程-主进程, 一个进程中默认一个线程 - 主线程
进程的状态:
新建
就绪:ready(等待时间片)
执行/运行running(调度就是执行的意思)
等待/阻塞(带带数据到达、满足条件)
死亡
1.2 验证系统中进程
Linux中:
PID 进程标识(PID process identify)
ps -aux(命令查看PID)
ps process status查看某一瞬间进程的状态
获取当前进程的PID
os.getpid()
获取当前进程的父进程的PID
os.getppid()
代码语言:javascript复制import os
# 查看当前所在进程的PID os.getpid() PPID parent PID
print("PID=%s PPID=%s" % (os.getpid(),os.getppid()))
Windows中
任务管理器可以看到
1.3 线程的创建和操作
创建
pro = multiprocessing.Process(target=入口, args=(), kwargs={})
pro.start() 只有程序调了start方法之后才可以启动子进程
阻塞等待子进程
pro.join() 一直等待 死等
pro.join(2) 阻塞等待子进程2秒 如果子进程没有终止那主进程就直接往下执行
终止子进程
pro.terminate()
代码语言:javascript复制 # 终止子进程 向操作系统发出一个终止子进程的信号,存在一定的延时,不要立即判断子进程的状态
判断子进程状态
pro.is_alive()
代码语言:javascript复制 # 判断子进程是否存活
print(pro.is_alive())
------------------
True # 输出的是一个布尔类型的数值
创建进程代码实现:
代码语言:javascript复制import time
import multiprocessing
import os
def pro_info(info, data):
"""子进程 运行的代码"""
for i in range(3):
print("这是子进程 info=%s data=%s PID=%s PPID=%s" % (info, data, os.getpid(), os.getppid()))
time.sleep(1)
def main():
"""单进程 单线程模式 主进程"""
# 创建子进程 (multiprocessing.Process类创建对象, 对象.start())
# target指定子进程入口 args指定位置参数 kwargs指定关键字参数
pro = multiprocessing.Process(target=pro_info, args=("今天天气不错",), kwargs={"data":"50"})
pro.start()
pro.join()
for i in range(3):
# 查看当前所在进程的PID os.getpid() PPID parent PID
print("这是主进程 PID=%s" % os.getpid())
time.sleep(1)
if __name__ == '__main__':
main()
进程的相关操作:
代码语言:javascript复制import time
import multiprocessing
import os
def pro_info(info, data):
"""子进程 运行的代码"""
for i in range(30):
print("这是子进程 info=%s data=%s PID=%s PPID=%s" % (info, data, os.getpid(), os.getppid()))
time.sleep(1)
def main():
"""单进程 单线程模式 主进程"""
# 创建子进程 (multiprocessing.Process类创建对象, 对象.start())
# target指定子进程入口 args指定位置参数 kwargs指定关键字参数
pro = multiprocessing.Process(target=pro_info, args=("天气不错",), kwargs={"data":"50"}, name="儿子进程")
# 创建 启动子进程
pro.start()
# 主进程阻塞等待子进程 2秒 / 如果能够等到子进程退出 回收子进程的资源
pro.join(2)
print("获取子进程的PID =%s name=%s" % (pro.pid, pro.name))
# 判断子进程是否存活
print(pro.is_alive())
# 终止子进程 向操作系统发出一个 终止子进程的信号 存在一定延时 不要立即区判断子进程的状态
# pro.terminate()
# pro.join()
print(pro.is_alive())
print("-------------------------")
for i in range(3):
# 查看当前所在进程的PID os.getpid() PPID parent PID
print("这是主进程 PID=%s" % os.getpid())
time.sleep(1)
if __name__ == '__main__':
main()
验证多进程的执行顺序:无序的
代码语言:javascript复制import multiprocessing
import time
def pro_func():
"""子进程入口"""
for i in range(3):
print("这是子进程:%s" % multiprocessing.current_process().name)
time.sleep(1)
def main():
for i in range(5):
pro = multiprocessing.Process(target=pro_func)
pro.start()
# 多进程执行的顺序无序的
if __name__ == '__main__':
main()
验证多进程是否共享全局变量:
代码语言:javascript复制import multiprocessing
g_number = 0
def proc_func():
"""子进程的入口函数 修改全局变量"""
global g_number
g_number = 1000
def main():
# 创建1一个子进程
# multiprocessing.Process(target=proc_func).start()
pro = multiprocessing.Process(target=proc_func)
pro.start()
# 阻塞等待子进程运行完成
pro.join()
# 获取最终 全局变量的值
print("最终结果是%d" % g_number)
# 结论: 多进程不共享全局资源
if __name__ == '__main__':
main()
- 子进程可以访问全局变量的原因:是因为子进程去创建的时候将主进程的所有东西,包括全局变量拷贝了一份
- 系统分配资源的最小单位是进程,分配完之后,每个进程都有自己的资源,不同的进程是不同的资源,无法相互修改使用
- 进程是独立的数据空间,他们不共享全局资源(因为进程是资源分配的基本单位)
- 看到的内存编号都是虚拟的,每个进程都有一块自己的虚拟的空间。父进程和子进程的虚拟的地址是一样的,真正运行的时候地址就不是我们看到的id了。不要试图通过空间id的方式去验证,验证不了,不科学,它是虚拟的,你会看到id一样然后得出错误的结论
1.4 进程间通信
原因: 进程间不共享全局资源
Queue 是一种进程间通信的方式
是一种队列 先进先出
Queue使用
代码语言:javascript复制创建 队列对象 = multiprocessing.Queue(长度)
放 队列对象.put(数据)
取 数据 = 队列对象.get()
判断空 队列对象.empty()
判断满 队列对象.full()
数量 队列对象.qsize()
代码语言:javascript复制get方法的参数 get(block=True, timeout=None)
block表示是否阻塞等待
timeout超时-等待的时间 None表示一直等待
数据 = 队列对象.get() = .get(True) = .get(True,None)
.get(True, 10) 等待10s
.get(False) 不等待 = .get_nowait()
put方法的参数 put(data, block=True, timeout=None)
block表示是否阻塞等待
timeout超时-等待的时间 None表示一直等待
.put(100) = .put(100, True) = .put(100, True, None) 一直等待
.put(100, True, 10) 等待10s
.put(100, False) 不等待 = .put_nowait()
进程间通信:
代码语言:javascript复制import multiprocessing
import time
def proc_func(q):
"""子进程入口"""
while True:
time.sleep(3)
# 判断空
if q.empty():
print("队列中已经没有了 稍后再来")
time.sleep(3)
# 从队列中取出数据
data = q.get()
print("从队列中取出了数据%s" % data)
def main():
pass
# 1 创建出来 主进程和子进程通信所需的 队列对象
q = multiprocessing.Queue(3)
# 2 创建子进程
pro = multiprocessing.Process(target=proc_func, args=(q,))
pro.start()
while True:
# 3 接收输入 放入队列中
data = input(":")
# 判断队列满
if q.full():
print("慢点输入已经满了 马上溢出了")
time.sleep(1)
# 向队列中放入数据
q.put(data)
if __name__ == '__main__':
main()
1.5 进程池
工作模式:
提前创建一批进程
重复利用已经空闲的进程执行 多任务
优点:
节约了 大量进程的创建/销毁的开销
提高任务的响应速度
添加任务的两种方式
同步方式
会阻塞等待添加任务的执行完成后才会继续往下执行
异步方式
只添加任务 不会等待任务执行完成
只有真正的异步添加任务才能实现多任务
使用步骤:
1 创建进程池 进程池对象 = multiprocessing.Pool(工作进程的数量)
2 添加任务
同步 进程池对象.apply(入口) 添加任务并等待任务执行完成
异步 进程池对象.apply_async(入口) 只添加任务 不等待任务完成
3 关闭进程池
进程池对象.close() 不允许添加新任务
4 等待所有任务执行完成
进程池对象.join()
注意:
进程池之间的进程通信不能使用multiprocessing.Queue 而应该使用 multiprocessing.Manager().Queue
进程池的使用;
代码语言:javascript复制import multiprocessing
import os
import time
def worker(no):
"""工作进程执行的代码"""
for i in range(3):
print("这是工作进程%s %s" % (no, os.getpid()))
time.sleep(1)
def main():
# 1 创建进程池对象 指定工作进程的数量
pool = multiprocessing.Pool(3)
# 2 添加任务
# 2.1 同步方式添加任务 -保证任务的执行顺序
pool.apply(worker,args=(1111,))
pool.apply(worker, args=(2222,))
# 2.2 异步方式添加任务
pool.apply_async(worker,args=(3333,))
pool.apply_async(worker, args=(4444,))
# 3 关闭进程池 - 不允许添加新任务
pool.close()
# 4 等待所有工作进程执行完成
pool.join()
if __name__ == '__main__':
main()
1.6 进程对比线程
进程 线程
分配 调度
不共享 共享
独立存在 依附于进程
可以用多核
系统提供原生的线程 可以使用多核CPU
CPython解释器产生的多线程 由于GIL问题 不能使用多核CPU
1.7 文件夹备份工具
代码语言:javascript复制import os
import multiprocessing
def copy_file(src_path, dest_path, file):
"""从源目录下 将file对应的文件数据读取并且写入到目的目录下 file文件中"""
# 1 打开源目录下的文件用以读
src_file = open(src_path "/" file, "rb")
# 2 打开目的目录下的文件用以写
dest_file = open(dest_path "/" file, "wb")
# 3 一边从源文件中读取数据 写入目的文件中
while True:
file_data = src_file.read(4096)
if not file_data:
print("%s文件拷贝完成" % file)
break
dest_file.write(file_data)
# 4 完成 关闭源文件 目的文件
src_file.close()
dest_file.close()
def main():
# 1 用户输入需要备份的目录-源目录 demo 01.py
source_path = input("请输入需要备份的目录:")
# 2 根据源目录创建目的目录 源目录-备份 demo-备份
dest_path = source_path "-备份"
os.mkdir(dest_path)
# 3 根据源目录获取源目录下所有的 文件名称
file_list = os.listdir(source_path)
# 4 根据每个源文件名称 读取出每个文件的数据 将数据写入到 目的目录/源文件
# 每个文件名对应一个进程
for file in file_list:
pro = multiprocessing.Process(target=copy_file, args=(source_path, dest_path, file))
pro.start()
if __name__ == '__main__':
main()
1.8 文件夹备份工具进程池版
代码语言:javascript复制import os
import multiprocessing
import time
def copy_file(src_path, dest_path, file, q):
"""从源目录下 将file对应的文件数据读取并且写入到目的目录下 file文件中"""
# 1 打开源目录下的文件用以读
src_file = open(src_path "/" file, "rb")
# 2 打开目的目录下的文件用以写
dest_file = open(dest_path "/" file, "wb")
# 3 一边从源文件中读取数据 写入目的文件中
while True:
file_data = src_file.read(4096)
if not file_data:
# print("%s文件拷贝完成" % file)
break
dest_file.write(file_data)
# 4 完成 关闭源文件 目的文件
src_file.close()
dest_file.close()
# 5 当任务完成后 向队列中添加一个消息 表示完成
q.put(file)
def main():
# 1 用户输入需要备份的目录-源目录 demo 01.py
source_path = input("请输入需要备份的目录:")
# 2 根据源目录创建目的目录 源目录-备份 demo-备份
dest_path = source_path "-备份"
os.mkdir(dest_path)
# 3 根据源目录获取源目录下所有的 文件名称
file_list = os.listdir(source_path)
# 4 根据每个源文件名称 读取出每个文件的数据 将数据写入到 目的目录/源文件
# 使用进程池出来每个任务
# 4.0 创建出一个队列 用于进程间通信 进程池中进程通信不能使用multiprocessing.Queue
# q = multiprocessing.Queue(10)
q = multiprocessing.Manager().Queue(10)
# 4.1 创建进程池
p = multiprocessing.Pool(4)
# 4.2 添加任务到进程池中
for file in file_list:
p.apply_async(copy_file, args=(source_path, dest_path, file, q))
# 4.3 关闭进程池
p.close()
# 从队列中取出 消息
count = 0
while True:
if count == len(file_list):
break
q.get()
count = 1
print("r当前进度是%.2f %%" % (100*(count / len(file_list))), end="")
time.sleep(0.5)
# 4.4 等待进程池所有任务都执行完成
p.join()
if __name__ == '__main__':
main()