Linux:多线程(二.理解pthread_t、线程互斥与同步、基于阻塞队列的生产消费模型)

2024-08-08 08:22:42 浏览数 (1)

1.理解Linux下线程——理解tid

  • 我们知道Linux系统中没有线程的概念,只有轻量级进程。但是我们用户只认线程,那么Linux下就有原生线程库libpthread.so.0进行了封装,使得我们用户能通过库里的接口进程线程的创建,等待,终止等等
  • 那么现在线程的管理工作就落到这个库里面了,一提到管理那就是:先描述,再组织
  • 我们之前已经看过了:(tid与LWP是不同的)pthread_t类型是用户空间线程库对线程的抽象(本质就是一个虚拟地址),用于在用户空间管理线程的创建、销毁等操作。而LWP则是内核管理轻量级进程的抽象,用于在内核空间进行线程的调度和管理。 在Linux系统中,线程库(如pthread库)会将pthread_t映射到对应的LWP上,以便内核进行线程的调度。当创建一个线程时,线程库会分配一个pthread_t标识符,并在内核中创建一个对应的LWP。线程库会负责将pthread_t与LWP进行映射,以便在用户空间对线程进行操作。
  • 动态库也叫共享库,那么其他进程创建的线程都是在库里,共享的。但是一个进程内只有自己线程的地址,看不到其他进程的
  • 每一条执行流的本质就是一条调用链,我们的线程就是一个个执行流,为了保证互相之间不影响,那每个线程都要有独立的栈结构
  1. struct pthread:在Linux系统中,struct pthread是指代线程控制块(Thread Control Block,TCB)的结构体。它包含了线程的状态信息、线程的调度信息、线程的栈信息等。struct pthread结构体用于描述线程的属性和状态,是操作系统用来管理线程的数据结构。
  2. 线程局部存储(Thread Local Storage,TLS):线程局部存储是一种机制,允许每个线程拥有自己独立的存储空间,用于存放线程私有的数据(只能存内置类型)。在C/C 中,可以使用__thread关键字创建线程局部变量。使用后全局变量会发生拷贝到线程内(放到类型前面使用:__thread int a;),线程会使用线程局部的那个。
  3. 线程栈(Thread Stack):线程栈是线程独立的内存区域,用于存储线程执行函数中的局部变量、函数调用信息、临时对象等。每个线程都有自己独立的栈空间,栈的大小通常是固定的或者可以通过系统调用来设置。线程栈的大小限制了线程能够调用的函数深度,过大的栈空间可能导致资源浪费,而过小的栈空间可能导致栈溢出。
代码语言:javascript复制
#include <iostream>
#include <thread> // C  里的库
#include <unistd.h>
#include <sys/types.h>

using namespace std;

void *task(void *argv)
{
    int count = 5;
    while (true)
    {
        cout << "I'm a new thread ,pid : " << getpid() << ". count:" << count << "count's address:" << &count << endl;
        sleep(1);
        count--;
    }
    return nullptr;
}

int main()
{
    pthread_t tid1;
    pthread_t tid2;
    pthread_create(&tid1, nullptr, task, nullptr);
    pthread_create(&tid2, nullptr, task, nullptr); // 这里我们两个线程执行一个函数,里面有临时变量,看二者地址如何

    pthread_join(tid1, nullptr);
    pthread_join(tid2, nullptr);

    return 0;
}

能证明独立栈的存在


2. Linux线程互斥

2.1相关概念

  • 临界资源:多线程执行流共享的资源就叫做临界资源
  • 临界区:每个线程内部,访问临界资源的代码,就叫做临界区
  • 互斥:任何时刻,互斥保证有且只有一个执行流进入临界区,访问临界资源,通常对临界资源起保护作用
  • 原子性(后面讨论如何实现):不会被任何调度机制打断的操作,该操作只有两态,要么完成,要么未完成

2.2引入

我们利用上次自己封装的Thread来写一段多线程抢票代码

Thread.hpp

代码语言:javascript复制
#ifndef __THREAD_HPP__
#define __THREAD_HPP__

#include <iostream>
#include <string>
#include <unistd.h>
#include <functional>
#include <pthread.h>

namespace ThreadModule
{
    template <typename T>
    using func_t = std::function<void(T &)>;
    // typedef std::function<void(const T&)> func_t;

    template <typename T>
    class Thread
    {
    public:
        void Excute()
        {
            _func(_data);
        }

    public:
        Thread(func_t<T> func, T &data, const std::string &name = "none-name")
            : _func(func), _data(data), _threadname(name), _stop(true)
        {
        }
        static void *threadroutine(void *args) // 类成员函数,形参是有this指针的!!
        {
            Thread<T> *self = static_cast<Thread<T> *>(args);
            self->Excute();
            return nullptr;
        }
        bool Start()
        {
            int n = pthread_create(&_tid, nullptr, threadroutine, this);
            if (!n)
            {
                _stop = false;
                return true;
            }
            else
            {
                return false;
            }
        }
        void Detach()
        {
            if (!_stop)
            {
                pthread_detach(_tid);
            }
        }
        void Join()
        {
            if (!_stop)
            {
                pthread_join(_tid, nullptr);
            }
        }
        std::string name()
        {
            return _threadname;
        }
        void Stop()
        {
            _stop = true;
        }
        ~Thread() {}

    private:
        pthread_t _tid;
        std::string _threadname;
        T &_data; // 为了让所有的线程访问同一个全局变量
        func_t<T> _func;
        bool _stop;
    };
} // namespace ThreadModule

#endif
代码语言:javascript复制
#include "Thread.hpp"
using namespace MyThread;

class ThreadData
{
public:
    ThreadData(int &tickets, const std::string &name)
        : _tickets(tickets), _name(name), _total(0)
    {
    }
    ~ThreadData()
    {
    }

public:
    int &_tickets;     // 所有的线程,最后都会引用同一个全局的g_tickets
    std::string _name; // 进程的名字
    int _total;        // 这个进程抢了多少票
};

int g_tickets = 10000; // 共享资源,没有保护的, 临界资源
const int num = 4;     // 线程数量

void route(ThreadData *td)
{
    while (true)
    {
        if (td->_tickets > 0)
        {
            usleep(1000);
            printf("%s running, get tickets: %dn", td->_name.c_str(), td->_tickets);
            td->_total  ;
            td->_tickets--;
        }
        else
        {
            break;
        }
    }
}

