C语言实现支持动态拓展和销毁的线程池
本文实例介绍了C语言实现线程池,支持动态拓展和销毁,分享给大家供大家参考,具体内容如下
实现功能
- 1.初始化指定个数的线程
- 2.使用链表来管理任务队列
- 3.支持拓展动态线程
- 4.如果闲置线程过多,动态销毁部分线程
#include<stdio.h>
#include<pthread.h>
#include<stdlib.h>
#include<signal.h>
/*线程的任务队列由,函数和参数组成,任务由链表来进行管理*/
typedefstructthread_worker_s{
void*(*process)(void*arg);//处理函数
void*arg;//参数
structthread_worker_s*next;
}thread_worker_t;
#defineboolint
#definetrue1
#definefalse0
/*线程池中各线程状态描述*/
#defineTHREAD_STATE_RUN0
#defineTHREAD_STATE_TASK_WAITING1
#defineTHREAD_STATE_TASK_PROCESSING2
#defineTHREAD_STATE_TASK_FINISHED3
#defineTHREAD_STATE_EXIT4
typedefstructthread_info_s{
pthread_tid;
intstate;
structthread_info_s*next;
}thread_info_t;
staticchar*thread_state_map[]={"创建","等待任务","处理中","处理完成","已退出"};
/*线程压缩的时候只有0,1,2,4状态的线程可以销毁*/
/*线程池管理器*/
#defineTHREAD_BUSY_PERCENT0.5/*线程:任务=1:2值越小,说明任务多,增加线程*/
#defineTHREAD_IDLE_PERCENT2/*线程:任务=2:1值大于1,线程多于任务,销毁部分线程*/
typedefstructthread_pool_s{
pthread_mutex_tqueue_lock;//队列互斥锁,即涉及到队列修改时需要加锁
pthread_cond_tqueue_ready;//队列条件锁,队列满足某个条件,触发等待这个条件的线程继续执行,比如说队列满了,队列空了
thread_worker_t*head;//任务队列头指针
boolis_destroy;//线程池是否已经销毁
intnum;//线程的个数
intrnum;;//正在跑的线程
intknum;;//已杀死的线程
intqueue_size;//工作队列的大小
thread_info_t*threads;//线程组id,通过pthread_join(thread_ids[0],NULL)来执行线程
pthread_tdisplay;//打印线程
pthread_tdestroy;//定期销毁线程的线程id
pthread_textend;
floatpercent;//线程个数于任务的比例rnum/queue_size
intinit_num;
pthread_cond_textend_ready;//如果要增加线程
}thread_pool_t;
/*-------------------------函数声明----------------------*/
/**
*1.初始化互斥变量
*2.初始化等待变量
*3.创建指定个数的线程线程
*/
thread_pool_t*thread_pool_create(intnum);
void*thread_excute_route(void*arg);
/*调试函数*/
voiddebug(char*message,intflag){
if(flag)
printf("%s\n",message);
}
void*display_thread(void*arg);
/**
*添加任务包括以下几个操作
*1.将任务添加到队列末尾
*2.通知等待进程来处理这个任务pthread_cond_singal();
*/
intthread_pool_add_worker(thread_pool_t*pool,void*(*process)(void*arg),void*arg);//网线程池的队列中增加一个需要执行的函数,也就是任务
/**
*销毁线程池,包括以下几个部分
*1.通知所有等待的进程pthread_cond_broadcase
*2.等待所有的线程执行完
*3.销毁任务列表
*4.释放锁,释放条件
*4.销毁线程池对象
*/
void*thread_pool_is_need_recovery(void*arg);
void*thread_pool_is_need_extend(void*arg);
voidthread_pool_destory(thread_pool_t*pool);
thread_pool_t*thread_pool_create(intnum){
if(num<1){
returnNULL;
}
thread_pool_t*p;
p=(thread_pool_t*)malloc(sizeof(structthread_pool_s));
if(p==NULL)
returnNULL;
p->init_num=num;
/*初始化互斥变量与条件变量*/
pthread_mutex_init(&(p->queue_lock),NULL);
pthread_cond_init(&(p->queue_ready),NULL);
/*设置线程个数*/
p->num=num;
p->rnum=num;
p->knum=0;
p->head=NULL;
p->queue_size=0;
p->is_destroy=false;
inti=0;
thread_info_t*tmp=NULL;
for(i=0;i<num;i++){
/*创建线程*/
tmp=(structthread_info_s*)malloc(sizeof(structthread_info_s));
if(tmp==NULL){
free(p);
returnNULL;
}else{
tmp->next=p->threads;
p->threads=tmp;
}
pthread_create(&(tmp->id),NULL,thread_excute_route,p);
tmp->state=THREAD_STATE_RUN;
}
/*显示*/
pthread_create(&(p->display),NULL,display_thread,p);
/*检测是否需要动态线程*/
//pthread_create(&(p->extend),NULL,thread_pool_is_need_extend,p);
/*动态销毁*/
pthread_create(&(p->destroy),NULL,thread_pool_is_need_recovery,p);
returnp;
}
intthread_pool_add_worker(thread_pool_t*pool,void*(*process)(void*arg),void*arg){
thread_pool_t*p=pool;
thread_worker_t*worker=NULL,*member=NULL;
worker=(thread_worker_t*)malloc(sizeof(structthread_worker_s));
intincr=0;
if(worker==NULL){
return-1;
}
worker->process=process;
worker->arg=arg;
worker->next=NULL;
thread_pool_is_need_extend(pool);
pthread_mutex_lock(&(p->queue_lock));
member=p->head;
if(member!=NULL){
while(member->next!=NULL)
member=member->next;
member->next=worker;
}else{
p->head=worker;
}
p->queue_size++;
pthread_mutex_unlock(&(p->queue_lock));
pthread_cond_signal(&(p->queue_ready));
return1;
}
voidthread_pool_wait(thread_pool_t*pool){
thread_info_t*thread;
inti=0;
for(i=0;i<pool->num;i++){
thread=(thread_info_t*)(pool->threads+i);
thread->state=THREAD_STATE_EXIT;
pthread_join(thread->id,NULL);
}
}
voidthread_pool_destory(thread_pool_t*pool){
thread_pool_t*p=pool;
thread_worker_t*member=NULL;
if(p->is_destroy)
return;
p->is_destroy=true;
pthread_cond_broadcast(&(p->queue_ready));
thread_pool_wait(pool);
free(p->threads);
p->threads=NULL;
/*销毁任务列表*/
while(p->head){
member=p->head;
p->head=member->next;
free(member);
}
/*销毁线程列表*/
thread_info_t*tmp=NULL;
while(p->threads){
tmp=p->threads;
p->threads=tmp->next;
free(tmp);
}
pthread_mutex_destroy(&(p->queue_lock));
pthread_cond_destroy(&(p->queue_ready));
return;
}
/*通过线程id,找到对应的线程*/
thread_info_t*get_thread_by_id(thread_pool_t*pool,pthread_tid){
thread_info_t*thread=NULL;
thread_info_t*p=pool->threads;
while(p!=NULL){
if(p->id==id)
returnp;
p=p->next;
}
returnNULL;
}
/*每个线程入口函数*/
void*thread_excute_route(void*arg){
thread_worker_t*worker=NULL;
thread_info_t*thread=NULL;
thread_pool_t*p=(thread_pool_t*)arg;
//printf("thread%lldcreatesuccess\n",pthread_self());
while(1){
pthread_mutex_lock(&(p->queue_lock));
/*获取当前线程的id*/
pthread_tpthread_id=pthread_self();
/*设置当前状态*/
thread=get_thread_by_id(p,pthread_id);
/*线程池被销毁,并且没有任务了*/
if(p->is_destroy==true&&p->queue_size==0){
pthread_mutex_unlock(&(p->queue_lock));
thread->state=THREAD_STATE_EXIT;
p->knum++;
p->rnum--;
pthread_exit(NULL);
}
if(thread){
thread->state=THREAD_STATE_TASK_WAITING;/*线程正在等待任务*/
}
/*线程池没有被销毁,没有任务到来就一直等待*/
while(p->queue_size==0&&!p->is_destroy){
pthread_cond_wait(&(p->queue_ready),&(p->queue_lock));
}
p->queue_size--;
worker=p->head;
p->head=worker->next;
pthread_mutex_unlock(&(p->queue_lock));
if(thread)
thread->state=THREAD_STATE_TASK_PROCESSING;/*线程正在执行任务*/
(*(worker->process))(worker->arg);
if(thread)
thread->state=THREAD_STATE_TASK_FINISHED;/*任务执行完成*/
free(worker);
worker=NULL;
}
}
/*拓展线程*/
void*thread_pool_is_need_extend(void*arg){
thread_pool_t*p=(thread_pool_t*)arg;
thread_pool_t*pool=p;
/*判断是否需要增加线程,最终目的线程:任务=1:2*/
if(p->queue_size>100){
intincr=0;
if(((float)p->rnum/p->queue_size)<THREAD_BUSY_PERCENT){
incr=(p->queue_size*THREAD_BUSY_PERCENT)-p->rnum;/*计算需要增加线程个数*/
inti=0;
thread_info_t*tmp=NULL;
thread_pool_t*p=pool;
pthread_mutex_lock(&pool->queue_lock);
if(p->queue_size<100){
pthread_mutex_unlock(&pool->queue_lock);
return;
}
for(i=0;i<incr;i++){
/*创建线程*/
tmp=(structthread_info_s*)malloc(sizeof(structthread_info_s));
if(tmp==NULL){
continue;
}else{
tmp->next=p->threads;
p->threads=tmp;
}
p->num++;
p->rnum++;
pthread_create(&(tmp->id),NULL,thread_excute_route,p);
tmp->state=THREAD_STATE_RUN;
}
pthread_mutex_unlock(&pool->queue_lock);
}
}
//pthread_cond_signal(&pool->extend_ready);
}
pthread_cond_tsum_ready;
/*恢复初始线程个数*/
void*thread_pool_is_need_recovery(void*arg){
thread_pool_t*pool=(thread_pool_t*)arg;
inti=0;
thread_info_t*tmp=NULL,*prev=NULL,*p1=NULL;
/*如果没有任务了,当前线程大于初始化的线程个数*/
while(1){
i=0;
if(pool->queue_size==0&&pool->rnum>pool->init_num){
sleep(5);
/*5s秒内还是这个状态的话就,销毁部分线程*/
if(pool->queue_size==0&&pool->rnum>pool->init_num){
pthread_mutex_lock(&pool->queue_lock);
tmp=pool->threads;
while((pool->rnum!=pool->init_num)&&tmp){
/*找到空闲的线程*/
if(tmp->state!=THREAD_STATE_TASK_PROCESSING){
i++;
if(prev)
prev->next=tmp->next;
else
pool->threads=tmp->next;
pool->rnum--;/*正在运行的线程减一*/
pool->knum++;/*销毁的线程加一*/
kill(tmp->id,SIGKILL);/*销毁线程*/
p1=tmp;
tmp=tmp->next;
free(p1);
continue;
}
prev=tmp;
tmp=tmp->next;
}
pthread_mutex_unlock(&pool->queue_lock);
printf("5s内没有新任务销毁部分线程,销毁了%d个线程\n",i);
}
}
sleep(5);
}
}
/*打印一些信息的*/
void*display_thread(void*arg){
thread_pool_t*p=(thread_pool_t*)arg;
thread_info_t*thread=NULL;
inti=0;
while(1){
printf("threads%d,running%d,killed%d\n",p->num,p->rnum,p->knum);/*线程总数,正在跑的,已销毁的*/
thread=p->threads;
while(thread){
printf("id=%ld,state=%s\n",thread->id,thread_state_map[thread->state]);
thread=thread->next;
}
sleep(5);
}
}
希望本文所述对大家学习C语言程序设计有所帮助。