《C++并发编程实战》读书笔记(1):并发、线程管控

2023-08-10 08:19:27 浏览数 (1)

第1章 你好,C 并发世界

计算机系统中的并发包括任务切换与硬件并发,往往同时存在,关键因素是硬件支持的线程数。不论何种,本书谈论的技术都适用。

采用并发的理由主要是分离关注点与提升性能。但并发使得代码复杂、难懂、易错,不值得时无需采用并发。

并发的方式包括多进程与多线程。前者采用多个进程,每个进程只含一个线程,开销更大,通过昂贵的进程间通信来传递信息,但更安全并且可利用网络连接在不同计算机上并发。后者采用单一进程,内含多个线程,额外开销更低,但难以驾驭,往往暗含隐患。本书专攻多线程并发。

并发与并行都指可调配的硬件资源同时运行多个任务,但并行更强调性能,而并发更强调分离关注点或相应能力。

以一个简单的例子开启本书:

代码语言:javascript复制
#include <iostream>
#include <thread>

void hello() { std::cout << "Hello Concurrent Worldn"; }

int main() {
    std::thread t(hello);
    // join令主线程等待子线程
    t.join();
}

第2章 线程管控


2.1 线程的基本管控

每个C 程序都含有至少一个线程,即main函数所在线程。随后,程序可通过std::thread启动更多线程;它需要<thread>头文件,可以通过任何可调用类型(函数、伪函数、lambda等)发起线程。

代码语言:javascript复制
void do_some_work();
std::thread my_thread(do_some_work);

启动线程后需要决定是与之汇合(join)还是与之分离(detach)。如果线程销毁时还没决定,那么线程会调用std::terminate终止整个程序。只有存在关联的执行线程时,即t.joinable()返回true,才能调用join/detach。

detach成员函数表示程序不等待线程结束,令线程在后台运行,归属权与控制权转交给C 运行时库。使用detach需确保所访问的外部数据始终正确有效,避免持有主线程的局部变量的指针/引用,否则主线程退出后该线程可能持有空悬指针/空悬引用。解决办法是将数据复制到新线程内部而非共享,或者使用join而非detach。

join成员函数的作用是等待线程的执行结束并回收线程资源;只能调用一次,之后就不再joinable。为了防止抛出异常时跳过join,导致程序崩溃有,可以实现一个RAII类,在析构函数中保证已经汇合过。

代码语言:javascript复制
class thread_guard {
    std::thread& t_;

   public:
    explicit thread_guard(std::thread& t) : t_(t) {}
    ~thread_guard() {
        if (t_.joinable()) {
            t_.join();
        }
    }
    thread_guard(thread_guard const&) = delete;
    thread_guard& operator=(thread_guard const&) = delete;
};

2、向线程函数传递参数

直接向std::thread的构造函数添加更多参数即可给线程函数传递参数。不过参数是先按默认方式复制到线程内部存储空间,再被当成临时变量以右值形式传给线程函数。

例如下面的字符串字面量hello,先以const char*形式传入,再转化为std::string类型。

代码语言:javascript复制
void f(const std::string &);
std::thread t(f,"hello");

但如果实参是指针,那么传入指针后构造string时,指针可能已经空悬。解决办法是传参时直接转换为string。

代码语言:javascript复制
std::thread t(f,std::string(buffer));

如果线程函数的形参是左值引用,直接传入实参会被转化为右值再传入,导致错误。解决办法是用std::ref加以包装。

代码语言:javascript复制
void f(int &i) { std::cout << i; }

int main() {
    int i = 3;
    std::thread t(f, std::ref(i));
}

想要使用成员函数作为线程函数的话,还需传入对象指针。例如下面的线程函数实际上调用w.f(i)。

代码语言:javascript复制
class Widget {
   public:
    void f(int i) { cout << i; }
};

int main() {
    Widget w;
    int i = 4;
    std::thread t(&Widget::f, &w, i);
    t.join();
}

对于只能移动不能拷贝的参数,例如unique_ptr,若实参是临时变量则自动移动,若实参是具名变量则需使用move。

代码语言:javascript复制
void f(std::unique_ptr<Widget>);