int main()
{
    std::vector<Thread<ThreadData *>> threads; // 所有的线程存在一个数组里
    std::vector<ThreadData *> datas;           // 所有的数据也是
    // 1. 创建一批线程
    for (int i = 0; i < num; i  )
    {
        std::string name = "thread-00"   std::to_string(i   1);
        ThreadData *td = new ThreadData(g_tickets, name);
        threads.emplace_back(route, td, name);
        datas.emplace_back(td); // 创建完后,都插入
    }

    for (auto &e : threads)
    {
        e.Start();
    }

    for (auto &e : threads)
    {
        e.Join();
    }

    return 0;
}

最后一运行发现:

问题分析

为什么会抢到负数?:对全局的g_tickets的判断不是原子的

此时,当第一个进程从内存里把g_tickets读到CPU的寄存器中,进行判断,此时1>0成立。然后因为sleep(),线程挂起(带走自己是上下文数据),CPU调度线程让下一个来了,又是同样的,因为把g_tickets读到CPU的寄存器中(还是1)……

最后,新线程都在等待队列里面时_tickets 都是1,然后遇到了 td->_tickets--;这条语句,都开始执行,先从内存读数据- ->>每次自减后都要写会回物理内存,那么就会导致,下一个线程执行 td-> _tickets–时,又会从内存里把已经减过一次的数据读回来

  • 线程切换的时机也是在用户态和内核态切换时进行的,我们在判断和- -直接给了多线程并发访问时,更多的切换机会

其实 td->_tickets--;不是原子的。本质上是这三步

  1. 从内存 读取到CPU
  2. CPU内部进行–操作
  3. 写回内存

那么最后的汇编语句大概率也是三条语句,在这三条语句之间都有可能发生时间片到了导致线程切换

汇编语句只有一句,那么就是原子的

问题解决思路

要解决以上问题,需要做到三点:

  1. 代码必须要有互斥行为:当代码进入临界区执行时,不允许其他线程进入该临界区。
  2. 如果多个线程同时要求执行临界区的代码,并且临界区没有线程在执行,那么只能允许一个线程进入该临界区。
  3. 如果线程不在临界区中执行,那么该线程不能阻止其他线程进入临界区

要做到这三点,本质上就是需要一把锁。Linux上提供的这把锁叫互斥量

2.3Linux中互斥量/互斥锁(mutex)

  • 大部分情况,线程使用的数据都是局部变量,变量的地址空间在线程栈空间内,这种情况,变量归属单个线程,其他线程无法获得这种变量。
  • 但有时候,很多变量都需要在线程间共享,这样的变量称为共享变量,可以通过数据的共享,完成线程之间的交互。
  • 多个线程并发的操作共享变量,会带来一些问题:我们上面代码产生的问题就是一个例子
接口介绍

关于静态变量与全局变量的小知识:

  • 静态变量(包括静态局部变量和静态全局变量)以及全局变量的初始化时间是在程序执行之前的一个特定阶段
  • 对于全局变量,静态变量,它们的生命周期与整个程序的生命周期相同。当程序结束时,操作系统会自动释放程序占用的资源
  • 全局变量和静态变量都是在程序运行期间一直存在的变量,但它们有一些重要的区别:
    1. 作用域不同
      • 全局变量的作用域是整个程序,即在定义它的文件中的任何地方都可以访问。
      • 静态变量的作用域限定在定义它的函数或文件内部,外部无法直接访问。
    2. 生命周期不同
      • 全局变量的生命周期是整个程序的运行期间,即在程序启动时分配内存,在程序结束时释放内存。
      • 静态变量的生命周期是整个程序的运行期间,但是在定义它的作用域内,它只会被初始化一次,直到程序结束才会被销毁。
    3. 存储位置不同
      • 全局变量存储在静态存储区,程序启动时就会被初始化。
      • 静态变量也存储在静态存储区,但是只有在第一次使用时才会被初始化。
    4. 访问权限不同
      • 全局变量可以被程序中的任何函数或模块访问。
      • 静态变量只能在定义它的函数或文件内部访问,外部无法直接访问。

    总的来说,全局变量是整个程序可见的变量,可以在不同的文件中共享;而静态变量是局部的,只能在定义它的函数或文件内部使用。根据需求,可以选择使用全局变量或静态变量来存储数据。

初始化:

  1. 定义的锁是静态或者全局的,使用静态分配
代码语言:javascript复制
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER

静态初始化的互斥锁是在编译时就已经初始化好了,而不是在运行时动态初始化。PTHREAD_MUTEX_INITIALIZER 宏会将互斥锁初始化为一个静态的、已经被初始化的状态,这样就可以不用显式调用 pthread_mutex_init 来初始化互斥锁 不需要显式调用 pthread_mutex_destroy 函数来销毁互斥锁。这是因为静态初始化的互斥锁是在编译时就已经初始化好了,并且通常会在程序结束时自动被系统释放

  1. 动态分配互斥锁是一种在运行时动态初始化互斥锁的方式,通过调用 pthread_mutex_init 函数来创建并初始化互斥锁。这种方式允许在程序运行时根据需要动态创建和初始化互斥锁,而不是在编译时静态初始化。

函数原型:

代码语言:javascript复制
int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrict attr);

参数说明:

  • mutex:要初始化的互斥锁,传入一个指向 pthread_mutex_t 类型的指针。
  • attr:互斥锁的属性,通常传入 NULL,表示使用默认属性进行初始化。

返回值:

  • 如果函数调用成功,返回值为 0,表示成功初始化互斥锁。
  • 如果函数调用失败,返回值为一个正整数错误码,表示初始化失败。

销毁互斥量:

销毁互斥锁是在不再需要使用互斥锁时释放其资源的重要操作。在销毁互斥锁时需要注意以下几点:

  1. 使用 PTHREAD_MUTEX_INITIALIZER 初始化的静态互斥锁不需要销毁:静态互斥锁在程序结束时会自动被系统释放,因此不需要显式调用 pthread_mutex_destroy 函数来销毁这种互斥锁。
  2. 不要销毁一个已经加锁的互斥锁:在销毁互斥锁之前,必须确保该互斥锁已经被解锁。如果一个互斥锁在被销毁之前仍然处于加锁状态,可能会导致未定义的行为或者程序崩溃。
  3. 已经销毁的互斥锁后续不应再被使用:一旦调用 pthread_mutex_destroy 函数销毁了一个互斥锁,该互斥锁的状态将不再可预测,不应再被用于加锁和解锁操作。

函数原型:

代码语言:javascript复制
int pthread_mutex_destroy(pthread_mutex_t *mutex);

参数说明:

  • mutex:要销毁的互斥锁,传入一个指向 pthread_mutex_t 类型的指针。

