生产者-消费者模型C++多线程简单实现

2024-07-29 09:22:44 浏览数 (1)

什么是生产者-消费者模型?

假设有两个线程(进程)A、B和一个容器

线程A生产完数据之后不用等待线程B消费处理,直接将生产的数据放到这个容器当中;消费者线程B也不用找生产者线程A索要数据,而是直接监听容器有无数据,有数据就取出消费。容器就类似于一个缓冲区,平衡了生产者和消费者的处理能力。

该模型的关键在于

消费者不会在缓冲区无数据时消耗数据。

若是容器有上限也要保证生产者不会在缓冲区满时加入数据。

以下是通过stl的queue队列做容器实现一个简单的生产者-消费者模型

代码实现

Task.h头文件

  • 首先我们需要一个类简单的模拟平常的任务需求
  • 通过new一个对象生成任务填充进待消费的队列
  • 运行Run以后视为消费了,delete掉之前new的对象
代码语言:cpp复制
#pragma once
#include <iostream>
#include <functional>
class Task
{
private:
	int id;
public:
	Task(int id) :id(id)
	{
		std::cout << "生产:" << id << std::endl;
	}
	int GetId() { return id; }
	void SetId(int k) {
		id = k;
	}
	void Run()
	{
		std::cout << "消费:" << id << std::endl;
	}
};

main.cpp

该代码中生产者线程产生一系列 Task 对象并将其放入共享队列中,消费者线程从队列中取出这些任务并处理它们。

  • 预先定义的一些变量
    • #define QueueMaxBuffer 5 ,用于模拟容器容量达到上限时的情景,当达到上限时生产者等待一秒后再继续生产数据
    • #define ProduceMax 8 , 用于模拟生产者的最大生产任务量,当达到最大生产任务量退出循环
    • g_DataBuffer用于存储生产者产生的任务,并供消费者消费。
    • g_DataBufferMutex;用于保护对 g_DataBuffer 的访问
    • g_CondVar 一个条件变量,可以使消费者线程等待队列不为空,而生产者线程会在向队列添加新任务后通知等待的消费者。
    • g_ProducerDone 用于标记生产者是否完成了所有的任务生产。
  • 生产者线程函数:
    • 任务满了就等待一秒,再继续生产
    • 每生产一个任务通过notify_one() 方法来通知一个等待的消费者线程
    • 每100毫秒生产一个任务
  • 消费者线程函数:
    • 用 g_CondVar.wait() 使线程等待,直到队列非空或 生产者生产完成既g_ProducerDone 为 true。
    • 每400秒消费一个任务
    • 当消费完成,既队列为空且生产者生产完成时退出
代码语言:cpp复制
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <chrono>
#include "Task.h"

#define QueueMaxBuffer 5  // 队列最大容量
#define ProduceMax 8  //生产者最大生产任务量

std::queue<Task*> g_DataBuffer;
std::mutex g_DataBufferMutex;
std::condition_variable g_CondVar;
std::atomic<bool> g_ProducerDone(false);  // 生产者完成标志

void ProduceData()
{
	int value = 0;
	while (true)
	{
		// 这个线程等待一百毫秒
		std::this_thread::sleep_for(std::chrono::milliseconds(100));

		std::unique_lock<std::mutex> lock(g_DataBufferMutex);
		// 等待一秒直到队列未满
		if (g_DataBuffer.size() < QueueMaxBuffer);
		else
		{
			std::this_thread::sleep_for(std::chrono::milliseconds(1000));
		}

		Task* t = new Task(value  );
		g_DataBuffer.push(t);

		// 通知消费者数据准备好了
		g_CondVar.notify_one();

		// 可以在这里改动退出逻辑
		if (value > ProduceMax)
		{
			std::cout << "Producer done, produced " << value - 1 << " tasks." << std::endl;
			g_ProducerDone.store(true);  // 设置生产者完成标志
			break;
		}
	}
}

void ConsumeData()
{
	while (true)
	{
		std::this_thread::sleep_for(std::chrono::milliseconds(400));
		std::unique_lock<std::mutex> lock(g_DataBufferMutex);
		g_CondVar.wait(lock, [] { return !g_DataBuffer.empty() || g_ProducerDone.load(); });

		if (g_DataBuffer.empty() && g_ProducerDone.load())
			break;  // 如果队列为空且生产者已完成,则退出

		Task* data = g_DataBuffer.front();
		data->Run();
		g_DataBuffer.pop();
		delete data;

		std::cout << "Consumed one task, remaining tasks in queue: " << g_DataBuffer.size() << std::endl;
	}
}

int main()
{
	std::thread p(ProduceData);
	std::thread c(ConsumeData);

	// 等待生产者线程和消费者线程完成
	p.join();
	c.join();

	return 0;
}

运行截图

总结

写这篇文章时,碰了不少壁,线程之间的时序问题相当不熟练以至于出现bug时,会导致我难以定位,最后只能妥协用一些较为简单的代码代替感觉有问题的地方。

虽然简单的实现了一个多线程的生产者消费者模型,但缺点不少,等以后碰到具体的应用场景时,再来完善其内容吧。

0 人点赞