文章目录
- 引言
- 理解生产者消费者模型
- 基于BlockingQueue的生产者消费者模型
- 单生产,单消费模型
- 多生产、多消费模型
引言
生产者消费者模型一般可以在超市中听到,例如如下是一个专门卖方便面的超市,这个超市有自己供应商,也有客户来买,客户称之为消费者。超市起到一个缓存作用,供应商放假的时候,短时间内超市依然有对应的商品,消费者依然可以消费;相同的,如果短时间内消费者不来买东西,供应商依然可以供应给超市。也就是说,供应商生产产品比较慢,可以先生成一批产品放在超市中;供应商如果供应比较快,可以等消费者消费一段时间再去供应产品,协调忙线不均。现实生活中,在人口密集的地方肯定会有超市,生产者消费者模型效率高,有了超市这个巨大的缓存,可以使得消费者和生产者并发起来。
个别消费者不想买方便面不会影响到供应商,个别供应商出现了问题,不会影响消费者买方便面,这就做到了生产者和消费者的解耦。
理解生产者消费者模型
上述例子对应到计算机中,供应商和消费者就是线程,超市是一段内存空间,方便面是数据。生产线程将数据交到一段内存空间中,消费线程从内存空间中将数据拿走。
“321原则”:
- 一个交易场所(特定数据结构的形式存在的一段内存空间)
- 两种角色:生产者、消费者,也就是生产线程和消费线程
- 三种关系:生产和生产(互斥关系)、消费和消费(互斥关系)、生产和消费(互斥关系、同步关系)
实现生产者消费者模型本质就是通过代码实现“321原则”,用锁和条件变量(或者其他形式)来实现三种关系。
基于BlockingQueue的生产者消费者模型
在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)
单生产,单消费模型
代码语言:javascript复制//BlockQueue.hpp
#pragma once
#include<iostream>
#include<string>
#include<queue>
#include<pthread.h>
const static int defaultcap=5;
template<typename T>
class BlockQueue
{
private:
bool isFull()
{
return _block_queue.size()==_max_cap;
}
bool isEmpty()
{
return _block_queue.empty();
}
public:
BlockQueue(int cap= defaultcap):_max_cap(cap)
{
pthread_mutex_init(&_mutex,nullptr);
pthread_cond_init(&_P_cond,nullptr);
pthread_cond_init(&_C_cond,nullptr);
}
void Pop(T *out)
{
pthread_mutex_lock(&_mutex);
while(isEmpty())
{
pthread_cond_wait(&_C_cond,&_mutex);
}
*out=_block_queue.front();
_block_queue.pop();
pthread_mutex_unlock(&_mutex);
pthread_cond_signal(&_P_cond); //唤醒生产者
}
void Equeue(const T &in)
{
pthread_mutex_lock(&_mutex);
while(isFull()) //阻塞队列满
{
//满了生产者不能再生产,必须等待
pthread_cond_wait(&_P_cond,&_mutex); //被调用的时候,除了让自己继续排队等待,还会释放自己传递的锁
//函数返回时,会返回在临界区,必须先参与锁的竞争,重新加上锁,该函数才会返回,依然是持有锁的状态
}
//阻塞队列未满或者被唤醒
_block_queue.push(in); //生产数据到阻塞队列
pthread_mutex_unlock(&_mutex);
pthread_cond_signal(&_C_cond); //唤醒消费者
}
~BlockQueue()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_P_cond);
pthread_cond_destroy(&_C_cond);
}
private:
std::queue<T> _block_queue; //临界资源
int _max_cap;
pthread_mutex_t _mutex;
pthread_cond_t _P_cond; //生产者条件变量
pthread_cond_t _C_cond; //消费者条件变量
};
Pop
函数:从队列中取出元素,并将其存储在 out
指针指向的地址中。步骤如下:
- 锁定互斥量:通过
pthread_mutex_lock(&_mutex)
确保对队列的操作是线程安全的。 - 等待条件变量:如果队列为空,使用
pthread_cond_wait(&_C_cond, &_mutex)
等待消费者条件变量被信号唤醒。 - 取出元素:从队列中取出前面的元素,并将其弹出。
- 解锁互斥量:通过
pthread_mutex_unlock(&_mutex)
解锁。 - 唤醒生产者:使用
pthread_cond_signal(&_P_cond)
唤醒可能被阻塞的生产者线程。
Equeue
函数:将元素 in
插入队列。步骤如下:
- 锁定互斥量:通过
pthread_mutex_lock(&_mutex)
确保对队列的操作是线程安全的。 - 等待条件变量:如果队列已满,使用
pthread_cond_wait(&_P_cond, &_mutex)
等待生产者条件变量被信号唤醒。 - 插入元素:将新元素插入到队列中。
- 解锁互斥量:通过
pthread_mutex_unlock(&_mutex)
解锁。 - 唤醒消费者:使用
pthread_cond_signal(&_C_cond)
唤醒可能被阻塞的消费者线程。
为了体现阻塞队列的特点,分别设计了两种测试代码:
- 生产一个,消费一个
#include"BlockQueue.hpp"
#include<ctime>
#include<pthread.h>
#include<unistd.h>
void *Consumer(void *args)
{
BlockQueue<int> *bq=static_cast<BlockQueue<int> *>(args);
while(true)
{
//获取数据
int data=0;
bq->Pop(&data);
//处理数据
std::cout<<"Coumer -> "<<data<<std::endl;
}
}
void *Productor(void *args)
{
srand(time(nullptr)^getpid());
BlockQueue<int> *bq=static_cast<BlockQueue<int> *>(args);
while(true)
{
sleep(2);
//构建数据
int data=rand() 1; // [1,10]
//生产数据
bq->Equeue(data);
std::cout<<"Productor -> "<<data<<std::endl;
}
}
int main()
{
BlockQueue<int> *bq=new BlockQueue<int>();
pthread_t c,p;
pthread_create(&c,nullptr,Consumer,bq);
pthread_create(&p,nullptr,Productor,bq);
pthread_join(c,nullptr);
pthread_join(p,nullptr);
return 0;
}
- 先生产一批数据,直到队列开始阻塞,然后消费一个,生产一个
#include"BlockQueue.hpp"
#include<ctime>
#include<pthread.h>
#include<unistd.h>
void *Consumer(void *args)
{
BlockQueue<int> *bq=static_cast<BlockQueue<int> *>(args);
while(true)
{
sleep(2);
//获取数据
int data=0;
bq->Pop(&data);
//处理数据
std::cout<<"Coumer -> "<<data<<std::endl;
}
}
void *Productor(void *args)
{
srand(time(nullptr)^getpid());
BlockQueue<int> *bq=static_cast<BlockQueue<int> *>(args);
while(true)
{
//构建数据
int data=rand() 1; // [1,10]
//生产数据
bq->Equeue(data);
std::cout<<"Productor -> "<<data<<std::endl;
}
}
int main()
{
BlockQueue<int> *bq=new BlockQueue<int>();
pthread_t c,p;
pthread_create(&c,nullptr,Consumer,bq);
pthread_create(&p,nullptr,Productor,bq);
pthread_join(c,nullptr);
pthread_join(p,nullptr);
return 0;
}
上述测试代码是传递一个int
类型的数据到阻塞队列中,也可以传递其他类型,在传递struct
或者class
类型时,可以封装成一个个的任务传递到阻塞队列中。
- 传递任务:
//Task.hpp
#pragma once
#include<iostream>
#include<string>
class Task
{
public:
Task()
{}
Task(int x,int y):_x(x),_y(y)
{}
void Excute()
{
_result=_x _y;
}
std::string debug()
{
std::string msg=std::to_string(_x) " " std::to_string(_y) "=?";
return msg;
}
std::string result()
{
std::string msg=std::to_string(_x) " " std::to_string(_y) "=" std::to_string(_result);
return msg;
}
private:
int _x;
int _y;
int _result;
};
代码语言:javascript复制#include"BlockQueue.hpp"
#include"Task.hpp"
#include<ctime>
#include<pthread.h>
#include<unistd.h>
void *Consumer(void *args)
{
BlockQueue<Task> *bq=static_cast<BlockQueue<Task> *>(args);
while(true)
{
sleep(2);
//获取数据
Task t;
bq->Pop(&t);
// bq->Pop(&data);
//处理数据
t.Excute();
std::cout<<"Coumer -> "<<t.result()<<std::endl;
}
}
void *Productor(void *args)
{
srand(time(nullptr)^getpid());
BlockQueue<Task> *bq=static_cast<BlockQueue<Task> *>(args);
while(true)
{
//构建数据
int x=rand() 1;
usleep(x*1000);
int y=rand() 1;
Task t(x,y);
//生产数据
bq->Equeue(t);
std::cout<<"Productor -> "<<t.debug()<<std::endl;
}
}
int main()
{
BlockQueue<Task> *bq=new BlockQueue<Task>();
pthread_t c,p;
pthread_create(&c,nullptr,Consumer,bq);
pthread_create(&p,nullptr,Productor,bq);
pthread_join(c,nullptr);
pthread_join(p,nullptr);
return 0;
}
- 传递函数任务:
//Task.hpp
#pragma once
#include<iostream>
#include<string>
#include<functional>
using task_t=std::function<void()>;
void Download()
{
std::cout<<"I am Download task"<<std::endl;
}
代码语言:javascript复制//main.cc
#include"BlockQueue.hpp"
#include"Task.hpp"
#include<ctime>
#include<pthread.h>
#include<unistd.h>
void *Consumer(void *args)
{
BlockQueue<task_t> *bq=static_cast<BlockQueue<task_t> *>(args);
while(true)
{
sleep(2);
//获取数据
task_t t;
bq->Pop(&t);
//处理数据
t();
}
}
void *Productor(void *args)
{
srand(time(nullptr)^getpid());
BlockQueue<task_t> *bq=static_cast<BlockQueue<task_t> *>(args);
while(true)
{
bq->Equeue(Download);
std::cout<<"Productor -> Download "<<std::endl;
}
}
int main()
{
BlockQueue<task_t> *bq=new BlockQueue<task_t>();
pthread_t c,p;
pthread_create(&c,nullptr,Consumer,bq);
pthread_create(&p,nullptr,Productor,bq);
pthread_join(c,nullptr);
pthread_join(p,nullptr);
return 0;
}
多生产、多消费模型
创建两个消费者线程 c1
和 c2
,它们会并行地从队列中取出任务并处理。创建三个生产者线程 p1
、p2
和 p3
,它们会并行地将任务放入队列中。
//main.cc
#include"BlockQueue.hpp"
#include"Task.hpp"
#include<ctime>
#include<pthread.h>
#include<unistd.h>
void *Consumer(void *args)
{
BlockQueue<task_t> *bq=static_cast<BlockQueue<task_t> *>(args);
while(true)
{
//获取数据
task_t t;
bq->Pop(&t);
t();
}
}
void *Productor(void *args)
{
srand(time(nullptr)^getpid());
BlockQueue<task_t> *bq=static_cast<BlockQueue<task_t> *>(args);
while(true)
{
bq->Equeue(Download);
std::cout<<"Productor -> Download "<<std::endl;
sleep(1);
}
}
int main()
{
BlockQueue<task_t> *bq=new BlockQueue<task_t>();
pthread_t c1,c2,p1,p2,p3;
pthread_create(&c1,nullptr,Consumer,bq);
pthread_create(&c2,nullptr,Consumer,bq);
pthread_create(&p1,nullptr,Productor,bq);
pthread_create(&p1,nullptr,Productor,bq);
pthread_create(&p3,nullptr,Productor,bq);
pthread_join(c1,nullptr);
pthread_join(c2,nullptr);
pthread_join(p1,nullptr);
pthread_join(p2,nullptr);
pthread_join(p3,nullptr);
return 0;
}