一、引言
多线程版服务器一个客户端就需要创建一个线程! 若客户端太多, 显然不太 合适. 什么时候需要创建线程池呢?简单的说,如果一个应用需要频繁的创建和销 毁线程,而任务执行的时间又非常短,这样线程创建和销毁的带来的开销就不容 忽视,这时也是线程池该出场的机会了。如果线程创建和销毁时间相比任务执行 时间可以忽略不计,则没有必要使用线程池了。 实现的时候类似于生产者和消费
线程池和任务池
线程池 | 任务池 | |
---|---|---|
定义 | 线程池是一组可重复使用的线程的集合 | 任务池是一组待执行的任务的集合 |
任务管理 | 线程池负责管理线程的生命周期,包括线程的创建、调度、执行和销毁等。 | 务池负责管理任务的生命周期,包括任务的创建、调度、执行和销毁等 |
并发控制 | 线程池可以根据需要动态调整线程的数量,可以根据系统负载、任务数量等进行调度。 | 任务池可以根据需要动态调整任务的执行顺序和并发度,可以根据任务的优先级、依赖关系等进行调度 |
资源利用 | 线程池可以重复利用线程,避免了频繁创建和销毁线程的开销,提高了系统的性能。 | 任务池可以将多个任务分配给少量的线程执行,可以更有效地利用系统资源 |
使用场景 | 线程池适用于需要管理大量并发任务的场景,例如并发计算、IO操作等 | 任务池适用于需要管理大量独立任务的场景,例如并发请求处理、消息队列等 |
主线程负责向任务池添加任务,而子线程负责从任务池中取出任务并执行。任务池的作用是存放待执行的任务,每个任务都是一个结构体,其中包含了一个回调函数。尽管子线程执行的代码可以是相同的,但是它们从任务池中获取的任务元素是不同的,因此执行的回调函数也会有所不同。通过这种方式,可以实现子线程执行不同的任务,提高程序的并发性和执行效率。
线程相关函数
1. pthread_create:创建线程的函数。 2. pthread_detach:用于将线程分离。分离线程在终止时会自动释放资源,无需其他线程与之进行连接。 3. pthread_attr_t:线程属性的数据类型,用于存储线程的属性信息。 4. pthread_attr_init:用于初始化pthread_attr_t对象,将其设置为默认属性。初始化后,可以使用其他pthread_attr_*函数修改属性。 5. pthread_attr_setdetachstate:用于设置线程的分离状态属性。分离状态可以是PTHREAD_CREATE_JOINABLE(可连接状态)或PTHREAD_CREATE_DETACHED(分离状态)。可连接状态的线程可以通过pthread_join与其他线程连接,而分离状态的线程在终止时会自动释放资源。 6. pthread_exit:用于终止调用线程。可以在线程执行的任何位置调用pthread_exit。当线程调用pthread_exit时,它的资源会自动释放,并将控制返回给父线程。
互斥锁相关函数
- pthread_mutex_init:初始化互斥锁。使用该函数可以初始化一个互斥锁对象,设置互斥锁的属性。
- pthread_mutex_destroy:销毁互斥锁。使用该函数可以销毁一个互斥锁对象,释放相关资源。
- pthread_mutex_lock:加锁。使用该函数可以将互斥锁加锁,如果互斥锁已经被其他线程锁定,则当前线程会被阻塞,直到互斥锁被解锁。
- pthread_mutex_trylock:尝试加锁。使用该函数可以尝试将互斥锁加锁,如果互斥锁已经被其他线程锁定,则该函数会立即返回一个错误码。
- pthread_mutex_unlock:解锁。使用该函数可以将互斥锁解锁,允许其他线程对互斥锁进行加锁操作。
- pthread_mutexattr_init:初始化互斥锁属性对象。使用该函数可以初始化一个互斥锁属性对象,设置互斥锁的属性。
- pthread_mutexattr_destroy:销毁互斥锁属性对象。使用该函数可以销毁一个互斥锁属性对象,释放相关资源。
- pthread_mutexattr_settype:设置互斥锁类型。使用该函数可以设置互斥锁的类型,包括普通锁、递归锁等。
- pthread_mutexattr_gettype:获取互斥锁类型。使用该函数可以获取互斥锁的类型。
若任务池已满,主线程应该阻塞等待子线程处理任务,此时主线程需要阻塞等待
若任务池空了,子线程应该阻塞等待,等待主线程往任务池里面添加任务
pthread_cond_wait函数用于等待条件变量的信号。当一个线程调用pthread_cond_wait时,它会释放当前持有的互斥锁,并进入等待状态,直到收到与该条件变量相关的信号(通常是由其他线程调用pthread_cond_signal发送的)。一旦收到信号,该线程会重新获得互斥锁,并继续执行。
pthread_cond_signal函数用于发送条件变量的信号。当一个线程调用pthread_cond_signal时,它会唤醒等待该条件变量的一个线程(如果有多个线程在等待,则唤醒其中一个)。被唤醒的线程会重新获得互斥锁,并继续执行。
二、线程池头文件介绍
代码语言:javascript复制#ifndef _THREADPOOL_H
#define _THREADPOOL_H
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <unistd.h>
#include <pthread.h>
typedef struct _PoolTask
{
int tasknum;//模拟任务编号
void *arg;//回调函数参数
void (*task_func)(void *arg);//任务的回调函数
}PoolTask ;
typedef struct _ThreadPool
{
int max_job_num;//最大任务个数
int job_num;//实际任务个数,小于等于max_job_num
PoolTask *tasks;//任务队列数组
int job_push;//入队位置,在这个地方添加任务
int job_pop;// 出队位置
int thr_num;//线程池内线程个数
pthread_t *threads;//线程池内线程数组
int shutdown;//是否关闭线程池
pthread_mutex_t pool_lock;//线程池的锁
pthread_cond_t empty_task;//任务队列为空的条件
pthread_cond_t not_empty_task;//任务队列不为空的条件
}ThreadPool; //线程池
void create_threadpool(int thrnum,int maxtasknum);//创建线程池--thrnum 代表线程个数,maxtasknum 最大任务个数
void destroy_threadpool(ThreadPool *pool);//摧毁线程池
void addtask(ThreadPool *pool);//添加任务到线程池
void taskRun(void *arg);//任务回调函数
#endif
三、简单版本线程池
1.创建线程池
代码语言:javascript复制//创建线程池
void create_threadpool(int thrnum,int maxtasknum)
{
printf("begin call %s-----n",__FUNCTION__);
thrPool = (ThreadPool*)malloc(sizeof(ThreadPool));
thrPool->thr_num = thrnum;
thrPool->max_job_num = maxtasknum;
thrPool->shutdown = 0;//是否摧毁线程池,1代表摧毁
thrPool->job_push = 0;//任务队列添加的位置
thrPool->job_pop = 0;//任务队列出队的位置
thrPool->job_num = 0;//初始化的任务个数为0
thrPool->tasks = (PoolTask*)malloc((sizeof(PoolTask)*maxtasknum));//申请最大的任务队列
//初始化锁和条件变量
pthread_mutex_init(&thrPool->pool_lock,NULL);
pthread_cond_init(&thrPool->empty_task,NULL);
pthread_cond_init(&thrPool->not_empty_task,NULL);
int i = 0;
thrPool->threads = (pthread_t *)malloc(sizeof(pthread_t)*thrnum);//申请n个线程id的空间
pthread_attr_t attr;
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
for(i = 0;i < thrnum;i )
{
pthread_create(&thrPool->threads[i],&attr,thrRun,(void*)thrPool);//创建多个线程
}
//printf("end call %s-----n",__FUNCTION__);
}
2.添加任务到线程池
代码语言:javascript复制void addtask(ThreadPool *pool)
{
pthread_mutex_lock(&pool->pool_lock); // 加锁,确保线程池的任务队列安全访问
while(pool->max_job_num <= pool->job_num) // 当任务队列已满时,等待任务队列有空闲位置
{
pthread_cond_wait(&pool->empty_task,&pool->pool_lock); // 等待任务队列有空闲位置
}
int taskpos = (pool->job_push )%pool->max_job_num; // 计算任务在任务队列中的位置
pool->tasks[taskpos].tasknum = beginnum ; // 设置任务的编号
pool->tasks[taskpos].arg = (void*)&pool->tasks[taskpos]; // 设置任务的参数
pool->tasks[taskpos].task_func = taskRun; // 设置任务的函数指针
pool->job_num ; // 增加任务数量
pthread_mutex_unlock(&pool->pool_lock); // 解锁
pthread_cond_signal(&pool->not_empty_task); // 通知线程池有新的任务可执行
}
3.子线程执行回调函数
代码语言:javascript复制void *thrRun(void *arg)
{
//printf("begin call %s-----n",__FUNCTION__);
ThreadPool *pool = (ThreadPool*)arg;
int taskpos = 0;//任务位置
PoolTask *task = (PoolTask *)malloc(sizeof(PoolTask));
while(1)
{
//获取任务,先要尝试加锁
pthread_mutex_lock(&thrPool->pool_lock);
//无任务并且线程池不是要摧毁
while(thrPool->job_num <= 0 && !thrPool->shutdown )
{
//如果没有任务,线程会阻塞
pthread_cond_wait(&thrPool->not_empty_task,&thrPool->pool_lock);
}
if(thrPool->job_num)
{
//有任务需要处理
taskpos = (thrPool->job_pop )%thrPool->max_job_num;
//printf("task out %d...tasknum===%d tid=%lun",taskpos,thrPool->tasks[taskpos].tasknum,pthread_self());
//为什么要拷贝?避免任务被修改,生产者会添加任务
memcpy(task,&thrPool->tasks[taskpos],sizeof(PoolTask));
task->arg = task;
thrPool->job_num--;
//task = &thrPool->tasks[taskpos];
pthread_cond_signal(&thrPool->empty_task);//通知生产者
}
if(thrPool->shutdown)
{
//代表要摧毁线程池,此时线程退出即可
//pthread_detach(pthread_self());//临死前分家
pthread_mutex_unlock(&thrPool->pool_lock);
free(task);
pthread_exit(NULL);
}
//释放锁
pthread_mutex_unlock(&thrPool->pool_lock);
task->task_func(task->arg);//执行回调函数
}
//printf("end call %s-----n",__FUNCTION__);
}
4.摧毁线程池
代码语言:javascript复制//摧毁线程池
void destroy_threadpool(ThreadPool *pool)
{
pool->shutdown = 1;//开始自爆
pthread_cond_broadcast(&pool->not_empty_task);//诱杀
int i = 0;
for(i = 0; i < pool->thr_num ; i )
{
pthread_join(pool->threads[i],NULL);
}
pthread_cond_destroy(&pool->not_empty_task);
pthread_cond_destroy(&pool->empty_task);
pthread_mutex_destroy(&pool->pool_lock);
free(pool->tasks);
free(pool->threads);
free(pool);
}
5.简单版线程池流程分析
四、复杂版本线程池
1.结构体介绍
typedef
struct
{
void
*(*function)(void
*); /* 函数指针,回调函数 */
void
*arg; /* 上面函数的参数 */
} threadpool_task_t; /* 各子线程任务结构体 */
/* 描述线程池相关信息 */
struct
threadpool_t
{
pthread_mutex_t lock; /* 用于锁住本结构体 */
pthread_mutex_t thread_counter; /* 记录忙状态线程个数de琐 -- busy_thr_num */
pthread_cond_t queue_not_full; /* 当任务队列满时,添加任务的线程阻塞,等待此条件变量 */
pthread_cond_t queue_not_empty; /* 任务队列里不为空时,通知等待任务的线程 */
pthread_t *threads; /* 存放线程池中每个线程的tid。数组 */
pthread_t adjust_tid; /* 存管理线程tid */
threadpool_task_t *task_queue; /* 任务队列(数组首地址) */
int
min_thr_num; /* 线程池最小线程数 */
int
max_thr_num; /* 线程池最大线程数 */
int
live_thr_num; /* 当前存活线程个数 */
int
busy_thr_num; /* 忙状态线程个数 */
int
wait_exit_thr_num; /* 要销毁的线程个数 */
int
queue_front; /* task_queue队头下标 */
int
queue_rear; /* task_queue队尾下标 */
int
queue_size; /* task_queue队中实际任务数 */
int
queue_max_size; /* task_queue队列可容纳任务数上限 */
int
shutdown; /* 标志位,线程池使用状态,true或false */
};
2.主线程
代码语言:javascript复制//threadpool_create(3,100,100);
threadpool_t *threadpool_create(int min_thr_num, int max_thr_num, int queue_max_size)
{
int i;
threadpool_t *pool = NULL;
do
{
if((pool = (threadpool_t *)malloc(sizeof(threadpool_t))) == NULL)
{
printf("malloc threadpool fail");
break; /*跳出do while*/
}
pool->min_thr_num = min_thr_num;
pool->max_thr_num = max_thr_num;
pool->busy_thr_num = 0;
pool->live_thr_num = min_thr_num; /* 活着的线程数 初值=最小线程数 */
pool->wait_exit_thr_num = 0;
pool->queue_size = 0; /* 有0个产品 */
pool->queue_max_size = queue_max_size;
pool->queue_front = 0;
pool->queue_rear = 0;
pool->shutdown = false; /* 不关闭线程池 */
/* 根据最大线程上限数, 给工作线程数组开辟空间, 并清零 */
pool->threads = (pthread_t *)malloc(sizeof(pthread_t)*max_thr_num);
if (pool->threads == NULL)
{
printf("malloc threads fail");
break;
}
memset(pool->threads, 0, sizeof(pthread_t)*max_thr_num);
/* 队列开辟空间 */
pool->task_queue = (threadpool_task_t *)malloc(sizeof(threadpool_task_t)*queue_max_size);
if (pool->task_queue == NULL)
{
printf("malloc task_queue failn");
break;
}
/* 初始化互斥琐、条件变量 */
if (pthread_mutex_init(&(pool->lock), NULL) != 0
|| pthread_mutex_init(&(pool->thread_counter), NULL) != 0
|| pthread_cond_init(&(pool->queue_not_empty), NULL) != 0
|| pthread_cond_init(&(pool->queue_not_full), NULL) != 0)
{
printf("init the lock or cond failn");
break;
}
//启动工作线程
pthread_attr_t attr;
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
for (i = 0; i < min_thr_num; i )
{
pthread_create(&(pool->threads[i]), &attr, threadpool_thread, (void *)pool);/*pool指向当前线程池*/
printf("start thread 0x%x...n", (unsigned int)pool->threads[i]);
}
//创建管理者线程
pthread_create(&(pool->adjust_tid), &attr, adjust_thread, (void *)pool);
return pool;
} while (0);
/* 前面代码调用失败时,释放poll存储空间 */
threadpool_free(pool);
return NULL;
}
/* 向线程池中 添加一个任务 */
//threadpool_add(thp, process, (void*)&num[i]); /* 向线程池中添加任务 process: 小写---->大写*/
int threadpool_add(threadpool_t *pool, void*(*function)(void *arg), void *arg)
{
pthread_mutex_lock(&(pool->lock));
/* ==为真,队列已经满, 调wait阻塞 */
while ((pool->queue_size == pool->queue_max_size) && (!pool->shutdown))
{
pthread_cond_wait(&(pool->queue_not_full), &(pool->lock));
}
if (pool->shutdown)
{
pthread_cond_broadcast(&(pool->queue_not_empty));
pthread_mutex_unlock(&(pool->lock));
return 0;
}
/* 清空 工作线程 调用的回调函数 的参数arg */
if (pool->task_queue[pool->queue_rear].arg != NULL)
{
pool->task_queue[pool->queue_rear].arg = NULL;
}
/*添加任务到任务队列里*/
pool->task_queue[pool->queue_rear].function = function;
pool->task_queue[pool->queue_rear].arg = arg;
pool->queue_rear = (pool->queue_rear 1) % pool->queue_max_size; /* 队尾指针移动, 模拟环形 */
pool->queue_size ;
/*添加完任务后,队列不为空,唤醒线程池中 等待处理任务的线程*/
pthread_cond_signal(&(pool->queue_not_empty));
pthread_mutex_unlock(&(pool->lock));
return 0;
}
3.子线程
代码语言:javascript复制/* 线程池中各个工作线程 */
void *threadpool_thread(void *threadpool)
{
threadpool_t *pool = (threadpool_t *)threadpool;
threadpool_task_t task;
while (true)
{
/* Lock must be taken to wait on conditional variable */
/*刚创建出线程,等待任务队列里有任务,否则阻塞等待任务队列里有任务后再唤醒接收任务*/
pthread_mutex_lock(&(pool->lock));
/*queue_size == 0 说明没有任务,调 wait 阻塞在条件变量上, 若有任务,跳过该while*/
while ((pool->queue_size == 0) && (!pool->shutdown))
{
printf("thread 0x%x is waitingn", (unsigned int)pthread_self());
pthread_cond_wait(&(pool->queue_not_empty), &(pool->lock));//暂停到这
/*清除指定数目的空闲线程,如果要结束的线程个数大于0,结束线程*/
if (pool->wait_exit_thr_num > 0)
{
pool->wait_exit_thr_num--;
/*如果线程池里线程个数大于最小值时可以结束当前线程*/
if (pool->live_thr_num > pool->min_thr_num)
{
printf("thread 0x%x is exitingn", (unsigned int)pthread_self());
pool->live_thr_num--;
pthread_mutex_unlock(&(pool->lock));
//pthread_detach(pthread_self());
pthread_exit(NULL);
}
}
}
/*如果指定了true,要关闭线程池里的每个线程,自行退出处理---销毁线程池*/
if (pool->shutdown)
{
pthread_mutex_unlock(&(pool->lock));
printf("thread 0x%x is exitingn", (unsigned int)pthread_self());
//pthread_detach(pthread_self());
pthread_exit(NULL); /* 线程自行结束 */
}
/*从任务队列里获取任务, 是一个出队操作*/
task.function = pool->task_queue[pool->queue_front].function;
task.arg = pool->task_queue[pool->queue_front].arg;
pool->queue_front = (pool->queue_front 1) % pool->queue_max_size; /* 出队,模拟环形队列 */
pool->queue_size--;
/*通知可以有新的任务添加进来*/
pthread_cond_broadcast(&(pool->queue_not_full));
/*任务取出后,立即将 线程池琐 释放*/
pthread_mutex_unlock(&(pool->lock));
/*执行任务*/
printf("thread 0x%x start workingn", (unsigned int)pthread_self());
pthread_mutex_lock(&(pool->thread_counter)); /*忙状态线程数变量琐*/
pool->busy_thr_num ; /*忙状态线程数 1*/
pthread_mutex_unlock(&(pool->thread_counter));
(*(task.function))(task.arg); /*执行回调函数任务*/
//task.function(task.arg); /*执行回调函数任务*/
/*任务结束处理*/
printf("thread 0x%x end workingn", (unsigned int)pthread_self());
pthread_mutex_lock(&(pool->thread_counter));
pool->busy_thr_num--; /*处理掉一个任务,忙状态数线程数-1*/
pthread_mutex_unlock(&(pool->thread_counter));
}
pthread_exit(NULL);
}
4.管理线程
代码语言:javascript复制/* 管理线程 */
void *adjust_thread(void *threadpool)
{
int i;
threadpool_t *pool = (threadpool_t *)threadpool;
while (!pool->shutdown)
{
sleep(DEFAULT_TIME); /*定时 对线程池管理*/
pthread_mutex_lock(&(pool->lock));
int queue_size = pool->queue_size; /* 关注 任务数 */
int live_thr_num = pool->live_thr_num; /* 存活 线程数 */
pthread_mutex_unlock(&(pool->lock));
pthread_mutex_lock(&(pool->thread_counter));
int busy_thr_num = pool->busy_thr_num; /* 忙着的线程数 */
pthread_mutex_unlock(&(pool->thread_counter));
/* 创建新线程 算法: 任务数大于最小线程池个数, 且存活的线程数少于最大线程个数时 如:30>=10 && 40<100*/
if (queue_size >= MIN_WAIT_TASK_NUM && live_thr_num < pool->max_thr_num)
{
pthread_mutex_lock(&(pool->lock));
int add = 0;
/*一次增加 DEFAULT_THREAD 个线程*/
for (i = 0; i < pool->max_thr_num && add < DEFAULT_THREAD_VARY
&& pool->live_thr_num < pool->max_thr_num; i )
{
if (pool->threads[i] == 0 || !is_thread_alive(pool->threads[i]))
{
pthread_create(&(pool->threads[i]), NULL, threadpool_thread, (void *)pool);
add ;
pool->live_thr_num ;
}
}
pthread_mutex_unlock(&(pool->lock));
}
/* 销毁多余的空闲线程 算法:忙线程X2 小于 存活的线程数 且 存活的线程数 大于 最小线程数时*/
if ((busy_thr_num * 2) < live_thr_num && live_thr_num > pool->min_thr_num)
{
/* 一次销毁DEFAULT_THREAD个线程, 隨機10個即可 */
pthread_mutex_lock(&(pool->lock));
pool->wait_exit_thr_num = DEFAULT_THREAD_VARY; /* 要销毁的线程数 设置为10 */
pthread_mutex_unlock(&(pool->lock));
for (i = 0; i < DEFAULT_THREAD_VARY; i )
{
/* 通知处在空闲状态的线程, 他们会自行终止*/
pthread_cond_signal(&(pool->queue_not_empty));
}
}
}
return NULL;
}