使用 C 语言开发一个简单的线程池函数

2024-04-27 14:58:16 浏览数 (3)

线程池的概念

线程池顾名思义就是一个池子,里边放着很多的线程,那么这么做有什么好处。试想一下,如果我们家里有很多工具,比方说剪刀,斧头,如果我们每次用剪刀后都扔了。等到用的时候在买回非常麻烦,我们需要找个地方存起来,线程也是这样的,如果每来一个任务就创建一个线程,任务结束就销毁,那对服务器来说是非常麻烦的。所有我们引入了线程池的概念。

既然了解了为什么用线程池,那么就回到了怎么管理这些东西。在开发中肯定要设计怎么对齐进行管理。所有代码的开始都从设计结构开始。

仔细想一下,我们的一个简单想法就是用链表保存一系列线程,然后用链表保存一系列处理线程的对象。所有我们就有以下结构。

代码语言:c复制
// task queue 表示线程的链表
struct nTask{
    void (*task_func)(void *arg);  //表示这个线程处理函数
    void (*user_data);                //线程的处理数据
    
    struct nTask *prev;
    struct nTask *next;           //双向链表
};

//execute queue。 处理线程的对象
struct nWorker{
    pthread_t threadid;        //线程id
    int terminate;                //表示线程退出标志
    struct nManager *manager; //管理线程领导,员工的领导是谁
    
    struct nWorker *prev;
    struct nWorker *next;    //双向链表
};

上述分别创建了两个结构体,分别是装任务的和处理任务的。通俗的来说就是员工和员工要干的事情。

但是如果只有这两个结构,那么要干活的人和要干的事情怎么发生关系。这个员工怎么知道自己需要干什么活。这个时候就需要领导了!

代码语言:c复制
//manage component --> lock
typedef struct nManager{
    struct nTask *tasks;        //任务
    struct nWorker *workers;  //员工
    
    pthread_mutex_t mutex;  //manage lock
    pthread_cond_t cond;    //任务锁
}ThreadPool;

这个manager就相当于领导管理任务的,而且要加锁,对已经分配的任务的员工和任务做个记录,保证任务分配别分配多了,影响效率。

这里岔开一下,因为这里涉及双向链表管理,所有我们先定义两个链表操作,因为本身比较简单,就只用宏定义,开发中可以自己开发函数。

代码语言:c复制
//链表插入操作
#define LIST_INSERT(item, list) do{    
   item->prev = NULL;    
   item->next = list;  
   if(list != NULL) list->prev = item; 
   (list) = item; 
}while(0)

//链表节点移除操作
#define LIST_REMOVE(item, list) do{ 
   if(item->prev != NULL) item->prev->next = item->next;
   if(item->next != NULL) item->next->prev = item->prev;
   if(list == item) list = item->next;
   item->prev = item->next = NULL;
}while(0)

接下来就是一个新项目启动的时候,我们首先都要对数据进行初始化。保证员工从零开始。用开发的语言讲就是结构对象初始化。

代码语言:c复制
int nThreadPoolCreate(ThreadPool *pool, int nWorkers){
    if(pool == NULL) return -1;     //没有员工没有任务
    if(nWorkers < 1) return -1;     //没有员工
    pthread_cond_t blank_cond = PTHREAD_COND_INITIALIZER;  //线程的等待初始化
    memcpy(&pool->cond, &blank_cond, sizeof(pthread_cond_t));
    pthread_mutex_init(&pool->mutex, NULL);   //锁的初始化
    int i = 0;
    for(int i = 0; i < nWorkers; i  ){
        struct nWorker *worker = (struct nWorker *)malloc(sizeof(struct nWorker ));  //开辟员工的空间
        if(worker == NULL){
            perror("malloc failed");
            return -2;
        }
        memset(worker, 0, sizeof(struct nWorker));
        worker->manager = pool;
        
        int ret = pthread_create(&worker->threadid, NULL, nThreadPoolCallback, worker);  //创建员工的id
        if(ret){
            perror("pthread_create");
            free(worker);
            return -3;
            
        }
        LIST_INSERT(worker, pool->workers);  //插入员工的链表
    }
    return 0;
}

注意的时候,这里的任务是不用初始化的,任务是需要我们其他模块的传输进来的。

代码中这里有一个创建进程的函数。

代码语言:c复制
int ret = pthread_create(&worker->threadid, NULL, nThreadPoolCallback, worker);  //创建员工的id

里边的一个参数 nThreadPoolCallback 是一个回调函数,什么是回调函数,简单来说就是一个函数名字作为一个参数传进另一个参数。

而创建进程的回调函数,这里的回调函数 nThreadPoolCallback 会承担对员工分配一个任务作用。

具体代码如下:

代码语言:c复制
static void* nThreadPoolCallback(void *arg)
{
    struct nWorker *worker = (struct nWorker *)arg;
    while(1){
        pthread_mutex_lock(&worker->manager->mutex);  //对领导加锁,领导同时只能分配一个任务
        while(worker->manager->tasks == NULL){  //领导没有任务
            if(worker->terminate) break;     //某一个员工退出, 退出的话赋值为  1
            pthread_cond_wait(&worker->manager->cond, &worker->manager->mutex); //等待任务出现
        }
        if(worker->terminate){
            pthread_mutex_unlock(&worker->manager->mutex);
            break;
        }
        struct nTask *task = worker->manager->tasks;  //出现任务
        LIST_REMOVE(task, worker->manager->tasks); //讲任务移除任务列表
        pthread_mutex_unlock(&worker->manager->mutex);
        task->task_func(task->user_data);    //处理任务
    }
    free(worker);
    return 0;  //可以去掉,这里是 xcode 限制,不加会报错
}

这里的整体逻辑比较简单,就是员工先对领导上锁,判断有没有任务,没有任务就空转。有任务就分配任务,员工terminate = 1 就是离职了。

最后就是线程池怎么销毁,简单说就是部门全部裁掉。

代码语言:c复制
int nThreadPoolDestory(ThreadPool *pool, int nWorkers)
{
    struct nWorker *worker = NULL;
    for(worker = pool->workers; worker != NULL; worker = worker->next)
    {
        worker->terminate = 1;
    }
    
    pthread_mutex_lock(&pool->mutex);
    
    pthread_cond_broadcast(&pool->cond);
    
    pthread_mutex_unlock(&pool->mutex);
    pool->workers = NULL;
    pool->tasks = NULL;
    
    return 0;
    
}

首先就是 terminate 设置成 1 表示员工都离职,然后领导广播,最后设置成 null 任务交接,工位空出。

至此,线程池内部管理就结束了,但是还有一个环节就是部门怎么接的任务,我们需要别人给我任务证明我们部门的价值。所以就需要一个接任务的函数。

代码语言:c复制
int nThreadPoolPushTask(ThreadPool *pool, struct nTask *task)
{
    pthread_mutex_lock(&pool->mutex);
    
    LIST_INSERT(task, pool->tasks);
    pthread_cond_signal(&pool->cond);
    
    pthread_mutex_unlock(&pool->mutex);
    return 0;
}

简单的是,就是添加任务,通知没活儿干的员工。

至此一个简单的线程池函数就创建完毕了。

开发工程上的技术大多都是有其目的的,各种封装越来越好的技术确实难以理解,但是我们知道他是干什么的,再了解他是怎么干的,最后简单写一个会更好。用有限的资源博弈出无限可能结果是任何组织都希望得到的。

1 人点赞