【Linux】线程间同步实践 —— 生产消费模型

2024-08-21 14:20:36 浏览数 (2)

1 知识回顾

线程互斥的场景中,我们使用锁来保证一次只能一个线程访问临界区,保证了线程安全。但是,仅仅凭借一把锁是很难保证线程访问的顺序性。 就比如:学校有一个vip自习室(1人间),门口存放着钥匙,来到自习室的人可以拿着钥匙进入自习室,并带走钥匙,离开时将钥匙放回原处。而这一天小明从自习室离开后,将钥匙放在了原处,刚走一步,立马又想再进行学一会儿,因为他距离钥匙非常近,所以小明再次拿到钥匙进入了自习室。这样的过程反复了好几次,其他同学都心生不满!

这样的场景就是仅仅凭借一把锁是不能保证线程运行的顺序性的,所以要进行同步。也就是保证所有人访问自习室,未来是安全的并且还有一定顺序性。线程也是如此!通过条件变量我们可以进行线程间的同步!

条件变量

  1. int pthread_cond_init(pthread_cond_t *restrict cond , const pthread_condattr_t *restrictattr);:初始化接口
  2. int pthread_cond_destroy(pthread_cond_t *cond):销毁接口
  3. int pthread_cond_wait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex);:在条件不满足时阻塞等待
  4. int pthread_cond_broadcast(pthread_cond_t *cond);:条件满足,唤醒所有线程,开始竞争。
  5. int pthread_cond_signal(pthread_cond_t *cond);:条件满足,唤醒一个线程。

条件变量需要一个线程队列和相应的通知机制,才能保证线程同步!

2 生产消费模型

2.1 什么是生产消费模型

生产消费模型可以通过一个非常接地气的例子来进行讲述:

  1. 产家: 作为方便面的生产者进行生产,生产有一定限制,生产量满足需求不在生产。
  2. 平台:作为方便面的销售平台,从产家中获得方便面,销售给买家。作为中间人,调控方便面数量需求。
  3. 买家:作为方便面的消费者,从平台获取方便面。

通过平台的中转,可以实现生产与消费的解耦,通过中间平台可以快速将产品给到消费者,又可以在库存不足时通知产家进行生产。

当然,产家可能有多个,买家可能有多个。这就会产生竞争关系,通过线程同步(锁与条件变量)来协调,也就支持并发处理!

总结,生产消费模型有"321"原则:

  1. 一个交易场所(特定数据结构形式存在的一段内存空间)
  2. 两种角色(生产角色,消费角色):生产线程,消费线程
  3. 三种关系:生产与生产(互斥关系) , 消费与消费(互斥关系),生产与消费。

生产者消费者模型优点

  • 解耦
  • 支持并发
  • 支持忙闲不均

2.2 为何要使用生产消费模型

生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的

3 实践生产消费模型 — 阻塞队列

在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)

3.1 框架搭建

阻塞队列的本质还是队列,所以底层需要一个队列来储存数据(使用模版来适配各种类型数据)。按照实际需求,这个队列不能储存过多数据,需要有一个边界值限定。此外为了保证临界区操作的安全,要使用锁来保护。

代码语言:javascript复制
#pragma once

#include <mutex>
#include <queue>
#include <pthread.h>
#include <unistd.h>

const int num = 5;
template <class T>
class BlockQueue
{
private:
	//判断函数
    bool Full()
    {
        return _bq.size() >= _max_cp;
    }
    bool Empty()
    {
        return _bq.size() == 0;
    }
public:
	//构造函数
    BlockQueue()
        : _max_cp(num)
    {
        pthread_mutex_init(&_mtx , nullptr);
    }
    //析构函数
    ~BlockQueue()
    {
        pthread_mutex_destroy(&_mtx);
    }
    void Pop(T* data)
    {
    }
    void Equeue(const T data)
    {
    }
private:
	//队列来储存数据
    std::queue<T> _bq;
   	//加锁
    pthread_mutex_t _mtx;
    int _max_cp;
};

我们主要要实现的就是两个接口:

  1. Pop:消费者使用,用来获取一个数据
  2. Equeue:生产者使用,用来存入数据