返回值:

  • 如果函数调用成功,返回值为 0,表示成功销毁互斥锁。
  • 如果函数调用失败,返回值为一个正整数错误码,表示销毁失败。

互斥量加锁和解锁:

在多线程编程中,互斥锁(mutex)是一种用于保护共享资源的同步机制。互斥锁需要在访问共享资源之前进行加锁操作,访问完成后进行解锁操作,以确保同一时刻只有一个线程可以访问共享资源,避免数据竞争和不确定行为的发生。

pthread_mutex_lock 函数:

代码语言:javascript复制
int pthread_mutex_lock(pthread_mutex_t *mutex);
  • 功能:对互斥锁进行加锁操作。
  • 参数mutex 是要加锁的互斥锁。
  • 返回值:成功加锁时返回 0,失败时返回错误号。
  • 申请锁成功:函数就会返回,允许你继续向后运行
  • 申请锁失败:函数就会阻塞,不允许你继续向后运行
  • 函数调用失败:出错返回

当调用 pthread_mutex_lock 函数时,如果互斥量处于未锁定状态,那么该函数会成功将互斥量锁定,并且立即返回成功。这意味着当前线程已经获得了对互斥量的独占访问权限。 然而,如果在调用 pthread_mutex_lock 函数时,其他线程已经锁定了互斥量,或者有其他线程同时尝试锁定互斥量但未竞争成功,那么当前线程的调用将会被阻塞(即执行流被挂起),直到互斥量被解锁为止。这种行为确保了只有一个线程能够同时访问临界区,避免了数据竞争和不确定行为的发生。 只有一个线程会申请锁成功,成功的会接着执行。其余申请锁失败都会阻塞在那

pthread_mutex_unlock 函数:

代码语言:javascript复制
int pthread_mutex_unlock(pthread_mutex_t *mutex);
  • 功能:对互斥锁进行解锁操作。
  • 参数mutex 是要解锁的互斥锁。
  • 返回值:成功解锁时返回 0,失败时返回错误号。
开始解决问题

解决方案1:出现的并发访问的问题,本质是因为多个执行流执行访问全局数据的代码导致的。保护全局共享资源的本质是通过保护临界区完成的。那我们就加锁让一个线程去抢票(全局互斥锁)

代码语言:javascript复制
int g_tickets = 1000; // 共享资源,没有保护的,  临界资源
const int num = 4;    // 线程数量

pthread_mutex_t gmutex = PTHREAD_MUTEX_INITIALIZER;

void route(ThreadData *td)
{
    while (true)
    {
        pthread_mutex_lock(&gmutex); // 要在这里进行加锁,让一次只有一个线程(竞争力强的那个)能进里面
        if (td->_tickets > 0)        // 每个线程内部,访问临界资源的代码,就叫做临界区
        {
            usleep(1000);
            printf("%s running, get tickets: %dn", td->_name.c_str(), td->_tickets);
            td->_tickets--;
            pthread_mutex_unlock(&gmutex);
            td->_total  ;
        }
        else
        {
            pthread_mutex_unlock(&gmutex);
            break;
        }
    }
}

但是如果我们换个操作系统就有可能发生,全部都是一个相同的线程来抢票(它的竞争力太强了) 竞争锁是自由竞争的,竞争锁的能力太强的线程,会导致其他线程抢不到锁 — 造成了其他线程的饥饿问题 下面我们会利用同步来解决

局部互斥锁

代码语言:javascript复制
#include "Thread.hpp"
using namespace MyThread;

class ThreadData
{
public:
    ThreadData(int &tickets, const std::string &name, pthread_mutex_t &mutex)
        : _tickets(tickets), _name(name), _total(0), _mutex(mutex)
    {
    }
    ~ThreadData()
    {
    }

public:
    int &_tickets;           // 所有的线程,最后都会引用同一个全局的g_tickets
    std::string _name;       // 进程的名字
    int _total;              // 这个进程抢了多少票
    pthread_mutex_t &_mutex; // 传一个动态锁过来,因为是引用,所以都是同一个锁
};

int g_tickets = 1000; // 共享资源,没有保护的,  临界资源
const int num = 4;    // 线程数量

// pthread_mutex_t gmutex = PTHREAD_MUTEX_INITIALIZER;

void route(ThreadData *td)
{
    while (true)
    {
        // pthread_mutex_lock(&gmutex); // 要在这里进行加锁,让一次只有一个线程(竞争力强的那个)能进里面

        pthread_mutex_lock(&td->_mutex);
        if (td->_tickets > 0) // 每个线程内部,访问临界资源的代码,就叫做临界区
        {
            usleep(1000);
            printf("%s running, get tickets: %dn", td->_name.c_str(), td->_tickets);
            td->_tickets--;
            pthread_mutex_unlock(&td->_mutex);
            td->_total  ;
        }
        else
        {
            pthread_mutex_unlock(&td->_mutex);
            break;
        }
    }
}

int main()
{
    pthread_mutex_t mutex;
    pthread_mutex_init(&mutex, nullptr); // 进行初始化

    std::vector<Thread<ThreadData *>> threads; // 所有的线程存在一个数组里
    std::vector<ThreadData *> datas;           // 所有的数据也是
    // 1. 创建一批线程
    for (int i = 0; i < num; i  )
    {
        std::string name = "thread-00"   std::to_string(i   1);
        ThreadData *td = new ThreadData(g_tickets, name, mutex);
        threads.emplace_back(route, td, name);
        datas.emplace_back(td); // 创建完后,都插入
    }
    for (auto &e : threads)
    {
        e.Start();
    }
    for (auto &e : threads)
    {
        e.Join();
    }
    pthread_mutex_destroy(&mutex);
    return 0;
}

3.互斥量/互斥锁实现原理探究

先来复习一下线程的状态 除了正在执行(running)和挂起(blocked/sleeping/waiting)状态外,还有几种常见的线程状态:

  1. 就绪(ready)状态:线程已经准备好执行,但是还没有被分配 CPU 时间,等待系统调度分配 CPU 时间给它。这种状态通常发生在线程被唤醒后,但还未开始执行时。
  2. 终止(terminated)状态:线程已经执行完成,或者被强制终止,处于不再执行的状态。在这种状态下,线程的资源已经被回收,不再占用系统资源。
  3. 等待(waiting)状态:线程正在等待某个条件发生,比如等待某个事件的触发、等待某个线程的结束、等待 I/O 操作完成等。在等待状态下,线程暂时放弃 CPU 的执行,直到等待的条件满足时才会重新进入就绪状态。
  4. 被中断(interrupted)状态:线程被外部中断打断,比如收到了一个信号,操作系统会中断线程的执行,执行相应的中断处理程序,然后根据中断处理程序的逻辑可能会让线程继续执行或者进入其他状态。

