第1章 你好,C 并发世界
计算机系统中的并发包括任务切换与硬件并发,往往同时存在,关键因素是硬件支持的线程数。不论何种,本书谈论的技术都适用。
采用并发的理由主要是分离关注点与提升性能。但并发使得代码复杂、难懂、易错,不值得时无需采用并发。
并发的方式包括多进程与多线程。前者采用多个进程,每个进程只含一个线程,开销更大,通过昂贵的进程间通信来传递信息,但更安全并且可利用网络连接在不同计算机上并发。后者采用单一进程,内含多个线程,额外开销更低,但难以驾驭,往往暗含隐患。本书专攻多线程并发。
并发与并行都指可调配的硬件资源同时运行多个任务,但并行更强调性能,而并发更强调分离关注点或相应能力。
以一个简单的例子开启本书:
代码语言:javascript复制#include <iostream>
#include <thread>
void hello() { std::cout << "Hello Concurrent Worldn"; }
int main() {
std::thread t(hello);
// join令主线程等待子线程
t.join();
}
第2章 线程管控
2.1 线程的基本管控
每个C 程序都含有至少一个线程,即main函数所在线程。随后,程序可通过std::thread启动更多线程;它需要<thread>头文件,可以通过任何可调用类型(函数、伪函数、lambda等)发起线程。
代码语言:javascript复制void do_some_work();
std::thread my_thread(do_some_work);
启动线程后需要决定是与之汇合(join)还是与之分离(detach)。如果线程销毁时还没决定,那么线程会调用std::terminate终止整个程序。只有存在关联的执行线程时,即t.joinable()返回true,才能调用join/detach。
detach成员函数表示程序不等待线程结束,令线程在后台运行,归属权与控制权转交给C 运行时库。使用detach需确保所访问的外部数据始终正确有效,避免持有主线程的局部变量的指针/引用,否则主线程退出后该线程可能持有空悬指针/空悬引用。解决办法是将数据复制到新线程内部而非共享,或者使用join而非detach。
join成员函数的作用是等待线程的执行结束并回收线程资源;只能调用一次,之后就不再joinable。为了防止抛出异常时跳过join,导致程序崩溃有,可以实现一个RAII类,在析构函数中保证已经汇合过。
代码语言:javascript复制class thread_guard {
std::thread& t_;
public:
explicit thread_guard(std::thread& t) : t_(t) {}
~thread_guard() {
if (t_.joinable()) {
t_.join();
}
}
thread_guard(thread_guard const&) = delete;
thread_guard& operator=(thread_guard const&) = delete;
};
2、向线程函数传递参数
直接向std::thread的构造函数添加更多参数即可给线程函数传递参数。不过参数是先按默认方式复制到线程内部存储空间,再被当成临时变量以右值形式传给线程函数。
例如下面的字符串字面量hello,先以const char*形式传入,再转化为std::string类型。
代码语言:javascript复制void f(const std::string &);
std::thread t(f,"hello");
但如果实参是指针,那么传入指针后构造string时,指针可能已经空悬。解决办法是传参时直接转换为string。
代码语言:javascript复制std::thread t(f,std::string(buffer));
如果线程函数的形参是左值引用,直接传入实参会被转化为右值再传入,导致错误。解决办法是用std::ref加以包装。
代码语言:javascript复制void f(int &i) { std::cout << i; }
int main() {
int i = 3;
std::thread t(f, std::ref(i));
}
想要使用成员函数作为线程函数的话,还需传入对象指针。例如下面的线程函数实际上调用w.f(i)。
代码语言:javascript复制class Widget {
public:
void f(int i) { cout << i; }
};
int main() {
Widget w;
int i = 4;
std::thread t(&Widget::f, &w, i);
t.join();
}
对于只能移动不能拷贝的参数,例如unique_ptr,若实参是临时变量则自动移动,若实参是具名变量则需使用move。
代码语言:javascript复制void f(std::unique_ptr<Widget>);
auto p = make_unique<Widget>();
std::thread t(f,std::move(p));
2.3 移交线程归属权
thread掌握资源,像unique_ptr一样只能移动不能拷贝;此外当thread关联一个线程时向其移动赋值会导致程序终止。支持移动操作的容器,例如vector,可以装载std::thread对象。
可以改进前文的thread_guard,使其支持构建并掌管线程,确保离开所在作用域前线程已完结。
代码语言:javascript复制class scoped_thread {
std::thread t;
public:
explicit scoped_thread(std::thread t_) : t(std::move(t_)) {
if (!t.joinable()) throw std::logic_error("No thread");
}
~scoped_thread() { t.join(); }
scoped_thread(scoped_thread const&) = delete;
scoped_thread& operator=(scoped_thread const&) = delete;
};
// 使用统一初始化避免被解析为函数声明
scoped_thread t{std::thread(f)};
2.4 在运行时选择线程数量、线程ID
可以通过std::thread::hardware_concurrency()来获取可真正并发的线程数量,硬件信息无法获取时返回0。当用多线程分解任务时,该值是有用的指标。
以下是并行版accumulate的简易实现,根据硬件线程数计算实际需要运算的线程数,随后将任务分解到各个线程处理,最后汇总得到结果。
代码语言:javascript复制// 每个线程运行的子任务
template <typename Iterator, typename T>
struct accumulate_block {
void operator()(Iterator first, Iterator last, T& result) {
result = std::accumulate(first, last, result);
}
};
template <typename Iterator, typename T>
T parallel_accumulate(Iterator first, Iterator last, T init) {
unsigned long const length = std::distance(first, last);
if (!length) return init;
// 每个线程至少处理25个元素
unsigned long const min_per_thread = 25;
unsigned long const max_threads =
(length min_per_thread - 1) / min_per_thread;
unsigned long const hardware_threads = std::thread::hardware_concurrency();
// 无法获取硬件线程数时设置为2
unsigned long const num_threads =
std::min(hardware_threads != 0 ? hardware_threads : 2, max_threads);
unsigned long const block_size = length / num_threads;
std::vector<T> results(num_threads);
// 创建n-1个线程,因为本线程也进行运算任务
std::vector<std::thread> threads(num_threads - 1);
Iterator block_start = first;
for (unsigned long i = 0; i < (num_threads - 1); i) {
Iterator block_end = block_start;
std::advance(block_end, block_size);
threads[i] = std::thread(accumulate_block<Iterator, T>(), block_start,
block_end, std::ref(results[i]));
block_start = block_end;
}
accumulate_block<Iterator, T>()(block_start, last,
results[num_threads - 1]);
std::for_each(threads.begin(), threads.end(),
std::mem_fn(&std::thread::join));
return std::accumulate(results.begin(), results.end(), init);
}
线程ID的类型是std::thread::id,可随意复制或比较。可以通过thread的get_id()成员函数获取,也可以通过std::this_thread::get_id()获取当前线程ID。
第3章 在线程间共享数据
3.1 线程间共享数据的问题
并发编程中操作由多个线程负责,争先让线程执行各自的操作,结果取决于它们执行的相对顺序,这就是条件竞争。恶性条件竞争会导致未定义行为。很经典的两个线程各自递增一个全局变量十万次的例子,理想情况下最后变量变为二十万,然而实际情况是这样:
3.2 用互斥保护共享数据
可以利用名为互斥的同步原语。C 线程库保证了一旦由线程锁住某个互斥,其他线程试图加锁时必须等待,直到原先加锁的线程将其解锁。注意应以合适的粒度加锁,仅在访问共享数据期间加锁,处理数据时尽可能解锁。
C 中通过构造std::mutex的实例来创建互斥,通过lock/unlock成员函数来加锁解锁。并不推荐直接调用成员函数,应使用其RAII类lock_guard,构造时加锁、析构时解锁。
代码语言:javascript复制// 使用互斥锁来保护some_list
std::list<int> some_list;
std::mutex some_mutex;
void add_to_list(int new_value)
{
std::lock_guard<std::mutex> guard(some_mutex);
some_list.push_back(new_value);
}
//C 17支持类模板参数推导与scoped_lock
void add_to_list(int new_value)
{
std::scoped_lock guard(some_mutex);
some_list.push_back(new_value);
}
然而仍可能出现未被保护的指针/引用,或者成员函数调用了不受掌控的其他函数,因此不能向锁所在的作用域之外传递受保护数据的指针/引用。然而即使用互斥保护,有些接口仍存在固有的条件竞争。例如对于栈来说:线程1判断栈非空,随后线程2取出元素,栈空,随后线程1取出元素时出错。下面是一个解决办法的示例:
代码语言:javascript复制template <typename T>
class threadsafe_stack {
public:
std::shared_ptr<T> pop() {
std::lock_guard<std::mutex> lock(m);
if (data.empty()) throw empty_stack();
std::shared_ptr<T> const res(std::make_shared<T>(data.top()));
data.pop();
return res;
}
void pop(T& value) {
std::lock_guard<std::mutex> lock(m);
if (data.empty()) throw empty_stack();
value = data.top();
data.pop();
}
...
};
最后,死锁是指两个线程都需要锁住两个互斥锁才能继续运行,而目前都只锁住一个,并苦苦等待对方解锁。以下是一些防范死锁的准则:1、如果已经持有锁,就不要获取第二个锁;确实需要获取多个锁时使用std::lock来一次性获取所有锁。2、一旦持锁,避免调用用户提供的程序接口避免嵌套锁。3、依从固定顺序获取锁。4、按层级加锁。5、事实上任何同步机制的循环等待都会导致死锁。
例如swap函数需要同时获取双方的锁时:
代码语言:javascript复制class X {
public:
friend void swap(X& lhs, X& rhs) {
if (&lhs == &rhs) return;
std::lock(lhs.m, rhs.m);
// adopt_lock表示lhs.m已经上锁
std::lock_guard<std::mutex> lock_a(lhs.m, std::adopt_lock);
std::lock_guard<std::mutex> lock_b(rhs.m, std::adopt_lock);
swap(lhs.some_detail, rhs.some_detail);
}
// C 17中
friend void swap(X& lhs, X& rhs){
if (&lhs == &rhs) return;
std::scoped_lock guard(lhs.m,rhs.m);
swap(lhs.some_detail, rhs.some_detail);
}
};
unique_lock比lock_guard更灵活,不占有与之关联的互斥锁,但占用更多空间并且更慢。它提供了lock/try_lock/unlock成员函数;构造函数第二个参数传入adopt_lock表示互斥锁已上锁,传入defer_lock表示构造时无需上锁。unique_lock可移动不可复制,可以在不同作用域间转移互斥所有权,用途是让程序在同一个锁的保护下执行其他操作。
3.3 保护共享数据的其他工具
可以通过once_flag类和call_once函数来在初始化过程中保护共享数据。
代码语言:javascript复制std::once_flag resource_flag;
void init_resource(){ .. }
void run(){
std::call_once(resource_flag, init_resource);
...
}
C 11还规定了静态数据只会初始化一次。那么单例模板类可以这样实现:
代码语言:javascript复制template<class T>
class Singleton {
public:
static T& Instance() {
static T instance;
return instance;
}
protected:
Singleton() = default;
~Singleton() = default;
private:
Singleton(const Singleton&) = delete;
Singleton& operator=(const Singleton&) = delete;
};
// 使用方法:
class MyClass : public Singleton<MyClass> {
public:
...
private:
MyClass();
friend class Singleton<MyClass>;
};
对于读多写少的数据结构,C 14提供了shared_timed_mutex,C 17提供了功能更多的shared_mutex,那么写锁即lock_guard<shared_mutex>或unique_lock<shared_mutex>,读锁即shared_lock<shared_mutex>。
递归锁recursive_mutex允许同一线程对它多次加锁,释放所有锁后其他线程才可获取该锁。