首先,先简单介绍,线程池的工作原理。
1.他自身拥有一定数量的线程数组 threads,处于等待状态,等待唤醒(通过条件变量)
2.拥有一个任务队列 m_tasks,存储用户的任务,有新任务以后,唤醒线程,取出任务,通过回调函数的方式调用任务,执行完以后继续等待。
使用情况:线程池,适用于会话简短的情况下,http访问可以使用线程池,如需要长时间保持通讯的,如会话,就不要用线程池了。
本例子,采用单例模式,线程安全。
公开接口两个:
static CMyThreadPool * getInstance(); bool start(Task fun);
用户的函数 fun 的参数,可通过,bind来传递,不过要注意,如果传的是指针,需要注意他的生存周期,如果传的是 new,处理完以后,要自己 delete.
代码语言:javascript复制void showTicket(mutex* m){
lock_guard<std::mutex> l(*m);
cout <<" show ticket: " << ticket << endl;
}
pool->start(bind(showTicket, m));
头文件:
代码语言:javascript复制//定义一个函数对象类型
typedef std::function<void()> Task;
class CMyThreadPool
{
private:
int max_thread; // max thread;
int max_task; // max task;
// thread array:
vector<thread> threads;
// task queue:
queue<Task> m_tasks;
// lock:
mutex m_lock;
// condition:
condition_variable has_task;
bool running_flag;
public:
~CMyThreadPool(void);
//获取线程池对象指针
static CMyThreadPool * getInstance();
//添加任务,成功返回true,失败返回false
bool start(Task fun);
private:
CMyThreadPool(void);
bool InitThread();
void DestroyPool();
//工作线程
void WorkFun();
static CMyThreadPool * m_pool;
static std::mutex *singal_mutex;
};
实现:
代码语言:javascript复制#include "MyThreadPool.h"
CMyThreadPool * CMyThreadPool::m_pool = NULL;
mutex* CMyThreadPool::singal_mutex = new mutex();
CMyThreadPool::CMyThreadPool(void):max_thread(default_max_thread),
max_task(default_max_task),running_flag(true)
{
}
CMyThreadPool::~CMyThreadPool(void)
{
DestroyPool();
}
CMyThreadPool * CMyThreadPool::getInstance()
{
if( NULL == m_pool){
//lock();
std::lock_guard<std::mutex> l(*singal_mutex);
if( NULL == m_pool){
m_pool = new CMyThreadPool();
}
//unlock();
}
return m_pool;
}
bool CMyThreadPool::start( Task fun )
{
//判断是否第一次,延缓线程初始化
{
if( threads.size() == 0){
unique_lock<mutex> l(m_lock);
if( threads.size() == 0){
//初始化线程
if(!InitThread()){
return false;
}
}
}
}
//判断工作队列是否已满,没满则加入工作队列
{
unique_lock<mutex> l(m_lock);
if( (unsigned int)max_task > m_tasks.size()){
m_tasks.push(fun);
}else{
return false;
}
}
//唤醒一个线程
has_task.notify_one();
return true;
}
//已经上着锁了
bool CMyThreadPool::InitThread()
{
for (int i = 0; i != max_thread; i ){
threads.push_back(thread(&CMyThreadPool::WorkFun, this));
}
return true;
}
void CMyThreadPool::WorkFun()
{
while(running_flag || !m_tasks.empty()){
Task t;
//获取task
{
unique_lock<mutex> l(m_lock);
while( m_tasks.empty())
has_task.wait(l);
t = m_tasks.front();
m_tasks.pop();
}
//执行task
t();
}
}
void CMyThreadPool::DestroyPool()
{
{
unique_lock<mutex> u_lock(m_lock);
running_flag = false;
}
has_task.notify_all();
for( auto &t : threads){
t.join();
}
threads.clear();
}
测试用例:
代码语言:javascript复制#include <iostream>
#include "MyThreadPool.h"
#include <memory>
#define _CRTDBG_MAP_ALLOC
#include <crtdbg.h>
#include <Windows.h>
using namespace std;
int ticket = 0;
void showTicket(mutex* m){
lock_guard<std::mutex> l(*m);
#ifdef WIN32
//打印当前线程号
cout << "Thread id: " << GetCurrentThreadId();
#endif
cout <<" show ticket: " << ticket << endl;
}
int main(){
mutex *m = new mutex;
int sum = 0;
{
std::shared_ptr<CMyThreadPool> pool(CMyThreadPool::getInstance());
for(int i = 0; i < 100;i ){
if(!pool->start(bind(showTicket, m))){
sum ;
}
}
}
cout << "not use task : "<< sum << endl;
delete m;
_CrtDumpMemoryLeaks();
system("pause");
return 0;
}