C++线程池看这篇就够了,支持不同优先级,支持带返回值

2024-07-18 13:11:35 浏览数 (1)

如果知道我会死在哪里,那我将永远不去那个地方 -查理 芒格

引言

随着多核处理器的普及,并发编程在提高应用程序性能方面变得越来越重要。C 标准库提供了多线程支持,但直接使用std::thread进行大规模并发编程无疑增加了线程创建、销毁的开销。

线程池作为一种高效管理线程的机制,具有如下的有点(1)通过重用已存在的线程,减少对象的创建、销毁的开销,提升性能;(2)通过重复利用已创建的线程降低线程创建和销毁造成的消耗,防止消耗过多的内存或系统资源;(3)当任务到达时,任务可以不需要等待线程创建就能立即执行,消除了线程创建所带来的延迟,使应用程序响应更快;(4)线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性。线程池可以进行统一的分配、调优和监控,提高线程的可管理性。

本文将深入探讨C 线程池的原理、实现以及最佳实践。

原理

简言之,线程池的原理为多个线程从一个任务队列中取任务,如果取到任务便执行任务,未取到任务则等待新的任务到来,直到将所有任务取完。

由以上可知,线程池需要一个任务队列、一个线程队列,同时,为了保证取任务、添加任务的原子性,需要配套的控制变量(互斥锁、条件变量),具体详述如下:

(1)线程池初始化:线程池在创建时,会预先创建一组线程并保存在池中。这些线程通常处于休眠状态,等待任务的到来;

(2)任务队列:当有新任务到达时,它会被放入一个任务队列中。线程池中的线程会等待新任务到来的通知;

(3)线程复用:一旦线程执行完一个任务,它不会立即被销毁,而是一直在池内等待新任务的到来

(4)线程管理:线程池还负责管理线程的生命周期。例如,如果所有线程都在忙碌状态,并且队列中还有新的任务等待处理,线程池可能会选择创建新的线程来处理这些任务。

依据场景的不同,存在但不限于如下两种场景

1. 任务有不同的优先级,优先级高的任务希望能够先被执行,优先级低的任务可以延后执行;

2. 针对需要执行的任务,有的任务需要结果,有的任务不需要结果。

结合如上的场景,本文实现了可以支持如上需求的线程池。

实现

如下源码可直接拿走运行,也可以后台回复“线程池”获取源码下载链接。

代码语言:javascript复制
//header 
#pragma once
#include <functional>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <future>

enum class TaskPriority:int
{
  kTP_Highest,
  kTP_Normal
};


class TaskEvent final
{
public:
  TaskEvent(std::function<void()>f, TaskPriority p=TaskPriority::kTP_Highest):
  m_event(f),
  m_priority(p)
  {}

  const TaskPriority priority()const noexcept
{
    return m_priority;
  }

  void operator()()
{
    m_event();
  }

private:
    std::function<void()> m_event{nullptr};
    TaskPriority m_priority;
};

struct Compare {
  bool operator()(const std::shared_ptr <TaskEvent>& left, const std::shared_ptr <TaskEvent>& right) 
{   
    return left->priority() > right->priority(); 
  }
};

class ThreadPool
{
public:
  ThreadPool(int thread_num=std::thread::hardware_concurrency(), int max_thread_num= 4* std::thread::hardware_concurrency());
  ~ThreadPool();

  void AddTask(std::function<void()> t);
  void AddTask(std::shared_ptr<TaskEvent> te);

  template<typename F, typename... Args>
  auto AddHasResultTask(F&& f, Args&&... args) -> std::shared_future<decltype(f(args...))>//shared_futrue for saved into vector or queue
{
    using returnType = decltype(f(args...));
    auto task = std::make_shared<std::packaged_task<returnType()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...));
    std::shared_future<returnType> result = task->get_future().share();

    auto  t = std::make_shared<TaskEvent>([task]() {          (*task)();       });
    AddTask(t);
    return result;
  }

private:
  static void threadWorkFunction(void* p);

