基于信号量和环形队列的生产者消费者模型

2024-08-29 08:20:48 浏览数 (3)

POSIX信号量

POSIX信号量和SystemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。 但POSIX可以用于线程间同步。信号量的本质是一个计数器。

信号量本身是一个判断条件,是资源的预定机制,预定在外部,可以不判断资源是否满足,就可以知道内部资源的情况。

信号量接口

初始化信号量

代码语言:javascript复制
#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);

Link with -pthread.

pshared:0表示线程间共享,非零表示进程间共享 value:信号量初始值

销毁信号量

代码语言:javascript复制
#include <semaphore.h>

int sem_destroy(sem_t *sem);

Link with -pthread.

等待信号量

等待信号量,会将信号量的值减1

代码语言:javascript复制
 int sem_wait(sem_t *sem);

等待成功继续往后执行,资源不足,阻塞在信号量这里

发布信号量

代码语言:javascript复制
int sem_post(sem_t *sem);

发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加1。

基于环形队列的生产者消费者模型

环形队列在物理结构上是一个线性结构,逻辑结构我们可以认为是一个环形结构。

环形队列有一头一为,头部进行取数据,尾部进行放数据,最开始指向同一个位置。

现在进行只插入数据操作,end一直进行 操作,此时end又和head相遇,也就是说,当队列为空或者为满的时候,headend是相等的。

这样就出现了歧义,head==end无法判断队列的状态,因此引入了计数器或牺牲掉一个空位置(head==end 1表示队列满了)。

上面已经了解了信号量,因此队列空和满不再是本节需要关注的问题,需要关注的是多线程如何在环形队列中进行生产和消费。

当队列为空的时候,理论上只能让生产者先生产;当队列为满的时候,只能让消费者先消费,这就保证在访问的时候有一定的顺序性和互斥特点。 环形队列不为空也不为满时,生产和消费的下标,一定指的不是同一个位置(head!=end),此时允许生产和消费同时进行。

结论:

  1. 不能让生产者把消费者套一个圈
  2. 不能让消费者超过生产者

通过信号量来完成上述要求,实现同步和互斥。

消费者最关心的是数据资源,生产者最关心的是空间资源。

定义两个信号量:

  1. sem_t data_sem = 0数据信号量
  2. sem_t space_sem = N空间信号量

作为生产者需要申请空间,执行P操作:P(space_sem) ,生产数据后,空间被占了,执行V操作:V(data_sem) 作为消费者需要申请资源,执行P操作:P(sem_data),一旦将数据拿走,空间就多出来了,再执行一个V操作:V(spce_sem) 因此生产者和消费者是申请自己的资源,释放对方的资源。

单生产单消费

先将环形队列生产满,然后消费一个,生产一个,体现同步特性:

代码语言:javascript复制
//RingQueue.hpp
#pragma once
#include<iostream>
#include<string>
#include<vector>
#include<semaphore.h>

template<typename T>
class RingQueue
{
private:
    void P(sem_t &s)
    {
        sem_wait(&s);
    }

    void V(sem_t &s)
    {
        sem_post(&s);
    }

public:
    RingQueue(int max_cap)
    :_ringqueue(max_cap)
    ,_max_cap(max_cap)
    ,_c_step(0)
    ,_p_step(0)
    {
        sem_init(&_data_sem,0,0);
        sem_init(&_space_sem,0,_max_cap);
    }

    //生产者
    void Push(const T &in)
    {
        P(_space_sem);
        _ringqueue[_p_step]=in;
        _p_step  ;
        _p_step%=_max_cap;
        V(_data_sem);
    }
    //消费者
    void Pop(T *out)
    {
        P(_data_sem);
        *out=_ringqueue[_c_step];
        _c_step  ;
        _c_step%=_max_cap;
        V(_space_sem);
    }

    ~RingQueue()
    {
        sem_destroy(&_data_sem);
        sem_destroy(&_space_sem);
    }

private:
    std::vector<T> _ringqueue;
    int _max_cap;

    int _c_step;
    int _p_step;

    sem_t _data_sem;  //消费者信号量
    sem_t _space_sem;  //生产者消费量
};
代码语言:javascript复制
//Main.cc
#include"RingQueue.hpp"
#include<iostream>
#include<pthread.h>
#include<ctime>
#include<unistd.h>

void *Consumer(void *argc)
{
    RingQueue<int> *rq=static_cast<RingQueue<int> *>(argc);

    while(true)
    {
        sleep(1);
        int data=0;
        //1.消费
        rq->Pop(&data);

        std::cout<<"Consumer -> "<<data<<std::endl;
    }
}

void *Productor(void *argc)
{
    RingQueue<int> *rq=static_cast<RingQueue<int> *>(argc);
    while(true)
    {
        //1.构造数据
        int data=rand() 1;

        //2.生产
        rq->Push(data);
        std::cout<<"Productor -> "<<data<<std::endl;
    }
}

