《C++并发编程实战》读书笔记(6):高级线程管理、并行算法函数、测试与除错

2023-09-19 20:40:50 浏览数 (1)

第9章 高级线程管理

9.1 线程池

大多数程序中并不方便给每个任务分配单独的线程,但仍可通过线程池来充分利用可调配的并发算力:将可同时执行的任务提交到线程池,放入任务队列中等待,工作线程循环地领取并执行任务。

以下是一种实现,提交任务后返回future,提交者可通过future获取任务结果,任务先被包装成packaged_task再被包装成function,由工作线程来处理。

代码语言:javascript复制
class ThreadPool {
private:
    std::vector<std::thread> threads;
    ThreadsafeQueue<std::function<void()>> taskQueue;
    std::atomic<bool> stop;
    join_threads joiner;
public:
    ThreadPool(size_t numThreads = std::thread::hardware_concurrency()) 
        : stop(false),joiner(threads) {
        for (size_t i = 0; i < numThreads;   i) {
            threads.emplace_back([this]() {
                while (!stop) {
                    run_pending_task();
                }
            });
        }
    }
    
    // 避免所有线程都在等待其他线程完成任务
    void run_pending_task(){
        std::function<void()> task;               
        if (taskQueue.try_pop(task))
            task();
        else
            std::this_thread::yield();
    }
    
    ~ThreadPool() {
        stop = true;
    }

    template<class F, class... Args>
    auto submit(F&& f, Args&&... args) -> std::future<decltype(f(args...))> {
        using ReturnType = decltype(f(args...));
        auto task = std::make_shared<std::packaged_task<ReturnType()>>(
            std::bind(std::forward<F>(f), std::forward<Args>(args)...)
        );
        std::future<ReturnType> result = task->get_future();
        taskQueue.push([task]() { (*task)(); });
        return result;
    }
};

例如可以实现基于线程池的快排:

代码语言:javascript复制
template <typename T>
struct sorter {
    ThreadPool pool;

    std::list<T> do_sort(std::list<T>& chunk_data) {
        if (chunk_data.empty()) return chunk_data;
        // 将原list分为大小两段
        std::list<T> result;
        result.splice(result.begin(), chunk_data, chunk_data.begin());
        T const& partition_val = *result.begin();
        auto divide_point = std::partition(chunk_data.begin(), chunk_data.end(),[&](T const& val) { return val < partition_val; });
        // 两段分别处理
        std::list<T> new_lower_chunk;
        new_lower_chunk.splice(new_lower_chunk.end(), chunk_data,chunk_data.begin(), divide_point);
        auto new_lower = pool.submit(std::bind(&sorter::do_sort, this, std::move(new_lower_chunk)));
        std::list<T> new_higher(do_sort(chunk_data));
        result.splice(result.end(), new_higher);
        // 避免所有线程彼此等待
        while (!new_lower.is_ready()) {
            pool.run_pending_task();
        }
        result.splice(result.begin(), new_lower.get());
        return result;
    }
};

template <typename T>
std::list<T> parallel_quick_sort(std::list<T> input) {
    if (input.empty()) return input;
    sorter<T> s;
    return s.do_sort(input);
}

上述线程池仅具备一个全局的任务队列,即使使用无锁队列来优化仍然会有严重的缓存乒乓,导致性能浪费。可以为每个线程配备thread_local任务队列,仅当线程自身线程没有任务时才从全局队列领取任务。

此外,倘若某线程自身队列为空,而另一线程的队列为满,需支持窃取任务。首先实现支持这样操作的队列,仅用锁简单实现,一端用于push/pop,另一端用于steal。

代码语言:javascript复制
class work_stealing_queue {
private:
    typedef std::function<void()> data_type;
    std::deque<data_type> the_queue;
    mutable std::mutex the_mutex;

public:
    work_stealing_queue() {}

    work_stealing_queue(const work_stealing_queue& other) = delete;
    work_stealing_queue& operator=(const work_stealing_queue& other) = delete;

    void push(data_type data) {
        std::lock_guard<std::mutex> lock(the_mutex);
        the_queue.push_front(std::move(data));
    }

