第4章 并发操作的同步
4.1 等待事件或等待其他条件
如果线程甲需要等待线程乙完成任务,可以使用C 标准库的条件变量来等待事件发生。<condition_variable>中提供了condition_variable和condition_variable_any,前者只能配合mutex使用,而后者可以与任意符合互斥标准的类型使用,会产生额外开销。主要使用成员函数wait、notify_one、notify_all。
例如可以实现一个生产者消费者模型,通过队列来传递数据,一端准备数据另一端处理数据,其中条件变量的作用是消费者线程取出数据前检查队列是否非空,否则释放锁并等待生产者线程准备数据。
代码语言:javascript复制std::mutex mut;
std::queue<Widget> data_queue;
std::condition_variable data_cond;
void data_preparation_thread() {
while (...) {
const Widget data = prepare_data();
{
std::lock_guard<std::mutex> lk(mut);
data_queue.push(data);
}
// 通知消费者线程
data_cond.notify_one();
}
}
void data_processing_thread() {
while (...) {
// 需要多次加锁解锁,所以用unique_lock
std::unique_lock<std::mutex> lk(mut);
// wait首先判断lambda,成立则返回,否则解锁互斥进入阻塞
// 每次被notify后解除阻塞并获取锁,重复上述过程
data_cond.wait(lk, [] { return !data_queue.empty(); });
Widget data = data_queue.front();
data_queue.pop();
lk.unlock();
process(data);
}
}
也可以实现一个简略的线程安全的队列:
代码语言:javascript复制template <typename T>
class threadsafe_queue {
private:
mutable std::mutex mut;
std::queue<T> data_queue;
std::condition_variable data_cond;
public:
threadsafe_queue() {}
threadsafe_queue(threadsafe_queue const& other) {
std::lock_guard<std::mutex> lk(other.mut);
data_queue = other.data_queue;
}
void push(T new_value) {
std::lock_guard<std::mutex> lk(mut);
data_queue.push(new_value);
data_cond.notify_one();
}
void wait_and_pop(T& value) {
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(lk, [this] { return !data_queue.empty(); });
value = data_queue.front();
data_queue.pop();
}
};
4.2 使用future等待一次性事件发生
若线程需等待某一次性事件,可以以适当方式取得一个代表目标事件的future,此后线程就可以一边执行其他任务一边在future上等待。一旦目标事件发生,future就进入就绪状态,无法重置。
<future>中提供了两种类模板future和shared_future,同一事件仅可关联一个future实例,但可关联多个shared_future,并且目标事件发生后关联的所有shared_future实例都就绪。future本身不提供同步,多线程时需要用同步方式进行保护。
4.2.1 从后台任务返回值
并不急需某任务的返回值时,可以用async异步地启动任务,获得一个future对象;对后者调用get会阻塞当前线程,等待future准备完并返回该值。
代码语言:javascript复制int f() { ... }
std::future<int> answer = std::async(f);
...
std::cout << answer.get();
给async的任务函数传递参数类似给thread传递参数。
代码语言:javascript复制// 调用成员函数的情况
// p=&x; p->foo("hello")
auto f1 = std::async(&X::foo, &x, "hello");
// tmpx=x; tmpx.foo("hello")
auto f2 = std::async(&X::foo, x, "hello");
// 调用仿函数的情况
// tmpy=y; tmpy(3.14)
auto f3 = std::async(Y(), 3.14);
// y(3.14)
auto f4 = std::async(std::ref(y), 3.14);
// 函数形参为引用的情况
X baz(X&);
auto f5 = std::async(baz, std::ref(x));
可以给async传递参数指定运行方式,deferred代表直到在future上调用wait/get才执行任务函数,async代表开启专属线程来执行;默认为deferred|async。
代码语言:javascript复制auto f = std::async(std::launch::async, Y(), 1.2);
4.2.2 关联future实例与任务
类模板packaged_task把任务包装起来,可作为任务调度器、线程池的构建单元,其模板参数是函数签名,例如int(int,double*)。它具备函数调用操作符,参数取决于上述模板参数,调用时将参数传递给任务函数,通过get_future获取future对象,异步运行得到结果后保存到该对象。
例如图形用户界面需要接收其他线程的消息来更新界面。
代码语言:javascript复制std::mutex m;
std::deque<std::packaged_task<void()> > tasks;
// 图形用户界面的线程函数
void gui_thread() {
while (...) {
get_and_process_gui_message();
std::packaged_task<void()> task;
{
std::lock_guard<std::mutex> lk(m);
if (tasks.empty()) continue;
task = std::move(tasks.front());
tasks.pop_front();
}
task();
}
}
// 其他线程通过该函数传递消息
template <typename Func>
std::future<void> post_task_for_gui_thread(Func f) {
std::packaged_task<void()> task(f);
std::future<void> res = task.get_future();
std::lock_guard<std::mutex> lk(m);
tasks.push_back(std::move(task));
return res;
}
4.2.3 创建std::promise
有些任务无法以简单的函数调用表达,或者执行结果来自多个部分的代码,那么就需要使用std::promise显式地异步求值。
promise通过get_future获取关联的future对象,等待数据的线程在future上阻塞,提供数据的线程通过set_value设置数据,设置完后future即就绪。若promise销毁时仍未set_value,则传递异常。
下面是单线程处理多个连接的例子。这里假设传入的数据包含有ID与荷载数据,接收后将ID与promise对应,将相关值设为荷载数据。对于传出的数据而言,promise的相关值是代表是否成功的bool。
代码语言:javascript复制void process_connections(connection_set& connections) {
while (...) {
for (connection_iterator connection = ...) {
if (connection->has_incoming_data()) {
data_packet data = connection->incoming();
std::promise<payload_type>& p = connection->get_promise(data.id);
p.set_value(data.payload);
}
if (connection->has_outgoing_data()) {
outgoing_packet data = connection->top_of_outgoing_queue();
connection->send(data.payload);
data.promise.set_value(true);
}
}
}
}
async与packaged_task运行的函数抛出异常时会保存在future对象中,调用get时再次抛出。对于promise而言,应用set_exception保存异常
代码语言:javascript复制some_promise.set_exception(std::make_exception_ptr(std::logic_error("foo")));
4.2.4 多个线程一起等待
shared_future可以让多个线程等待同一个目标事件。每个线程复制一份shared_future副本,成为各线程独有的局部变量;通过该局部变量访问将由标准库自动同步,可以安全地访问。
代码语言:javascript复制std::promise<int> p1;
auto f1 = p1.get_future();
assert(f1.valid());
std::shared_future<int> sf1 = std::move(f1);
assert(!f1.valid());
assert(sf1.valid());
std::promise<int> p2;
auto sf2 = p2.get_future().share();
4.3 限时等待
之前介绍的所有可能阻塞的调用,其阻塞都可能漫无止境。为此可以采用一些超时机制:延迟超时表示等待一定时间,后缀为for,绝对超时表示等待到某时间点,后缀为until。
std::chrono库中时钟是时间信息的来源,每个时钟类都提供当前时刻now、时间值的类型time_point、计时单元的长度ratio<>、计时速率是否恒定is_steady。常用时钟类包括system_clock,steady_clock,high_resolution_clock。
时长类duration<>,其模板参数有两个,第一个指采用何种类型表示计时单元的数量,第二个指每个计时单元代表多少秒。例如std::chrono::duration<double,std::ratio<1,1000>>代表采用double值计数的毫秒时长类。
代码语言:javascript复制auto f = std::async(some_task);
if(f.wait_for(std::chrono::milliseconds(35))==
std::future_status::ready){
process(f.get());
}
时间点类time_point<>,模板参数有两个,第一个指参考时钟,第二个指计时单元,即特化的duration。
代码语言:javascript复制std::condition_variable cv;
bool done;
std::mutex m;
bool wait_loop() {
auto const timeout =
std::chrono::steady_clock::now() std::chrono::milliseconds(500);
std::unique_lock<std::mutex> lk(m);
while (!done) {
if (cv.wait_until(lk, timeout) == std::cv_status::timeout) break;
}
return done;
}
4.4 运用同步操作简化代码
在并发实战中可以使用贴近函数式编程的风格,函数调用的结果完全取决于参数而非任何外部状态。线程间不会直接共享数据,而是由各任务分别预先准备妥自己所需的数据,随后通过future将结果发送到其他有需要的线程。
例如可以实现并行的快排:
代码语言:javascript复制template <typename T>
std::list<T> parallel_quick_sort(std::list<T> input) {
if (input.empty()) {
return input;
}
// 将input的开头剪切到result
// 以此为分界值,将input分为两段
std::list<T> result;
result.splice(result.begin(), input, input.begin());
T const& pivot = *result.begin();
auto divide_point = std::partition(input.begin(), input.end(),
[&](T const& t) { return t < pivot; });
// 异步处理较小的一段
std::list<T> lower_part;
lower_part.splice(lower_part.end(), input, input.begin(), divide_point);
std::future<std::list<T> > new_lower(std::async(¶llel_quick_sort<T>, std::move(lower_part)));
// 本线程处理较大的一段
auto new_higher(parallel_quick_sort(std::move(input)));
// 汇合所有结果
result.splice(result.end(), new_higher);
result.splice(result.begin(), new_lower.get());
return result;
}
除了函数式编程,CSP(通信式串行线程)也有同样特性,其中线程完全隔离,没有共享数据,通过管道传递消息。具体代码这里不再演示。
C 20中还提出两个新特性:latch和barrier。latch是一个同步对象,内含计数器,减到0时就绪。
代码语言:javascript复制void foo() {
unsigned const thread_count = ...;
latch done(thread_count);
my_data data[thread_count];
std::vector<std::future<void> > threads;
for (unsigned i = 0; i < thread_count; i)
threads.push_back(std::async(std::launch::async, [&, i] {
data[i] = make_data(i);
done.count_down();
...
}));
done.wait();
process_data(data, thread_count);
}
而barrier针对一组给定的线程,每个线程运行到barrier处就阻塞,直到同组的所有线程都抵达才释放。
代码语言:javascript复制void process_data(data_source &source, data_sink &sink) {
unsigned const num_threads = ...
barrier sync(num_threads);
std::vector<joining_thread> threads(num_threads);
std::vector<data_chunk> chunks;
result_block result;
for (unsigned i = 0; i < num_threads; i) {
threads[i] = joining_thread([&, i] {
while (...) {
if (!i) {
data_block current_block = source.get_next_data_block();
chunks = divide_into_chunks(current_block, num_threads);
}
sync.arrive_and_wait();
result.set_chunk(i, num_threads, process(chunks[i]));
sync.arrive_and_wait();
if (!i) {
sink.write_data(std::move(result));
}
}
});
}
}