auto p = make_unique<Widget>();
std::thread t(f,std::move(p));

2.3 移交线程归属权

thread掌握资源,像unique_ptr一样只能移动不能拷贝;此外当thread关联一个线程时向其移动赋值会导致程序终止。支持移动操作的容器,例如vector,可以装载std::thread对象。

可以改进前文的thread_guard,使其支持构建并掌管线程,确保离开所在作用域前线程已完结。

代码语言:javascript复制
class scoped_thread {
    std::thread t;

   public:
    explicit scoped_thread(std::thread t_) : t(std::move(t_)) {
        if (!t.joinable()) throw std::logic_error("No thread");
    }
    ~scoped_thread() { t.join(); }
    scoped_thread(scoped_thread const&) = delete;
    scoped_thread& operator=(scoped_thread const&) = delete;
};

// 使用统一初始化避免被解析为函数声明
scoped_thread t{std::thread(f)}; 

2.4 在运行时选择线程数量、线程ID

可以通过std::thread::hardware_concurrency()来获取可真正并发的线程数量,硬件信息无法获取时返回0。当用多线程分解任务时,该值是有用的指标。

以下是并行版accumulate的简易实现,根据硬件线程数计算实际需要运算的线程数,随后将任务分解到各个线程处理,最后汇总得到结果。

代码语言:javascript复制
// 每个线程运行的子任务
template <typename Iterator, typename T>
struct accumulate_block {
    void operator()(Iterator first, Iterator last, T& result) {
        result = std::accumulate(first, last, result);
    }
};

template <typename Iterator, typename T>
T parallel_accumulate(Iterator first, Iterator last, T init) {
    unsigned long const length = std::distance(first, last);
    if (!length) return init;
    // 每个线程至少处理25个元素
    unsigned long const min_per_thread = 25;
    unsigned long const max_threads =
        (length   min_per_thread - 1) / min_per_thread;
    unsigned long const hardware_threads = std::thread::hardware_concurrency();
    // 无法获取硬件线程数时设置为2
    unsigned long const num_threads =
        std::min(hardware_threads != 0 ? hardware_threads : 2, max_threads);
    unsigned long const block_size = length / num_threads;
    
    std::vector<T> results(num_threads);
    // 创建n-1个线程,因为本线程也进行运算任务
    std::vector<std::thread> threads(num_threads - 1);

    Iterator block_start = first;
    for (unsigned long i = 0; i < (num_threads - 1);   i) {
        Iterator block_end = block_start;
        std::advance(block_end, block_size);
        threads[i] = std::thread(accumulate_block<Iterator, T>(), block_start,
                                 block_end, std::ref(results[i]));
        block_start = block_end;
    }
    accumulate_block<Iterator, T>()(block_start, last,
                                    results[num_threads - 1]);

    std::for_each(threads.begin(), threads.end(),
                  std::mem_fn(&std::thread::join));

    return std::accumulate(results.begin(), results.end(), init);
}

线程ID的类型是std::thread::id,可随意复制或比较。可以通过thread的get_id()成员函数获取,也可以通过std::this_thread::get_id()获取当前线程ID。


第3章 在线程间共享数据

3.1 线程间共享数据的问题

并发编程中操作由多个线程负责,争先让线程执行各自的操作,结果取决于它们执行的相对顺序,这就是条件竞争。恶性条件竞争会导致未定义行为。很经典的两个线程各自递增一个全局变量十万次的例子,理想情况下最后变量变为二十万,然而实际情况是这样:


3.2 用互斥保护共享数据

可以利用名为互斥的同步原语。C 线程库保证了一旦由线程锁住某个互斥,其他线程试图加锁时必须等待,直到原先加锁的线程将其解锁。注意应以合适的粒度加锁,仅在访问共享数据期间加锁,处理数据时尽可能解锁。

C 中通过构造std::mutex的实例来创建互斥,通过lock/unlock成员函数来加锁解锁。并不推荐直接调用成员函数,应使用其RAII类lock_guard,构造时加锁、析构时解锁。

代码语言:javascript复制
// 使用互斥锁来保护some_list
std::list<int> some_list;
std::mutex some_mutex;