    bool try_pop(data_type& res) {
        std::lock_guard<std::mutex> lock(the_mutex);
        if (the_queue.empty()) return false;
        res = std::move(the_queue.front());
        the_queue.pop_front();
        return true;
    }

    bool try_steal(data_type& res) {
        std::lock_guard<std::mutex> lock(the_mutex);
        if (the_queue.empty()) return false;
        res = std::move(the_queue.back());
        the_queue.pop_back();
        return true;
    }
};

基于上面的结构,可以实现支持任务窃取的线程池:

代码语言:javascript复制
class thread_pool {
private:
    typedef std::function<void()> task_type;
    std::vector<std::thread> threads;
    join_threads joiner;
    std::atomic_bool done;
    // 全局任务队列
    thread_safe_queue<task_type> pool_work_queue;
    std::vector<std::unique_ptr<work_stealing_queue>> queues;
    // 指向线程独有的任务队列
    static thread_local work_stealing_queue* local_work_queue;
    // 线程编号
    static thread_local unsigned my_index;

    void worker_thread(unsigned my_index_) {
        my_index = my_index_;
        local_work_queue = queues[my_index].get();
        while (!done) {
            run_pending_task();
        }
    }

    bool pop_task_from_local_queue(task_type& task) {
        return local_work_queue && local_work_queue->try_pop(task);
    }

    bool pop_task_from_pool_queue(task_type& task) {
        return pool_work_queue.try_pop(task);
    }
    // 遍历,偷取任务
    bool pop_task_from_other_thread_queue(task_type& task) {
        for (unsigned i = 0; i < queues.size();   i) {
            unsigned const index = (my_index   i   1) % queues.size();
            if (queues[index]->try_steal(task)) {
                return true;
            }
        }
        return false;
    }

public:
    thread_pool() : joiner(threads), done(false) {
        unsigned const thread_count = std::thread::hardware_concurrency();
        try {
            for (unsigned i = 0; i < thread_count;   i) {
                queues.push_back(std::unique_ptr<work_stealing_queue>(
                    new work_stealing_queue));
                threads.push_back(
                    std::thread(&thread_pool::worker_thread, this, i));
            }
        } catch (...) {
            done = true;
            throw;
        }
    }

    ~thread_pool() { done = true; }

    template <class F, class... Args>
    auto submit(F&& f, Args&&... args) -> std::future<decltype(f(args...))> {
        using ReturnType = decltype(f(args...));
        auto task = std::make_shared<std::packaged_task<ReturnType()>>(
            std::bind(std::forward<F>(f), std::forward<Args>(args)...));
        std::future<ReturnType> result = task->get_future();
        if (local_work_queue) {
            local_work_queue->push([task]() { (*task)(); });
        } else {
            pool_work_queue.push([task]() { (*task)(); });
        }
        return result;
    }

    void run_pending_task() {
        task_type task;
        if (pop_task_from_local_queue(task) || pop_task_from_pool_queue(task) ||
            pop_task_from_other_thread_queue(task)) {
            task();
        } else {
            std::this_thread::yield();
        }
    }
};

9.2 中断线程

C 20中引入了能接收中断、自动join的jthread。但自己实现也不复杂。借助thread_local的interrupt_flag来辅助实现,通过interrupt成员函数来设置中断,并借此实现可中断的条件变量/future上的等待。

代码语言:javascript复制
thread_local interrupt_flag this_thread_interrupt_flag;

class interruptible_thread {
    std::thread internal_thread;
    interrupt_flag* flag;

public:
    template <typename FunctionType>
    interruptible_thread(FunctionType f) {
        std::promise<interrupt_flag*> p;
        internal_thread = std::thread([f, &p] {
            p.set_value(&this_thread_interrupt_flag);
            try{
                f();
            }catch(...){}
        });
        flag = p.get_future().get();
    }
    // 设置中断
    void interrupt() {
        if (flag) {
            flag->set();
        }
    }
};
// 如果已设置中断则抛出异常
void interruption_point() {
    if (this_thread_interrupt_flag.is_set()) {
        throw std::exception();
    }
}
// 可中断的条件变量等待
template <typename Lockable>
void interruptible_wait(std::condition_variable_any& cv, Lockable& lk) {
    this_thread_interrupt_flag.wait(cv, lk);
}
// 可中断的future等待
template <typename T, typename Lockable>
void interruptible_wait(std::future<T>& uf, Lockable& lk) {
    while (!this_thread_interrupt_flag.is_set()) {
        if (uf.wait_for(lk, 1ms) == std::future_status::ready) break;
    }
}