在操作系统中,挂起、等待和阻塞是相关但不完全相同的概念:

  1. 挂起(Suspended):指的是暂时停止进程或线程的执行,使其处于非活动状态。挂起的进程或线程不参与 CPU 的调度,不执行任何指令,直到被唤醒。挂起通常是由于外部事件触发的,比如收到特定信号、调用了挂起函数等。被挂起的进程或线程可以在稍后的某个时间点被恢复执行。
  2. 等待(Waiting):指的是进程或线程在等待某些事件发生时暂时停止执行。等待可能是主动的(比如调用等待函数)也可能是被动的(比如等待 I/O 操作完成)。在等待期间,进程或线程可能处于阻塞状态(被阻塞)或者挂起状态,取决于等待的具体条件。
  3. 阻塞(Blocked):指的是进程或线程由于等待某些事件的发生而暂时停止执行。在阻塞状态下,进程或线程不会被分配 CPU 时间,因为它们无法继续执行,直到等待的事件发生。与挂起类似,阻塞状态可能是由于等待 I/O 操作、等待资源、等待锁等原因造成的。

为了实现互斥锁操作,大多数体系结构都提供了swap或exchange指令(汇编指令),该指令的作用是把寄存器和内存单元的数据相交换,由于只有一条指令,保证了原子性。 现在我们把lock和unlock的伪代码改一下

代码语言:javascript复制
lock:
    movb $0, al        ; 将值0加载到al寄存器中
    xchgb al, mutex    ; 将al寄存器的值和mutex的值进行交换
    cmpb $0, al        ; 比较al寄存器的值和0
    jne wait           ; 如果al寄存器的值不等于0,则跳转到等待(wait)标签
    ret                ; 返回,表示加锁成功,会去执行下面的代码

wait:
    suspend            ; 挂起线程等待
    jmp lock           ; 跳转到lock标签,重新尝试加锁

unlock:
    movb $1, mutex     ; 将值1写入mutex,表示解锁
    wakeup             ; 唤醒等待mutex的线程
    ret                ; 返回,表示解锁成功

本来我们定义的mutex是在内存中的。数据在内存里,所有线程都能访问,属未共享的。但是如果转移到CPU内部寄存器中,就属于一个线程私有 当线程1竞争成功时,1被交换到寄存器内,也就是线程1的上下文中。CPU寄存器硬件只有一套,但是CPU寄存器内的是数据线程的硬件上下文 而且我们执行的是交换,不是拷贝,这保证了mutex只有一个。加之交换是原子的,即便线程被切换的时机是随时的,发生了切换,但是那时mutex已经到了某个线程的上下文中了,凭借这个值,就能执行下方代码,而其他线程就阻塞了

那现在还有个问题:在临界区内部,正在访问临界区的线程可以被OS切换调度吗?——答案是可以的。正在执行的线程是可以被操作系统(OS)切换调度的。即使一个线程已经获取了锁并进入了临界区,仍然有可能被操作系统暂时挂起

现在假设有一个线程 A 正在访问临界区(已经获取了锁),而其他线程 B、C、D 正在等待获取这个锁。在这种情况下,int pthread_mutex_lock(pthread_mutex_t *mutex);这条语句对于其他线程只有两种情况是有意义的(锁被释放,或者没线程申请到了锁):

  1. 锁被释放:当线程 A 完成了对临界区的访问,释放了锁,其他线程 B、C、D 中的某一个将会获取到这个锁,然后进入临界区执行代码。
  2. 没有线程申请到锁:没有线程申请到锁,所以其他进程能接着进行申请

临界区的代码对于其他线程是原子的,因为只有一个线程能够同时访问临界区。其他线程在等待获取锁的过程中不会执行临界区的代码,从而确保了临界区操作的原子性和线程安全性。


4.可重入与线程安全

概念

  • 线程安全:多个线程并发同一段代码时,不会出现不同的结果。常见对全局变量或者静态变量进行操作,并且没有锁保护的情况下,会出现该问题。
  • 重入:同一个函数被不同的执行流调用,当前一个流程还没有执行完,就有其他的执行流再次进入,我们称之为重入。一个函数在重入的情况下,运行结果不会出现任何不同或者任何问题,则该函数被称为可重入函数,否则,是不可重入函数

线程安全是针对线程执行时,各个线程的相互关系。而重入是属于函数的特点

常见的线程不安全的情况

  1. 不保护共享变量的函数:
  • 当多个线程同时访问并修改同一个共享变量时,如果没有适当的同步机制(如互斥锁、信号量等),就会导致竞态条件,造成数据的不一致性。
  • 例如:多个线程同时对一个计数器进行增减操作,如果没有加锁保护,可能会导致计数器的值出现错误。
  1. 函数状态随着被调用,状态发生变化的函数:
  • 如果一个函数在调用过程中的状态会发生变化,且同时被多个线程调用,在无法保证原子性的情况下可能导致竞态条件。
  • 例如:一个函数在内部维护了一个静态变量作为状态,多个线程同时调用这个函数可能会导致状态的混乱。
  1. 返回指向静态变量指针的函数:
  • 如果一个函数返回一个指向静态变量的指针,那么多个线程调用该函数可能会导致竞态条件,因为静态变量在所有线程间共享
  • 例如:一个函数返回一个静态字符数组的指针,如果多个线程同时对这个指针所指向的数据进行操作,可能会出现数据不一致的情况。
  1. 调用线程不安全函数的函数:
  • 如果一个函数内部调用了一个线程不安全的函数,而该函数被多个线程同时调用,可能会导致整个调用链上的线程不安全。
  • 例如:一个函数内部调用了 strtok 函数(线程不安全),如果该函数被多个线程同时调用,可能会导致出现奇怪的结果。

常见的线程安全的情况

  • 每个线程对全局变量或者静态变量只有读取的权限,而没有写入的权限,一般来说这些线程是安全的
  • 类或者接口对于线程来说都是原子操作
  • 多个线程之间的切换不会导致该接口的执行结果存在二义性

