基于BlockingQueue的生产者消费者模型

2024-08-24 13:49:13 浏览数 (1)

文章目录

  • 引言
  • 理解生产者消费者模型
  • 基于BlockingQueue的生产者消费者模型
    • 单生产,单消费模型
    • 多生产、多消费模型

引言

生产者消费者模型一般可以在超市中听到,例如如下是一个专门卖方便面的超市,这个超市有自己供应商,也有客户来买,客户称之为消费者。超市起到一个缓存作用,供应商放假的时候,短时间内超市依然有对应的商品,消费者依然可以消费;相同的,如果短时间内消费者不来买东西,供应商依然可以供应给超市。也就是说,供应商生产产品比较慢,可以先生成一批产品放在超市中;供应商如果供应比较快,可以等消费者消费一段时间再去供应产品,协调忙线不均。现实生活中,在人口密集的地方肯定会有超市,生产者消费者模型效率高,有了超市这个巨大的缓存,可以使得消费者和生产者并发起来。

个别消费者不想买方便面不会影响到供应商,个别供应商出现了问题,不会影响消费者买方便面,这就做到了生产者和消费者的解耦

理解生产者消费者模型

上述例子对应到计算机中,供应商和消费者就是线程,超市是一段内存空间,方便面是数据。生产线程将数据交到一段内存空间中,消费线程从内存空间中将数据拿走。

“321原则”:

  1. 一个交易场所(特定数据结构的形式存在的一段内存空间)
  2. 两种角色:生产者、消费者,也就是生产线程和消费线程
  3. 三种关系:生产和生产(互斥关系)、消费和消费(互斥关系)、生产和消费(互斥关系、同步关系)

实现生产者消费者模型本质就是通过代码实现“321原则”,用锁和条件变量(或者其他形式)来实现三种关系。

基于BlockingQueue的生产者消费者模型

在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)

单生产,单消费模型

代码语言:javascript复制
//BlockQueue.hpp
#pragma once

#include<iostream>
#include<string>
#include<queue>
#include<pthread.h>

const static int defaultcap=5;

template<typename T>
class BlockQueue
{
private:
    bool isFull()
    {
        return _block_queue.size()==_max_cap;
    }
    
    bool isEmpty()
    {
        return _block_queue.empty();
    }

public:
    BlockQueue(int cap= defaultcap):_max_cap(cap)
    {
        pthread_mutex_init(&_mutex,nullptr);
        pthread_cond_init(&_P_cond,nullptr);
        pthread_cond_init(&_C_cond,nullptr);
    }
    void Pop(T *out)
    {
        pthread_mutex_lock(&_mutex);

        while(isEmpty())
        {
            pthread_cond_wait(&_C_cond,&_mutex);
        }

        *out=_block_queue.front();
        _block_queue.pop();

        pthread_mutex_unlock(&_mutex);

        pthread_cond_signal(&_P_cond);  //唤醒生产者
    }
    void Equeue(const T &in)
    {
        pthread_mutex_lock(&_mutex);

        while(isFull()) //阻塞队列满
        {
            //满了生产者不能再生产,必须等待
            pthread_cond_wait(&_P_cond,&_mutex); //被调用的时候,除了让自己继续排队等待,还会释放自己传递的锁
            //函数返回时,会返回在临界区,必须先参与锁的竞争,重新加上锁,该函数才会返回,依然是持有锁的状态

        }
        
        //阻塞队列未满或者被唤醒
        _block_queue.push(in);  //生产数据到阻塞队列
        
        pthread_mutex_unlock(&_mutex);

        pthread_cond_signal(&_C_cond);  //唤醒消费者
    }
    ~BlockQueue()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_P_cond);
        pthread_cond_destroy(&_C_cond);
    }

private:
    std::queue<T> _block_queue;  //临界资源
    int _max_cap;
    pthread_mutex_t _mutex;
    pthread_cond_t _P_cond;  //生产者条件变量
    pthread_cond_t _C_cond;  //消费者条件变量
};

