如果知道我会死在哪里,那我将永远不去那个地方 -查理 芒格
引言
随着多核处理器的普及,并发编程在提高应用程序性能方面变得越来越重要。C 标准库提供了多线程支持,但直接使用std::thread进行大规模并发编程无疑增加了线程创建、销毁的开销。
线程池作为一种高效管理线程的机制,具有如下的有点(1)通过重用已存在的线程,减少对象的创建、销毁的开销,提升性能;(2)通过重复利用已创建的线程降低线程创建和销毁造成的消耗,防止消耗过多的内存或系统资源;(3)当任务到达时,任务可以不需要等待线程创建就能立即执行,消除了线程创建所带来的延迟,使应用程序响应更快;(4)线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性。线程池可以进行统一的分配、调优和监控,提高线程的可管理性。
本文将深入探讨C 线程池的原理、实现以及最佳实践。
原理
简言之,线程池的原理为多个线程从一个任务队列中取任务,如果取到任务便执行任务,未取到任务则等待新的任务到来,直到将所有任务取完。
由以上可知,线程池需要一个任务队列、一个线程队列,同时,为了保证取任务、添加任务的原子性,需要配套的控制变量(互斥锁、条件变量),具体详述如下:
(1)线程池初始化:线程池在创建时,会预先创建一组线程并保存在池中。这些线程通常处于休眠状态,等待任务的到来;
(2)任务队列:当有新任务到达时,它会被放入一个任务队列中。线程池中的线程会等待新任务到来的通知;
(3)线程复用:一旦线程执行完一个任务,它不会立即被销毁,而是一直在池内等待新任务的到来
(4)线程管理:线程池还负责管理线程的生命周期。例如,如果所有线程都在忙碌状态,并且队列中还有新的任务等待处理,线程池可能会选择创建新的线程来处理这些任务。
依据场景的不同,存在但不限于如下两种场景:
1. 任务有不同的优先级,优先级高的任务希望能够先被执行,优先级低的任务可以延后执行;
2. 针对需要执行的任务,有的任务需要结果,有的任务不需要结果。
结合如上的场景,本文实现了可以支持如上需求的线程池。
实现
如下源码可直接拿走运行,也可以后台回复“线程池”获取源码下载链接。
代码语言:javascript复制//header
#pragma once
#include <functional>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <future>
enum class TaskPriority:int
{
kTP_Highest,
kTP_Normal
};
class TaskEvent final
{
public:
TaskEvent(std::function<void()>f, TaskPriority p=TaskPriority::kTP_Highest):
m_event(f),
m_priority(p)
{}
const TaskPriority priority()const noexcept
{
return m_priority;
}
void operator()()
{
m_event();
}
private:
std::function<void()> m_event{nullptr};
TaskPriority m_priority;
};
struct Compare {
bool operator()(const std::shared_ptr <TaskEvent>& left, const std::shared_ptr <TaskEvent>& right)
{
return left->priority() > right->priority();
}
};
class ThreadPool
{
public:
ThreadPool(int thread_num=std::thread::hardware_concurrency(), int max_thread_num= 4* std::thread::hardware_concurrency());
~ThreadPool();
void AddTask(std::function<void()> t);
void AddTask(std::shared_ptr<TaskEvent> te);
template<typename F, typename... Args>
auto AddHasResultTask(F&& f, Args&&... args) -> std::shared_future<decltype(f(args...))>//shared_futrue for saved into vector or queue
{
using returnType = decltype(f(args...));
auto task = std::make_shared<std::packaged_task<returnType()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...));
std::shared_future<returnType> result = task->get_future().share();
auto t = std::make_shared<TaskEvent>([task]() { (*task)(); });
AddTask(t);
return result;
}
private:
static void threadWorkFunction(void* p);
private:
int m_current_thread_num{0};
int m_max_thread_num{0};
std::atomic_bool m_thread_pool_runing{true};
//about threads
std::vector<std::thread> m_threads;
//about threads end
//about tasks
std::priority_queue< std::shared_ptr<TaskEvent>,std::vector<std::shared_ptr<TaskEvent>>,Compare>m_tasks_queue;//priority queue
std::mutex m_tasks_mtx;
std::condition_variable m_tasks_cv;
//about tasks end
};
//source file
#include "ThreadPool.h"
ThreadPool::ThreadPool(int thread_num, int max_thread_num):
m_current_thread_num(thread_num),
m_max_thread_num(max_thread_num)
{
for (int i =0; i< m_current_thread_num; i )
{
m_threads.emplace_back(&ThreadPool::threadWorkFunction, this);
}
}
ThreadPool::~ThreadPool()
{
m_thread_pool_runing.store(false);
m_tasks_cv.notify_all();
for (auto& t : m_threads)
{
if (t.joinable())
{
t.join();
}
}
}
void ThreadPool::AddTask(std::function<void()> t)
{
AddTask(std::make_shared<TaskEvent>(t));
}
void ThreadPool::AddTask(std::shared_ptr<TaskEvent> te)
{
if (!m_thread_pool_runing)
{
return;//do not add new task
}
std::unique_lock<std::mutex> lk(m_tasks_mtx);
m_tasks_queue.push(te);
int size = m_tasks_queue.size();
lk.unlock();
if (size> m_current_thread_num && size<m_max_thread_num)
{
m_threads.emplace_back(&ThreadPool::threadWorkFunction, this);
}
m_tasks_cv.notify_all();
}
void ThreadPool::threadWorkFunction(void* p)
{
if (!p)
{
return;
}
auto pool = static_cast<ThreadPool*>(p);
if (!pool)
{
return;
}
//stop tasks and tasks queue is empty, will exit thread
while (pool->m_thread_pool_runing.load()
|| pool->m_tasks_queue.size()>0)
{
std::unique_lock<std::mutex> lk(pool->m_tasks_mtx);
pool->m_tasks_cv.wait(lk, [p_lambda = pool ]()->bool{
return !p_lambda->m_tasks_queue.empty()||!p_lambda->m_thread_pool_runing.load();
});
if (!pool->m_tasks_queue.empty())
{
auto task = pool->m_tasks_queue.top();
pool->m_tasks_queue.pop();
(*task)();
}
}
}
//test file
int a = 100;
std::mutex g_mtx;
void use_thread_pool()
{
std::shared_ptr<ThreadPool> pool=std::make_shared<ThreadPool>();
for (int i = 0; i<10; i )
{
pool->AddTask([]() {
std::unique_lock<std::mutex> lk(g_mtx);
a = 100;
std::cout<<"thread id="<< std::this_thread::get_id()<<" a="<<a<<"n";
//printf("thread id=%s, a=%d n",ss.str(),a);
});
}
std::this_thread::sleep_for(std::chrono::seconds(5));
pool.reset();
}
const int data_size = 3;
const int calculate_time= 10;
void use_thread_pool_with_result()
{
float* result_data = (float*)malloc(data_size* sizeof(float));
memset(result_data,0, data_size * sizeof(float));
std::vector<std::shared_future<float*>> sf_vec;
sf_vec.reserve(calculate_time);
std::shared_ptr<ThreadPool> pool = std::make_shared<ThreadPool>();
for (int i = 0; i < calculate_time; i )
{
auto sf = pool->AddHasResultTask([]() ->float*{
float* t = (float*)malloc(data_size * sizeof(float));
t[0]=1;
t[1]=2;
t[2]=3;
return t;
});
sf_vec.push_back(sf);
}
for (auto sf: sf_vec)
{
auto t = sf.get();
if (!t)
{
continue;
}
for (int i =0;i<data_size;i )
{
result_data[i] =t[i];
}
free(t);
}
for (int i = 0; i < data_size; i )
{
std::cout<<result_data[i] <<"t";
}
std::cout<<"n";
free(result_data);
}