C++ 线程池的简易实现

2020-10-10 16:39:29 浏览数 (1)

首先,先简单介绍,线程池的工作原理。

1.他自身拥有一定数量的线程数组 threads,处于等待状态,等待唤醒(通过条件变量)

2.拥有一个任务队列 m_tasks,存储用户的任务,有新任务以后,唤醒线程,取出任务,通过回调函数的方式调用任务,执行完以后继续等待。

使用情况:线程池,适用于会话简短的情况下,http访问可以使用线程池,如需要长时间保持通讯的,如会话,就不要用线程池了。

本例子,采用单例模式,线程安全。

公开接口两个:

static CMyThreadPool * getInstance(); bool start(Task fun);

用户的函数 fun 的参数,可通过,bind来传递,不过要注意,如果传的是指针,需要注意他的生存周期,如果传的是 new,处理完以后,要自己 delete.

代码语言:javascript复制
void showTicket(mutex* m){
            lock_guard<std::mutex> l(*m);
            cout <<" show ticket: " << ticket   << endl;
    
}

pool->start(bind(showTicket, m));

头文件:

代码语言:javascript复制
//定义一个函数对象类型
typedef std::function<void()> Task;

class CMyThreadPool
{
private:
    int max_thread;                // max thread;
    int max_task;                // max task;
    // thread array:
    vector<thread> threads;
    // task queue:
    queue<Task> m_tasks;
    // lock:
    mutex m_lock;
    // condition:
    condition_variable has_task;
    bool running_flag;
public:
    ~CMyThreadPool(void);
  //获取线程池对象指针
     static CMyThreadPool * getInstance();
  //添加任务,成功返回true,失败返回false
     bool start(Task fun);
private:
    CMyThreadPool(void);
    bool InitThread();
    void DestroyPool();
  //工作线程
    void WorkFun();
    static CMyThreadPool * m_pool;
    static std::mutex *singal_mutex;
};

实现:

代码语言:javascript复制
#include "MyThreadPool.h"

CMyThreadPool * CMyThreadPool::m_pool = NULL;
mutex* CMyThreadPool::singal_mutex = new mutex();

CMyThreadPool::CMyThreadPool(void):max_thread(default_max_thread),
    max_task(default_max_task),running_flag(true)
{
}


CMyThreadPool::~CMyThreadPool(void)
{
    DestroyPool();
}

CMyThreadPool * CMyThreadPool::getInstance()
{
    if( NULL == m_pool){
        //lock();
        std::lock_guard<std::mutex> l(*singal_mutex);
        if( NULL == m_pool){
            m_pool = new CMyThreadPool();
        }
        //unlock();
    }
    return m_pool;
}

bool CMyThreadPool::start( Task fun )
{
    //判断是否第一次,延缓线程初始化
    {
        if( threads.size() == 0){
            unique_lock<mutex> l(m_lock);
            if( threads.size() == 0){
                //初始化线程
                if(!InitThread()){
                    return false;
                }
            }
        }
    }
    //判断工作队列是否已满,没满则加入工作队列
    {
        unique_lock<mutex> l(m_lock);
        if( (unsigned int)max_task > m_tasks.size()){
            m_tasks.push(fun);
        }else{
            return false;
        }
    }
    //唤醒一个线程
    has_task.notify_one();
    return true;
}


//已经上着锁了
bool CMyThreadPool::InitThread()
{
    for (int i = 0; i != max_thread; i  ){
        threads.push_back(thread(&CMyThreadPool::WorkFun, this));
    }
    return true;
}

void CMyThreadPool::WorkFun()
{
    while(running_flag || !m_tasks.empty()){
        Task t;
        //获取task
        {
            unique_lock<mutex> l(m_lock);
            while( m_tasks.empty())
                has_task.wait(l);
            t = m_tasks.front();
            m_tasks.pop();
        }
        //执行task
        t();
    }
}

void CMyThreadPool::DestroyPool()
{
    {
        unique_lock<mutex> u_lock(m_lock);
        running_flag = false;
    }
    has_task.notify_all();

    for( auto &t : threads){
        t.join();
    }
    threads.clear();
}

测试用例:

代码语言:javascript复制
#include <iostream>
#include "MyThreadPool.h"
#include <memory>
#define _CRTDBG_MAP_ALLOC
#include <crtdbg.h>
#include <Windows.h>

using namespace std;
int ticket = 0;

void showTicket(mutex* m){
            lock_guard<std::mutex> l(*m);
#ifdef WIN32
            //打印当前线程号
            cout << "Thread id: " << GetCurrentThreadId();

#endif
            cout <<" show ticket: " << ticket   << endl;
    
}

int main(){
    mutex *m = new mutex;
    int sum = 0;
    {
    std::shared_ptr<CMyThreadPool> pool(CMyThreadPool::getInstance());
    for(int i = 0; i < 100;i  ){
        if(!pool->start(bind(showTicket, m))){
            sum  ;
        }
    }
    }
    cout << "not use task : "<< sum << endl;
    delete m;
    _CrtDumpMemoryLeaks();
    system("pause");
    return 0;
}

0 人点赞