3.2 Pop与Equeue

在Pop与Equeue中进行的操作:

  1. 判断是否可以获取 / 插入 ,涉及判断就是非原子操作,需要加锁!不可以的情况下就要进行阻塞(阻塞后会自动解锁),等待被唤醒!因此需要加入两个条件变量来进行判断!
  2. 进行获取 / 插入
  3. 唤醒生产者 / 消费者,唤醒对应的条件变量即可!
代码语言:javascript复制
void Pop(T* data)
    {
        pthread_mutex_lock(&_mtx);
        //队列没有数据 阻塞等待
        //while保证代码的鲁棒性 
        //函数返回也要再次进行判断,保证不为空!
        while(Empty())
        {
            pthread_cond_wait(&_c_cond , &_mtx);
        }
        // 被唤醒了 || 队列中有数据
        //出队列
        *data = _bq.front();
        _bq.pop();
        pthread_mutex_unlock(&_mtx);
        //读取走了可以唤醒生产者
        pthread_cond_signal(&_p_cond);
    }
    void Equeue(const T data)
    {
        //插入和判断都是临界区操作 ,上锁保证线程安全
        pthread_mutex_lock(&_mtx);
        //如果容量满了 ,就要阻塞,等待消费者消费
        //函数返回也要再次进行判断,保证没有满!
        while(Full())
        {
        	//满了 生产者不能生产 必须等待
        	//这里是临界区!!! 阻塞后会自动解锁
        	//被调用的时候,除了让自己继续排队等待,还会释放传入的锁!
        	//函数返回时 ,回到临界区,会参与锁的竞争,获取到锁之后进行返回!
            pthread_cond_wait(&_p_cond , &_mtx);
        }
        //入队列
        _bq.push(data);
        pthread_mutex_unlock(&_mtx);
        //入队列 可以唤醒消费者
        pthread_cond_signal(&_c_cond);
    }

注意:

代码语言:javascript复制
 pthread_mutex_unlock(&_mtx);
	
 pthread_cond_signal(&_c_cond);

这两步操作是,分别是解锁和唤醒条件变量。这两个函数的顺序没有要求。因为唤醒一个线程,都要进行竞争锁,只有对应的阻塞的线程获得到锁才会返回。因此唤醒与解锁的顺序并不影响后续线程的运行!

3.3 测试运行

我们来测试一下使用两个线程来进行消费和生产的:

代码语言:javascript复制
#include"BlockQueue.hpp"
#include <stdlib.h>
#include<iostream>

void* Consumer(void* args)
{
    srand(time(nullptr)^getpid());
    BlockQueue<int>* bq = static_cast<BlockQueue<int>* >(args);
    while(true)
    {
        //std::cout << "---开始读取--- " << std::endl;
        int data = 0;
        bq->Pop(&data);
        std::cout << "Consumer: " << data << std::endl;
        sleep(1);
    }
}
//生产者
void*  Productor(void* args)
{
    srand(time(nullptr)^getpid());
    BlockQueue<int>* bq = static_cast<BlockQueue<int>* >(args);

    while(true)
    {
        
        int data = rand() % 10;
        bq->Equeue(data);
        std::cout << "Productor: " << data << std::endl;
        usleep(100000);
    }

    
}

int main()
{
    
    BlockQueue<int> bq;
    pthread_t c, p;
    pthread_create(&c , nullptr  , Consumer , &bq );
    pthread_create(&p , nullptr  , Productor , &bq );

    pthread_join(c ,nullptr);
    pthread_join(p ,nullptr);

    return 0;
}

非常好!!!测试没有问题!

4 多生产与多消费

我们刚才测试的环境是单生产,单消费。但在实际场所中,我们尽量使用多生产,多消费,因为消费者获取到任务,以及生产者产生任务都是需要时间的。多执行流的模式可以保证最大程度的提高效率!并发运行保证效率最大程度运行高效,这里的并发并不是生产与消费的并发,而是生产与生产,消费与消费的并发运行!

我们所写代码天然的支持多生产多消费,因为对应的任务都有锁来保护,不会威胁线程安全!

0 人点赞