Pop 函数:从队列中取出元素,并将其存储在 out 指针指向的地址中。步骤如下:

  • 锁定互斥量:通过 pthread_mutex_lock(&_mutex) 确保对队列的操作是线程安全的。
  • 等待条件变量:如果队列为空,使用 pthread_cond_wait(&_C_cond, &_mutex) 等待消费者条件变量被信号唤醒。
  • 取出元素:从队列中取出前面的元素,并将其弹出。
  • 解锁互斥量:通过 pthread_mutex_unlock(&_mutex) 解锁。
  • 唤醒生产者:使用 pthread_cond_signal(&_P_cond) 唤醒可能被阻塞的生产者线程。

Equeue 函数:将元素 in 插入队列。步骤如下:

  • 锁定互斥量:通过 pthread_mutex_lock(&_mutex) 确保对队列的操作是线程安全的。
  • 等待条件变量:如果队列已满,使用 pthread_cond_wait(&_P_cond, &_mutex) 等待生产者条件变量被信号唤醒。
  • 插入元素:将新元素插入到队列中。
  • 解锁互斥量:通过 pthread_mutex_unlock(&_mutex) 解锁。
  • 唤醒消费者:使用 pthread_cond_signal(&_C_cond) 唤醒可能被阻塞的消费者线程。

为了体现阻塞队列的特点,分别设计了两种测试代码:

  1. 生产一个,消费一个
代码语言:javascript复制
#include"BlockQueue.hpp"
#include<ctime>
#include<pthread.h>
#include<unistd.h>

void *Consumer(void *args)
{
    BlockQueue<int> *bq=static_cast<BlockQueue<int> *>(args);
    while(true)
    {
        //获取数据
        int data=0;
        bq->Pop(&data);
        //处理数据
        std::cout<<"Coumer -> "<<data<<std::endl;
    }
}

void *Productor(void *args)
{
    srand(time(nullptr)^getpid());
    BlockQueue<int> *bq=static_cast<BlockQueue<int> *>(args);
    while(true)
    {
        sleep(2);
        //构建数据
        int data=rand() 1;  // [1,10]
        //生产数据
        bq->Equeue(data);
        std::cout<<"Productor -> "<<data<<std::endl;
    }
}

int main()
{
    BlockQueue<int> *bq=new BlockQueue<int>();
    pthread_t c,p;
    pthread_create(&c,nullptr,Consumer,bq);
    pthread_create(&p,nullptr,Productor,bq);

    pthread_join(c,nullptr);
    pthread_join(p,nullptr);

    return 0;
}
  1. 先生产一批数据,直到队列开始阻塞,然后消费一个,生产一个
代码语言:javascript复制
#include"BlockQueue.hpp"
#include<ctime>
#include<pthread.h>
#include<unistd.h>

void *Consumer(void *args)
{
    BlockQueue<int> *bq=static_cast<BlockQueue<int> *>(args);
    while(true)
    {
        sleep(2);
        //获取数据
        int data=0;
        bq->Pop(&data);
        //处理数据
        std::cout<<"Coumer -> "<<data<<std::endl;
    }
}

void *Productor(void *args)
{
    srand(time(nullptr)^getpid());
    BlockQueue<int> *bq=static_cast<BlockQueue<int> *>(args);
    while(true)
    {
        //构建数据
        int data=rand() 1;  // [1,10]
        //生产数据
        bq->Equeue(data);
        std::cout<<"Productor -> "<<data<<std::endl;
    }
}

int main()
{
    BlockQueue<int> *bq=new BlockQueue<int>();
    pthread_t c,p;
    pthread_create(&c,nullptr,Consumer,bq);
    pthread_create(&p,nullptr,Productor,bq);

    pthread_join(c,nullptr);
    pthread_join(p,nullptr);

    return 0;
}

上述测试代码是传递一个int类型的数据到阻塞队列中,也可以传递其他类型,在传递struct或者class类型时,可以封装成一个个的任务传递到阻塞队列中。

  • 传递任务:
代码语言:javascript复制
//Task.hpp
#pragma once
#include<iostream>
#include<string>

class Task
{

public:
    Task()
    {}
    Task(int x,int y):_x(x),_y(y)
    {}

    void Excute()
    {
        _result=_x _y;
    }
    std::string debug()
    {
        std::string msg=std::to_string(_x) " " std::to_string(_y) "=?";
        return msg;
    }
    std::string result()
    {
        std::string msg=std::to_string(_x) " " std::to_string(_y) "=" std::to_string(_result);
        return msg;
    }

private:
    int _x;
    int _y;
    int _result;
};
代码语言:javascript复制
#include"BlockQueue.hpp"
#include"Task.hpp"
#include<ctime>
#include<pthread.h>
#include<unistd.h>

