深入思考 PyQt 多线程处理

2020-05-07 19:37:00 浏览数 (1)

1. 上期笔记续集

在上一篇探索笔记 《结合 Qt 信号槽机制的 Python 自定义线程类》 中,我初步研究了一下 Python3 的 threading.Thread 类以及 PySide2 的信号槽机制,并结合这两者的特性设计出一种能够在子线程中向主线程异步发送数据的自定义线程类的实现方案。

1.1 这是一个定时器?

在那篇笔记中,我举了一个例子:每隔一秒钟时间子线程向主线程发送一次当前时间信息,然后由主线程打印输出。其实你如果仔细理解的话,你会发现这个例子所做的功能更像是一个定时器:每隔一定时间重复做一个固定的动作。这种 “定时器” 的实现,其实直接使用 threading.Timer 即可完成,也可以使用更简单更强大的 QTimer 来实现(后期可能会整理有关 threading.Timer 和 QTimer 相关的笔记)。

既然定时器有更好的实现方案,那么那篇笔记中所介绍的 “结合 Qt 信号槽机制实现的自定义线程” 有什么应用价值呢?

1.2 复杂的且需要异步与主线程进行通信的情况

是的,当你的子线程所需要处理的代码逻辑比较复杂时,且需要异步与主线程进行通信时,这种方案就能够发挥其重要的应用价值了。

举个具体的案例吧:

主线程程序读取一个文件,这个文件的每一行即表示一条命令,然后主线程要调用一个子线程,让这个子线程来负责自动向串口设备发送每一条命令,并接受目标设备的响应结果,然后将结果反馈给主线程,主线程收到结果后立即在列表中展示。

没看懂案例?好吧,那我就画一个图来简单描述一下整个程序运行的过程:

在这个例子中,仔细看子线程程序,这里需要发送多个信号,并且何时发送是不确定的,因为你不知道数据何时从串口设备传达过来,因此这肯定不可能是一个定时器。这个时候使用 “结合 Qt 信号槽机制的自定义线程” 这种模式就变得很方便了。

1.3 这是一个完美的方案吗?

在上面这个案例中,从设计思想来看,似乎好像已经无可挑剔了。但是,假设文件中定义的命令有几千条甚至几万条,这时候发送命令以及接收响应结果的累计等待时间肯定是相当长的,那万一你等得不耐烦了,想要随时暂停甚至直接停止掉子线程的工作,那要怎么办呢?

你可能会想到,这还不简单,直接结束线程不就得了吗!是的,没错,这肯定是一个相当好的办法。

这里顺便嘲讽一下有些比较傻的人: 可能有人觉得,在子线程类中加个标识变量不就得了,平时是 False 值,等到主线程想停止子线程工作的时候,就给它设为 True,然后子线程在判断这个标识变量为 True 时就跳出 while 循环直接 return 出去。 你傻啊,你要这样做的话,你在什么地方来判断这个标识变量呢?在 while 代码块开始的地方吗,还是在 while 代码块结尾的地方,或者是每一行语句中间都插一个判断的?就比如说你在 while 代码块刚开始的地方进行判断吧,那后面的代码你怎么知道要执行过久?万一后面的代码还要再执行五秒钟呢,这岂不是我点了停止按钮要过五秒后它才真正停止?如果你决定傻到每一行语句中间都插一句判断标识变量的,那串口接收数据也要等待很久啊,说不定等半个钟都没收到响应数据呢?

所以这个方案其实是不太可行的,(但其实这是一个比较安全的方法,我在后面再重新夸回来有这样的想法的人吧)

那好吧,那结束线程就结束呗。但问题来了,还记得上篇笔记的代码吗,PrintTimeThread 是继承了 threading.Thread 类的,我查遍了这个类的所有文档(python3),它居然……没有提供结束线程的方法……

后来在网上找了一圈,大体上都是一样的代码(基本是完全一样,也不知道是谁 copy 了谁的),不过他们的代码太乱,我大概整理了一下:

代码语言:javascript复制
def kill_thread(ident: int):  
    """ 结束线程   
    :param ident: 线程标识符  
    """  
    if not ident:    
        raise ValueError("无效的线程标识符")  
    else:    
        tid = ctypes.c_long(ident)    
        code = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(SystemExit))
        
    # 如果返回值不为1,则表示结束失败    
    if code != 1:      
        # 需要使用 exc=None 再次调用这个方法来恢复效果      
        ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None)      
        raise SystemError("线程结束失败")

其基本实现思路就是通过线程标识符来获取线程 ID,然后调用一个叫什么 PyThreadState_SetAsyncExc 的方法来引发 SystemExit 异常,从而实现让线程结束(好吧,其实我也不知道这个方法具体啥原理,我承认我也是抄了别人的代码,但我至少抄得比别人认真哈哈)。

你以为真的就这么简单实现了吗?那你就大错特错了,关于多线程、多进程的坑,是你这一辈子都踩不完的坑哈哈。这里我只能告诫大家:不要试图用强制方法杀掉一个 python 线程,这从服务设计上就存在不合理性。多线程本来就是用来任务的协作并发,如果你使用强制手段干掉线程,那么会有很大概率会出现意想不到的 bug。请记住:锁资源不会因为线程退出而被释放的 !这里举两个栗子:

  1. A 线程拿到了锁,但它还没来得及释放锁就被强制干掉了,那么这就会导致所有的其他线程都获取不到这个锁资源,这就是典型的死锁现象。
  2. 在常见的生产消费者的场景下,消费者从任务队列中获取了任务,还没来得及将任务丢回到队列中就被强制干掉了,那么这就造成了数据的丢失。

好吧,太深奥了,我知道你看不懂,那我也不继续装逼了。反正你只需要知道,尽量别用强制方法杀掉 python 的线程就行了,而理想的停止线程的方法就是让线程自个自杀,所谓自杀就是你给它一个标志位,它检测到标志位后就自己退出线程。这好像又回到了上面的梗,没错,就是上面被我骂了很傻的并且留着后面来夸的那些人,现在我就可以光明正大夸夸你们了,你们的想法初衷其实是完全正确的,只是很可惜这种想法没办法达到预期的效果(毕竟产品经理不会关心你的头发,他只关心他的工资)。

那说了这么多废话,到底要怎么做才是最聪明的呢?不急,且听我给你介绍一个新玩意儿——QThread。

2. 强大的 QThread

2.1 QThread 从哪里来?

QThread 是啥?它从哪里来的?它是干嘛的?

QThread 其实是 Qt 的一个专门用于处理多线程的类。在 Python 语言中,QThread 可以来自于 PyQt5,也可以来自于 PySide2。我特意去对比了一下,PyQt5 的 QThread 比 PySide2 的足足多了13个方法,真的是扶不起的 PySide2 啊~~不过即使 PySide2 如此不争气,我也还是喜欢它,没别的理由,喜欢就是喜欢!

哎,算了,我还是用 PyQt5 吧,PySide2 居然连个 finished 信号都没有,真不知道该怎么说它了,希望它能够好好反省一下自己。

2.2 三个结束线程的方法

下面列出来了 PyQt5 中 QThread 类的所有方法,对了,别问我 QThread 这个类怎么引入,我是不会告诉你要用 from PyQt5.QtCore import QThread 这行代码的,这问题问得太低级了。

哇,好多方法名称,是不是看得头昏眼花?那只能说明你英语太好了,像我这种英文菜到爆的人就不会出现头晕的现象,因为我只认得这几个:run、start、quit、exit、terminate、started、finished、isRunning、isFinished,其他的我都不认识哈哈。如果你探索精神可嘉非要去研究那些连我都看不懂的东西,这里给你推一个 PyQt5 的官方文档:https://www.riverbankcomputing.com/static/Docs/PyQt5/api/qtcore/qthread.html#,以及 Qt 本尊的文档:https://doc.qt.io/qt-5/qthread.html,能看懂我拜你为大哥!

