线程安全的单例模式 | 可重入 | 线程安全 |死锁(理论)

2024-10-06 08:03:35 浏览数 (3)

单例模式概述

某些类, 只应该具有一个对象(实例), 就称之为单例。 例如一个男人只能有一个媳妇。

在很多服务器开发场景中, 经常需要让服务器加载很多的数据 (上百G) 到内存中。此时往往要用一个单例的类来管理这些数据。

饿汉实现方式和懒汉实现方式

如何理解饿汉方式和懒汉方式? 饿汉方式:吃完饭,直接洗完,下一次吃饭的时候就可以直接使用; 懒汉方式:吃完饭,先放着,等下一顿吃饭的时候再去洗碗。

懒汉方式最核心的思想是 “延时加载”,从而能够优化服务器的启动速度。

懒汉方式实现

在单线程场景中
代码语言:javascript复制
//ThreadPool.hpp
#pragma once

#include<iostream>
#include<unistd.h>
#include<string>
#include<vector>
#include<queue>
#include<functional>
#include"Thread.hpp"
#include"Log.hpp"

using namespace threadModel;
using namespace log_ns;

static const int gdefaultnum=5;

void test()
{
    while(true)
    {
        std::cout<<"hello world"<<std::endl;
        sleep(1);
    }
}

template<typename T>
class ThreadPool
{
private:
    void LockQueue()
    {
        pthread_mutex_lock(&_mutex);
    }

    void UnlockQueue()
    {
        pthread_mutex_unlock(&_mutex);
    }

    void Wakeup()
    {
        pthread_cond_signal(&_cond);
    }

    void WakeupAll()
    {
        pthread_cond_broadcast(&_cond);
    }

    void Sleep()
    {
        pthread_cond_wait(&_cond,&_mutex);
    }

    bool IsEmpty()
    {
        return _task_queue.empty();
    }

    void HandlerTask(const std::string& name)  // this
    {
        while (true)
        {
            LockQueue();
            //如果队列为空(有任务)
            while(IsEmpty()&&_isrunning) //线程没有任务,但是在工作,继续休眠
            {
                _sleep_thread_num  ;
                LOG(INFO,"%s thread sleep begin!n",name.c_str());

                Sleep();

                LOG(INFO,"%s thread wakeup!n",name.c_str());

                _sleep_thread_num--;

            }

            if(IsEmpty()&&!_isrunning) // 任务是空的,并且线程退出工作
            {
                UnlockQueue();
                LOG(INFO,"%s quitn",name.c_str());
                break;
            }

            // 队列不为空,有任务 或者 队列被唤醒
            // 取任务
            T t=_task_queue.front();
            _task_queue.pop();
            UnlockQueue();

            // 此处任务已经不在任务队列中,任务已经被拿走,处理任务和临界资源是两码事

            t(); // 处理任务,不能不用也不能在临界区中处理
            LOG(DEBUG,"hander task done, task is: n%s",t.result().c_str());
        }
    }

    void Init()
    {
        func_t func = std::bind(&ThreadPool::HandlerTask, this, std::placeholders::_1);
        for (int i = 0; i < _thread_num; i  )
        {
            std::string threadname = "thread-"   std::to_string(i   1);
            _threads.emplace_back(threadname, func);
            LOG(DEBUG, "construct thread %s done, init success.n", threadname.c_str());
        }
    }

    void Start()
    {
        _isrunning = true;
        for (auto &thread : _threads)
        {
            LOG(DEBUG, "Start thread %s done.n", thread.Name().c_str());
            thread.Start();
        }
    }

    ThreadPool(int thread_num = gdefaultnum)
        : _thread_num(thread_num)
        , _isrunning(false) // 刚开始线程没有使用
        ,_sleep_thread_num(0)
    {
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_cond, nullptr);
    }

    ThreadPool(const ThreadPool<T> &)=delete;
    void operator=(const ThreadPool<T> &)=delete;

public:
    void Stop()
    {
        LockQueue();
        _isrunning=false;
        WakeupAll();
        UnlockQueue();
        LOG(INFO,"Thread Pool Stop Success!n");
    }

    static ThreadPool<T> *GetInstance()
    {
        if(_tp==nullptr)
        {
            LOG(INFO,"create threadpooln");
            _tp=new ThreadPool();
            _tp->Init();
            _tp->Start();
        }
        else
        {
            LOG(INFO,"get threadpooln");
            
        }   

        return _tp;
    }

    void Equeue(const T &in)
    {
        LockQueue();
        if(_isrunning)
        {
            _task_queue.push(in);
            // 如果当前有线程在等待,需要唤醒
            if(_sleep_thread_num>0)
            {
                Wakeup();
            }
        }

        UnlockQueue();
    }

    ~ThreadPool()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_cond);
    }
