C++ 实现多线程生产者消费者模式

2023-03-24 09:08:39 浏览数 (1)

之前介绍过 生产者、消费者模式,是一种常用的多线程并发设计模式,本文记录 C 实现的过程。

生产者消费者模式

生产者消费者问题(英语:Producer-consumer problem),也称有限缓冲问题(英语:Bounded-buffer problem),是一个多线程同步问题的经典案例。该问题描述了两个共享固定大小缓冲区的线程——即所谓的“生产者”和“消费者”——在实际运行时会发生的问题。

生产者的主要作用是生成一定量的数据放到缓冲区中,然后重复此过程。与此同时,消费者也在缓冲区消耗这些数据。

该问题的关键就是要保证生产者不会在缓冲区满时加入数据,消费者也不会在缓冲区中空时消耗数据。

根据生产者和消费者数量的多少,程序复杂程度也不同,可以分为 :单生产者-单消费者模型单生产者-多消费者模型多生产者-单消费者模型多生产者-多消费者模型

单生产者-单消费者模型

单生产者-单消费者模型中只有一个生产者和一个消费者,生产者不停地往产品库中放入产品,消费者则从产品库中取走产品,产品库容积有限制,只能容纳一定数目的产品,如果生产者生产产品的速度过快,则需要等待消费者取走产品之后,产品库不为空才能继续往产品库中放置新的产品,相反,如果消费者取走产品的速度过快,则可能面临产品库中没有产品可使用的情况,此时需要等待生产者放入一个产品后,消费者才能继续工作。

  • C 11 实现单生产者单消费者模型的代码如下:
代码语言:javascript复制
#include <iostream>
#include <condition_variable>
#include <mutex>
#include <thread>
 
static const int repository_size = 10;//循环队列的大小
static const int item_total = 20;//要生产的产品数目
 
std::mutex mtx;//互斥量,保护产品缓冲区
 
std::condition_variable repo_not_full;//条件变量指示产品缓冲区不满
std::condition_variable repo_not_empty;//条件变量指示产品缓冲区不为空,就是缓冲区有产品
 
int item_buffer[repository_size];
 
static std::size_t read_position = 0;//消费者读取产品的位置
static std::size_t write_position = 0;//生产者写入产品的位置
 
std::chrono::seconds t(1);//a  new feature of c   11 standard
 
void produce_item(int i)
{
	std::unique_lock<std::mutex> lck(mtx);
	while (((write_position   1) % repository_size) == read_position)
	{
		std::cout << "Producer is waiting for an empty slot..." << std::endl;
		repo_not_full.wait(lck);// 生产者等待"产品库缓冲区不为满"这一条件发生.
	}                            //当缓冲区满了之后我们就不能添加产品了
 
	item_buffer[write_position] = i;//写入产品
	write_position  ;
 
	if (write_position == repository_size)//写入的位置如果在队列最后则重新设置
	{
		write_position = 0;
	}
 
	repo_not_empty.notify_all();//通知消费者产品库不为空
 
	//lck.unlock();//解锁
}
 
int consume_item()
{
	int data;
	std::unique_lock<std::mutex> lck(mtx);
	while (write_position == read_position)
	{
		std::cout << "Consumer is waiting for items..." << std::endl;
		repo_not_empty.wait(lck);// 消费者等待"产品库缓冲区不为空"这一条件发生.
	}                            
 
	data = item_buffer[read_position];//读取产品
	read_position  ;
 
	if (read_position >= repository_size)
	{
		read_position = 0;
	}
		
	repo_not_full.notify_all();//通知产品库不满
	//lck.unlock();
 
	return data;
}
 
void Producer_thread()
{
	for (int i = 1; i <= item_total;   i)
	{
		//std::this_thread::sleep_for(t);
		std::cout << "生产者生产第" << i  << "个产品" << std::endl;
		produce_item(i);
	}
}
 
void Consumer_thread()
{
	static int cnt = 0;
	while (1)
	{
		//std::this_thread::sleep_for(t);
		int item = consume_item();
		std::cout << "消费者消费第" << item << "个产品" << std::endl;
 
		if (  cnt == item_total)
			break;
	}
}
 