这里的 runstart 其实和 threading.Thread 中的 run、start 是一样的作用,我们需要将自己的子线程处理的代码逻辑写在 run 方法里,因为我们就必须要继承 QThread 派生出一个新的子类来(其实还有另一种不用派生新类的方法,暂且先设置悬念,以后的文章中可能会提及)。线程通过 start 方法来启动运行。

startedfinished 表示的是线程开始运行的信号以及运行完毕的信号,既然是信号,如有需要,连到槽函数中即可。

isRunningisFinished 顾名思义就是用来判断线程是否正在运行以及是否已运行完毕。

那么现在就只剩 quitexitterminate 这三个了,乍一看,感觉它们好像没啥区别,都是退出或结束线程的意思。没错,这里就体现到了 QThread 对比 threading.Thread 的强大之处了,前者给你提供了三种退出线程的方法,而后者居然吝啬到一个都不给。那么它们三到底啥区别啊?我们且来看 Qt 的官方描述:

  • quit( ) 方法:

很显然,它跟 exit(0) 是一个意思,所以这里就直接忽略它吧,看后面的 exit() 是啥意思就好。

  • exit( ) 方法:

Emm,表示没怎么看懂,不过好像是跟 exec( ) 这个方法有关,好,现在来看看 exec() 方法:

这回终于看懂了上面的一切了。大概就是说,你写在 run() 里面的子线程处理逻辑,写完代码后要在最后面调用 exec() 这个方法,来让线程进入到循环等待中,避免线程运行完了直接结束掉。直到你调用 exit() 这个方法的时候,它才会停止循环,并返回一个数字,这个数字就是你在调用 exit() 是给它传进去的。这就很好理解 quit() 了,调用 quit() 就会返回 0 嘛。

但是,额~好像,这并不是我们要的结果。因为我们的子线程里本身就写了一个 while 大循环,一直不间断地接收串口设备发来的响应数据,所以根本就轮不到 exec() 插手的余地。所以,在这里,exit( ) 和 quit() 对我们而言都没啥作用。好吧,现在只剩最后一个 terminate() 方法了,让我们来看看它的官方描述:

  • terminate( ) 方法:

很长的一段描述,说明含金量肯定也会比较高。

从上面的官方描述中可以看出,terminate( ) 方法其实就是根据操作系统的调度策略来终止线程,但可能不会立即就终止(废话,你得给人家喘喘气啊),所以在调用了 terminate( ) 之后还需要调用一下 wait( ) 方法,来等待它终止完毕。但是,上面还有个加粗的 Warning,看到那段话就让人很不舒服了。那段话的歌词大意是:此功能很危险,不建议使用,线程可以在代码中的任何位置终止,在修改数据时也可能被终止,线程无法解锁任何保持的互斥锁等。总之,仅在绝对必要时才使用此功能。好吧,绕了一大圈,又回到了起点,这不跟上一节中写的 kill_thread() 一样的效果吗。。。

所以最后的结论就是:QThread 很强大,提供了很多 threading.Thread 没有的方法,并且也给出了三种结束线程的方法,但都没啥卵用,不过至少比自己写手结束线程函数要强。如果你注意观察,你会发现 QThread 这里还自带了延时休眠函数,这样你也同样省了去 import time 包了。

难道就真的没有办法了没?作为一个爱学习爱探索爱钻研的我来说,不达目的誓不摆休,于是,我只能自己研究一种方案来达到最初的目的了。

3. 如何安全地结束线程

其实这是一个世纪难题,如果有很好的解决办法的话,我也不会写这篇文章出来装逼了,早就满大街的文章横空出世了。说到要安全地结束线程,都是需要分情况讨论的,我这里就分成三种情况来讨论吧,当然我肯定是只讲 Python 的,其他语言不在本篇讨论范围之内。

3.1 没有长时间等待的情况