void add_to_list(int new_value)
{
    std::lock_guard<std::mutex> guard(some_mutex);
    some_list.push_back(new_value);
}

//C  17支持类模板参数推导与scoped_lock
void add_to_list(int new_value)
{
    std::scoped_lock guard(some_mutex);
    some_list.push_back(new_value);
}

然而仍可能出现未被保护的指针/引用,或者成员函数调用了不受掌控的其他函数,因此不能向锁所在的作用域之外传递受保护数据的指针/引用。然而即使用互斥保护,有些接口仍存在固有的条件竞争。例如对于栈来说:线程1判断栈非空,随后线程2取出元素,栈空,随后线程1取出元素时出错。下面是一个解决办法的示例:‍

代码语言:javascript复制
template <typename T>
class threadsafe_stack {
   public:
    std::shared_ptr<T> pop() {
        std::lock_guard<std::mutex> lock(m);
        if (data.empty()) throw empty_stack();
        std::shared_ptr<T> const res(std::make_shared<T>(data.top()));
        data.pop();
        return res;
    }
    void pop(T& value) {
        std::lock_guard<std::mutex> lock(m);
        if (data.empty()) throw empty_stack();
        value = data.top();
        data.pop();
    }
    ...
};

最后,死锁是指两个线程都需要锁住两个互斥锁才能继续运行,而目前都只锁住一个,并苦苦等待对方解锁。以下是一些防范死锁的准则:1、如果已经持有锁,就不要获取第二个锁;确实需要获取多个锁时使用std::lock来一次性获取所有锁。2、一旦持锁,避免调用用户提供的程序接口避免嵌套锁。3、依从固定顺序获取锁。4、按层级加锁。5、事实上任何同步机制的循环等待都会导致死锁。

例如swap函数需要同时获取双方的锁时:

代码语言:javascript复制
class X {   
public:
    friend void swap(X& lhs, X& rhs) {
        if (&lhs == &rhs) return;
        std::lock(lhs.m, rhs.m);
    // adopt_lock表示lhs.m已经上锁
        std::lock_guard<std::mutex> lock_a(lhs.m, std::adopt_lock);
        std::lock_guard<std::mutex> lock_b(rhs.m, std::adopt_lock);
        swap(lhs.some_detail, rhs.some_detail);
    }
    
    // C  17中
    friend void swap(X& lhs, X& rhs){
        if (&lhs == &rhs) return;
        std::scoped_lock guard(lhs.m,rhs.m);
        swap(lhs.some_detail, rhs.some_detail);
    }
};

unique_lock比lock_guard更灵活,不占有与之关联的互斥锁,但占用更多空间并且更慢。它提供了lock/try_lock/unlock成员函数;构造函数第二个参数传入adopt_lock表示互斥锁已上锁,传入defer_lock表示构造时无需上锁。unique_lock可移动不可复制,可以在不同作用域间转移互斥所有权,用途是让程序在同一个锁的保护下执行其他操作。


3.3 保护共享数据的其他工具

可以通过once_flag类和call_once函数来在初始化过程中保护共享数据。

代码语言:javascript复制
std::once_flag resource_flag;
void init_resource(){ .. }

void run(){
  std::call_once(resource_flag, init_resource);
  ...
}

C 11还规定了静态数据只会初始化一次。那么单例模板类可以这样实现:

代码语言:javascript复制
template<class T>
class Singleton {
public:
    static T& Instance() {
        static T instance;
        return instance;
    }

protected:
    Singleton() = default;
    ~Singleton() = default;

private:
    Singleton(const Singleton&) = delete;
    Singleton& operator=(const Singleton&) = delete;
};
// 使用方法:
class MyClass : public Singleton<MyClass> {
public:
    ...
private:
    MyClass();
    friend class Singleton<MyClass>;
};

对于读多写少的数据结构,C 14提供了shared_timed_mutex,C 17提供了功能更多的shared_mutex,那么写锁即lock_guard<shared_mutex>或unique_lock<shared_mutex>,读锁即shared_lock<shared_mutex>。

递归锁recursive_mutex允许同一线程对它多次加锁,释放所有锁后其他线程才可获取该锁。

0 人点赞