《C++并发编程实战》读书笔记(2):并发操作的同步

2023-08-10 08:20:15 浏览数 (1)

第4章 并发操作的同步

4.1 等待事件或等待其他条件

如果线程甲需要等待线程乙完成任务,可以使用C 标准库的条件变量来等待事件发生。<condition_variable>中提供了condition_variable和condition_variable_any,前者只能配合mutex使用,而后者可以与任意符合互斥标准的类型使用,会产生额外开销。主要使用成员函数wait、notify_one、notify_all。

例如可以实现一个生产者消费者模型,通过队列来传递数据,一端准备数据另一端处理数据,其中条件变量的作用是消费者线程取出数据前检查队列是否非空,否则释放锁并等待生产者线程准备数据。

代码语言:javascript复制
std::mutex mut;
std::queue<Widget> data_queue;
std::condition_variable data_cond;

void data_preparation_thread() {
    while (...) {
        const Widget data = prepare_data();
        {
          std::lock_guard<std::mutex> lk(mut);
          data_queue.push(data);
        }
        // 通知消费者线程
        data_cond.notify_one();
    }
}

void data_processing_thread() {
    while (...) {
        // 需要多次加锁解锁,所以用unique_lock
        std::unique_lock<std::mutex> lk(mut);
        // wait首先判断lambda,成立则返回,否则解锁互斥进入阻塞
        // 每次被notify后解除阻塞并获取锁,重复上述过程
        data_cond.wait(lk, [] { return !data_queue.empty(); });
        Widget data = data_queue.front();
        data_queue.pop();
        lk.unlock();
        process(data);
    }
}

也可以实现一个简略的线程安全的队列:

代码语言:javascript复制
template <typename T>
class threadsafe_queue {
   private:
    mutable std::mutex mut;
    std::queue<T> data_queue;
    std::condition_variable data_cond;

   public:
    threadsafe_queue() {}
    threadsafe_queue(threadsafe_queue const& other) {
        std::lock_guard<std::mutex> lk(other.mut);
        data_queue = other.data_queue;
    }

    void push(T new_value) {
        std::lock_guard<std::mutex> lk(mut);
        data_queue.push(new_value);
        data_cond.notify_one();
    }

    void wait_and_pop(T& value) {
        std::unique_lock<std::mutex> lk(mut);
        data_cond.wait(lk, [this] { return !data_queue.empty(); });
        value = data_queue.front();
        data_queue.pop();
    }
};

4.2 使用future等待一次性事件发生

若线程需等待某一次性事件,可以以适当方式取得一个代表目标事件的future,此后线程就可以一边执行其他任务一边在future上等待。一旦目标事件发生,future就进入就绪状态,无法重置。

<future>中提供了两种类模板future和shared_future,同一事件仅可关联一个future实例,但可关联多个shared_future,并且目标事件发生后关联的所有shared_future实例都就绪。future本身不提供同步,多线程时需要用同步方式进行保护。


4.2.1 从后台任务返回值

并不急需某任务的返回值时,可以用async异步地启动任务,获得一个future对象;对后者调用get会阻塞当前线程,等待future准备完并返回该值。

代码语言:javascript复制
int f() { ... }

std::future<int> answer = std::async(f);
...
std::cout << answer.get();

给async的任务函数传递参数类似给thread传递参数。

代码语言:javascript复制
// 调用成员函数的情况
// p=&x; p->foo("hello")
auto f1 = std::async(&X::foo, &x, "hello");
// tmpx=x; tmpx.foo("hello")
auto f2 = std::async(&X::foo, x, "hello");

// 调用仿函数的情况
// tmpy=y; tmpy(3.14)
auto f3 = std::async(Y(), 3.14);
// y(3.14)
auto f4 = std::async(std::ref(y), 3.14);

// 函数形参为引用的情况
X baz(X&);
auto f5 = std::async(baz, std::ref(x));

可以给async传递参数指定运行方式,deferred代表直到在future上调用wait/get才执行任务函数,async代表开启专属线程来执行;默认为deferred|async。

代码语言:javascript复制
auto f = std::async(std::launch::async, Y(), 1.2);

4.2.2 关联future实例与任务

类模板packaged_task把任务包装起来,可作为任务调度器、线程池的构建单元,其模板参数是函数签名,例如int(int,double*)。它具备函数调用操作符,参数取决于上述模板参数,调用时将参数传递给任务函数,通过get_future获取future对象,异步运行得到结果后保存到该对象。

例如图形用户界面需要接收其他线程的消息来更新界面。

代码语言:javascript复制
std::mutex m;
std::deque<std::packaged_task<void()> > tasks;

// 图形用户界面的线程函数
void gui_thread() {
    while (...) {
        get_and_process_gui_message();
        std::packaged_task<void()> task;
        {
            std::lock_guard<std::mutex> lk(m);
            if (tasks.empty()) continue;
            task = std::move(tasks.front());
            tasks.pop_front();
        }
        task();
    }
}

// 其他线程通过该函数传递消息
template <typename Func>
std::future<void> post_task_for_gui_thread(Func f) {
    std::packaged_task<void()> task(f);
    std::future<void> res = task.get_future();
    std::lock_guard<std::mutex> lk(m);
    tasks.push_back(std::move(task));
    return res;
}

4.2.3 创建std::promise

有些任务无法以简单的函数调用表达,或者执行结果来自多个部分的代码,那么就需要使用std::promise显式地异步求值。

promise通过get_future获取关联的future对象,等待数据的线程在future上阻塞,提供数据的线程通过set_value设置数据,设置完后future即就绪。若promise销毁时仍未set_value,则传递异常。