private:
    int _thread_num;
    std::vector<Thread> _threads;  // 管理多个线程
    std::queue<T> _task_queue; // 任务队列
    bool _isrunning; //当前线程是否在工作
    int _sleep_thread_num;   //计数器:休眠的线程个数

    pthread_mutex_t _mutex;
    pthread_cond_t _cond;

    //单例程模式
    static ThreadPool<T>* _tp;

};


//静态指针初始化必须在类外初始化
template<typename T>
ThreadPool<T> *ThreadPool<T>:: _tp=nullptr;

定义了一个静态成员函数 GetInstance(),用于实现线程池的单例模式:

  • 单例模式: 这个函数的目的是确保 ThreadPool 类只有一个实例存在。它利用静态指针 _tp 来检查是否已经创建了一个实例。
  • 实例化逻辑: 空指针检查: if (_tp == nullptr):检查静态指针 _tp 是否为空。如果为空,表示尚未创建线程池实例。 创建实例: 在指针为空的情况下,会记录日志(LOG(INFO, "create threadpooln");),然后使用new关键字创建一个新的 ThreadPool 实例。接着调用 Init() 方法进行初始化,可能用于设置线程池的初始状态。然后调用 Start() 方法启动线程池,以便开始处理任务。 获取现有实例: 如果 _tp 不为空,说明线程池实例已存在,则记录另一条日志(LOG(INFO, "get threadpooln");)以指示已经获取到现有实例。

通过检查静态指针 _tp 的状态来实现线程池的单例模式。它在第一次调用时创建并初始化线程池实例,随后的调用将返回相同的实例,从而避免不必要的资源浪费和多重实例的问题。这就是按需加载。

多线程场景中
代码语言:javascript复制
//ThreadPool.hpp
#pragma once

#include<iostream>
#include<unistd.h>
#include<string>
#include<vector>
#include<queue>
#include<functional>
#include"Thread.hpp"
#include"Log.hpp"
#include"LockGuard.hpp"

using namespace threadModel;
using namespace log_ns;

static const int gdefaultnum=5;

void test()
{
    while(true)
    {
        std::cout<<"hello world"<<std::endl;
        sleep(1);
    }
}

template<typename T>
class ThreadPool
{
private:
    void LockQueue()
    {
        pthread_mutex_lock(&_mutex);
    }

    void UnlockQueue()
    {
        pthread_mutex_unlock(&_mutex);
    }

    void Wakeup()
    {
        pthread_cond_signal(&_cond);
    }

    void WakeupAll()
    {
        pthread_cond_broadcast(&_cond);
    }

    void Sleep()
    {
        pthread_cond_wait(&_cond,&_mutex);
    }

    bool IsEmpty()
    {
        return _task_queue.empty();
    }

    void HandlerTask(const std::string& name)  // this
    {
        while (true)
        {
            LockQueue();
            //如果队列为空(有任务)
            while(IsEmpty()&&_isrunning) //线程没有任务,但是在工作,继续休眠
            {
                _sleep_thread_num  ;
                LOG(INFO,"%s thread sleep begin!n",name.c_str());

                Sleep();

                LOG(INFO,"%s thread wakeup!n",name.c_str());

                _sleep_thread_num--;

            }

            if(IsEmpty()&&!_isrunning) // 任务是空的,并且线程退出工作
            {
                UnlockQueue();
                LOG(INFO,"%s quitn",name.c_str());
                break;
            }

            // 队列不为空,有任务 或者 队列被唤醒
            // 取任务
            T t=_task_queue.front();
            _task_queue.pop();
            UnlockQueue();

            // 此处任务已经不在任务队列中,任务已经被拿走,处理任务和临界资源是两码事

            t(); // 处理任务,不能不用也不能在临界区中处理
            LOG(DEBUG,"hander task done, task is: n%s",t.result().c_str());
        }
    }

    void Init()
    {
        func_t func = std::bind(&ThreadPool::HandlerTask, this, std::placeholders::_1);
        for (int i = 0; i < _thread_num; i  )
        {
            std::string threadname = "thread-"   std::to_string(i   1);
            _threads.emplace_back(threadname, func);
            LOG(DEBUG, "construct thread %s done, init success.n", threadname.c_str());
        }
    }

    void Start()
    {
        _isrunning = true;
        for (auto &thread : _threads)
        {
            LOG(DEBUG, "Start thread %s done.n", thread.Name().c_str());
            thread.Start();
        }
    }

    ThreadPool(int thread_num = gdefaultnum)
        : _thread_num(thread_num)
        , _isrunning(false) // 刚开始线程没有使用
        ,_sleep_thread_num(0)
    {
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_cond, nullptr);
    }

    ThreadPool(const ThreadPool<T> &)=delete;
    void operator=(const ThreadPool<T> &)=delete;

