1 线程类的封装
学习线程互斥之前,我们先对linux的线程库进行封装,熟悉一下C 的线程库。并且方便我们后续使用
1.1 框架搭建
我们主要要实现start stop join三个功能,线程启动,线程终止,线程等待。完成这些就可以快速使用线程了!
类内部需要:
- 线程名字:name
- 线程ID :进行等待和终止关键
- 是否运行判断 :只有运行状态才可以进行终止和等待
- 线程需要执行的回调函数指针 typedef void(*func_t)(const std::string& name)
- 函数返回值 void* result
拥有这些成员变量,就这样就可以保证我们的基本工作了!
代码语言:javascript复制namespace ThreadMouble
{
//回调函数的类型
typedef void(*func_t)(const std::string& name);
class Thread
{
public:
//构造函数需要传入名字和回调函数
Thread(const std::string& name , func_t func):
_name(name), _func(func)
{
}
bool Start()
{
}
void Stop()
{
}
void Join()
{
}
~Thread()
{
}
private:
//线程名字
std::string _name;
//线程ID
pthread_t id;
//是否运行判断符
bool isrunning;
//回调函数
func_t _func;
//函数返回值
std::string _result;
};
}
1.2 线程启动
线程启动接口很简单就可以实现,我们调用系统调用pthread_create
传入对应的参数.
但是执行的ThreadRun函数就要费一些头脑,pthread_create
系统调用中需要传入一个void* (* )(void*)
的函数指针
void* ThreadRoutinue(void* args) --- 执行回调方法
但是对象内的函数都有一个默认参数 this指针,所以需要加入 static修饰成为类的函数,这样也造成不能调用内部的成员了, 为了优雅的执行 多加一个Excute()成员 进行调用回调函数
代码语言:javascript复制 void Excute()
{
isrunning = true;
_func(_name);
isrunning = false;
}
static void* ThreadRun(void* args)
{
//获取类对象
Thread* self = static_cast<Thread*>(args);
self->Excute();
return nullptr;
}
bool Start()
{
//需要启动线程
isrunning = true;
int n = ::pthread_create(&id , nullptr , ThreadRun , this);
if(n == 0)
{
return true;
}
else
{
return false;
}
}
PS: ::
表示使用标准库的接口
这样就优雅的执行了线程启动
1.3 线程终止
只有线程运行了才可以进行终止,直接调用系统调用即可
代码语言:javascript复制 void Stop()
{
if(isrunning )
{
isrunning = false;
::pthread_cancel(id);
}
return ;
}
1.4 线程等待
直接调用系统调用即可!
代码语言:javascript复制 void Join()
{
::pthread_join(id , nullptr);
return ;
}
1.5 运行测试
我们写好了线程的封装,接下来就来使用一下,来看看效果:
代码语言:javascript复制#include<iostream>
#include"Thread.hpp"
#include<unistd.h>
using namespace ThreadMouble;
void threadrun(const std::string& name)
{
while(true)
{
std::cout << "name: " << name << " is running..." << std::endl;
sleep(1);
}
return ;
}
int main()
{
Thread t("thread-1" , threadrun);
t.Start();
std::cout << t.status() << std::endl;
sleep(10);
t.Stop();
t.Join();
return 0;
}
运行来看:
很好,可以正常创建线程并执行任务!
2 线程互斥
线程可以看到的大部分资源是共享资源,即多个线程可以看到的资源叫做共享资源!那么如果今天这个共享资源是一个大数组,一个线程可以进行写入,其他线程可以进行读取,这样不就实现了线程通信了!可是还是有问题的,因为线程读取不受对方控制,可以刚写一个字符立马就被读取了。就造成了读取不一致的问题。所以共享资源往往需要进行保护,类似取ATM机取钱,虽然是公共场所但是只有你一个人可以使用当前的ATM机。而线程也有这样的场景,就是线程互斥!
2.1 多线程访问的问题
首先我们先来看看多线程访问中会遇到的问题 — 我们设置一个情景,抢10000张票,我们设置4个并发线程一起来抢票:
- 按理来说每个线程只会在有票的时候可以抢
- 抢到就总数减一 , 直到没票为止
#include <iostream>
#include "Thread.hpp"
#include <unistd.h>
#include <vector>
using namespace ThreadMouble;
// 一共10000张票
int num = 10000;
void threadrun(const std::string &name)
{
while (true)
{
if (num > 0)
{
usleep(1000);//抢票的时间
std::cout << "name: " << name << "剩余票数: " << num << std::endl;
num--;
}
else
{
break;
}
}
return;
}
int main()
{
std::vector<Thread> thds;
for (int i = 0; i < 4; i )
{
char buffer[128];
std::string name = "thread-" std::to_string(i);
snprintf(buffer, 128, "%s", name.c_str());
thds.emplace_back(buffer, threadrun);
}
for (int i = 0; i < 4; i )
{
thds[i].Start();
}
std::cout << "所以票已经强光!!! " << std::endl;
for (int i = 0; i < 4; i )
{
thds[i].Join();
}
return 0;
}
我们运行看看:
运行之后发现怎么抢到了负数票?这是为什么???这其实就是多线程并发访问中会遇到的问题,访问全局资源时就发生了问题! 我们分析一下为什么会发生问题
直接原因
- 判读的过程其实是一种计算,计算结果是真和假(逻辑运算),是由CPU进行的,当CPU进行计算时,会先在内存中读取
num
,然后再到一个寄存器中进行储存,再然后将判断数0
移动到寄存器进行判断,最后得到结果。每个线程都会进行这样一个过程 - 一行代码可能对应多条汇编指令,执行汇编指令 中随时都可能切换到其他线程
- CPU中只有一套寄存器,但是寄存器的数据可以有多套(属于线程私有,看起来放在一套公共资源中,它会带走自己的数据,回来时恢复),所以假如在判断过程中进行了线程的切换,此时还没有进行
--
,就可能造成多个线程都存储着最后一张票,这样就造成了负数!
2.2 解决办法 — 锁
为了解决上述的问题,就要使用锁,我们先来了解锁和对应接口。
在pthread库中有我们锁的对应接口,和类型pthread_mutex_t
互斥锁(任何时刻只允许一个线程进行资源访问)。有了这把锁既有对应的初始化和销毁。设置时不管是全局的还是静态的,只需要进行init即可。
pthread_mutex_init
的第一个参数传入锁的地址,第二个参数设置为nullptr
就行.pthread_mutex_destory
传入锁的地址就可以进行销毁了,全局或者静态的其实不需要进行主动销毁,在程序运行结束之后就自动销毁了。使用临时锁时才需要进行主动销毁。
进行加锁时需要使用lock
,解锁使用unlock
,非常直观!在使用过程中,会有多个线程竞争一个锁,成功的正常运行,失败的直接阻塞。
所谓的对共享资源的保护,本质是对临界区代码的保护!因为访问资源是由代码进行访问的,把访问资源的代码保护起来就保护了共享资源!接下来我们来了解一下临界区和非临界区
在需要保护的区域进行上锁,使其串行执行线程,就不会出现之前并发执行的问题了!
我们快速上手一下:
代码语言:javascript复制void threadrun(const std::string &name)
{
while (true)
{
pthread_mutex_lock(&gmutex);
if (num > 0)
{
usleep(1000);
std::cout << "name: " << name << "剩余票数: " << num << std::endl;
num--;
pthread_mutex_unlock(&gmutex);
}
else
{
pthread_mutex_unlock(&gmutex);
break;
}
}
return;
}
我们分析过,出现问题的原因是这个判断语句,也就是临界区,要在临界区之前上锁。也就是在进行抢票判断之前,我们先将代码上锁。之后处理完成就解锁(一定要保证解锁)。
注意:
- 加锁的粒度一定要小,只将临界区代码加锁就可以
- 任何进程要进行抢票,都得先申请锁,不应该有例外!
- 所有线程申请锁的前提是都要看到这把锁, 所以锁也是一种共享资源!是共享资源就有可能会出现之前的问题,但是锁用谁来保护呢?想要不发生问题就得保证加锁的过程必须是原子的(只有一条汇编指令)!
- 原子性:要么不做,要做就做完,没有中间状态!
- 如果线程申请锁失败了,当前线程就会别阻塞!
- 如果线程申请锁成功了,当前线程就继续进行!
- 线程申请锁成功了,运行临界区的代码时,会进行切换吗?可以!加锁不会影响调度算法,只会影响线程会不会继续向下运行!加锁的线程可以放心运行,不会受到打扰!
总之,对于其他线程,要么没有申请锁,要么释放了锁,对于其他线程才有意义!相当于我访问临界区,对于其他线程是原子的!
我们在对锁和线程名进行一个整体封装,更加优雅地进行使用:
代码语言:javascript复制// 包含回调函数所需的数据
class ThreadData
{
public:
ThreadData(const std::string name, pthread_mutex_t *gmutex) : _name(name), _lock(gmutex)
{
}
~ThreadData()
{
}
public:
std::string _name;
pthread_mutex_t *_lock;
};
再稍微修改一下线程类内部的构造函数,将主函数的传参修改一下:
代码语言:javascript复制int main()
{
//设置一个局部锁
pthread_mutex_t mutex ;
pthread_mutex_init(&mutex , nullptr);
std::vector<Thread> thds;
for (int i = 0; i < 4; i )
{
char buffer[128];
std::string name = "thread-" std::to_string(i);
snprintf(buffer, 128, "%s", name.c_str());
//每个线程都需要一个td对象
ThreadData *td = new ThreadData(name, &mutex);
thds.emplace_back(name, threadrun, td);
}
for (int i = 0; i < 4; i )
{
thds[i].Start();
}
for (int i = 0; i < 4; i )
{
thds[i].Join();
}
//销毁锁
pthread_mutex_destroy(&mutex);
return 0;
}
我们运行一下:
可以看到使用的是同一个锁!
我们还可以进行优化,我们可以将锁单独封装起来,做到自动解锁释放:
代码语言:javascript复制#include <pthread.h>
class LockGuard
{
public:
LockGuard(pthread_mutex_t *td) : _td(td)
{
pthread_mutex_lock(_td);
}
~LockGuard()
{
pthread_mutex_unlock(_td);
}
private:
pthread_mutex_t *_td;
};
这样每次在临界区之前创建一个锁对象,就可以完成对临界区的保护!
2.3 从原理角度理解锁
上面我们见到了锁的作用,那我们如何理解:
- 申请锁成功,允许进入临界区
- 申请锁失败,不允许进入临界区
很简单,申请成功了,函数
pthread_mutex_lock()
会返回,否则不返回(就阻塞了,直到函数内部被唤醒,重新申请锁)!这就一个类似scanf()
的情况 。
- 我们已经意识到单纯的
i
或者i
都不是原子的,因为 - 为了实现互斥锁操作,大多数体系结构都提供了
swap
或exchange
指令,该指令的作用是把寄存器和内存单元的数据相交换,由于只有一条指令,保证了原子性(一条汇编语句的是原子性的)。即使是多处理器平台,访问内存的总线周期也有先后,一个处理器上的交换指令执行时另一个处理器的交换指令只能等待总线周期。
我们可以画图来看:
- 首先先确认一个概念:lock为1才能进行。
- 将内存中锁数据与寄存器的数据交换,将 1 移动到寄存器中,而寄存器此时数据是属于当前线程的,在线程调度完之后,就会将这个
1
带走! - 下一个线程进行锁数据与寄存器的数据交换,只会得到
0
就阻塞在这里了 - 等锁住的线程执行结束,进行解锁,将
1
交换到内存中,此时就可以别其他线程使用了!
后序文章继续学习线程互斥与线程同步!