下面是单线程处理多个连接的例子。这里假设传入的数据包含有ID与荷载数据,接收后将ID与promise对应,将相关值设为荷载数据。对于传出的数据而言,promise的相关值是代表是否成功的bool。

代码语言:javascript复制
void process_connections(connection_set& connections) {
    while (...) {
        for (connection_iterator connection = ...) {
            if (connection->has_incoming_data()) {
                data_packet data = connection->incoming();
                std::promise<payload_type>& p = connection->get_promise(data.id);
                p.set_value(data.payload);
            }
            if (connection->has_outgoing_data()) {
                outgoing_packet data = connection->top_of_outgoing_queue();
                connection->send(data.payload);
                data.promise.set_value(true);
            }
        }
    }
}

async与packaged_task运行的函数抛出异常时会保存在future对象中,调用get时再次抛出。对于promise而言,应用set_exception保存异常

代码语言:javascript复制
some_promise.set_exception(std::make_exception_ptr(std::logic_error("foo")));

4.2.4 多个线程一起等待

shared_future可以让多个线程等待同一个目标事件。每个线程复制一份shared_future副本,成为各线程独有的局部变量;通过该局部变量访问将由标准库自动同步,可以安全地访问。

代码语言:javascript复制
std::promise<int> p1;
auto f1 = p1.get_future();
assert(f1.valid());
std::shared_future<int> sf1 = std::move(f1);
assert(!f1.valid());
assert(sf1.valid());

std::promise<int> p2;
auto sf2 = p2.get_future().share();

4.3 限时等待

之前介绍的所有可能阻塞的调用,其阻塞都可能漫无止境。为此可以采用一些超时机制:延迟超时表示等待一定时间,后缀为for,绝对超时表示等待到某时间点,后缀为until。

std::chrono库中时钟是时间信息的来源,每个时钟类都提供当前时刻now、时间值的类型time_point、计时单元的长度ratio<>、计时速率是否恒定is_steady。常用时钟类包括system_clock,steady_clock,high_resolution_clock。

时长类duration<>,其模板参数有两个,第一个指采用何种类型表示计时单元的数量,第二个指每个计时单元代表多少秒。例如std::chrono::duration<double,std::ratio<1,1000>>代表采用double值计数的毫秒时长类。

代码语言:javascript复制
auto f = std::async(some_task);
if(f.wait_for(std::chrono::milliseconds(35))==
    std::future_status::ready){    
    process(f.get());
}

时间点类time_point<>,模板参数有两个,第一个指参考时钟,第二个指计时单元,即特化的duration。

代码语言:javascript复制
std::condition_variable cv;
bool done;
std::mutex m;

bool wait_loop() {
    auto const timeout =
        std::chrono::steady_clock::now()   std::chrono::milliseconds(500);
    std::unique_lock<std::mutex> lk(m);
    while (!done) {
        if (cv.wait_until(lk, timeout) == std::cv_status::timeout) break;
    }
    return done;
}

4.4 运用同步操作简化代码

在并发实战中可以使用贴近函数式编程的风格,函数调用的结果完全取决于参数而非任何外部状态。线程间不会直接共享数据,而是由各任务分别预先准备妥自己所需的数据,随后通过future将结果发送到其他有需要的线程。

例如可以实现并行的快排:

代码语言:javascript复制
template <typename T>
std::list<T> parallel_quick_sort(std::list<T> input) {
    if (input.empty()) {
        return input;
    }
    // 将input的开头剪切到result
    // 以此为分界值,将input分为两段
    std::list<T> result;
    result.splice(result.begin(), input, input.begin());
    T const& pivot = *result.begin();
    auto divide_point = std::partition(input.begin(), input.end(),
                                       [&](T const& t) { return t < pivot; });
    // 异步处理较小的一段
    std::list<T> lower_part;
    lower_part.splice(lower_part.end(), input, input.begin(), divide_point);
    std::future<std::list<T> > new_lower(std::async(&parallel_quick_sort<T>, std::move(lower_part)));
    // 本线程处理较大的一段
    auto new_higher(parallel_quick_sort(std::move(input)));
    // 汇合所有结果
    result.splice(result.end(), new_higher);
    result.splice(result.begin(), new_lower.get());
    return result;
}

除了函数式编程,CSP(通信式串行线程)也有同样特性,其中线程完全隔离,没有共享数据,通过管道传递消息。具体代码这里不再演示。

C 20中还提出两个新特性:latch和barrier。latch是一个同步对象,内含计数器,减到0时就绪。

代码语言:javascript复制
void foo() {
    unsigned const thread_count = ...;
    latch done(thread_count);
    my_data data[thread_count];
    std::vector<std::future<void> > threads;
    for (unsigned i = 0; i < thread_count;   i)
        threads.push_back(std::async(std::launch::async, [&, i] {
            data[i] = make_data(i);
            done.count_down();
            ...
        }));
    done.wait();
    process_data(data, thread_count);
}

而barrier针对一组给定的线程,每个线程运行到barrier处就阻塞,直到同组的所有线程都抵达才释放。

代码语言:javascript复制
void process_data(data_source &source, data_sink &sink) {
    unsigned const num_threads = ...
    barrier sync(num_threads);
    std::vector<joining_thread> threads(num_threads);
    
    std::vector<data_chunk> chunks;
    result_block result;

    for (unsigned i = 0; i < num_threads;   i) {
        threads[i] = joining_thread([&, i] {
            while (...) {
                if (!i) {
                    data_block current_block = source.get_next_data_block();
                    chunks = divide_into_chunks(current_block, num_threads);
                }
                sync.arrive_and_wait();
                result.set_chunk(i, num_threads, process(chunks[i]));
                sync.arrive_and_wait();
                if (!i) {
                    sink.write_data(std::move(result));
                }
            }
        });
    }
}

0 人点赞