第9章 高级线程管理
9.1 线程池
大多数程序中并不方便给每个任务分配单独的线程,但仍可通过线程池来充分利用可调配的并发算力:将可同时执行的任务提交到线程池,放入任务队列中等待,工作线程循环地领取并执行任务。
以下是一种实现,提交任务后返回future,提交者可通过future获取任务结果,任务先被包装成packaged_task再被包装成function,由工作线程来处理。
代码语言:javascript复制class ThreadPool {
private:
std::vector<std::thread> threads;
ThreadsafeQueue<std::function<void()>> taskQueue;
std::atomic<bool> stop;
join_threads joiner;
public:
ThreadPool(size_t numThreads = std::thread::hardware_concurrency())
: stop(false),joiner(threads) {
for (size_t i = 0; i < numThreads; i) {
threads.emplace_back([this]() {
while (!stop) {
run_pending_task();
}
});
}
}
// 避免所有线程都在等待其他线程完成任务
void run_pending_task(){
std::function<void()> task;
if (taskQueue.try_pop(task))
task();
else
std::this_thread::yield();
}
~ThreadPool() {
stop = true;
}
template<class F, class... Args>
auto submit(F&& f, Args&&... args) -> std::future<decltype(f(args...))> {
using ReturnType = decltype(f(args...));
auto task = std::make_shared<std::packaged_task<ReturnType()>>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
);
std::future<ReturnType> result = task->get_future();
taskQueue.push([task]() { (*task)(); });
return result;
}
};
例如可以实现基于线程池的快排:
代码语言:javascript复制template <typename T>
struct sorter {
ThreadPool pool;
std::list<T> do_sort(std::list<T>& chunk_data) {
if (chunk_data.empty()) return chunk_data;
// 将原list分为大小两段
std::list<T> result;
result.splice(result.begin(), chunk_data, chunk_data.begin());
T const& partition_val = *result.begin();
auto divide_point = std::partition(chunk_data.begin(), chunk_data.end(),[&](T const& val) { return val < partition_val; });
// 两段分别处理
std::list<T> new_lower_chunk;
new_lower_chunk.splice(new_lower_chunk.end(), chunk_data,chunk_data.begin(), divide_point);
auto new_lower = pool.submit(std::bind(&sorter::do_sort, this, std::move(new_lower_chunk)));
std::list<T> new_higher(do_sort(chunk_data));
result.splice(result.end(), new_higher);
// 避免所有线程彼此等待
while (!new_lower.is_ready()) {
pool.run_pending_task();
}
result.splice(result.begin(), new_lower.get());
return result;
}
};
template <typename T>
std::list<T> parallel_quick_sort(std::list<T> input) {
if (input.empty()) return input;
sorter<T> s;
return s.do_sort(input);
}
上述线程池仅具备一个全局的任务队列,即使使用无锁队列来优化仍然会有严重的缓存乒乓,导致性能浪费。可以为每个线程配备thread_local任务队列,仅当线程自身线程没有任务时才从全局队列领取任务。
此外,倘若某线程自身队列为空,而另一线程的队列为满,需支持窃取任务。首先实现支持这样操作的队列,仅用锁简单实现,一端用于push/pop,另一端用于steal。
代码语言:javascript复制class work_stealing_queue {
private:
typedef std::function<void()> data_type;
std::deque<data_type> the_queue;
mutable std::mutex the_mutex;
public:
work_stealing_queue() {}
work_stealing_queue(const work_stealing_queue& other) = delete;
work_stealing_queue& operator=(const work_stealing_queue& other) = delete;
void push(data_type data) {
std::lock_guard<std::mutex> lock(the_mutex);
the_queue.push_front(std::move(data));
}
bool try_pop(data_type& res) {
std::lock_guard<std::mutex> lock(the_mutex);
if (the_queue.empty()) return false;
res = std::move(the_queue.front());
the_queue.pop_front();
return true;
}
bool try_steal(data_type& res) {
std::lock_guard<std::mutex> lock(the_mutex);
if (the_queue.empty()) return false;
res = std::move(the_queue.back());
the_queue.pop_back();
return true;
}
};
基于上面的结构,可以实现支持任务窃取的线程池:
代码语言:javascript复制class thread_pool {
private:
typedef std::function<void()> task_type;
std::vector<std::thread> threads;
join_threads joiner;
std::atomic_bool done;
// 全局任务队列
thread_safe_queue<task_type> pool_work_queue;
std::vector<std::unique_ptr<work_stealing_queue>> queues;
// 指向线程独有的任务队列
static thread_local work_stealing_queue* local_work_queue;
// 线程编号
static thread_local unsigned my_index;
void worker_thread(unsigned my_index_) {
my_index = my_index_;
local_work_queue = queues[my_index].get();
while (!done) {
run_pending_task();
}
}
bool pop_task_from_local_queue(task_type& task) {
return local_work_queue && local_work_queue->try_pop(task);
}
bool pop_task_from_pool_queue(task_type& task) {
return pool_work_queue.try_pop(task);
}
// 遍历,偷取任务
bool pop_task_from_other_thread_queue(task_type& task) {
for (unsigned i = 0; i < queues.size(); i) {
unsigned const index = (my_index i 1) % queues.size();
if (queues[index]->try_steal(task)) {
return true;
}
}
return false;
}
public:
thread_pool() : joiner(threads), done(false) {
unsigned const thread_count = std::thread::hardware_concurrency();
try {
for (unsigned i = 0; i < thread_count; i) {
queues.push_back(std::unique_ptr<work_stealing_queue>(
new work_stealing_queue));
threads.push_back(
std::thread(&thread_pool::worker_thread, this, i));
}
} catch (...) {
done = true;
throw;
}
}
~thread_pool() { done = true; }
template <class F, class... Args>
auto submit(F&& f, Args&&... args) -> std::future<decltype(f(args...))> {
using ReturnType = decltype(f(args...));
auto task = std::make_shared<std::packaged_task<ReturnType()>>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...));
std::future<ReturnType> result = task->get_future();
if (local_work_queue) {
local_work_queue->push([task]() { (*task)(); });
} else {
pool_work_queue.push([task]() { (*task)(); });
}
return result;
}
void run_pending_task() {
task_type task;
if (pop_task_from_local_queue(task) || pop_task_from_pool_queue(task) ||
pop_task_from_other_thread_queue(task)) {
task();
} else {
std::this_thread::yield();
}
}
};
9.2 中断线程
C 20中引入了能接收中断、自动join的jthread。但自己实现也不复杂。借助thread_local的interrupt_flag来辅助实现,通过interrupt成员函数来设置中断,并借此实现可中断的条件变量/future上的等待。
代码语言:javascript复制thread_local interrupt_flag this_thread_interrupt_flag;
class interruptible_thread {
std::thread internal_thread;
interrupt_flag* flag;
public:
template <typename FunctionType>
interruptible_thread(FunctionType f) {
std::promise<interrupt_flag*> p;
internal_thread = std::thread([f, &p] {
p.set_value(&this_thread_interrupt_flag);
try{
f();
}catch(...){}
});
flag = p.get_future().get();
}
// 设置中断
void interrupt() {
if (flag) {
flag->set();
}
}
};
// 如果已设置中断则抛出异常
void interruption_point() {
if (this_thread_interrupt_flag.is_set()) {
throw std::exception();
}
}
// 可中断的条件变量等待
template <typename Lockable>
void interruptible_wait(std::condition_variable_any& cv, Lockable& lk) {
this_thread_interrupt_flag.wait(cv, lk);
}
// 可中断的future等待
template <typename T, typename Lockable>
void interruptible_wait(std::future<T>& uf, Lockable& lk) {
while (!this_thread_interrupt_flag.is_set()) {
if (uf.wait_for(lk, 1ms) == std::future_status::ready) break;
}
}
其中,interrupt_flag的实现如下,基于condition_variable_any而非普通条件变量,set时(即设置中断时)唤醒条件变量,wait时多次检查是否设置中断。
代码语言:javascript复制class interrupt_flag {
std::atomic<bool> flag;
std::condition_variable_any* thread_cond_any;
std::mutex set_clear_mutex;
public:
interrupt_flag() : thread_cond_any(nullptr) {}
void set() {
flag.store(true, std::memory_order_relaxed);
std::lock_guard<std::mutex> lk(set_clear_mutex);
if (thread_cond_any) {
thread_cond_any->notify_all();
}
}
bool is_set() const { return flag.load(std::memory_order_relaxed); }
template <typename Lockable>
void wait(std::condition_variable_any& cv, Lockable& lk) {
struct custom_lock {
interrupt_flag* self;
Lockable& lk;
custom_lock(interrupt_flag* self_,
std::condition_variable_any& cond, Lockable& lk_)
: self(self_), lk(lk_) {
self->set_clear_mutex.lock();
self->thread_cond_any = &cond;
}
void unlock() {
lk.unlock();
self->set_clear_mutex.unlock();
}
void lock() { std::lock(self->set_clear_mutex, lk); }
~custom_lock() { self->thread_cond_any = nullptr; }
};
custom_lock cl(this, cv, lk);
interruption_point();
cv.wait(cl);
interruption_point();
}
};
可以用try/catch来捕获中断,按某种方式处理然后继续执行。中断线程在实际应用中的常见场景是运行程序前开启后台任务,程序运行完退出时中断后台任务。
第10章 并行算法函数
C 17向标准库加入了并行算法函数,在原有函数的参数列表前新增了执行策略参数。<execution>中定义了三种执行策略sequenced_policy、parallel_policy、parallel_unsequenced_policy,以及对应的传给并行算法函数的对象seq、par、par_unseq。
不同策略会影响算法函数的复杂度、抛出异常时的行为、何时何地何种方式执行。其中seq代表顺序策略,令算法函数在发起调用的线程上执行全部操作,没有内存次序限制;par代表并行策略,内部操作可能在发起调用的线程上也可能另外创建线程执行,涉及的变量绝不能引发数据竞争;par_unseq代表非顺序并行策略,并行化最高,涉及的变量不得以任何形式同步。
例如某网站有庞大的日志,需要逐行处理日志提炼各项信息,最后聚合结果,类似mapreduce。由于每行日志的处理都独立,只需最后总数正确,所以可以用transfrom_reduce来处理:
代码语言:javascript复制struct log_info {
std::string page;
time_t visit_time;
std::string browser;
};
extern log_info parse_log_line(std::string const &line);
using visit_map_type = std::unordered_map<std::string, unsigned long long>;
visit_map_type count_visits_per_page(
std::vector<std::string> const &log_lines) {
struct combine_visits {
visit_map_type operator()(visit_map_type lhs,
visit_map_type rhs) const {
if (lhs.size() < rhs.size()) std::swap(lhs, rhs);
for (auto const &entry : rhs) {
lhs[entry.first] = entry.second;
}
return lhs;
}
visit_map_type operator()(log_info log, visit_map_type map) const {
map[log.page];
return map;
}
visit_map_type operator()(visit_map_type map, log_info log) const {
map[log.page];
return map;
}
visit_map_type operator()(log_info log1, log_info log2) const {
visit_map_type map;
map[log1.page];
map[log2.page];
return map;
}
};
return std::transform_reduce(std::execution::par, log_lines.begin(),
log_lines.end(), visit_map_type(),
combine_visits(), parse_log_line);
}