1.关于模型的理解
消费者定期去超市买东西,买完在拿回来,即消费行为 供货商作为生产者,由供货商把商品生产到超市
为什么会存在超市?
消费者有可能去购买时,供货商当前并没有进行生产活动 假设要一根火腿肠,供货商不可能将机器全启动进行生产
消费者需求特别零散,供货商生产能力很强,但要考虑成本问题 所以需要超市这种零售行业,超市的存在使生产者和消费者的效率提高了
供货商可以集中生产的一大批的商品 放到超市中,让消费者随时随地来买,供货商就不生产了
因为超市的存在,允许生产和消费步调不一致
在计算机中,生产者和消费者代表线程 超市可以看作是 特定的缓冲区 生产者把自己的数据交给超市,再由消费者把数据取走 ,这种工作模式即 生产者 消费者模型
基于 生产者 消费者模型,来完成线程之间的通信
想要使用交易场所,前提是交易场所必须先被生产者和消费者线程看到 注定了 ,交易场所一定是会被多线程并发访问的公共区域, 多线程一定要保护共享资源的安全,要维护线程互斥与同步的关系
如何维护线程互斥与同步?
生产消费模型 角色之间的关系
1.生产者和生产者 生产者和生产者 为互斥关系 假设两者都要生产火腿肠,当生产者1正在生产时,生产者2也要生产就不可以
2.消费者和消费者 消费者和消费者 为 互斥关系 v假设超市货架上只有一根火腿肠了,有两个人都看上了这根火腿肠,此时两者就为竞争关系
3.生产者和消费者
生产和消费 拥有 同步关系 需要生产是先生产,需要消费是先消费 如:若超市火腿肠满了,就应该让消费者先消费,若超市没有火腿肠了,就应该让生产者先生产
生产和消费 拥有 互斥关系 假设你想在超市买一根火腿肠,正好来一个供货商 你想要在货架上拿火腿肠,供货商想要把火腿肠放到货架上,两者处于竞争状态
2. 交易场所的设计
基于阻塞队列的生产者消费者模型
当队列为空时,从队列获取的元素的操作就会被阻塞,直到队列中被放入元素 当队列满时,队列里存放元素的操作也会被阻塞,直到元素被从队列中取出
具体实现
主函数的实现
交易场所为 阻塞队列,将模板参数定义为int,并且在堆上开辟一块空间 创建两个线程,分别为生产者和消费者, 通过调用自定义函数 consumer 执行消费任务,调用自定义函数 productor 执行生产任务
通过pthread_create ,将bq作为回调函数的参数 args ,使生产者和消费者线程看到同一个阻塞队列
productor 执行生产任务,先从某种渠道获取数据,这里使用随机数作为数据 再把数据放入 blockqueue交易场所中 ,调用blockqueue中的push
consumer 执行消费任务,先把数据从blockqueue中获取,调用blockqueue的pop 再结合业务逻辑,处理数据
BlockQueue类的实现
阻塞队列作为交易场所,有可能被多线程并发访问, 所以为了保证共享资源的安全,所以在内部添加锁
若队列中没有数据存在,则不该让消费者消费,若队列中数据满了,不该让生产者进行生产 但是并不知道什么时候队列为空,什么时候阻塞队列为满,从而产生饥饿问题 (不断加锁 解锁 使别人无法申请锁 ,进而无法访问临界资源) 所以也要加上条件变量
为了保证生产者和消费者互相等待,所以设置两个条件变量 consumercond 作为消费者对应的条件变量,当队列为空时,进行等待 productorcond 作为生产者对应的条件变量,当队列为满时,进行等待
push ——生产
将数据推送到lblockqueue中,调用对应BlockQueue类中的push
通过条件判断,由于队列满了,就需要当前线程进行等待 ,并自动释放锁 若队列不为满,则插入数据 关于 为什么要在申请锁之后判断 以及 wait函数第二个参数要带锁 上一篇文章都有提到,点击查看:条件变量的理解
发生休眠实际上就是线程切换,当线程从休眠状态被唤醒时,因为是从临界区被切走的,所以继续从临界区内部执行 被唤醒时,pthread_cond_wait函数处向后执行,同时又要重新申请锁,申请成功才会彻底返回
push后队列中至少有一个数据存在,所以唤醒消费者线程
pop——消费
从blockqueue中获取数据,调用对应BlockQueue类中的pop
通过条件判断,由于队列空了,就需要当前线程进行等待 ,并自动释放锁 若不为空,则删除队列数据
pop后队列中至少有一个位置为空,所以唤醒生产者
细节问题
误唤醒
假设有1个消费者以及5个的生产者 当消费者pop数据后节省出1个空间 ,错误的使用pthread_cond_broadcast 将生产者线程全部唤醒 就导致 5个生产者push 5个数据 , 但是push 需要5个空间,而现在只有1个空间,就会超过队列的容量上限
针对上述情况,将if判断改为while循环,每一次被唤醒都要被检测,只有当队列真的是不满的情况,才会进行push
效率高 体现在哪里?
由于是持有锁生产的,所以生产时是不能进行消费的
当消费者在交易场所拿到数据后正在处理时,生产者可以不断的把数据放到交易场所里 处理数据和生产行为 是 并行的
当消费者从交易场所拿数据时,生产者可能不断从网络或者系统中拿数据 生产者在拿数据的过程中,并不影响消费者进行消费 ,两者同样是并行的
完整代码
blockQueue.hpp
代码语言:javascript复制#pragma once
#include<iostream>
#include<pthread.h>
#include<queue>
using namespace std;
const int gcap=5;
template<class T>
class BlockQueue //阻塞队列
{
public:
BlockQueue(const int cap=gcap)
:_cap(cap)
{
pthread_mutex_init(&_mutex,nullptr);
pthread_cond_init(&_consumercond,nullptr);
pthread_cond_init(&_productorcond,nullptr);
}
~BlockQueue()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_consumercond);
pthread_cond_destroy(&_productorcond);
}
void push(const T&in)
{
pthread_mutex_lock(&_mutex);
//若队列满了,就需要在条件变量中等待,并自动释放锁
while(_q.size()==_cap)
{
pthread_cond_wait(&_productorcond,&_mutex);
}
_q.push(in);
//队列中至少有一个数据 所以 唤醒消费者
pthread_cond_signal(&_consumercond);
pthread_mutex_unlock(&_mutex);
}
void pop( T*out)
{
pthread_mutex_lock(&_mutex);
//若队列为空,则需要在条件变量中等待,并自动释放锁
while(_q.empty())
{
pthread_cond_wait(&_consumercond,&_mutex);
}
*out=_q.front();
_q.pop();
//队列中至少有一个空位置 所以唤醒生产者
pthread_cond_signal(&_productorcond);
pthread_mutex_unlock(&_mutex);
}
private:
queue<T> _q;
int _cap;//容量
pthread_mutex_t _mutex;//锁 提供对队列的保护
pthread_cond_t _consumercond;//消费者对应的条件变量
pthread_cond_t _productorcond;//生产者对应的条件变量
};
makefile
代码语言:javascript复制cp:main.cc
g -o $@ $^ -std=c 11 -lpthread
./PHONY:clean
clean:
rm -f cp
main.cc
代码语言:javascript复制#include"blockQueue.hpp"
#include<unistd.h>
#include<ctime>
//消费者执行消费任务
void*consumer(void*args)
{
BlockQueue<int>*bq=(BlockQueue<int>*)args;
while(true)
{
sleep(1);
int data=0;
//1.将数据从blockqueue中获取 --获取到了数据
bq->pop(&data);
//2.结合某种业务逻辑,处理数据
cout<<"consumer data :"<<data<<endl;
}
}
//生产者执行生产任务
void*productor(void*args)
{
BlockQueue<int>*bq=(BlockQueue<int>*)args;
while(true)
{
//1.通过某种渠道获取数据
int data=rand() 1;//1-10
//2.将数据推送到blockqueue中 --- 完成生产的任务
bq->push(data);
cout<<"productor data :"<<data<<endl;
}
}
int main()
{
srand((uint64_t)time(nullptr)^ getpid());//随机数
BlockQueue<int> *bq= new BlockQueue<int>;
//单生产和单消费
pthread_t c,p;
pthread_create(&c,nullptr,consumer,bq);//消费者
pthread_create(&p,nullptr,productor,bq);//生产者
pthread_join(c,nullptr);
pthread_join(p,nullptr);
delete bq;
return 0;
}