进程
程序运行在操作系统上的一个实例,就称之为进程。进程需要相应的系统资源:内存、时间片、pid。创建进程:首先要导入multiprocessing中的Process:创建一个Process对象; 创建Process对象时,可以传递参数;
多进程
相关知识
Unix/Linux操作系统提供了一个fork()系统调用,它非常特殊。普通的函数调用,调用一次,返回一次,但是fork()调用一次,返回两次,因为操作系统自动把当前进程(称为父进程)复制了一份(称为子进程),然后,分别在父进程和子进程内返回。 子进程永远返回0,而父进程返回子进程的ID。这样做的理由是,一个父进程可以fork出很多子进程,所以,父进程要记下每个子进程的ID,而子进程只需要调用getppid()就可以拿到父进程的ID
创建一个线程
代码语言:javascript复制创建一个线程
from multiprocessing import Process
# 导入多进程
def do_work(thread_id):
print('current thread {} start '.format(thread_id))
time.sleep(3)
print('current thread {} over'.format(thread_id))
p = Process(target=do_work,args=[1])
p.start()
print("main thread over")
输出:
发现主进程先结束,子进程后结束
main thread over
current thread 1 start
current thread 1 over
进程阻塞
代码语言:javascript复制对进程加一行join方法,表示阻塞其他进程的操作直到当前子进程执行完成
def do_work(thread_id):
print('current thread {} start '.format(thread_id))
time.sleep(3)
print('current thread {} over'.format(thread_id))
p = Process(target=do_work,args=[1])
p.start()
p.join()
print("main thread over")
进程
代码语言:javascript复制创建多个进程
def do_work(thread_id):
print('current thread {} start '.format(thread_id))
time.sleep(3)
print('current thread {} over'.format(thread_id))
for i in range(10):
p = Process(target=do_work, args=[i])
print(p.name)
p.start()
for i in range(10):
p.join()
print("main thread over")
print('我是主进程,PID是%s' % os.getpid())
进程池
代码语言:javascript复制如果要启动大量的子进程,可以用进程池的方式批量创建子进程
p = Pool(4)
for i in range(5):
p.apply_async(request_baidu, args=(i,))
print('Waiting for all subprocesses done...')
p.close()
p.join()
print('All subprocesses done.')
print(time.clock() - start_time)
进程锁
代码语言:javascript复制def do_work(thread_id):
print('current thread {} start '.format(thread_id))
print('开始撸代码,PID是%s' % os.getpid())
time.sleep(3)
print('current thread {} over'.format(thread_id))
p = Process(target=do_work,args=[1])
p.run()
print("main thread over")
print('我是主进程,PID是%s' % os.getpid())
输出:
使用run方法后,其实也是调用主线程的方法去执行.
current thread 1 start
开始撸代码,PID是66113
current thread 1 over
main thread over
我是主进程,PID是66113
进程间通信
代码语言:javascript复制from multiprocessing import Process, Queue
import os, time, random
# 写数据进程执行的代码:
def write(q):
print('Process to write: %s' % os.getpid())
for value in ['A', 'B', 'C']:
print('Put %s to queue...' % value)
q.put(value)
time.sleep(random.random())
# 读数据进程执行的代码:
def read(q):
print('Process to read: %s' % os.getpid())
while True:
value = q.get(True)
print('Get %s from queue.' % value)
if __name__=='__main__':
# 父进程创建Queue,并传给各个子进程:
q = Queue()
pw = Process(target=write, args=(q,))
pr = Process(target=read, args=(q,))
# 启动子进程pw,写入:
pw.start()
# 启动子进程pr,读取:
pr.start()
# 等待pw结束:
pw.join()
# pr进程里是死循环,无法等待其结束,只能强行终止:
pr.terminate()
进程锁
代码语言:javascript复制多进程抢占资源 当多个进程使用同一份数据资源的时候,就会引发数据安全或顺序混乱问题
import time
from multiprocessing import Process,Lock
def work(lock,n):
lock.acquire()
print('%s: %s is runing' % (n,os.getpid()))
time.sleep(random.random())
print('%s: %s is down' % (n, os.getpid()))
lock.release()
if __name__ == '__main__':
lock = Lock()
for i in range(3):
p = Process(target=work, args=(lock,i,))
p.start()
多进程保存数据
代码语言:javascript复制from multiprocessing import Process
from multiprocessing import Manager
manager = Manager()
return_dict = manager.dict()
for group in apk_list_group:
logger.info(group)
# 分组的apk列表
for i in group:
logger.info('待上传文件 ==> {}'.format(i))
p = Process(target=upload_oss, args=(i,return_dict))
logger.info('当前进程ID ==> {}'.format(p.name))
p.start()
for i in group:
p.join()
logger.info('构建主进程运行完成')
def upload_oss(file_path,return_dict):
"""
上传到阿里云oss储存服务
:return:
"""
results = {}
try:
....
return_dict[file_path] = oss_url
refresh_cdn(oss_url)
# 刷新cdn地址
except Exception as e:
logger.error(results)
logger.eror(e)
results = {}
results['code'] = '10000'
results['message'] = '保存oss失败,请检查后再试。'
finally:
return results
return_dict.values()
# 拿到多线程的字典值,但是不能报错对象数据
多线程
多任务可以由多进程完成,也可以由一个进程内的多线程完成。 我们前面提到了进程是由若干线程组成的,一个进程至少有一个线程
创建线程
代码语言:javascript复制import os
import time
import threading
def task():
print(t.getName())
print(threading.current_thread().name)
print('task')
t = threading.Thread(target=task, name='LoopThread')
t.start()
print(threading.current_thread().name)
线程阻塞
代码语言:javascript复制调用join阻塞方法
import os
import time
import threading
def task():
print(t.getName())
print(threading.current_thread().name)
print('task')
time.sleep(3)
t = threading.Thread(target=task, name='LoopThread')
t.start()
t.join()
print(threading.current_thread().name)
线程守护
代码语言:javascript复制当定义子线程为守护线程的话,当主线程结束了,不管子线程是否执行完,都会被直接给暂停掉。默认daemon为False
def task():
print(t.getName())
print(threading.current_thread().name)
print('task')
time.sleep(3)
t = threading.Thread(target=task, name='LoopThread')
t.daemon = True
t.start()
run方法
run() 方法并不启动一个新线程,就是在主线程中调用了一个普通函数而已
线程池
代码语言:javascript复制pip instll threadpool
代码语言:javascript复制import threadpool
import time
def sayhello (a):
print("hello: " a)
time.sleep(2)
def main():
global result
seed=["a","b","c"]
start = time.time()
task_pool = threadpool.ThreadPool(5)
requests = threadpool.makeRequests(sayhello,seed)
for req in requests:
task_pool.putRequest(req)
task_pool.wait()
end = time.time()
time_m = end-start
print("time: " str(time_m))
start1 = time.time()
for each in seed:
sayhello(each)
end1 = time.time()
print("time1: " str(end1-start1))
if __name__ == '__main__':
main()
线程锁
代码语言:javascript复制通过上面的结果比较可以知道,当多线程中需要“独占资源”的时候,要使用锁来控制,防止多个线程同时占用资源而出现其他异常 使用锁的时候就调用acquire()方法,以此告诉其他线程,我正在占用该资源,你们要等会;待使用资源后需要释放资源的时候就调用release()方法, 告诉其他线程,我已经完成使用该资源了,其他人可以过来使用了
import threading
import time
lock = threading.Lock()
l = []
def test1(n):
lock.acquire()
l.append(n)
print(l)
lock.release()
def main():
for i in xrange(0, 10):
th = threading.Thread(target=test1, args=(i,))
th.start()
队列
代码语言:javascript复制队列一般是先进先出
import queue
# q = queue.Queue() # 先进先出
q = queue.LifoQueue() # 先进后出
q = queue.PriorityQueue() # 优先级队列,
# q.put(1)
# q.put(2)
# q.put(3)
q.put((1,'11')) # 优先级队列添加的参数是一个元组,元组元素为优先
q.put((2,'22'))
q.put((3,'33'))
q.put((4,'44'))
print(q.get())
线程队列
代码语言:javascript复制import queue
import threading
import time
exitFlag = 0
count = 1
class myThread (threading.Thread):
def __init__(self, threadID, name, q):
threading.Thread.__init__(self)
self.threadID = threadID
self.name = name
self.q = q
def run(self):
print ("开启线程:" self.name)
process_data(self.name, self.q)
print ("退出线程:" self.name)
def process_data(threadName, q):
while not exitFlag:
queueLock.acquire()
global count
count = count 1
print(count)
if not workQueue.empty():
data = q.get()
queueLock.release()
print ("%s processing %s" % (threadName, data))
else:
queueLock.release()
time.sleep(1)
threadList = ["Thread-1", "Thread-2", "Thread-3"]
nameList = ["One", "Two", "Three", "Four", "Five"]
queueLock = threading.Lock()
workQueue = queue.Queue(10)
threads = []
threadID = 1
# 创建新线程
for tName in threadList:
thread = myThread(threadID, tName, workQueue)
thread.start()
threads.append(thread)
threadID = 1
# 填充队列
queueLock.acquire()
for word in nameList:
workQueue.put(word)
queueLock.release()
# 等待队列清空
while not workQueue.empty():
pass
# 通知线程是时候退出
exitFlag = 1
# 等待所有线程完成
for t in threads:
t.join()
print ("退出主线程")
threading.Thread.__init__(self)
全局线程对象
代码语言:javascript复制一个线程的数据可以全局共享,local_school是一个字典,可以通过set和get方法操作数据
import threading
# 创建全局ThreadLocal对象:
local_school = threading.local()
def process_student():
# 获取当前线程关联的student:
std = local_school.student
print('Hello, %s (in %s)' % (std, threading.current_thread().name))
def process_thread(name):
# 绑定ThreadLocal的student:
local_school.student = name
process_student()
t1 = threading.Thread(target= process_thread, args=('Alice',), name='Thread-A')
t2 = threading.Thread(target= process_thread, args=('Bob',), name='Thread-B')
t1.start()
t2.start()
t1.join()
t2.join()
输出:
Hello, Alice (in Thread-A)
Hello, Bob (in Thread-B)
进程 vs 线程
- 多进程模式最大的优点就是稳定性高,因为一个子进程崩溃了,不会影响主进程和其他子进程
- 多进程模式的缺点是创建进程的代价大,在Unix/Linux系统下
- 多线程模式通常比多进程快一点,但是也快不到哪去,而且,多线程模式致命的缺点就是任何一个线程挂掉都可能直接造成整个进程崩溃,因为所有线程共享进程的内存
线程切换
线程切换是有代价的,多任务一旦多到一个限度,就会消耗掉系统所有的资源,结果效率急剧下降,所有任务都做不好
计算密集型 vs IO密集型
是否采用多任务的第二个考虑是任务的类型。我们可以把任务分为计算密集型和IO密集型。 计算密集型任务的特点是要进行大量的计算,消耗CPU资源,比如计算圆周率、对视频进行高清解码等等,全靠CPU的运算能力。这种计算密集型任务虽然也可以用多任务完成,但是任务越多,花在任务切换的时间就越多,CPU执行任务的效率就越低,所以,要最高效地利用CPU,计算密集型任务同时进行的数量应当等于CPU的核心数。 计算密集型任务由于主要消耗CPU资源,因此,代码运行效率至关重要。Python这样的脚本语言运行效率很低,完全不适合计算密集型任务。对于计算密集型任务,最好用C语言编写。 第二种任务的类型是IO密集型,涉及到网络、磁盘IO的任务都是IO密集型任务,这类任务的特点是CPU消耗很少,任务的大部分时间都在等待IO操作完成(因为IO的速度远远低于CPU和内存的速度)。对于IO密集型任务,任务越多,CPU效率越高,但也有一个限度。常见的大部分任务都是IO密集型任务,比如Web应用。 IO密集型任务执行期间,99%的时间都花在IO上,花在CPU上的时间很少,因此,用运行速度极快的C语言替换用Python这样运行速度极低的脚本语言,完全无法提升运行效率。对于IO密集型任务,最合适的语言就是开发效率最高(代码量最少)的语言,脚本语言是首选,C语言最差。