int main()
{
	std::thread producer(Producer_thread); // 创建生产者线程.
	std::thread consumer(Consumer_thread); // 创建消费之线程.
	producer.join();
	consumer.join();
}

单生产者-多消费者模型

与单生产者和单消费者模型不同的是,单生产者-多消费者模型中可以允许多个消费者同时从产品库中取走产品。所以除了保护产品库在多个读写线程下互斥之外,还需要维护消费者取走产品的计数器。

  • 代码如下:
代码语言:javascript复制
#include <iostream>
#include <condition_variable>
#include <mutex>
#include <thread>
#include <vector>
static const int repository_size = 10;//循环队列的大小
static const int item_total = 20;//要生产的产品数目
 
std::mutex mtx;//互斥量,保护产品缓冲区
std::mutex mtx_counter;//互斥量,保护产品计数器
 
std::condition_variable repo_not_full;//条件变量指示产品缓冲区不满
std::condition_variable repo_not_empty;//条件变量指示产品缓冲区不为空,就是缓冲区有产品
 
int item_buffer[repository_size];//产品缓冲区,这里使用了一个循环队列
 
static std::size_t read_position = 0;//消费者读取产品的位置
static std::size_t write_position = 0;//生产者写入产品的位置
 
static std::size_t item_counter = 0;//消费者消费产品计数器
 
std::chrono::seconds t(1);//a  new feature of c   11 standard
 
void produce_item(int i)
{
	std::unique_lock<std::mutex> lck(mtx);
	//item buffer is full, just wait here.
	while (((write_position   1) % repository_size) == read_position)
	{
		std::cout << "Producer is waiting for an empty slot..." << std::endl;
		repo_not_full.wait(lck);// 生产者等待"产品库缓冲区不为满"这一条件发生.
	}                            //当缓冲区满了之后我们就不能添加产品了
 
	item_buffer[write_position] = i;//写入产品
	write_position  ;
 
	if (write_position == repository_size)//写入的位置如果在队列最后则重新设置
	{
		write_position = 0;
	}
 
	repo_not_empty.notify_all();//通知消费者产品库不为空
 
	lck.unlock();//解锁
}
 
int consume_item()
{
	int data;
	std::unique_lock<std::mutex> lck(mtx);
	// item buffer is empty, just wait here.
	while (write_position == read_position)
	{
		std::cout << "Consumer is waiting for items..." << std::endl;
		repo_not_empty.wait(lck);// 消费者等待"产品库缓冲区不为空"这一条件发生.
	}
 
	data = item_buffer[read_position];//读取产品
	read_position  ;
 
	if (read_position >= repository_size)
	{
		read_position = 0;
	}
 
	repo_not_full.notify_all();//通知产品库不满
	lck.unlock();
 
	return data;
}
 
void Producer_thread()
{
	for (int i = 1; i <= item_total;   i)
	{
		//std::this_thread::sleep_for(t);
		std::cout << "生产者生产第" << i << "个产品" << std::endl;
		produce_item(i);
	}
}
 
void Consumer_thread()
{
	bool read_to_exit = false;
	while (1)
	{
		std::this_thread::sleep_for(t);
		std::unique_lock<std::mutex> lck(mtx_counter);
		if (item_counter < item_total)
		{
			int item = consume_item();
			  item_counter;
			std::cout << "消费者线程" << std::this_thread::get_id()
				<< "消费第" << item << "个产品" << std::endl;
		}
		else
		{
			read_to_exit = true;
		}
		if (read_to_exit == true)
			break;
	}
 
	std::cout << "Consumer thread " << std::this_thread::get_id()
		<< " is exiting..." << std::endl;
 
}
 
int main()
{
	std::thread producer(Producer_thread); // 创建生产者线程.
	std::vector<std::thread> thread_vector;
	for (int i = 0; i != 5;   i)
	{
		thread_vector.push_back(std::thread(Consumer_thread));// 创建消费者线程.
	}
	
	producer.join();
	for (auto &thr : thread_vector)
	{
		thr.join();
	}
 
}

多生产者-单消费者模型

与单生产者和单消费者模型不同的是,多生产者-单消费者模型中可以允许多个生产者同时向产品库中放入产品。所以除了保护产品库在多个读写线程下互斥之外,还需要维护生产者放入产品的计数器。

  • 代码如下:
代码语言:javascript复制
#include <iostream>
#include <condition_variable>
#include <mutex>
#include <thread>
#include <vector>
 
static const int repository_size = 10;//循环队列的大小
static const int item_total = 20;//要生产的产品数目
 
std::mutex mtx;//互斥量,保护产品缓冲区
std::mutex mtx_counter;
 
std::condition_variable repo_not_full;//条件变量指示产品缓冲区不满
std::condition_variable repo_not_empty;//条件变量指示产品缓冲区不为空,就是缓冲区有产品
 
int item_buffer[repository_size];//产品缓冲区,这里使用了一个循环队列
 
static std::size_t read_position = 0;//消费者读取产品的位置
static std::size_t write_position = 0;//生产者写入产品的位置
 
static std::size_t item_counter = 0;//计数器
 
std::chrono::seconds t(1);//a  new feature of c   11 standard
 
void produce_item(int i)
{
	std::unique_lock<std::mutex> lck(mtx);
	// item buffer is full, just wait here.
	while (((write_position   1) % repository_size) == read_position)
	{
		std::cout << "Producer is waiting for an empty slot..." << std::endl;
		repo_not_full.wait(lck);// 生产者等待"产品库缓冲区不为满"这一条件发生.
	}                            //当缓冲区满了之后我们就不能添加产品了
 
	item_buffer[write_position] = i;//写入产品
	write_position  ;
 
	if (write_position == repository_size)//写入的位置如果在队列最后则重新设置
	{
		write_position = 0;
	}
 
	repo_not_empty.notify_all();//通知消费者产品库不为空
 
	lck.unlock();//解锁
}
 
int consume_item()
{
	int data;
	std::unique_lock<std::mutex> lck(mtx);
	// item buffer is empty, just wait here.
	while (write_position == read_position)
	{
		std::cout << "Consumer is waiting for items..." << std::endl;
		repo_not_empty.wait(lck);// 消费者等待"产品库缓冲区不为空"这一条件发生.
	}
 
	data = item_buffer[read_position];//读取产品
	read_position  ;
 
	if (read_position >= repository_size)
	{
		read_position = 0;
	}
 
	repo_not_full.notify_all();//通知产品库不满
	lck.unlock();
 
	return data;
}
 
void Producer_thread()
{
	bool read_to_exit = false;
	while (1)
	{
		std::unique_lock<std::mutex> lck(mtx_counter);
		if (item_counter < item_total)
		{
			  item_counter;
			produce_item(item_counter);
			std::cout << "生产者线程 " << std::this_thread::get_id()
				<< "生产第  " << item_counter << "个产品" << std::endl;
		}
		else
		{
			read_to_exit = true;
		}
 
		if (read_to_exit == true)
			break;
	}
 
	std::cout << "Producer thread " << std::this_thread::get_id()
		<< " is exiting..." << std::endl;
 
}
 
void Consumer_thread()
{
	static int cnt = 0;
	while (1)
	{
		std::this_thread::sleep_for(t);
		int item = consume_item();
		std::cout << "消费者消费第" << item << "个产品" << std::endl;
 
		if (  cnt == item_total)
			break;
	}
}
 
int main()
{
	std::vector<std::thread> thread_vector;
	for (int i = 0; i != 5;   i)
	{
		thread_vector.push_back(std::thread(Producer_thread));// 创建消费者线程.
	}
 
	std::thread consumer(Consumer_thread); // 创建消费之线程.
 
	for (auto &thr : thread_vector)
	{
		thr.join();
	}
 
	consumer.join();
}

多生产者-多消费者模型

该模型可以说是前面两种模型的综合,程序需要维护两个计数器,分别是生产者已生产产品的数目和消费者已取走产品的数目。另外也需要保护产品库在多个生产者和多个消费者互斥地访问。

  • 示例代码:
代码语言:javascript复制
#include <iostream>
#include <condition_variable>
#include <mutex>
#include <thread>
#include <vector>
 
static const int repository_size = 10;//循环队列的大小
static const int item_total = 20;//要生产的产品数目
 
std::mutex mtx;//互斥量,保护产品缓冲区
std::mutex producer_count_mtx;
std::mutex consumer_count_mtx;
 