常见不可重入的情况

  1. 调用了malloc/free函数:因为malloc函数通常使用全局链表或数据结构来管理堆内存的分配和释放,如果在多线程环境下同时调用malloc/free,可能会导致竞争条件和数据不一致问题,从而使得malloc/free不可重入。
  2. 调用了标准I/O库函数:标准I/O库的很多实现都是以不可重入的方式使用全局数据结构来管理文件描述符等资源,例如stdio中FILE结构体就是一个全局数据结构。如果在多线程环境下调用标准I/O库函数,会导致数据竞争和不确定性,造成不可重入。
  3. 可重入函数体内使用了静态的数据结构:静态数据结构会被多个线程访问时存在竞争条件和数据共享问题,导致函数不再是可重入的。因为静态变量在内存中只有一份拷贝,被多个线程共享,如果多个线程同时修改静态变量,可能导致数据不一致性。

常见可重入的情况

  1. 使用函数内数据:函数内部使用的所有数据都是函数本地的局部变量,不涉及全局变量或静态变量。这样每次函数调用时,线程内都会有独立的数据副本,不会受到其他线程的干扰,从而实现了可重入。
  2. 通过制作全局数据的本地拷贝:如果函数需要使用全局数据,可以在函数内部将全局数据复制到函数的局部变量中进行操作,这样可以保护全局数据不受其他线程的影响,从而保证函数的可重入性。
  3. 使用线程局部存储(Thread-local storage):对于需要保持状态的情况,可以使用线程局部存储来存储线程特有的数据,每个线程有自己独立的数据副本,不会受其他线程的影响,从而实现函数的可重入。
  4. 使用信号量或互斥锁:在需要访问共享资源的情况下,可以使用信号量或互斥锁来保护临界区,确保同一时间只有一个线程可以访问共享资源,避免数据竞争。
  5. 避免调用不可重入函数:在函数内部避免调用不可重入的函数,尤其是那些使用全局或静态变量的函数,避免引入不可重入性。

5.死锁

死锁是指在并发系统中的一种状态,其中每个进程都在等待系统资源,但这些资源被其他进程占用,导致所有进程都无法继续执行,形成一种互相等待的僵局状态。 死锁是多线程对锁不合理的使用,导致代码不会继续向后正常推进

死锁是在并发系统中常见的一种问题,指的是多个进程或线程因竞争系统资源而陷入无限等待对方释放资源的状态,导致所有进程都无法继续执行,形成一种僵局。死锁的发生通常总是伴随着系统资源的互相占用和互相等待。

死锁发生的必要条件通常包括:

  1. 互斥条件:某些资源只能被一个进程或线程持有,其他进程无法同时访问该资源。 当某些资源只能被一个进程或线程持有时,如果多个进程同时请求这些资源,就有可能造成资源竞争和互斥性冲突
  2. 请求与保持条件:进程持有至少一个资源,同时又请求其他资源造成阻塞的情况。(我不光要你的,自己的还不放)
  3. 不可抢占条件:已经分配给一个进程或线程的资源不能被其他进程强制剥夺,只能由占有者自行释放
  4. 循环等待条件:多个进程之间形成一个资源循环等待的关系,每个进程都在等待其他进程所持有的资源。

当满足以上四个条件时,就会发生死锁。

避免死锁的最有效方式是破坏死锁的四个必要条件

  1. 破坏互斥条件:非必要不加锁。
  2. 破坏请求与保持条件:如果申请锁失败,那就释放掉自己有的锁。能破坏保持条件
  3. 破坏不可抢占条件:对于某些资源,如果可以抢占,可以将资源设置为可抢占的,当其他进程请求资源时,可以主动收回已分配的资源。这样可以避免出现一个进程一直占用资源导致其他进程等待而无法继续执行。
  4. 破坏循环等待条件:如果线程申请多把锁,每个线程申请锁的顺序一致

还可以采取以下具体措施来避免死锁:

  1. 避免锁未释放的场景:确保线程在使用完资源后及时释放资源,不要出现某个线程一直占用资源而不释放的情况,这样可以减少死锁的发生。
  2. 资源一次性分配:尽量在开始时分配给线程所有需要的资源,而不是分配一部分然后再逐步分配。减少加锁的次数

6.线程同步

在了解线程同步之前先明确几个概念:串行、并发和并行。描述了多任务处理的不同方式。

  • 串行:在串行处理中,任务按顺序逐个执行,一个任务执行完毕后才会执行下一个任务。这意味着同一时间只有一个任务在执行,其他任务需要等待前一个任务完成后才能执行。
  • 并发:并发是指多个任务之间存在时间重叠,多个任务在同一时间间隔内启动、执行和完成。在并发处理中,虽然多个任务可能同时执行,但实际上处理器会快速地在不同任务间进行切换,以模拟多个任务同时执行的情况。
  • 并行:在并行处理中,多个任务同时执行,每个任务由独立的处理器核心或处理单元处理。这意味着在同一时刻,多个任务真正同时在不同的处理器核心上运行,从而提高了整体的处理能力。

在多核处理器中,可以实现并行处理,即同时在多个核心上执行不同的任务,以提高整体系统的执行效率。而并发则更多指的是在单个处理器上通过快速切换实现多任务间的交替执行

线程同步是指多个线程之间协调和控制其执行顺序,以避免出现竞态条件(Race Condition)和数据竞争(Data Race)等问题。

在保证数据安全的前提下,让线程能够按照某种特定的顺序访问临界资源,从而有效避免饥饿问题,叫做同步

同步:在保证数据安全的前提下,让线程能够按照某种特定的顺序访问临界资源,从而有效避免饥饿问题,叫做同步 竞态条件:因为时序问题,而导致程序异常,我们称之为竞态条件。由于多个线程的操作顺序不确定或不对称而导致的错误结果或异常情况。当多个线程在对共享资源进行读写操作时,如果它们的操作顺序不正确,可能会导致程序出现意外的结果

6.1条件变量(Condition Variable)

条件变量是一种线程同步的高级机制,它允许线程在某个特定条件下等待。条件变量通常与互斥锁一起使用,用于线程之间的协调和通信。条件变量允许一个线程在某个条件不满足时等待,当条件满足时,其他线程可以通知等待的线程继续执行

6.2接口介绍

条件变量是多线程编程中用于线程间协调和通信的一种机制。它通常与互斥锁一起使用,用于等待某个条件的发生并在条件满足时唤醒等待的线程。条件变量的接口函数包括初始化、销毁、等待条件满足和唤醒等待等操作。

初始化条件变量

静态初始化条件变量

代码语言:javascript复制
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;

上述代码使用了宏PTHREAD_COND_INITIALIZER来进行静态初始化,这样就可以在定义条件变量时直接初始化,无需调用pthread_cond_init函数。这种方式适用于条件变量的属性使用默认值的情况。