当子线程的代码中没有死循环或长时间等待的情况时,这就非常好办了,直接使用 exit( ) 或 quit( ) 方法即可,等代码执行完再退出 exec( ) 循环来结束线程,这种方法既简单又安全,就算要等,也只是等一会的时间,用户几根感觉不到,顶多就界面卡一两秒。如果需要使用线程锁,则要引入 QMutex 类

这里写个简单的示例代码:1-5 的数字每隔 1 秒打印出来。

代码语言:javascript复制
import time
from PyQt5.QtCore import QThread, QMutex


# 创建线程锁
mutex = QMutex()


class CustomThread(QThread):    
    """ 自定义线程类:继承 QThread """       
    def __init__(self):        
        QThread.__init__(self)        
        
    def run(self):        
        mutex.lock()                    # 加锁                
        for data in [1, 2, 3, 4, 5]:            
            print(self.objectName()   ": read data | ", data)            
            time.sleep(1)                    
            
        mutex.unlock()                  # 释放锁        
        self.exec()                     # 进入线程循环
        

if __name__ == "__main__":    
    # 创建子线程1    
    thread1 = CustomThread()    
    thread1.setObjectName("Thread 1")        
    
    # 创建子线程2    
    thread2 = CustomThread()    
    thread2.setObjectName("Thread 2")        
    
    # 第0秒时启动子线程1    
    print("Main: start Thread 1.")    
    thread1.start()        
    
    # 第1.5秒时启动子线程2    
    time.sleep(1.5)    
    print("Main: start Thread 2.")    
    thread2.start()
    
    # 第2.5秒时结束子线程1    
    time.sleep(1)    
    print("Main: quit Thread 1.")    
    thread1.quit()        
    
    # 第3.5秒时检测子线程1是否已退出    
    time.sleep(1)    
    print("Main: Thread 1 is finished? ", thread1.isFinished())        
    
    while True:        
        pass

下面是程序执行的实际结果:

在这个例程中,每个子线程的理论总运行时间应该为5秒,虽然在 1.5 秒时刻时就已经启动了子线程2,但由于子线程1的线程锁的作用,子线程2必须等待子线程1结束后才会启动;并且由于使用的是 quit/exit 方式来结束线程,因此必须等待子线程代码执行到 self.exec( ) 这一行时 quit/exit 才会起作用,因此子线程并没有在第2.5秒时就立即结束,而是执行完所有代码后才退出。

3.2 没有操作互斥资源的情况

上面那个例程虽然是最安全的,但显然不是用户体验最好的,因为第2.5秒时刻结束子线程,硬是等待第5秒后才退出,并且还继续打印输出,这在很多时候都是不太能接收的。

如果子线程中没有对共享的互斥资源进行操作的话,由于不担心数据丢失与互斥的问题,因此完全可以使用 terminate( ) 方法强制结束线程,无论它是否为长时间等待的操作,都是这么的简单粗暴,就是这么拽!

代码语言:javascript复制
import time
from PyQt5.QtCore import QThread


class CustomThread(QThread):    
    def __init__(self):        
        QThread.__init__(self)        
        
    def run(self):        
        for data in [0, 1, 2, 3, 4, 5]:            
            print("Thread: read data | ", data)            
            time.sleep(1)    
                
        self.exec()
        
        
if __name__ == "__main__":    
    # 创建子线程    
    thread = CustomThread()        
    
    # 第0秒时启动子线程    
    print("Main: start Thread.")    
    thread.start()        
    
    # 第2.5秒时结束子线程    
    time.sleep(2.5)    
    print("Main: terminate Thread.")    
    thread.terminate()    
    thread.wait()        
    
    # 第3.5秒时检测子线程是否已退出    
    time.sleep(1)    
    print("Main: Thread is finished? ", thread.isFinished())        
    
    while True:        
        pass

程序执行结果如下:

可以看到,到第2.5秒的时候,子线程就真的没结束掉了,干脆利落!

3.3 有长时间等待且有互斥资源操作的情况