public:
    void Stop()
    {
        LockQueue();
        _isrunning=false;
        WakeupAll();
        UnlockQueue();
        LOG(INFO,"Thread Pool Stop Success!n");
    }

    static ThreadPool<T> *GetInstance()
    {
        if(_tp==nullptr)
        {
            LockGuard lockguard(&_sig_mutex);  //解决多线程场景

            if(_tp==nullptr)
            {
                LOG(INFO,"create threadpooln");
                _tp=new ThreadPool();
                _tp->Init();
                _tp->Start();
            }
            else
            {
                LOG(INFO,"get threadpooln");

            }   
        }

        return _tp;
    }

    void Equeue(const T &in)
    {
        LockQueue();
        if(_isrunning)
        {
            _task_queue.push(in);
            // 如果当前有线程在等待,需要唤醒
            if(_sleep_thread_num>0)
            {
                Wakeup();
            }
        }

        UnlockQueue();
    }

    ~ThreadPool()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_cond);
    }
private:
    int _thread_num;
    std::vector<Thread> _threads;  // 管理多个线程
    std::queue<T> _task_queue; // 任务队列
    bool _isrunning; //当前线程是否在工作
    int _sleep_thread_num;   //计数器:休眠的线程个数

    pthread_mutex_t _mutex;
    pthread_cond_t _cond;

    //单例程模式
    static ThreadPool<T>* _tp;
    static pthread_mutex_t _sig_mutex;

};


//静态指针初始化必须在类外初始化
template<typename T>
ThreadPool<T> *ThreadPool<T>:: _tp=nullptr;

template<typename T>
pthread_mutex_t ThreadPool<T>::_sig_mutex=PTHREAD_MUTEX_INITIALIZER;

多线程场景中,在GetInstance()内部,需要创建一个 LockGuard 对象以自动加锁 _sig_mutex 互斥锁。这确保在进入临界区时,只有一个线程可以访问此代码块,以避免多个线程同时创建实例。

可重入vs线程安全

  • 线程安全:多个线程并发同一段代码时,不会出现不同的结果。常见对全局变量或者静态变量进行操作,并且没有锁保护的情况下,会出现该问题。
  • 重入:同一个函数被不同的执行流调用,当前一个流程还没有执行完,就有其他的执行流再次进入,我们称之为重入。一个函数在重入的情况下,运行结果不会出现任何不同或者任何问题,则该函数被称为可重入函数,否则,是不可重入函数。

如果一个函数可重入,那么在多线程调用时一定是安全的;如果一个函数不可重入,那么这个函数可能不是线程安全的。

常见锁概念

死锁

死锁是指在一组进程中的各个进程均占有不会释放的资源,但因互相申请被其他进程所站用不会释放的资源而处于的一种永久等待状态。

一个线程一把锁也可能出现死锁:当在给一个线程加锁的后,没有解锁而是继续加锁。

死锁四个必要条件

  • 互斥条件:一个资源每次只能被一个执行流使用
  • 请求与保持条件:一个执行流因请求资源而阻塞时,对已获得的资源保持不放
  • 不剥夺条件:一个执行流已获得的资源,在末使用完之前,不能强行剥夺
  • 循环等待条件:若干执行流之间形成一种头尾相接的循环等待资源的关系

避免死锁

  • 破坏死锁的四个必要条件
  • 加锁顺序一致
  • 避免锁未释放的场景
  • 资源一次性分配

避免死锁算法

  • 死锁检测算法(了解)
  • 银行家算法(了解)

STL、智能指针与线程安全

STL中的容器是否是线程安全的

不是.

原因是, STL 的设计初衷是将性能挖掘到极致, 而一旦涉及到加锁保证线程安全, 会对性能造成巨大的影响. 而且对于不同的容器, 加锁方式的不同, 性能可能也不同(例如hash表的锁表和锁桶). 因此 STL 默认不是线程安全. 如果需要在多线程环境下使用, 往往需要调用者自行保证线程安全

智能指针是否是线程安全的

对于 unique_ptr, 由于只是在当前代码块范围内生效, 因此不涉及线程安全问题. 对于 shared_ptr, 多个对象需要共用一个引用计数变量, 所以会存在线程安全问题. 但是标准库实现的时候考虑到了这个问题, 基于原子操作(CAS)的方式保证 shared_ptr 能够高效, 原子的操作引用计数.

其他常见的各种锁

  • 悲观锁:在每次取数据时,总是担心数据会被其他线程修改,所以会在取数据前先加锁(读锁,写锁,行锁等),当其他线程想要访问数据时,被阻塞挂起。
  • 乐观锁:每次取数据时候,总是乐观的认为数据不会被其他线程修改,因此不上锁。但是在更新数据前,会判断其他数据在更新前有没有对数据进行修改。主要采用两种方式:版本号机制和CAS操作。
  • CAS操作:当需要更新数据时,判断当前内存值和之前取得的值是否相等。如果相等则用新值更新。若不等则失败,失败则重试,一般是一个自旋的过程,即不断重试。

1 人点赞