注意事项:

  • 静态初始化的条件变量在定义时就已经被初始化,因此无需再调用pthread_cond_init函数。
  • 静态初始化的条件变量不需要再调用pthread_cond_destroy函数来销毁,因为它们不会分配额外的资源,只是简单的初始化。
  • 静态初始化的条件变量只能在定义时初始化,不能在后续的代码中重新初始化。

动态初始化

代码语言:javascript复制
int pthread_cond_init(pthread_cond_t *restrict cond, const pthread_condattr_t *restrict attr);
  • 参数:
    • cond:要初始化的条件变量
    • attr:条件变量的属性,通常为NULL表示使用默认属性

销毁条件变量

代码语言:javascript复制
int pthread_cond_destroy(pthread_cond_t *cond);
  • 参数:
    • cond:要销毁的条件变量

等待条件满足

使当前线程等待在指定的条件变量上,直到条件满足或被其他线程唤醒。

当线程调用 pthread_cond_wait() 时,它会暂时离开临界区,因为 pthread_cond_wait() 会自动释放传递给它的互斥锁。这是为了允许其他线程能够访问和修改与条件变量相关联的共享数据,同时避免死锁。 具体来说,当线程调用 pthread_cond_wait() 时,会发生以下步骤:

  1. 线程首先会检查它持有的互斥锁(在这个例子中是 &gmutex),确保它是锁定的。
  2. 然后,线程释放这个互斥锁。
  3. 接着,线程进入阻塞状态,等待条件变量(在这个例子中是 &gcond)被其他线程触发。
  4. 当条件变量被触发时(即 pthread_cond_signal()pthread_cond_broadcast() 被调用),线程会被唤醒。
  5. 在线程被唤醒之后,pthread_cond_wait() 会自动重新竞争之前释放的互斥锁。
  6. 此时,线程重新进入临界区,并可以继续执行 pthread_cond_wait() 调用之后的代码。

因此,在调用 pthread_cond_wait() 时,线程会短暂地离开临界区,等待条件变量被触发,然后再重新进入临界区。这种机制确保了线程在访问共享数据时能够正确地同步,并避免了竞态条件和其他并发问题。

代码语言:javascript复制
int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);
  • 参数:
    • cond:要在这个条件变量上等待
    • mutex:与条件变量关联的互斥量,用于在等待条件变量之前解锁,等待结束后再次上锁

在调用pthread_cond_wait函数时需要传入一个互斥锁(mutex),这是因为条件变量(condition variable)通常与互斥锁一起使用,以确保线程在等待条件时能够正确同步和避免竞态条件(race condition) 在使用条件变量时,通常会遵循以下步骤:

  1. 调用pthread_mutex_lock函数对互斥锁进行加锁,以确保对共享资源的访问是互斥的,避免多个线程同时访问共享资源
  2. 在加锁和解锁之间使用条件变量等待条件的变化。在调用pthread_cond_wait函数时,会先释放互斥锁,然后等待在条件变量上的信号。
  3. 当条件变量的信号到达时,线程会被唤醒,然后重新获取之前释放的互斥锁,继续执行后续操作

所以就是:线程A得到锁,执行等待条件->释放锁,等条件变化 - -> 另一个线程又申请到锁,又在等条件变化…… 最后所有线程都在条件那里等着 在使用条件变量时,线程在等待条件变化时会先释放之前获取的互斥锁,然后等待在条件变量上的信号。当条件满足时,线程被唤醒后需要重新获取之前释放的互斥锁,这是因为在等待条件变化时释放互斥锁是条件变量机制的一部分。先释放再获取的 具体原因包括:

  1. 等待条件变化时释放互斥锁是为了让其他线程有机会获取互斥锁并修改共享资源,进而满足条件。如果线程在等待条件变化时仍然持有互斥锁,其他线程无法访问共享资源,可能导致条件永远无法满足。
  2. 重新获取互斥锁是为了保证线程在继续执行后续操作时能够正确访问共享资源。只有重新获取互斥锁后,线程才能安全地访问共享资源,避免出现并发访问问题。

因此,在使用条件变量时,线程需要在等待条件变化时释放互斥锁,等待条件满足后重新获取互斥锁,以确保线程能够正确同步共享资源的访问。这样可以避免竞争条件和确保线程安全地访问共享资源。

  1. 最后,调用pthread_mutex_unlock函数对互斥锁进行解锁,释放资源(释放的是互斥锁的相关资源)。

唤醒等待

  • pthread_cond_broadcast:唤醒所有等待在指定条件变量上的线程。
  • pthread_cond_signal:唤醒等待在指定条件变量上的一个线程(如果有多个线程等待,则唤醒其中一个)
代码语言:javascript复制
int pthread_cond_broadcast(pthread_cond_t *cond);
int pthread_cond_signal(pthread_cond_t *cond);
  • 参数:
    • cond:要唤醒等待的条件变量
代码语言:javascript复制
#include <iostream>
#include <string>
#include <cstring>
#include <vector>
#include <pthread.h>
#include <unistd.h>

void *Master(void *args)
{
    std::string name = static_cast<char *>(args);
    while (true)
    {
        std::cout << name << std::endl;
        sleep(1);
    }
}

void StartMaster(std::vector<pthread_t> *tids)
{
    pthread_t tid;
    int n = pthread_create(&tid, nullptr, Master, (void *)"Master Thread");
    if (n == 0)
    {
        std::cout << "create master success" << std::endl;
    }
    tids->emplace_back(tid);
}

void *Slaver(void *args)
{
    std::string name = static_cast<char *>(args);
    while (true)
    {
        std::cout << name << std::endl;
        sleep(1);
    }
}

void StartSlaver(std::vector<pthread_t> *tids, int num)
{
    for (int i = 0; i < num; i  )
    {
        pthread_t tid;
        char *name = new char[20];
        snprintf(name, 20, "salver-00%d", i   1);
        int n = pthread_create(&tid, nullptr, Slaver, name);
        if (n == 0)
        {
            std::cout << "create success: " << name << std::endl;
            tids->emplace_back(tid);
        }
    }
}

void WaitThread(std::vector<pthread_t> tids)
{
    for (auto &tid : tids)
    {
        pthread_join(tid, nullptr);
    }
}

int main()
{
    std::vector<pthread_t> tids; // 这里放所有的线程的tid
    StartMaster(&tids);          // 启动主线程
    StartSlaver(&tids, 5);       // 启动新线程
    WaitThread(tids);            // 等待新线程
    return 0;
}

我们写了这样的一份代码,会发现最一开始输出是乱的