对于有长时间等待的情况,尤其是直接在 run( ) 方法来写了个 while True: 的情况,这时候调用 exit/quit 是不可能结束的了线程的了,这辈子都结束不了的了。如果不考虑数据安全性还好,直接 terminate 执行下去,果断杀掉线程。但如果同时也有互斥资源的操作,这时候就必须要考虑数据安全性了,直接使用 terminate 方法肯定是不推荐的。

经过一番探索,我找到了一个实现这种需求的小技巧。大概思路就是:我们一般说使用 terminate 方法不安全,无非就是指怕中途干掉了子线程后,子线程未来得及释放锁,导致其他的子线程一直无法获得锁。既然如此,那只要在干掉子线程的时候同时将锁释放掉不就解决问题了?于是,我们可以重写父类的 teminate 方法,在子类的 terminate 方法中将锁释放掉,然后再调用父类 terminate 执行杀死线程的动作。另外,既然我们都可以在子类 terminate 中释放锁了,那当然 wait 方法也可以一并写到这里,甚至如果有其他的需要恢复数据状态的操作,也都可以写在这里,例如前面提到的将数据丢回到队列里等。下面直接贴出代码更直观些。

代码语言:javascript复制
import time
from PyQt5.QtCore import QThread, QMutex

# 实例化线程锁对象
mutex = QMutex()


class CustomThread(QThread):    
    """ 自定义线程类:继承 QThread """        
    
    def __init__(self):        
        QThread.__init__(self)        
        
    def run(self):        
        mutex.lock()                            # 在子线程运行时加锁                
        
        # 这是一个死循环代码块        
        while True:            
            print(self.objectName(), "-" * 20, time.asctime())            
            time.sleep(1)        
            
    def terminate(self):        
        """ 重写 terminate 方法 """        
        mutex.unlock()                          # 在终止线程前先解锁        
        super(CustomThread, self).terminate()   # 终止线程        
        self.wait()                             # 等待线程被终止完毕
        

if __name__ == "__main__":    
    # 创建子线程    
    thread1 = CustomThread()    
    thread1.setObjectName("Thread 1")    
    thread2 = CustomThread()    
    thread2.setObjectName("Thread 2")        
    
    # 第0秒时启动子线程1    
    print("Main: start Thread 1.")    
    thread1.start()
    
    # 第2.2秒时启动子线程2    
    time.sleep(2.2)    
    print("Main: start Thread 2.")    
    thread2.start()        
    
    # 第3.2秒时结束子线程1    
    time.sleep(1)    
    print("Main: terminate Thread 1.")    
    thread1.terminate()        
    
    # 第4.2秒时检测子线程1是否已退出    
    time.sleep(1)    
    print("Main: Thread 1 is finished? ", thread1.isFinished())        
    
    # 第6.2秒时结束子线程2    
    time.sleep(2)    
    print("Main: terminate Thread 2.")    
    thread2.terminate()        
    
    # 第7.2秒时检测子线程2是否已退出    
    time.sleep(1)    
    print("Main: Thread 2 is finished? ", thread2.isFinished())        
    
    while True:        
        pass

来看下程序执行的结果:

可以看出,当启动子线程2的时候,由于子线程1还占用着锁,因为子线程2必须要等待;当子线程1被调用 terminate 方法时,立刻就已经退出线程了,并且安全地释放了锁,此时子线程2拿到了锁便开始运行。这正是我们想要得到的效果。

4. 总结

通过对上一篇笔记案例的思考,在多线程处理过程中,不仅仅用户体验很重要,数据安全也更重要,如何兼顾这两者的需求成了程序员不得不认真思考的问题。Qt 自带的 QThread 类为我们提供了许多非常的接口,但其实它也不是完全安全可靠的,一切可靠性的程序都应该由程序员自主判断和设计,针对不同的需求,应该根据实际情况选择最优的解决方案,而不是一味追求通用而忽略了用户体验或数据安全。

0 人点赞