线程池的使用场景:
服务器连接通信
- 在服务器与客户端建立连接进行通信的时候,需要用到多线程进行,如果客户端有10万个,则按照普通做法,即一客户端一线程,需要开10万个线程,而在posix标准的线程,一个有8M,则16G内存只有2048个线程可开,故引出了线程池。
日志文件
- 磁盘操作远远比内存操作慢很多,在写线程的时候,会引起线程的挂起。故在落盘即执行写操作时与如何写,
这两个问题分开,如何写即是任务,执行写操作即是执行。
线程池的好处:
避免线程太多,使得内存耗尽。
避免线程创建与销毁的代价。
任务与执行分离。
线程池的基本原理:
一个任务队列与一个执行队列,中间一个管理组件。
管理组件中有这两个队列与一个mutex锁,其中还有一个条件变量,这个条件变量的作用是
来使执行队列进行。
线程池即可以理解为上图中的管理组件。
故线程池需要定义三个东西,定义如下:
1.任务队列
structnTask{/* data */void(*task_func)(structnTask*task);void*user_data;structnTask*prev;structnTask*next;};2. 执行队列
structnWorker{pthread_t thread_id;intterminate;structnManager*manager;structnWorker*prev;structnWorker*next;};3.管理模块
typedefstructnManager{structnTask*tasks;structnWorker*workers;pthread_mutex_t mutex;pthread_cond_t cond;}ThreadPool;API定义
所需要提供给用户的API主要有以下功能,创建一个线程池,销毁一个线程池,往线程池里添加任务。1. 创建线程池
这里面做两件事情,一个是激活执行队列中的“工人”,二是去任务队列里取任务。
intnThreadPoolCreate(ThreadPool*pool,intnumWorkers){//创建线程池,首先初始化各个参数if(pool==NULL)return-1;//工人必须大于等于1,执行队列里一定有一个工人。if(numWorkers<1)numWorkers=1;//初始化线程池,此处是静态初始化条件变量memset(pool,0,sizeof(ThreadPool));pthread_cond_t blank_cond=PTHREAD_COND_INITIALIZER;memcpy(&pool->cond,&blank_cond,sizeof(pthread_cond_t));pthread_mutex_init(&pool->mutex,NULL);inti=0;for(i=0;i<numWorkers;i++){structnWorker*worker=(structnWorker*)malloc(sizeof(structnWorker));if(worker==NULL){perror("malloc");return-2;}memset(worker,0,sizeof(structnWorker));worker->manager=pool;// INFO("nthreadpool worker %d start\n",i);intret=pthread_create(&worker->thread_id,NULL,nThreadPoolCallback,worker);if(ret){perror("pthread_create");free(worker);return-3;}LIST_INSERT(worker,pool->workers);}return0;}其中LIST_INSERT是通过宏定义的函数,即链表的插入操作:
#defineLIST_INSERT(item,list)do{\item->prev=NULL;\item->next=list;\if((list)!=NULL)(list)->prev=item;\list=item;\}while(0)2.线程回调函数
staticvoid*nThreadPoolCallback(void*arg){structnWorker*worker=(structnWorker*)arg;while(1){pthread_mutex_lock(&worker->manager->mutex);//等待任务的到来while(worker->manager->tasks==NULL){//循环退出条件if(worker->terminate)break;//线程阻塞pthread_cond_wait(&worker->manager->cond,&worker->manager->mutex);}if(worker->terminate){pthread_mutex_unlock(&worker->manager->mutex);break;}//有任务就取出任务structnTask*task=worker->manager->tasks;//从头取出一个节点LIST_REMOVE(task,worker->manager->tasks);pthread_mutex_unlock(&worker->manager->mutex);task->task_func(task);}free(worker);}这其中的LIST_REMOVE也是通过宏定义的函数,即链表的删除操作:
#defineLIST_REMOVE(item,list)do{\if(item->prev!=NULL)item->prev->next=item->next;\if(item->next!=NULL)item->next->prev=item->prev;\if(list==item)list=item->next;\item->prev=item->next=NULL;\}while(0)3.销毁线程池
其中要做的就是切断work结构体与task结构体的联系,并将线程回调函数终止。
intnThreadPoolDestory(ThreadPool*pool,intnWorker){structnWorker*worker=NULL;for(worker=pool->workers;worker!=NULL;worker=worker->next){worker->terminate=1;}//这里加锁的原因是防止在广播时有些还未进行wait的即将进行wait的条件都一起满足!!!!pthread_mutex_lock(&pool->mutex);//唤醒所有线程,叫醒正在睡觉的员工,叫他们下班。pthread_cond_broadcast(&pool->cond);pthread_mutex_unlock(&pool->mutex);pool->workers=NULL;pool->tasks=NULL;}4.向线程池中添加任务
intnThreadPoolPushTask(ThreadPool*pool,structnTask*task){pthread_mutex_lock(&pool->mutex);LIST_INSERT(task,pool->tasks);//唤醒一个线程pthread_cond_signal(&pool->cond);pthread_mutex_unlock(&pool->mutex);}