1.POSIX信号量
1.1引入
代码语言:javascript复制上次我们使用了阻塞队列的生产消费模型,在先前的生产者-消费者模型代码中,当一个线程想要操作临界资源时,必须确保临界资源处于满足条件的状态才能进行修改;否则无法修改。例如,在
Enqueue
接口中,当队列已满时,临界资源处于条件不可用的状态,无法继续进行push
操作。此时,线程应该进入条件变量队列cond
中等待。如果队列未满,即临界资源条件已准备好,那么可以继续push
,调用队列_q
的push
接口。 观察代码可以看到,在判断临界资源是否就绪之前,必须先获取锁,因为判断临界资源实质上就是对临界资源的访问,而访问临界资源自然需要加锁以保护。因此,代码通常会先获取锁,然后手动检查临界资源的就绪状态,根据状态判断是等待还是直接操作临界资源。 但是如果事先知道临界资源的状态是否就绪,则无需一上来就加锁。一旦提前知道临界资源的就绪状态,便不再需要手动检查资源状态。在这种情况下,若有一个计数器来表示临界资源中小块资源的数量(如队列中每个空间),线程在访问临界资源前会先请求该计数器。若计数器大于0,则表明队列中有空余位置,可以直接向队列push
数据;若计数器等于0,则说明队列已满,不能继续push
数据,应该阻塞等待,直至计数器再次大于0,方可继续向队列push
数据。
void Enqueue(T &in) // 生产者用的接口
{
pthread_mutex_lock(&_mutex);
while (IsFull())
{
// 生产线程去等待,是在临界区中休眠的!你现在还持有锁呢!!!
// 1. pthread_cond_wait调用是: a. 让调用进程等待 b. 自动释放曾经持有的_mutex锁
_product_wait_num ;
pthread_cond_wait(&_product_cond, &_mutex);
_product_wait_num--;
}
// 进行生产
_block_queue.push(in);
// 通知消费者来消费
if (_consum_wait_num > 0)
{
pthread_cond_signal(&_consum_cond);
}
pthread_mutex_unlock(&_mutex); // 其实解锁和唤醒条件顺序无所谓,先唤醒后那边等着,解锁后直接竞争
// 如果先解锁,后唤醒:先解锁没任何效果,因为都在wait那里等,一唤醒就直接得到锁
}
1.2回顾加深理解信号量
信号量是一种用于进程间通信和同步的机制。它本质上是一个计数器,用于衡量系统中的资源可用数量。通过信号量,可以实现对临界资源的访问控制,确保多个进程或线程能够安全地共享资源而不发生冲突。
在访问临界资源之前,程序可以通过申请信号量来获取对资源的访问权限。如果信号量的值大于0,表示资源可用,程序可以继续访问资源;如果信号量的值等于0,表示资源已被占用,程序需要等待,直到资源可用为止。
信号量并不仅仅是简单的计数器,它是通过原子操作实现的,确保信号量的操作是线程安全的。常用的信号量操作包括P操作(等待操作)和V操作(释放操作),也称为PV操作。P操作会将信号量的值减1,用于占用资源;V操作会将信号量的值加1,用于释放资源。
通过合理地使用信号量和PV操作,可以实现多线程或多进程之间的同步和互斥,避免资源竞争和死锁等并发问题。信号量是操作系统中重要的同步工具,广泛应用于进程间通信、资源管理、线程同步等场景。
system信号量和POSIX信号量都是用于进程间通信和同步的机制,但它们之间存在一些区别。
- 系统信号量:
- 系统信号量是Linux中的一种系统调用,用于进程间通信和同步。
- 系统信号量是以系统级资源的形式存在,可以跨越进程边界,不仅可以用于线程之间的同步,也可以用于进程之间的同步。
- 系统信号量是一个全局的计数器,可以通过系统调用函数来创建、初始化、P操作(等待操作)和V操作(释放操作)等。
- 系统信号量的操作是通过系统调用函数来实现的,如semget、semop等。
- POSIX信号量:
- POSIX信号量是基于POSIX标准的一种同步机制
- POSIX信号量与系统信号量类似,但是在接口和使用上有些许差异。
- POSIX信号量允许用于进程间通信和线程间同步。
- POSIX信号量通过调用相关的POSIX函数来创建、初始化、等待和释放,如sem_open、sem_wait、sem_post等。
系统信号量是Linux系统提供的一种进程间通信和同步机制,而POSIX信号量是基于POSIX标准的一种同步机制,二者都可以实现进程或线程间的同步和互斥操作
1.3信号量的操作接口
初始化信号量:
使用sem_init
函数可以初始化信号量,给定的value
值会成为信号量的初始值。如果信号量是线程间共享的,可以被多个线程同时使用;如果是进程间共享的,可以被多个进程使用
#include <semaphore.h>//下面的函数都这此头文件
int sem_init(sem_t *sem, int pshared, unsigned int value);
sem
: 指向要初始化的信号量的指针(我们使用sem_t 类型直接定义)pshared
: 0 表示该信号量为线程间共享;非零值表示信号量为进程间共享value
: 信号量的初始值- 若成功,返回值为0,表示初始化信号量成功。
- 若出现错误,返回值为-1,表示初始化失败,并设置errno来指示具体错误。(下面都是一样的)
销毁信号量:
使用sem_destroy
函数可以销毁之前初始化的信号量。在销毁信号量之前,要确保所有线程或进程都已经停止使用该信号量。
int sem_trywait(sem_t *sem);
sem
: 要销毁的信号量的指针
等待信号量:(P操作- -)
使用sem_wait
函数可以等待信号量,即执行P操作。如果信号量的值大于0,则将其减1并立即返回,否则线程(或进程)会阻塞等待信号量变为大于0。
int sem_wait(sem_t *sem);
sem
: 要等待的信号量的指针
发布信号量:(V操作 )
使用sem_post
函数可以发布(释放)信号量,即执行V操作。对信号量执行V操作会将其值加1,并唤醒可能正在等待该信号量的线程(或进程)。
int sem_post(sem_t *sem);
sem
: 要发布的信号量的指针
2.基于循环队列的生产消费模型
2.1循环队列
代码语言:javascript复制之前在阻塞队列里,我们不能实现出队列与入队列的同时进行。现在因为是循环队列我们使用了两个索引,而两个索引不同时可以同时进行出和入 当为空时或者满时,二者只能有一个开始执行。然后就不再相等了,也是能分开进行了
#pragma once
#include <iostream>
#include <string>
#include <vector>
#include <semaphore.h>
#include <pthread.h>
template <typename T>
class RingQueue
{
public:
RingQueue(int cap) : _ringqueue(cap - 1), _cap(cap), _productor_index(0), _consumer_index(0) // vector初始化大小为cap个0
{
sem_init(&_room_sem, 0, _cap); // 这个是生产者的(能用的空间),一开始大小是整个空间的
sem_init(&_data_sem, 0, 0); // 这个是消费者的(能用的数据),一开始是0
pthread_mutex_init(&_productor_mutex, nullptr);
pthread_mutex_init(&_consumer_mutex, nullptr); // 锁的初始化
}
// P V保证了消费与生产的互斥与同步
// 加锁和解锁保证了之间的互斥
// 我们采取先预定资源,再竞争锁
void Enqueue(const T &in) // 入队列
{
P(_room_sem); // p操作--
Lock(_productor_mutex);
// 到这里就说明一定有空间
_ringqueue[_productor_index ] = in;
_productor_index %= _cap; // 保证循环
Unlock(_productor_mutex);
V(_data_sem); // data
}
void Pop(T *out) // 出队列 输出型参数
{
// 消费行为
P(_data_sem);
Lock(_consumer_mutex);
*out = _ringqueue[_consumer_index ];
_consumer_index %= _cap;
Unlock(_consumer_mutex);
V(_room_sem);
}
~RingQueue()
{
sem_destroy(&_room_sem);
sem_destroy(&_data_sem); // 处理信号量
pthread_mutex_destroy(&_productor_mutex);
pthread_mutex_destroy(&_consumer_mutex);
}
private:
void P(sem_t &sem) // 预定空间
{
sem_wait(&sem);
}
void V(sem_t &sem) // 还东西
{
sem_post(&sem);
}
void Lock(pthread_mutex_t &mutex)
{
pthread_mutex_lock(&mutex);
}
void Unlock(pthread_mutex_t &mutex)
{
pthread_mutex_unlock(&mutex);
}
private:
std::vector<T> _ringqueue; // 底层是一个数组
int _cap; // 容量上限
int _productor_index;
int _consumer_index; // 生产和消费的下标
sem_t _room_sem; // 生产者关心
sem_t _data_sem; // 消费者关心
// 定义锁,维护多生产多消费之间的互斥关系
pthread_mutex_t _productor_mutex;
pthread_mutex_t _consumer_mutex;
};
2.2整个项目
- RingQueue.hpp:封装的循环队列
- Main.cc:程序的主体
- Thread.hpp:自己封装的Thread
- Task.hpp:任务类(这里只是一个function包装器)
Tash.hpp
代码语言:javascript复制#pragma once
#include <functional>
#include <iostream>
using Task = std::function<void()>;
void Test()
{
std::cout << "This is the Test Funtion" << std::endl;
}
Thread.hpp
代码语言:javascript复制#ifndef __THREAD_HPP__
#define __THREAD_HPP__
#include <iostream>
#include <string>
#include <unistd.h>
#include <functional>
#include <pthread.h>
namespace ThreadModule
{
template <typename T>
using func_t = std::function<void(T &, std::string name)>;
// typedef std::function<void(const T&)> func_t;
template <typename T>
class Thread
{
public:
void Excute()
{
_func(_data, _threadname);
}
public:
Thread(func_t<T> func, T &data, const std::string &name = "none-name")
: _func(func), _data(data), _threadname(name), _stop(true)
{
}
static void *threadroutine(void *args) // 类成员函数,形参是有this指针的!!
{
Thread<T> *self = static_cast<Thread<T> *>(args);
self->Excute();
return nullptr;
}
bool Start()
{
int n = pthread_create(&_tid, nullptr, threadroutine, this);
if (!n)
{
_stop = false;
return true;
}
else
{
return false;
}
}
void Detach()
{
if (!_stop)
{
pthread_detach(_tid);
}
}
void Join()
{
if (!_stop)
{
pthread_join(_tid, nullptr);
}
}
std::string name()
{
return _threadname;
}
void Stop()
{
_stop = true;
}
~Thread() {}
private:
pthread_t _tid;
std::string _threadname;
T &_data; // 为了让所有的线程访问同一个全局变量
func_t<T> _func;
bool _stop;
};
} // namespace ThreadModule
#endif
Main.cc
代码语言:javascript复制#include "RingQueue.hpp"
#include "Thread.hpp"
#include "Task.hpp"
#include <string>
#include <vector>
#include <unistd.h>
using namespace ThreadModule;
int a = 10;
using ringqueue_t = RingQueue<Task>;
void Consumer(ringqueue_t &rq, std::string name)
{
while (true)
{
Task t;
rq.Pop(&t);
std::cout << "Consumer :" << " NAME" << name << std::endl;
t();
sleep(2);
}
}
void Productor(ringqueue_t &rq, std::string name)
{
int cnt = 1;
srand(time(nullptr));
while (true)
{
rq.Enqueue(Test);
std::cout << "Productor is : " << cnt << " NAME" << name << std::endl;
// sleep(2);
cnt ;
}
}
void InitComm(std::vector<Thread<ringqueue_t>> *threads, int num, ringqueue_t &rq, func_t<ringqueue_t> func, std::string prename)
{
for (int i = 0; i < num; i )
{
std::string name = prename "thread-00" std::to_string(i 1);
threads->emplace_back(func, rq, name);
}
}
void InitConsumer(std::vector<Thread<ringqueue_t>> *threads, int num, ringqueue_t &rq)
{
InitComm(threads, num, rq, Consumer, "Cons ");
}
void InitProductor(std::vector<Thread<ringqueue_t>> *threads, int num, ringqueue_t &rq)
{
InitComm(threads, num, rq, Productor, "Prod ");
}
void StartAll(std::vector<Thread<ringqueue_t>> &threads)
{
for (auto &thread : threads)
{
std::cout << "start: " << thread.name() << std::endl;
thread.Start();
}
}
void WaitAllThread(std::vector<Thread<ringqueue_t>> &threads)
{
for (auto &thread : threads)
{
thread.Join();
}
}
int main()
{
ringqueue_t *rq = new ringqueue_t(5);
std::vector<Thread<ringqueue_t>> threads;
InitProductor(&threads, 1, *rq);
InitConsumer(&threads, 1, *rq);
StartAll(threads);
WaitAllThread(threads);
return 0;
}
3.线程池
可变参数的处理
<stdarg.h>
头文件中定义了一些宏,用于处理 C 语言中的可变参数函数
#define va_start(ap, param) ap = (va_list)¶m
#define va_arg(ap, type) (*(type*)(ap ))
#define va_end(ap) ap = NULL
va_list
:va_list
是一个类型,它用来声明一个变量,这个变量将被用来依次访问可变参数列表中的参数。va_start
:va_start
宏用于初始化va_list
变量。它接受两个参数:第一个参数是一个va_list
类型的变量,用来指向参数列表;第二个参数是最后一个确定的参数的后一个参数,即可变参数列表中已知参数的后一个参数。这样就能让va_list
从可变参数列表的第一个参数开始遍历。va_arg
:va_arg
宏用于先返回参数的值,再访问va_list
中的下一个参数。它接受两个参数:第一个参数是va_list
类型的变量;第二个参数是要获取的参数的类型。va_arg
的作用是逐个遍历可变参数列表,返回对应类型的参数值,并将va_list
向后移动到下一个参数。va_end
:va_end
宏用于清理va_list
变量,释放资源。一般来说,va_end
应该与对应的va_start
成对出现,用来正确终止可变参数的处理。va_copy
:va_copy
宏用于将一个va_list
类型的变量的值复制给另一个va_list
类型的变量,以便在后续代码中再次访问相同的可变参数列表。va_copy
函数的原型类似于va_copy(va_list dest, va_list src)
,通过将源va_list
复制给目标va_list
,使得目标va_list
在后续代码中可以重新访问相同的可变参数列表。
void Test(int num, ...)
{
va_list arg;
va_start(arg, num);
while (num)
{
int data = va_arg(arg, int);
std::cout << "data: " << data << std::endl;
num--;
}
va_end(arg); // arg = NULL
}
int main()
{
Test(3, 11, 22, 33);
return 0;
}
__VA_ARGS__
是 C/C 中的预定义宏,用于表示宏定义中的可变参数部分。在宏定义中,如果我们希望定义一个参数个数不确定的宏,就可以使用 __VA_ARGS__
来代表可变参数的部分。
使用方法
在宏定义中,__VA_ARGS__
常用于定义具有可变参数的宏
#define LOG(format, ...) printf(format, __VA_ARGS__)
在上面的示例中,LOG
宏定义了一个可变参数的输出日志功能。format
是格式化字符串,__VA_ARGS__
表示可变参数部分,当宏被调用时,实际参数会替换 __VA_ARGS__
部分。
工作原理
- 当宏被调用时,
__VA_ARGS__
会被替换为实际参数列表。 - 编译器会将实际参数列表直接展开到宏定义中,作为可变参数的位置。
- 这样,就可以实现宏的可变参数功能。
使用 ##
连接 format
和 __VA_ARGS__
,以确保在 __VA_ARGS__
为空时,不会产生额外的逗号(一般都会加上)()
项目内容
- Log.hpp:
- 定义了日志输出的相关功能,包括日志级别的枚举
Level
、输出日志到文件的函数、获取时间字符串、打印日志消息等。 - 定义了宏
LOG
,用于方便打印日志信息。
- Main.cc:
- 主程序文件,包含了
main
函数,创建了一个线程池ThreadPool
实例,并向线程池添加任务。 - 在添加任务的过程中会记录日志信息。
- ThreadPool.hpp:
- 实现了线程池的功能,包括任务队列管理、线程的启动和停止、任务处理等。
- 包括了线程池的初始化、启动、等待、添加任务、停止等操作。
- Task.hpp:定义了任务类
Task
,包含了任务的执行、结果转换为字符串等功能。 - Thread.hpp:定义了线程类
Thread
,包含了线程的执行函数、启动、分离、等待、停止等功能。
整体流程:在主程序中创建线程池并添加任务,线程池中的线程会从任务队列中获取任务并执行,执行过程中会记录日志信息。日志功能会将信息输出到屏幕或者保存到文件中,日志级别由枚举 Level
定义。
Log.hpp
代码语言:javascript复制#pragma once
#include <string>
#include <cstdio>
#include <time.h> //time函数和localtime函数
#include <iostream>
#include <sys/types.h>
#include <unistd.h> //getpid
#include <pthread.h>
#include <stdarg.h>
#include <fstream>
enum Level
{
DEBUG = 0,
INFO,
WARNING,
ERROR,
FATAL // 从上到下,程度依次增大
};
bool isSave = false; // 用来判断日志信息是否需要保存到文件中
std::string file_name = "log.txt";
std::string LevelToString(int level)
{
switch (level)
{
case DEBUG:
return "Debug";
case INFO:
return "Info";
case WARNING:
return "Warning";
case ERROR:
return "Error";
case FATAL:
return "Fatal";
default:
return "Unknown";
}
}
std::string GetTimeString()
{
time_t curr_time = time(nullptr);
struct tm *format_time = localtime(&curr_time); // format:格式
if (format_time == nullptr)
return "None";
char time_buffer[1024];
snprintf(time_buffer, sizeof(time_buffer), "%d-%d-%d %d:%d:%d", // snprintf 会确保在目标字符数组的末尾添加 null 结尾符 '