线程池的使用场景和代码实现!

2021-09-09 15:19:50 浏览数 (2)

前言:

大家周末好,今天给大家带来一篇技术文章,是关于线程池的实现和使用场景;我相信大家在公司里面的代码里面经常看到这个线程池的用法,或者甚至大家可能会听到内存池、对象池、连接池等这些专业术语,反正就很多带池的专业术语,不过你会发现他们都有一个共同的特点就是“屁股”末尾都带一个“池”字,池字,简单理解就是用来存东西的,举个简单例子来说,你比如游泳池里面可以用来存储水!

好了简单说了一下,后面的哪些什么内存池、连接池,后期复习都再给大家分享吧,今天我们的主题是线程池。

一、线程池的实现:

1、为啥要用到线程池?

多线程编程,大家这个应该很熟悉了,上次有一位朋友问了一个问题,一个线程大概占用多大内存大小,一般按照POSIX标准来算的话,一个线程大概在8M左右,但是我们一般内存资源有限,在进行高并发的时候,比如说,多个客户端同时向服务器端发送请求:

这个时候,你想一下给这么多客户端都分配开一个大概8M的内存大小,这现实嘛,显然行不通的嘛,我们来计算一下:

  • 一个线程:8M
  • 1024M可以开128个线程
  • 16G内存大小可以开16x128,计算下来大概在2048个线程

所以百万级个客户端都分配开一个线程的话,那内存资源肯定是不够的,所以这涉及到我们的线程池了,这也是为什么在这种场景下要使用线程池了!

为了帮助大家更好的理解线程池这个概念,我们还是举一个生活当中的实际场景吧;去银行存钱或者办理相关业务,这个大家都不陌生吧,你到了银行里面,一般来说的话,都要排队在窗口等待前面的人把业务办理完,才能够轮到你来办理你想要办理的业务,而窗口里面就是帮你办理各种业务的银行工作人员,同时一般窗口办理业务上面有一个提示电子信息,如果轮到了你,就会通知你,你就知道了轮到自己办理业务了。

这里换个专业的角度来说(也不专业哈,只是一个打比方),你来办理的这个业务就是一个任务(也就是一个线程,可以说成任务队列,因为要排队嘛,不可能一下子执行那么多任务,任务队列里面的任务必须一个一个执行),而银行工作人员相当于从任务队列里面拿一个任务来执行,你可以把银行工作人员看成是执行任务队列;而电子显示通知信息,你可以把它看成防止多个业务同时在一个窗口让一个银行工作人员来办理,两个窗口也就是两个银行工作人员同时办理一个业务,也就是说这个电子显示信息是一个管理组件,管理任务是否可以去办理,管理着银行工作人员是否开始办理业务任务,不让他们乱套了,合理有效的执行任务。

那么你从上面可以看到,使用线程池的优点了:

  • 避免线程太多,使得内存耗尽
  • 开始的时候,你可以把创建好的线程放入到线程池当中去,当我们要用的时候,就可以从线程池里面拿一个线程来用,用完这个线程的时候,再把这个线程放回到线程池里面;避免创建线程与销毁的代价

2、线程池实现模板步骤:

其实这个线程池的实现大概流程步骤都差不多,如果大家平时仔细看公司代码或者说自己去实现一个线程池的话,大概实现模板如下:

  • 任务队列(前来办理业务的人)
  • 执行队列(就是银行工作人员执行任务队列里面的任务)
  • 管理组件(管理任务有序的执行)

3、线程池实现结构体定义:

  • 任务队列:
代码语言:javascript复制
struct nTask
{
//用函数指针来存放不同的任务
  void (*task_func)(struct nTask *task);
  
  //这个参数用来做任务执行的参数
  void *user_data;

//链表节点的定义,这里采用链表的方式实现
struct nTask *prev;
struct nTask *next;

};
  • 执行队列:
代码语言:javascript复制
struct nWorker
{
  pthread_t threadid;//线程id
  
  int terminate;//表示是否终止任务
 //表示银行工作人员要执行任务还要向执行组件通告一下
  struct nManager *manager;
  
  //还是通过链表的方式来实现执行队列
  struct nWorker *prev;
  struct nWorker *next;

};

注意:这里如果没有办理业务的人来,银行工作人员只能在哪里等待任务的到来,然后再执行任务。

  • 管理组件:
代码语言:javascript复制
typedef struct nManager
{

  struct nTask *task;
  struct nWorker *workers;
  
  pthread_mutex_t mutex;//互斥锁
  pthread_cond_t cond;//条件变量
}ThreadPool;
  • 链表的插入和删除模板:
代码语言:javascript复制
//插入
#define LIST_INSERT(item,list) do{
  item->prev=NULL;                
  item->next=list;                
if((list)!=NULL) list->prev=item;
list=item;
}while(0)

//删除
#define LIST_REMOVE(item,list) do{ 
if(item->prev != NULL) item->prev->next = item->next; 
if(item->next !=NULL) item->next->prev=item->prev;  

if(list == item)list = item->netx;  
item->prev=item->next=NULL;
}while(0)

}



4、线程池接口定义如下:

  • 1、线程池初始化接口:
代码语言:javascript复制
int nThreadPoolCreate(ThreadPool *pool,int numWorkers)
{
//参数pool表示线程池,numWorkers表示线程池里面有多少个任务
}
  • 2、线程池销毁接口:
代码语言:javascript复制
int nThreadPoolDestory(ThreadPool *pool,int nWorker)
{

}
  • 3、往线程池里面添加任务接口:
代码语言:javascript复制
int nThreadPoolPushTask(ThreadPool *pool,struct nTask *task)
{


}
  • 4、线程回调函数:
代码语言:javascript复制
void *nThreadPoolCallback(void *arg)
{


}

二、线程池工程代码:

代码语言:javascript复制
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <pthread.h>

//链表插入
#define LIST_INSTER(item,list)do{ 
item->prev=NULL;       
item->next=next;       
if(list!=NULL) list->prev=item; 
list=item;
}while(0)

//删除
#define LIST_REMOVE(item,list)do {  
if(item->prev!=NULL)item->prev->next=item->next; 
if(item->next!=NULL)itme->next->prev=item->prev;

if(list==item)list=item->next;
item->prev=item->next=NULL;
}while(0)

//任务队列

struct nTask
{
  void(*task_funt)(struct nTask *task);
  void *uset_data;
  
  struct nTask *prev;
  struct nTask *next;
};

//执行队列
struct nWorker
{
  pthread_t threadid;
  int terminate;
  
  struct nManager *manager;
  
  struct nWorker *prev;
  struct nWorker *next;
};

//管理组件
typedef struct nManager
{
  struct nTask *tasks;
  struct nWoker *workers;
  
  pthread_mutex_t mutex;
  pthread_cond_t cond;

}ThreadPool;
//线程回调函数
void *nThreadPoolCallback(void *arg)
{
  struct nWorker *worker=(struct nWorker*)arg;
  
  while(1)
  {
    //判断是否有任务
    pthread_mutex_lock(&worker->manager-mutex);
    while(worker->manager->tasks==NULL)
    {
      if(worker-terminate)
        break;
      pthread_cond_wait(&worker->manager->cond,&worker->manager->mutex);//如果没有任务,一直等待任务的到来
    }
    if(worker->terminate)
    {
      pthread_mutex_unlock(&worker->manager->mutex);
      break;
    
    }
  struct nTask *task = worker->manager->tasks;
  LIST_REMOVE(task,worker->manager->tasks);
  pthread_mutex_unlock(&worker->manager->mutex);
  task->task_func(task);
  
  }

free(worker);
}

//创建线程池
int nThreadPoolCreate(ThreadPool *pool, int numWorkers)
{
  if(pool == NULL) return -1;
  if(numWorkers < 1)numWorkers =1;
  memset(&pool,0,sizeof(ThreadPool));

  //开始初始化
  pthread_cond_t blank_cond = PTHREAD_COND_INITIALIZER;

  memcpy(&pool->cond,&blank_cond,sizeof(pthread_cond_t));

  pthread_mutex_t blank_mutex =PTHREAD_MUTEX_INITIALIZER;
  memcpy(&pool->mutex,&blank_mutex,sizeof(pthread_mutex_t));

  int i =0;//开线程的个数,也就是执行任务的个数

  for(i=0;i < numWorkers;i  )
  {
    struct nWorker *worker =(struct nWorker*)malloc(sizeof(struct nWorker));
    if(worker == NUll)
    {
        perror("malloc");
        return -2;
    }
    memset(worker,0,sizeof(struct nWorker));
    worker->manager=pool;

  //创建线程
  int ret=pthread_create(&worker->pthreadid,NULL,nThreadPoolCallback,worker);
  
    if(ret)
    {
      perror("pthread_create");
      free(worker);
      return -3;
    }
    LIST_INSERT(worker,pool->workers);
  }
}

//线程池销毁

int nThreadPoolDestory(ThreadPool *pool,int nWorker)
{
  struct nWorker *worker = NULL;
  for(worker=pool->workers;worker!=NULL;worker=worker->next)
  {
    worker->terminate;
  }
pthread_mutex_lock(&pool->mutex);
pthread_cond_broadcast(&pool->cond);//做一个广播通知
pthread_mutex_unlock(&pool->mutex);

pool->workers = NULL;
pool->tasks = NULL;
}

//往线程池里面添加任务

int nThreadPoolPushTask(ThreadPool *pool,struct nTask *task)
{
  pthread_mutex_lock(&pool->mutex);
  LIST_INSERTER(task,pool->tasks);
  pthread_cond_sigal(&pool->cond);// 发送一个信号,有人来办理业务了
  pthread_mutex_unlock(&pool-mutex);
}


#if 1

#define THREADPOOL_INIT_COUNT 20
#define TASK_INIT_SIZE   1000


void task_entry(struct nTask *task) { //type 

 //struct nTask *task = (struct nTask*)task;
 int idx = *(int *)task->user_data;

 printf("idx: %dn", idx);

 free(task->user_data);
 free(task);
}


int main(void) {

 ThreadPool pool = {0};
 
 nThreadPoolCreate(&pool, THREADPOOL_INIT_COUNT);
 // pool --> memset();
 
 int i = 0;
 for (i = 0;i < TASK_INIT_SIZE;i   ) {
  struct nTask *task = (struct nTask *)malloc(sizeof(struct nTask));
  if (task == NULL) {
   perror("malloc");
   exit(1);
  }
  memset(task, 0, sizeof(struct nTask));

  task->task_func = task_entry;
  task->user_data = malloc(sizeof(int));
  *(int*)task->user_data  = i;

  
  nThreadPoolPushTask(&pool, task);
 }

 getchar();
 
}

代码量稍微有点多,大家可以多多看看几遍!

三、总结:

今天的分享暂时到这里了,今天线程池的分享花的时间比较多,才整理出来,下个礼拜开始音视频的分享,后面的其他复习知识,会慢慢整理分享出来。

0 人点赞