这是因为,所有的进行都向一个文件进行写入(标准输出流),那么此时标准输出流就是共享资源,是临界资源

使用条件变量来解决

代码语言:javascript复制
#include <iostream>
#include <string>
#include <cstring>
#include <vector>
#include <pthread.h>
#include <unistd.h>

pthread_mutex_t gmutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t gcond = PTHREAD_COND_INITIALIZER; // 创建一个锁和条件变量

void *Master(void *args) // 我们选择在主线程里面进行条件的唤醒
{
    std::string name = static_cast<char *>(args);
    while (true)
    {
        // std::cout << name << std::endl;
        sleep(1);
        pthread_cond_signal(&gcond); // 唤醒其中一个队列首部的线程
        // pthread_cond_broadcast(&gcond); // 唤醒队列中所有的线程
        std::cout << "master 唤醒一个线程..." << std::endl;
    }
}

void StartMaster(std::vector<pthread_t> *tids)
{
    pthread_t tid;
    int n = pthread_create(&tid, nullptr, Master, (void *)"Master Thread");
    if (n == 0)
    {
        std::cout << "create master success" << std::endl;
    }
    tids->emplace_back(tid);
}

void *Slaver(void *args)
{
    std::string name = static_cast<char *>(args);
    while (true)
    {
        // 1. 加锁
        pthread_mutex_lock(&gmutex);
        // 2. 一般条件变量是在加锁和解锁之间使用的
        pthread_cond_wait(&gcond, &gmutex); // gmutex:这个是,是用来被释放互斥锁的
        std::cout << name << std::endl;
        sleep(1);
        pthread_mutex_unlock(&gmutex);
        // 3.解锁
    }
}

void StartSlaver(std::vector<pthread_t> *tids, int num)
{
    for (int i = 0; i < num; i  )
    {
        pthread_t tid;
        char *name = new char[20];
        snprintf(name, 20, "salver-00%d", i   1);
        int n = pthread_create(&tid, nullptr, Slaver, name);
        if (n == 0)
        {
            std::cout << "create success: " << name << std::endl;
            tids->emplace_back(tid);
        }
    }
}

void WaitThread(std::vector<pthread_t> tids)
{
    for (auto &tid : tids)
    {
        pthread_join(tid, nullptr);
    }
}

int main()
{
    std::vector<pthread_t> tids; // 这里放所有的线程的tid
    StartMaster(&tids);          // 启动主线程
    StartSlaver(&tids, 5);       // 启动新线程
    WaitThread(tids);            // 等待新线程
    return 0;
}

就是在slave thread的执行函数里进行加锁和条件等待 在master thread的执行函数里进行唤醒


7.生产者消费者模型

超市(交易场所):

  • 定义:超市是数据“交易”的场所,即共享资源或临界资源的存储空间(也可以叫缓冲区)。在多线程编程中,这通常是一个数据结构(如队列、缓冲区等),用于临时存储数据,供生产者和消费者线程进行访问。 一般我们使用阻塞队列作为缓冲区
  • 功能:作为生产者和消费者之间数据传递的桥梁。生产者线程在此处添加(生产)数据,消费者线程在此处取走(消费)数据。

生产者(Producer):

  • 定义:生产者线程负责生成数据并将其放入超市(共享资源)中。
  • 并发度:生产者线程可以并发地运行,以提高数据的生成速度。但需要注意同步和互斥问题,以避免多个生产者同时写入数据导致的冲突。
  • 生产者之间都是互斥的:不能多个生产者同时都在往共享资源里面写

消费者(Consumer):

  • 定义:消费者线程负责从超市(共享资源)中取出数据并进行处理。
  • 并发度:消费者线程也可以并发地运行,以提高数据的处理速度。同样需要注意同步和互斥问题
  • 消费者之间都是互斥的:不能多个消费者同时都在从共享资源里面拿数据

3种关系:

生产者 vs 生产者 — 互斥

多个生产者线程可能同时试图向共享缓冲区(如队列或数组)中写入数据。为了防止数据竞争和不一致,我们需要使用互斥机制来确保同一时间只有一个生产者线程能够访问共享资源。 互斥通常通过互斥锁(Mutex)来实现。当一个生产者线程获得互斥锁时,其他生产者线程将被阻塞,直到锁被释放。这样,每个生产者线程在写入缓冲区时都能独占资源,从而避免了数据竞争。

消费者 vs 消费者 — 互斥

多个消费者线程可能同时试图从共享缓冲区中读取数据。为了确保数据的正确性和一致性,我们同样需要使用互斥机制来防止多个消费者线程同时访问缓冲区。 互斥锁在这里同样起到关键作用。当一个消费者线程获得互斥锁时,其他消费者线程将被阻塞,直到锁被释放。这样,每个消费者线程在读取缓冲区时都能独占资源,避免了潜在的冲突和不一致。

生产者 vs 消费者 — 互斥 && 同步

生产者线程和消费者线程需要共享一个缓冲区。这要求我们使用互斥机制来确保同一时间只有一个线程(生产者或消费者)能够访问缓冲区,以避免数据竞争和不一致。 但是,仅仅互斥是不够的。我们还需要使用同步机制来确保生产者和消费者之间的协调。例如,当缓冲区为空时,消费者线程应该被阻塞,直到生产者线程向其中添加了数据。同样地,当缓冲区满时,生产者线程也应该被阻塞,直到消费者线程从中取走了数据。 同步通常通过条件变量(Condition Variables)来实现。生产者线程在添加数据到缓冲区后,会向条件变量发送信号(signal),以唤醒等待的消费者线程。类似地,消费者线程在取走数据后,也会向条件变量发送信号,以唤醒等待的生产者线程。通过这种方式,生产者和消费者线程能够协调地工作,确保缓冲区的有效使用和数据的一致性。

优点:

  1. 解耦:由于引入了一个缓冲区作为中介,生产者和消费者之间并不直接相互调用,从而降低了它们之间的耦合度。这使得生产者和消费者的代码发生变化时,不会对对方产生直接影响,提高了系统的灵活性和可维护性。
  2. 支持并发:生产者和消费者是两个独立的并发体,它们之间通过缓冲区进行通信。生产者只需将数据放入缓冲区,就可以继续生产下一个数据;消费者只需从缓冲区中取出数据,就可以继续处理。这种并发处理的方式可以避免因生产者和消费者速度不匹配而导致的阻塞问题
  3. 支持忙闲不均:在生产者和消费者模型中,生产者和消费者的速度可以不相同。当生产者生产数据的速度过快,而消费者处理数据的速度较慢时,未处理的数据可以暂时存储在缓冲区中,等待消费者处理。这种机制可以平衡生产者和消费者之间的速度差异,避免资源的浪费和瓶颈的产生。