其中,interrupt_flag的实现如下,基于condition_variable_any而非普通条件变量,set时(即设置中断时)唤醒条件变量,wait时多次检查是否设置中断。

代码语言:javascript复制
class interrupt_flag {
    std::atomic<bool> flag;
    std::condition_variable_any* thread_cond_any;
    std::mutex set_clear_mutex;

public:
    interrupt_flag() : thread_cond_any(nullptr) {}
    void set() {
        flag.store(true, std::memory_order_relaxed);
        std::lock_guard<std::mutex> lk(set_clear_mutex);
        if (thread_cond_any) {
            thread_cond_any->notify_all();
        }
    }

    bool is_set() const { return flag.load(std::memory_order_relaxed); }

    template <typename Lockable>
    void wait(std::condition_variable_any& cv, Lockable& lk) {
        struct custom_lock {
            interrupt_flag* self;
            Lockable& lk;
            custom_lock(interrupt_flag* self_,
                        std::condition_variable_any& cond, Lockable& lk_)
                : self(self_), lk(lk_) {
                self->set_clear_mutex.lock();
                self->thread_cond_any = &cond;
            }
            void unlock() {
                lk.unlock();
                self->set_clear_mutex.unlock();
            }
            void lock() { std::lock(self->set_clear_mutex, lk); }
            ~custom_lock() { self->thread_cond_any = nullptr; }
        };
        custom_lock cl(this, cv, lk);
        interruption_point();
        cv.wait(cl);
        interruption_point();
    }
};

可以用try/catch来捕获中断,按某种方式处理然后继续执行。中断线程在实际应用中的常见场景是运行程序前开启后台任务,程序运行完退出时中断后台任务。


第10章 并行算法函数

C 17向标准库加入了并行算法函数,在原有函数的参数列表前新增了执行策略参数。<execution>中定义了三种执行策略sequenced_policy、parallel_policy、parallel_unsequenced_policy,以及对应的传给并行算法函数的对象seq、par、par_unseq。

不同策略会影响算法函数的复杂度、抛出异常时的行为、何时何地何种方式执行。其中seq代表顺序策略,令算法函数在发起调用的线程上执行全部操作,没有内存次序限制;par代表并行策略,内部操作可能在发起调用的线程上也可能另外创建线程执行,涉及的变量绝不能引发数据竞争;par_unseq代表非顺序并行策略,并行化最高,涉及的变量不得以任何形式同步。

例如某网站有庞大的日志,需要逐行处理日志提炼各项信息,最后聚合结果,类似mapreduce。由于每行日志的处理都独立,只需最后总数正确,所以可以用transfrom_reduce来处理:

代码语言:javascript复制
struct log_info {
    std::string page;
    time_t visit_time;
    std::string browser;
};

extern log_info parse_log_line(std::string const &line);

using visit_map_type = std::unordered_map<std::string, unsigned long long>;

visit_map_type count_visits_per_page(
    std::vector<std::string> const &log_lines) {
    struct combine_visits {
        visit_map_type operator()(visit_map_type lhs,
                                  visit_map_type rhs) const {
            if (lhs.size() < rhs.size()) std::swap(lhs, rhs);
            for (auto const &entry : rhs) {
                lhs[entry.first]  = entry.second;
            }
            return lhs;
        }

        visit_map_type operator()(log_info log, visit_map_type map) const {
              map[log.page];
            return map;
        }
        visit_map_type operator()(visit_map_type map, log_info log) const {
              map[log.page];
            return map;
        }
        visit_map_type operator()(log_info log1, log_info log2) const {
            visit_map_type map;
              map[log1.page];
              map[log2.page];
            return map;
        }
    };

    return std::transform_reduce(std::execution::par, log_lines.begin(),
                                 log_lines.end(), visit_map_type(),
                                 combine_visits(), parse_log_line);
}

0 人点赞