什么是生产者-消费者模型?
假设有两个线程(进程)A、B和一个容器
线程A生产完数据之后不用等待线程B消费处理,直接将生产的数据放到这个容器当中;消费者线程B也不用找生产者线程A索要数据,而是直接监听容器有无数据,有数据就取出消费。容器就类似于一个缓冲区,平衡了生产者和消费者的处理能力。
该模型的关键在于
消费者不会在缓冲区无数据时消耗数据。
若是容器有上限也要保证生产者不会在缓冲区满时加入数据。
以下是通过stl的queue队列做容器实现一个简单的生产者-消费者模型
代码实现
Task.h头文件
- 首先我们需要一个类简单的模拟平常的任务需求
- 通过new一个对象生成任务填充进待消费的队列
- 运行Run以后视为消费了,delete掉之前new的对象
#pragma once
#include <iostream>
#include <functional>
class Task
{
private:
int id;
public:
Task(int id) :id(id)
{
std::cout << "生产:" << id << std::endl;
}
int GetId() { return id; }
void SetId(int k) {
id = k;
}
void Run()
{
std::cout << "消费:" << id << std::endl;
}
};
main.cpp
该代码中生产者线程产生一系列 Task 对象并将其放入共享队列中,消费者线程从队列中取出这些任务并处理它们。
- 预先定义的一些变量
- #define QueueMaxBuffer 5 ,用于模拟容器容量达到上限时的情景,当达到上限时生产者等待一秒后再继续生产数据
- #define ProduceMax 8 , 用于模拟生产者的最大生产任务量,当达到最大生产任务量退出循环
- g_DataBuffer用于存储生产者产生的任务,并供消费者消费。
- g_DataBufferMutex;用于保护对 g_DataBuffer 的访问
- g_CondVar 一个条件变量,可以使消费者线程等待队列不为空,而生产者线程会在向队列添加新任务后通知等待的消费者。
- g_ProducerDone 用于标记生产者是否完成了所有的任务生产。
- 生产者线程函数:
- 任务满了就等待一秒,再继续生产
- 每生产一个任务通过notify_one() 方法来通知一个等待的消费者线程
- 每100毫秒生产一个任务
- 消费者线程函数:
- 用 g_CondVar.wait() 使线程等待,直到队列非空或 生产者生产完成既g_ProducerDone 为 true。
- 每400秒消费一个任务
- 当消费完成,既队列为空且生产者生产完成时退出
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <chrono>
#include "Task.h"
#define QueueMaxBuffer 5 // 队列最大容量
#define ProduceMax 8 //生产者最大生产任务量
std::queue<Task*> g_DataBuffer;
std::mutex g_DataBufferMutex;
std::condition_variable g_CondVar;
std::atomic<bool> g_ProducerDone(false); // 生产者完成标志
void ProduceData()
{
int value = 0;
while (true)
{
// 这个线程等待一百毫秒
std::this_thread::sleep_for(std::chrono::milliseconds(100));
std::unique_lock<std::mutex> lock(g_DataBufferMutex);
// 等待一秒直到队列未满
if (g_DataBuffer.size() < QueueMaxBuffer);
else
{
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}
Task* t = new Task(value );
g_DataBuffer.push(t);
// 通知消费者数据准备好了
g_CondVar.notify_one();
// 可以在这里改动退出逻辑
if (value > ProduceMax)
{
std::cout << "Producer done, produced " << value - 1 << " tasks." << std::endl;
g_ProducerDone.store(true); // 设置生产者完成标志
break;
}
}
}
void ConsumeData()
{
while (true)
{
std::this_thread::sleep_for(std::chrono::milliseconds(400));
std::unique_lock<std::mutex> lock(g_DataBufferMutex);
g_CondVar.wait(lock, [] { return !g_DataBuffer.empty() || g_ProducerDone.load(); });
if (g_DataBuffer.empty() && g_ProducerDone.load())
break; // 如果队列为空且生产者已完成,则退出
Task* data = g_DataBuffer.front();
data->Run();
g_DataBuffer.pop();
delete data;
std::cout << "Consumed one task, remaining tasks in queue: " << g_DataBuffer.size() << std::endl;
}
}
int main()
{
std::thread p(ProduceData);
std::thread c(ConsumeData);
// 等待生产者线程和消费者线程完成
p.join();
c.join();
return 0;
}
运行截图
总结
写这篇文章时,碰了不少壁,线程之间的时序问题相当不熟练以至于出现bug时,会导致我难以定位,最后只能妥协用一些较为简单的代码代替感觉有问题的地方。
虽然简单的实现了一个多线程的生产者消费者模型,但缺点不少,等以后碰到具体的应用场景时,再来完善其内容吧。