private:
  int m_current_thread_num{0};
  int m_max_thread_num{0};
  std::atomic_bool m_thread_pool_runing{true};

  //about threads
  std::vector<std::thread> m_threads;
  //about threads end

  //about tasks
  std::priority_queue< std::shared_ptr<TaskEvent>,std::vector<std::shared_ptr<TaskEvent>>,Compare>m_tasks_queue;//priority queue
  std::mutex m_tasks_mtx;
  std::condition_variable m_tasks_cv;
  //about tasks end
};


//source file
#include "ThreadPool.h"

ThreadPool::ThreadPool(int thread_num, int max_thread_num):
m_current_thread_num(thread_num),
m_max_thread_num(max_thread_num)
{
  for (int i =0; i< m_current_thread_num; i  )
  {
    m_threads.emplace_back(&ThreadPool::threadWorkFunction, this);
  }
}

ThreadPool::~ThreadPool()
{
  m_thread_pool_runing.store(false);
  m_tasks_cv.notify_all();
  for (auto& t : m_threads)
  {
    if (t.joinable())
    {

      t.join();
    }
  }
}

void ThreadPool::AddTask(std::function<void()> t)
{
  AddTask(std::make_shared<TaskEvent>(t));
}

void ThreadPool::AddTask(std::shared_ptr<TaskEvent> te)
{
  
  if (!m_thread_pool_runing)
  {
    return;//do not add new task
  }

  std::unique_lock<std::mutex> lk(m_tasks_mtx);
  m_tasks_queue.push(te);
  int size = m_tasks_queue.size();
  lk.unlock();

  if (size> m_current_thread_num && size<m_max_thread_num)
  {
    m_threads.emplace_back(&ThreadPool::threadWorkFunction, this);
  }
  m_tasks_cv.notify_all();
}

void ThreadPool::threadWorkFunction(void* p)
{
  if (!p)
  {
    return;
  }

  auto pool = static_cast<ThreadPool*>(p);
  if (!pool)
  {
    return;
  }

  //stop tasks and tasks queue is empty, will exit thread
  while (pool->m_thread_pool_runing.load()
    || pool->m_tasks_queue.size()>0)
  {
    std::unique_lock<std::mutex> lk(pool->m_tasks_mtx);
    pool->m_tasks_cv.wait(lk, [p_lambda = pool ]()->bool{
      return !p_lambda->m_tasks_queue.empty()||!p_lambda->m_thread_pool_runing.load();
    });

    if (!pool->m_tasks_queue.empty())
    {
      auto task = pool->m_tasks_queue.top();
      pool->m_tasks_queue.pop();

      (*task)();
    }
  }
}



//test file

int a = 100;
std::mutex g_mtx;

void use_thread_pool()
{
  std::shared_ptr<ThreadPool> pool=std::make_shared<ThreadPool>();

  for (int i = 0; i<10; i  )
  {
    pool->AddTask([]() {
      std::unique_lock<std::mutex> lk(g_mtx);
      a  = 100;
      std::cout<<"thread id="<< std::this_thread::get_id()<<" a="<<a<<"n";
      //printf("thread id=%s, a=%d n",ss.str(),a);
      });
  }
  std::this_thread::sleep_for(std::chrono::seconds(5));
  pool.reset();
}

const int data_size = 3;
const int calculate_time= 10;
void use_thread_pool_with_result()
{
  float* result_data = (float*)malloc(data_size* sizeof(float));
  memset(result_data,0, data_size * sizeof(float));

  std::vector<std::shared_future<float*>> sf_vec;
  sf_vec.reserve(calculate_time);
  std::shared_ptr<ThreadPool> pool = std::make_shared<ThreadPool>();

  for (int i = 0; i < calculate_time; i  )
  {
    auto sf = pool->AddHasResultTask([]() ->float*{
      float* t = (float*)malloc(data_size * sizeof(float));
      t[0]=1;
      t[1]=2;
      t[2]=3;
      return t;
      });
    sf_vec.push_back(sf);
  }

  for (auto sf: sf_vec)
  {
    auto t = sf.get();
    if (!t)
    {
      continue;
    }

    for (int i =0;i<data_size;i  )
    {
      result_data[i] =t[i];
    }
    free(t);
  }
  for (int i = 0; i < data_size; i  )
  {
    std::cout<<result_data[i] <<"t";
  }
  std::cout<<"n";
  free(result_data);
}

0 人点赞