Python 与线程
线程是进程的执行单元,对于大多数程序来说,可能只有一个主线程,但是为了能够提高效率,有些程序会采用多线程,在系统中所有的线程看起来都是同时执行的,例如,现在的多线程网络下载程序中,就使用了这种线程并发的特性,程序将欲下载的文件分成多个部分,然后同时进行下载,从而加快速度.虽然线程并不是一个容易掌握和使用的概念,但是如果运用得当,还是可以获得很不错的性能的.
◆创建使用线程◆
在 Python 中创建线程需要用到一个类,threading
类,其类的实现方法是底层调用了C语言的原生函数来实现的创建线程,创建线程有两种方式,一种是直接使用函数创建线程,另一种则是使用类创建线程,两种创建方式效果是相同的,但是需要注意一点,在使用类的方式创建线程的时候,默认执行run(self)方法
,且此函数名称必须是run不能修改,接下来看3个小例子吧.
使用函数创建线程: 通过线程模块创建线程,并传递参数即可实现直接对指定函数实现多线程.
代码语言:javascript复制import os
import threading
import time
def MyThread(x,y): #定义每个线程要执行的函数体
print("传递的数据:%s,%s"%(x,y)) #其中有两个参数,我们动态传入
time.sleep(5) #睡眠5秒钟
for x in range(10): #创建10个线程并发执行函数
thread = threading.Thread(target=MyThread,args=(x,x 1,)) #args是函数的参数,元组最后一个必须要逗号.
thread.start() #启动线程
使用类创建线程: 通过定义类,传递给类中一些参数,然后启动多线程,这种方式不常用.
代码语言:javascript复制import os
import threading
import time
class MyThread(threading.Thread): #继承threading.Thread类
def __init__(self,x,y): #重写构造函数
super(MyThread,self).__init__() #先执行父类的构造方法
self.x = x
self.y = y
def run(self): #run()方法,是cpu调度线程会使用的方法,名称必须是run
print("运行线程, X=%s Y=%s"%(self.x,self.y))
for i in range(10): #创建10个线程
obj = MyThread(i,i 10)
obj.start()
代码语言:javascript复制import paramiko,datetime,threading
class MyThread(threading.Thread):
def __init__(self,address,username,password,port,command):
super(MyThread, self).__init__()
self.address = address
self.username = username
self.password = password
self.port = port
self.command = command
def run(self):
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
ssh.connect(self.address, port=self.port, username=self.username, password=self.password, timeout=1)
stdin, stdout, stderr = ssh.exec_command(self.command)
result = stdout.read()
if not result:
self.result = stderr.read()
ssh.close()
self.result = result.decode()
except Exception:
self.result = "0"
def get_result(self):
try:
return self.result
except Exception:
return None
ThreadPool = [] # 定义线程池
starttime = datetime.datetime.now()
for item in range(5):
obj = MyThread("192.168.1.20","root","123","22","ifconfig")
ThreadPool.append(obj)
for item in ThreadPool:
item.start() # 启动线程
item.join()
for item in ThreadPool:
ret = item.get_result() # 获取返回结果
print(ret)
endtime = datetime.datetime.now()
print("程序开始运行:{} 结束:{}".format(starttime,endtime))
接收线程返回结果: 我们可以使用join
方法,等待线程执行完毕后的返回结果.
import os
import threading
import time
def MyThread(x,y): #定义每个线程要执行的函数体
print("传递的数据:%s,%s"%(x,y)) #其中有两个参数,我们动态传入
time.sleep(5) #睡眠5秒钟
return "ok"
temp=[]
for x in range(10): #创建10个线程并发执行函数
thread = threading.Thread(target=MyThread,args=(x,x 1,)) #args是函数的参数,元组最后一个必须要逗号.
thread.start() #启动线程
temp.append(thread) #将线程结果添加到列表
for y in temp: #遍历这个线程列表
#此处一定要join,不然主线程比子线程跑的快,会拿不到结果,程序就退出执行了.
y.join() #等待线程执行完毕,返回结果
print("线程: %s"%y)
◆线程锁与信号◆
由于线程之间是进行随机调度,并且每个线程可能只执行n条执行之后,当多个线程同时修改同一条数据时可能会出现脏数据,所以就出现了线程锁的概念,即在同一时刻只允许一个线程执行操作,在这里我们选择使用Rlock,而不使用Lock,因为Lock如果多次获取锁的时候会出错,而RLock允许在同一线程中被多次acquire,但是需要用n次的release才能真正释放所占用的琐,一个线程获取了锁在释放之前,其他线程只有等待线程结束后在进行操作.
全局锁(Lock): 添加本全局锁以后,能够保证在同一时间内保证只有一个线程具有权限.
代码语言:javascript复制import time
import threading
num = 0 #定义全局共享变量
thread_list = [] #线程列表
lock = threading.Lock() #生成全局锁
def SumNumber():
global num #在每个线程中获取这个全局变量
time.sleep(2)
lock.acquire() #修改数据前给数据加锁
num = 1 #每次进行递增操作
lock.release() #执行完毕以后,解除锁定
for x in range(50): #指定生成线程数
thread = threading.Thread(target=SumNumber)
thread.start() #启动线程
thread_list.append(thread) #将结果列表加入到变量中
for y in thread_list: #等待执行完毕.
y.join()
print("计算结果: ",num)
递归锁(RLock): 递归锁和全局锁差不多,递归锁就是在大锁中还要添加个小锁,递归锁是常用的锁.
代码语言:javascript复制import threading
import time
num = 0 #初始化全局变量
lock = threading.RLock() #设置递归锁
def fun1():
lock.acquire() #添加递归锁
global num
num = 1
lock.release() #关闭递归锁
return num
def fun2():
lock.acquire() #添加递归锁
res = fun1()
print("计算结果: ",res)
lock.release() #关闭递归锁
if __name__ == "__main__":
for x in range(10): #生成10个线程
thread = threading.Thread(target=fun2)
thread.start()
while threading.active_count() != 1: #等待所有线程执行完成
print(threading.active_count())
else:
print("所有线程运行完成...")
print(num)
互斥锁(Semaphore): 同时允许一定数量的线程更改数据,也就是限制每次允许执行的线程数.
代码语言:javascript复制import threading,time
num = 0 #初始化变量
semaphore = threading.BoundedSemaphore(5) #最多允许5个线程同时运行
def run(n):
semaphore.acquire() #添加信号
time.sleep(1)
print("运行这个线程中: %s"%n)
semaphore.release() #关闭信号
if __name__ == '__main__':
for i in range(20): #同时执行20个线程
t = threading.Thread(target=run, args=(i,))
t.start()
while threading.active_count() != 1: #等待所有线程执行完毕
pass
else:
print('----所有线程执行完毕了---')
print(num)
◆线程驱动事件◆
事件驱动(Event): 线程事件用于主线程控制其他线程的执行,事件主要提供了三个方法set、wait、clear、is_set
,分别用于设置检测和清除标志.
事件处理机制定义:全局定义了一个"Flag",如果"Flag"值为False,那么当程序执行event.wait 方法
时就会阻塞,如果"Flag"值为True,那么在执行event.wait 方法
时便不再阻塞,变成可执行模式,总体来说需要了解以下四个方法.
代码语言:javascript复制clear:将"Flag"设置为False set:将"Flag"设置为True wait:检测当前"Flag",如果"Flag"值为 False,那么当线程执行 event.wait 方法时就会阻塞,如果"Flag"值为True,那么event.wait 方法时便不再阻塞 is_set:检测当前的状态,是否阻塞
import threading
event = threading.Event()
def func(x,event):
print("函数被执行了: %s 次.." %x)
event.wait() #检测标志位状态,如果为True=继续执行以下代码,反之等待.
print("加载执行结果: %s" %x)
for i in range(10): #创建10个线程
thread = threading.Thread(target=func,args=(i,event,))
thread.start()
print("当前状态: %s" %event.is_set()) #检测当前状态,这里为False
event.clear() #将标志位设置为False,默认为False
temp=input("输入yes: ") #输入yes手动设置为True
if temp == "yes":
event.set() #设置成True
print("当前状态: %s" %event.is_set()) #检测当前状态,这里为True
定时器(Timer): 指定定时器,作用是让进程或者是指定函数,在n秒后执行相应的操作.
代码语言:javascript复制import threading
import time
def func():
print("hello python")
for i in range(5): #指定5个线程
thread = threading.Timer(5,func) #在5秒钟以后运行func函数
thread.start()
## Python 与进程
直观地说,进程就是正在执行的程序,进程是多任务操作系统中执行任务的基本单元,是包含了程序指令和相关资源的集合,线程的上一级就是进程,进程可包含很多线程,进程和线程的区别是进程间的数据不共享,多进程也可以用来处理多任务,不过多进程很消耗资源,计算型的任务最好交给多进程来处理,IO密集型最好交给多线程来处理,此外进程的数量应该和cpu的核心数保持一致.
进程与线程的区别,有以下几种解释: ● 新创建一个线程很容易,新创建一个进程需要复制父进程 ● 线程共享创建它的进程的地址空间,进程有自己的地址空间 ● 主线程可以控制相当大的线程在同一进程中,进程只能控制子进程 ● 线程是直接可以访问线程之间的数据,进程需要复制父进程的数据才能访问 ● 主线程变更可能会影响进程的其他线程的行为,父进程的变化不会影响子进程 ● 线程可以直接与其他线程的通信过程,进程必须使用进程间通信和同胞交流过程
◆创建一个进程◆
通常情况下,创建一个进程需要使用multiprocessing 模块
,具体的创建方法和上面的线程创建方法相同,唯一的不同是关键字的变化,但需要注意的是,由于进程之间的数据需要各自持有一份,所以创建进程需要的非常大的开销,其他使用方法和线程threading.Thread
是一样的,如下介绍两个创建进程例子.
创建进程(1): 通过使用multiprocessing库
,循环创建5个进程,并使用join等待进程执行完毕.
import multiprocessing
import time
def func(name):
time.sleep(2)
print("hello",name)
if __name__ == "__main__":
for i in range(5):
proc = multiprocessing.Process(target=func,args=("lyshark",))
proc.start()
proc.join()
创建进程(2): 创建5个进程,并在每个进程里启动1个线程,线程打印出线程的ID号.
代码语言:javascript复制import multiprocessing
import threading
import time
def thread_run():
print("子线程->子线程ID: %s" %threading.get_ident())
def func(num):
time.sleep(2)
print("-------------------------------->>> 主线程->主线程ID %s" %num)
for i in range(5): #在主线程里开辟5个子线程
thread = threading.Thread(target=thread_run,) #嵌套一个子线程
thread.start() #执行子线程
if __name__ == "__main__":
for i in range(5): #启动5个主线程
proc = multiprocessing.Process(target=func,args=(i,))
proc.start()
#proc.join()
◆进程数据共享◆
一般当我们创建两个进程后,进程各自持有一份数据,默认无法共享数据,如果我们想要共享数据必须通过一个中间件来实现数据的交换,来帮你把数据进行一个投递,要实现进程之间的数据共享,其主要有以下几个方法来实现进程间数据的共享,queues,Array,Manager.dict,pipe
这些方法都能实现数据共享,下面将举几个小例子进行说明.
共享队列(Queue): 这个Queue主要实现进程与进程之间的数据共享,与线程中的Queue不同.
代码语言:javascript复制from multiprocessing import Process
from multiprocessing import queues
import multiprocessing
def foo(i,arg):
arg.put(i)
print('say hi',i,arg.qsize())
li = queues.Queue(20,ctx=multiprocessing)
for i in range(10):
p = Process(target=foo,args=(i,li,))
p.start()
共享整数(int): 整数之间的共享,只需要使用multiprocessing.Value
方法,即可实现.
import multiprocessing
def func(num):
num.value = 1024 #虽然赋值了,但是子进程改变了这个数值
print("函数中的数值: %s"%num.value)
if __name__ == "__main__":
num = multiprocessing.Value("d",10.0) #主进程与子进程共享这个value
print("这个共享数值: %s"%num.value)
for i in range(5):
num = multiprocessing.Value("d", i) #声明进程,并传递1,2,3,4这几个数
proc = multiprocessing.Process(target=func,args=(num,))
proc.start() #启动进程
#proc.join()
print("最后打印数值: %s"%num.value)
共享数组(Array): 数组之间的共享,只需要使用multiprocessing.Array
方法,即可实现.
import multiprocessing
def func(ary): #子进程改变数组,主进程跟着改变
ary[0]=100
ary[1]=200
ary[2]=300
''' i所对应的类型是ctypes.c_int,其他类型如下参考:
'c': ctypes.c_char, 'u': ctypes.c_wchar,
'b': ctypes.c_byte, 'B': ctypes.c_ubyte,
'h': ctypes.c_short, 'H': ctypes.c_ushort,
'i': ctypes.c_int, 'I': ctypes.c_uint,
'l': ctypes.c_long, 'L': ctypes.c_ulong,
'f': ctypes.c_float, 'd': ctypes.c_double
'''
if __name__ == "__main__":
ary = multiprocessing.Array("i",[1,2,3]) #主进程与子进程共享这个数组
for i in range(5):
proc = multiprocessing.Process(target=func,args=(ary,))
print(ary[:])
proc.start()
共享字典(dict): 通过使用Manager()方法
,实现两个进程中的,字典与列表的数据共享.
import multiprocessing
def func(mydict, mylist):
mydict["字典1"] = "值1"
mydict["字典2"] = "值2"
mylist.append(1)
mylist.append(2)
mylist.append(3)
if __name__ == "__main__":
mydict = multiprocessing.Manager().dict() #主进程与子进程共享字典
mylist = multiprocessing.Manager().list() #主进程与子进程共享列表
proc = multiprocessing.Process(target=func,args=(mydict,mylist))
proc.start()
proc.join()
print("列表中的元素: %s" %mylist)
print("字典中的元素: %s" %mydict)
管道共享(Pipe): 通过Pipe
管道的方式在两个进程之间共享数据,类似于Socket套接字.
import multiprocessing
def func(conn):
conn.send("你好我是子进程.") #发送消息给父进程
print("父进程传来了:",conn.recv()) #接收父进程传来的消息
conn.close()
if __name__ == "__main__":
parent_conn,child_conn = multiprocessing.Pipe() #管道创建两个端口,一收一发送
proc = multiprocessing.Process(target=func,args=(child_conn,))
proc.start()
print("子进程传来了:",parent_conn.recv()) #接收子进程传来的数据
parent_conn.send("我是父进程,收到消息了..") #父进程发送消息给子进程
进程锁(Lock): 进程中也有锁,可以实现进程之间数据的一致性,也就是进程数据的同步,保证数据不混乱.
代码语言:javascript复制import multiprocessing
def func(loc,num):
loc.acquire() #添加进程锁
print("hello ---> %s" %num)
loc.release() #关闭进程锁
if __name__ == "__main__":
lock = multiprocessing.Lock() #生成进程锁
for number in range(10):
proc = multiprocessing.Process(target=func,args=(lock,number,))
proc.start()
◆进程的进程池◆
进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止,进程池有两个方法:apply(),apply_async()
,下面将介绍几个常用的小技巧.
进程池(apply): 同步执行,每次执行一个进程,直到所有进程执行完毕,其实也就是串行执行.
代码语言:javascript复制import multiprocessing
import time
def foo(num):
time.sleep(2)
print("进程执行-->: %s"%num)
if __name__ == "__main__":
pool = multiprocessing.Pool(processes=5) #允许进程池同时放入5个进程
for i in range(10):
pool.apply(func=foo,args=(i,)) #并行执行每次执行一个
print("ends ...")
pool.close()
pool.join()
进程池(apply_async): 异步执行进程,每次执行5个进程,直到执行完10次循环位置,并行执行.
代码语言:javascript复制import multiprocessing
import time
def foo(num):
time.sleep(2)
print("进程执行-->: %s"%num)
def bar(arg):
print("call back 函数执行..")
if __name__ == "__main__":
pool = multiprocessing.Pool(processes=5) #允许进程池同时放入5个进程
for i in range(10):
pool.apply_async(func=foo,args=(i,),callback=bar) #每次执行进程结束,自动执行callback指定的函数
print("ends ...")
pool.close()
pool.join()
## Python 与协程
协程,又称微线程,是一种用户态的轻量级线程,携程主要实现了在单线程下实现并发,一个线程能够被分割成多个协程,协程拥有自己的寄存器上下文和栈,协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈,因此协程能保留上一次调用时的状态,每次过程重入时,就相当于进入上一次调用的状态.
线程和进程的操作是由程序触发系统接口,最后的执行者是系统,协程的操作则是程序员,协程存在的意义:对于多线程应用,CPU通过切片的方式来切换线程间的执行,线程切换时需要耗时,而协程则只使用一个线程,在一个线程中规定某个代码块执行顺序,协程的适用场景:当程序中存在大量不需要CPU的操作时(IO操作),时适用于协程.
协程之(Yield): 通过使用yield方法来模拟实现协程操作的例子,这里只是演示.
代码语言:javascript复制import time
import queue
def consumer(name):
print("--->包子...")
while True:
new_yield = yield
print("[%s] 在吃包子 %s" % (name, new_yield))
def producer():
r = con.__next__()
r = con2.__next__()
n = 0
while n < 5:
n = 1
con.send(n)
con2.send(n)
print("