std::condition_variable repo_not_full;//条件变量指示产品缓冲区不满
std::condition_variable repo_not_empty;//条件变量指示产品缓冲区不为空,就是缓冲区有产品
 
int item_buffer[repository_size];//产品缓冲区,这里使用了一个循环队列
 
static std::size_t read_position = 0;//消费者读取产品的位置
static std::size_t write_position = 0;//生产者写入产品的位置
 
static size_t produced_item_counter = 0;
static size_t consumed_item_counter = 0;
 
std::chrono::seconds t(1);//a  new feature of c   11 standard
std::chrono::microseconds t1(1000);
 
void produce_item(int i)
{
	std::unique_lock<std::mutex> lck(mtx);
	// item buffer is full, just wait here.
	while (((write_position   1) % repository_size) == read_position)
	{
		std::cout << "Producer is waiting for an empty slot..." << std::endl;
		repo_not_full.wait(lck);// 生产者等待"产品库缓冲区不为满"这一条件发生.
	}                            //当缓冲区满了之后我们就不能添加产品了
 
	item_buffer[write_position] = i;//写入产品
	write_position  ;
 
	if (write_position == repository_size)//写入的位置如果在队列最后则重新设置
	{
		write_position = 0;
	}
 
	repo_not_empty.notify_all();//通知消费者产品库不为空
 
	lck.unlock();//解锁
}
 
int consume_item()
{
	int data;
	std::unique_lock<std::mutex> lck(mtx);
	// item buffer is empty, just wait here.
	while (write_position == read_position)
	{
		std::cout << "Consumer is waiting for items..." << std::endl;
		repo_not_empty.wait(lck);// 消费者等待"产品库缓冲区不为空"这一条件发生.
	}
 
	data = item_buffer[read_position];//读取产品
	read_position  ;
 
	if (read_position >= repository_size)
	{
		read_position = 0;
	}
 
	repo_not_full.notify_all();//通知产品库不满
	lck.unlock();
 
	return data;
}
 
void Producer_thread()
{
	bool ready_to_exit = false;
	while (1)
	{
		//std::this_thread::sleep_for(t);
		std::unique_lock<std::mutex> lock(producer_count_mtx);
		if (produced_item_counter < item_total)
		{
			  produced_item_counter;
			produce_item(produced_item_counter);
			std::cout << "生产者线程 " << std::this_thread::get_id()
				<< "生产第  " << produced_item_counter << "个产品" << std::endl;
		}
		else
		{
			ready_to_exit = true;
		}
 
		lock.unlock();
 
		if (ready_to_exit == true)
		{
			break;
		}
	}
 
	std::cout << "Producer thread " << std::this_thread::get_id()
		<< " is exiting..." << std::endl;
}
 
void Consumer_thread()
{
	bool read_to_exit = false;
	while (1)
	{
		std::this_thread::sleep_for(t1);
		std::unique_lock<std::mutex> lck(consumer_count_mtx);
		if (consumed_item_counter < item_total)
		{
			int item = consume_item();
			  consumed_item_counter;
			std::cout << "消费者线程" << std::this_thread::get_id()
				<< "消费第" << item << "个产品" << std::endl;
		}
		else
		{
			read_to_exit = true;
		}
 
		if (read_to_exit == true)
		{
			break;
		}
	}
 
	std::cout << "Consumer thread " << std::this_thread::get_id()
		<< " is exiting..." << std::endl;
}
 
int main()
{
	std::vector<std::thread> thread_vector1;
	std::vector<std::thread> thread_vector2;
	for (int i = 0; i != 5;   i)
	{
		thread_vector1.push_back(std::thread(Producer_thread));// 创建生产者线程.
		thread_vector2.push_back(std::thread(Consumer_thread));// 创建消费者线程.
 
	}
 
	for (auto &thr1 : thread_vector1)
	{
		thr1.join();
	}
 
	for (auto &thr2 : thread_vector2)
	{
		thr2.join();
	}
}

参考资料

  • https://www.zywvvd.com/notes/study/algorithm/queue/message-queue/message-queue/#生产者、消费者模式
  • https://blog.csdn.net/qq_45829112/article/details/123651735
  • https://blog.csdn.net/chenxun_2010/article/details/49848865

文章链接: https://cloud.tencent.com/developer/article/2245102

0 人点赞