int main()
{
    srand(time(nullptr)^getpid());

    RingQueue<int> *rq=new RingQueue<int>(5);


    //单生产单消费
    pthread_t c,p;
    pthread_create(&c,nullptr,Consumer,rq);
    pthread_create(&p,nullptr,Productor,rq);

    pthread_join(c,nullptr);
    pthread_join(p,nullptr);

    return 0;
}

运行结果:

生产一个消费一个,体现互斥特点:

代码语言:javascript复制
//Main.cc
#include"RingQueue.hpp"
#include<iostream>
#include<pthread.h>
#include<ctime>
#include<unistd.h>

void *Consumer(void *argc)
{
    RingQueue<int> *rq=static_cast<RingQueue<int> *>(argc);

    while(true)
    {
        int data=0;
        //1.消费
        rq->Pop(&data);

        std::cout<<"Consumer -> "<<data<<std::endl;
    }
}

void *Productor(void *argc)
{
    RingQueue<int> *rq=static_cast<RingQueue<int> *>(argc);
    while(true)
    {
        sleep(1);

        //1.构造数据
        int data=rand() 1;

        //2.生产
        rq->Push(data);
        std::cout<<"Productor -> "<<data<<std::endl;
    }
}

int main()
{
    srand(time(nullptr)^getpid());

    RingQueue<int> *rq=new RingQueue<int>(5);


    //单生产单消费
    pthread_t c,p;
    pthread_create(&c,nullptr,Consumer,rq);
    pthread_create(&p,nullptr,Productor,rq);

    pthread_join(c,nullptr);
    pthread_join(p,nullptr);

    return 0;
}

上述测试代码是传递一个int类型的数据到阻塞队列中,也可以传递其他类型,在传递struct或者class类型时,可以封装成一个个的任务传递到环形中。

传递一个任务:

代码语言:javascript复制
//Task.hpp
#pragma once
#include<iostream>
#include<string>
#include<functional>

class Task
{

public:
    Task()
    {}
    Task(int x,int y):_x(x),_y(y)
    {}

    void Excute()
    {
        _result=_x _y;
    }

    void operator()()
    {
        Excute();
    }

    std::string debug()
    {
        std::string msg=std::to_string(_x) " " std::to_string(_y) "=?";
        return msg;
    }
    std::string result()
    {
        std::string msg=std::to_string(_x) " " std::to_string(_y) "=" std::to_string(_result);
        return msg;
    }

private:
    int _x;
    int _y;
    int _result;
};
代码语言:javascript复制
//Main.cc
#include"RingQueue.hpp"
#include"Task.hpp"
#include<iostream>
#include<pthread.h>
#include<ctime>
#include<unistd.h>
//传递任务
void *Consumer(void *argc)
{
    RingQueue<Task> *rq=static_cast<RingQueue<Task> *>(argc);

    while(true)
    {
        Task t;
        //1.消费
        rq->Pop(&t);
        t();
        std::cout<<"Consumer -> "<<t.result()<<std::endl;
    }
}

void *Productor(void *argc)
{
    RingQueue<Task> *rq=static_cast<RingQueue<Task> *>(argc);
    while(true)
    {
        sleep(1);

        //1.构造数据
        int x=rand() 1;
        usleep(x*1000);
        int y=rand() 1;
        Task t(x,y);

        //2.生产
        rq->Push(t);
        std::cout<<"Productor -> "<<t.debug()<<std::endl;
    }
}

int main()
{
    srand(time(nullptr)^getpid());

    RingQueue<Task> *rq=new RingQueue<Task>(5);


    //单生产单消费
    pthread_t c,p;
    pthread_create(&c,nullptr,Consumer,rq);
    pthread_create(&p,nullptr,Productor,rq);

    pthread_join(c,nullptr);
    pthread_join(p,nullptr);

    return 0;
}

运行结果:

多生产多消费

生产者和消费者的下标位置只有一个,在环形队列中,执行多生产多消费操作,一瞬间下标称为自己的临界资源,所以必须要加锁。多生产多消费是为了在处理放数据取数据有更好的并发度。

先申请锁合适还是先申请信号量合适? 如果先加锁,申请信号量的线程就是一个生产者,一旦解锁,线程又得重新申请信号量,效率地下,申请锁和申请信号量注定是串行的。如果是先申请信号量,先预定着,然后再去竞争,谁的优先级高谁就先申请到锁。这就类似于我们日常生活中现在手机上面购票,等电影快开始准备检票即可。**因此先申请信号量在加锁合适。**申请信号量本身是原子的,不会出错,先把可用的资源给线程瓜分,然后等待即可。

0 人点赞