void *Consumer(void *args)
{
    BlockQueue<Task> *bq=static_cast<BlockQueue<Task> *>(args);
    while(true)
    {
        sleep(2);
        //获取数据
        Task t;
        bq->Pop(&t);
        // bq->Pop(&data);
        //处理数据
        t.Excute();
        std::cout<<"Coumer -> "<<t.result()<<std::endl;
    }
}

void *Productor(void *args)
{
    srand(time(nullptr)^getpid());
    BlockQueue<Task> *bq=static_cast<BlockQueue<Task> *>(args);
    while(true)
    {
        //构建数据
        int x=rand() 1;
        usleep(x*1000);
        int y=rand() 1;
        Task t(x,y);
        //生产数据
        bq->Equeue(t);
        std::cout<<"Productor -> "<<t.debug()<<std::endl;
    }
}

int main()
{
    BlockQueue<Task> *bq=new BlockQueue<Task>();
    pthread_t c,p;
    pthread_create(&c,nullptr,Consumer,bq);
    pthread_create(&p,nullptr,Productor,bq);

    pthread_join(c,nullptr);
    pthread_join(p,nullptr);

    return 0;
}
  • 传递函数任务:
代码语言:javascript复制
//Task.hpp
#pragma once
#include<iostream>
#include<string>
#include<functional>

using task_t=std::function<void()>;

void Download()
{
    std::cout<<"I am Download task"<<std::endl;
}
代码语言:javascript复制
//main.cc
#include"BlockQueue.hpp"
#include"Task.hpp"
#include<ctime>
#include<pthread.h>
#include<unistd.h>

void *Consumer(void *args)
{
    BlockQueue<task_t> *bq=static_cast<BlockQueue<task_t> *>(args);
    while(true)
    {
        sleep(2);
        //获取数据
        task_t t;
        bq->Pop(&t);
        //处理数据
        t();

    }
}

void *Productor(void *args)
{
    srand(time(nullptr)^getpid());
    BlockQueue<task_t> *bq=static_cast<BlockQueue<task_t> *>(args);
    while(true)
    {
        bq->Equeue(Download);
        std::cout<<"Productor -> Download "<<std::endl;
    }
}

int main()
{
    BlockQueue<task_t> *bq=new BlockQueue<task_t>();
    pthread_t c,p;
    pthread_create(&c,nullptr,Consumer,bq);
    pthread_create(&p,nullptr,Productor,bq);

    pthread_join(c,nullptr);
    pthread_join(p,nullptr);

    return 0;
}

多生产、多消费模型

创建两个消费者线程 c1c2,它们会并行地从队列中取出任务并处理。创建三个生产者线程 p1p2p3,它们会并行地将任务放入队列中。

代码语言:javascript复制
//main.cc

#include"BlockQueue.hpp"
#include"Task.hpp"
#include<ctime>
#include<pthread.h>
#include<unistd.h>

void *Consumer(void *args)
{
    BlockQueue<task_t> *bq=static_cast<BlockQueue<task_t> *>(args);
    while(true)
    {
        //获取数据
        task_t t;
        bq->Pop(&t);
        t();

    }
}

void *Productor(void *args)
{
    srand(time(nullptr)^getpid());
    BlockQueue<task_t> *bq=static_cast<BlockQueue<task_t> *>(args);
    while(true)
    {
        bq->Equeue(Download);
        std::cout<<"Productor -> Download "<<std::endl;
        sleep(1);

    }
}

int main()
{
    BlockQueue<task_t> *bq=new BlockQueue<task_t>();
    pthread_t c1,c2,p1,p2,p3;
    pthread_create(&c1,nullptr,Consumer,bq);
    pthread_create(&c2,nullptr,Consumer,bq);
    pthread_create(&p1,nullptr,Productor,bq);
    pthread_create(&p1,nullptr,Productor,bq);
    pthread_create(&p3,nullptr,Productor,bq);

    pthread_join(c1,nullptr);
    pthread_join(c2,nullptr);
    pthread_join(p1,nullptr);
    pthread_join(p2,nullptr);
    pthread_join(p3,nullptr);

    return 0;
}

0 人点赞