阻塞队列(BlockingQueue)

在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。 其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)

这里有个疑问,明明我们放任务和拿任务时都是串行的(加了锁,一次只有一个线程),为什么生产消费模型优点还是并发性呢?

  • 这里的并发是指,生产者放任务之前的生产过程和消费者拿走任务后的执行过程是并发的
  • 消费者们之间的任务处理也是并发的
  • 生产者之间的任务产生也是并发

我们来尝试实现一个BQ

代码语言:javascript复制
#ifndef __BLOCK_QUEUE_HPP__
#define __BLOCK_QUEUE_HPP__

#include <iostream>
#include <string>
#include <queue>
#include <pthread.h>

template <class T>
class BlockQueue
{
private:
    bool IsFull()
    {
        return _block_queue.size() == _cap;
    }
    bool IsEmpty()
    {
        return _block_queue.empty();
    }

public:
    BlockQueue(int cap) : _cap(cap)
    {
        _consum_wait_num = 0;
        _product_wait_num = 0;
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_product_cond, nullptr);
        pthread_cond_init(&_consum_cond, nullptr);
    }

    void Enqueue(T &in) // 生产者用的接口
    {
        pthread_mutex_lock(&_mutex);
        while (IsFull())
        {
            // 生产线程去等待,是在临界区中休眠的!你现在还持有锁呢!!!
            // 1. pthread_cond_wait调用是: a. 让调用进程等待 b. 自动释放曾经持有的_mutex锁
            _product_wait_num  ;
            pthread_cond_wait(&_product_cond, &_mutex);
            _product_wait_num--;
        }
        // 进行生产
        _block_queue.push(in);
        // 通知消费者来消费
        if (_consum_wait_num > 0)
        {
            pthread_cond_signal(&_consum_cond);
        }
        pthread_mutex_unlock(&_mutex); // 其实解锁和唤醒条件顺序无所谓,先唤醒后那边等着,解锁后直接竞争
        // 如果先解锁,后唤醒:先解锁没任何效果,因为都在wait那里等,一唤醒就直接得到锁
    }

    void Pop(T *out) // 消费者用的接口
    {
        pthread_mutex_lock(&_mutex);
        while (IsEmpty())
        {
            // 消费线程去等待,是在临界区中休眠的!你现在还持有锁呢!!!
            // 1. pthread_cond_wait调用是: a. 让调用进程等待 b. 自动释放曾经持有的_mutex锁——
            _consum_wait_num  ;
            pthread_cond_wait(&_consum_cond, &_mutex);
            _consum_wait_num--;
        }

        // 进行消费
        *out = _block_queue.front();
        _block_queue.pop();
        // 通知生产者来生产
        if (_product_wait_num > 0)
        {
            pthread_cond_signal(&_product_cond);
        }
        pthread_mutex_unlock(&_mutex);
    }
    ~BlockQueue()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_product_cond);
        pthread_cond_destroy(&_consum_cond);
    }

private:
    std::queue<T> _block_queue;   // 阻塞队列
    int _cap;                     // 总上限
    pthread_mutex_t _mutex;       // 保护_block_queue的锁
    pthread_cond_t _product_cond; // 专门给生产者提供的条件变量
    pthread_cond_t _consum_cond;  // 专门给消费者提供的条件变量
    int _product_wait_num;        // 等待的生产者数量
    int _consum_wait_num;         // 等待的消费者数量
};

#endif

一个实际应用的例子

  • BlockQueue.hpp:封装的阻塞队列
  • Main.cc:程序的主题
  • Thread.hpp:自己封装的Thread
  • Task.hpp:任务类(这里只是进行一个加法)

Thread.hpp与BlockQueue.hpp我们上面已经进行展示了,接下来只进行剩下二者

Task.hpp

代码语言:javascript复制
#pragma once

#include <iostream>
#include <string>

class Task
{
public:
    Task() {}
    Task(int a, int b) : _a(a), _b(b), _result(0)
    {
    }
    void Excute()
    {
        _result = _a   _b;
    }
    std::string ResultToString()
    {
        return std::to_string(_a)   " "   std::to_string(_b)   "="   std::to_string(_result);
    }
    std::string DebugToString()//测试的时候使用
    {
        return std::to_string(_a)   " "   std::to_string(_b)   "=?";
    }

private:
    int _a;
    int _b;
    int _result;
};

Main.cc

代码语言:javascript复制
#include "BlockQueue.hpp"
#include "Thread.hpp"
#include "Task.hpp"
#include <string>
#include <vector>
#include <unistd.h>

using namespace ThreadModule;
int a = 10;

using blockqueue_t = BlockQueue<Task>;

void Consumer(blockqueue_t &bq)
{
    while (true)
    {
        // 1.从Blockqueue里面取出任务
        Task t;
        bq.Pop(&t);
        // 2.开始执行任务
        t.Excute();
        std::cout << "Consumer Consum result is : " << t.ResultToString() << std::endl;
        sleep(2);
    }
}

void Productor(blockqueue_t &bq)
{
    int cnt = 1;
    srand(time(nullptr));
    while (true)
    {
        int a = rand() % 10;
        int b = rand() % 5;
        Task t(a, b);
        bq.Enqueue(t);
        cnt  ;
    }
}

void StartComm(std::vector<Thread<blockqueue_t>> *threads, int num, blockqueue_t &bq, func_t<blockqueue_t> func)
{
    for (int i = 0; i < num; i  )
    {
        std::string name = "thread-"   std::to_string(i   1);
        threads->emplace_back(func, bq, name);
        threads->back().Start();
    }
}

void StartConsumer(std::vector<Thread<blockqueue_t>> *threads, int num, blockqueue_t &bq)
{
    StartComm(threads, num, bq, Consumer);
}

void StartProductor(std::vector<Thread<blockqueue_t>> *threads, int num, blockqueue_t &bq)
{
    StartComm(threads, num, bq, Productor);
}

void WaitAllThread(std::vector<Thread<blockqueue_t>> &threads)
{
    for (auto &thread : threads)
    {
        thread.Join();
    }
}

int main()
{
    blockqueue_t *bq = new blockqueue_t(5);
    std::vector<Thread<blockqueue_t>> threads;

    StartProductor(&threads, 1, *bq);
    StartConsumer(&threads, 1, *bq);

    WaitAllThread(threads);

    return 0;
}

今天也是到这里啦!!!

0 人点赞