C++线程池的简单实现方法
本文以实例形式较为详细的讲述了C++线程池的简单实现方法。分享给大家供大家参考之用。具体方法如下:
一、几个基本的线程函数:
1.线程操纵函数:
intpthread_create(pthread_t*tidp,constpthread_attr_t*attr,(void*)(*start_rtn)(void*),void*arg);//创建 voidpthread_exit(void*retval);//终止自身 intpthread_cancel(pthread_ttid);//终止其他.发送终止信号后目标线程不一定终止,要调用join函数等待 intpthread_join(pthread_ttid,void**retval);//阻塞并等待其他线程
2.属性:
intpthread_attr_init(pthread_attr_t*attr);//初始化属性 intpthread_attr_setdetachstate(pthread_attr_t*attr,intdetachstate);//设置分离状态 intpthread_attr_destroy(pthread_attr_t*attr);//销毁属性
3.同步函数
互斥锁
intpthread_mutex_init(pthread_mutex_t*restrictmutex,constpthread_mutexattr_t*restrictattr);//初始化锁 intpthread_mutex_destroy(pthread_mutex_t*mutex);//销毁锁 intpthread_mutex_lock(pthread_mutex_t*mutex);//加锁 intpthread_mutex_trylock(pthread_mutex_t*mutex);//尝试加锁,上面lock的非阻塞版本 intpthread_mutex_unlock(pthread_mutex_t*mutex);//解锁
4.条件变量
intpthread_cond_init(pthread_cond_t*cv,constpthread_condattr_t*cattr);//初始化 intpthread_cond_destroy(pthread_cond_t*cond);//销毁 intpthread_cond_wait(pthread_cond_t*cond,pthread_mutex_t*mutex);//等待条件 intpthread_cond_signal(pthread_cond_t*cond);//通知,唤醒第一个调用pthread_cond_wait()而进入睡眠的线程
5.工具函数
intpthread_equal(pthread_tt1,pthread_tt2);//比较线程ID intpthread_detach(pthread_ttid);//分离线程 pthread_tpthread_self(void);//自身ID
上述代码中,线程的cancel和join,以及最后的工具函数,这些函数的参数都为结构体变量,其他的函数参数都是结构体变量指针;品味一下,参数为指针的,因为都需要改变结构体的内容,而参数为普通变量的,则只需要读内容即可。
二、线程池代码:
#include<stdio.h>
#include<stdlib.h>
#include<pthread.h>//linux环境中多线程的头文件,非C语言标准库,编译时最后要加-lpthread调用动态链接库
//工作链表的结构
typedefstructworker{
void*(*process)(void*arg);//工作函数
void*arg;//函数的参数
structworker*next;
}CThread_worker;
//线程池的结构
typedefstruct{
pthread_mutex_tqueue_lock;//互斥锁
pthread_cond_tqueue_ready;//条件变量/信号量
CThread_worker*queue_head;//指向工作链表的头结点,临界区
intcur_queue_size;//记录链表中工作的数量,临界区
intmax_thread_num;//最大线程数
pthread_t*threadid;//线程ID
intshutdown;//开关
}CThread_pool;
staticCThread_pool*pool=NULL;//一个线程池变量
intpool_add_worker(void*(*process)(void*arg),void*arg);//负责向工作链表中添加工作
void*thread_routine(void*arg);//线程例程
//线程池初始化
voidpool_init(intmax_thread_num)
{
inti=0;
pool=(CThread_pool*)malloc(sizeof(CThread_pool));//创建线程池
pthread_mutex_init(&(pool->queue_lock),NULL);//互斥锁初始化,参数为锁的地址
pthread_cond_init(&(pool->queue_ready),NULL);//条件变量初始化,参数为变量地址
pool->queue_head=NULL;
pool->cur_queue_size=0;
pool->max_thread_num=max_thread_num;
pool->threadid=(pthread_t*)malloc(max_thread_num*sizeof(pthread_t));
for(i=0;i<max_thread_num;i++){
pthread_create(&(pool->threadid[i]),NULL,thread_routine,NULL);//创建线程,参数为线程ID变量地址、属性、例程、参数
}
pool->shutdown=0;
}
//例程,调用具体的工作函数
void*thread_routine(void*arg)
{
printf("startingthread0x%x\n",(int)pthread_self());
while(1){
pthread_mutex_lock(&(pool->queue_lock));//从工作链表中取工作,要先加互斥锁,参数为锁地址
while(pool->cur_queue_size==0&&!pool->shutdown){//链表为空
printf("thread0x%xiswaiting\n",(int)pthread_self());
pthread_cond_wait(&(pool->queue_ready),&(pool->queue_lock));//等待资源,信号量用于通知。会释放第二个参数的锁,以供添加;函数返回时重新加锁。
}
if(pool->shutdown){
pthread_mutex_unlock(&(pool->queue_lock));//结束开关开启,释放锁并退出线程
printf("thread0x%xwillexit\n",(int)pthread_self());
pthread_exit(NULL);//参数为void*
}
printf("thread0x%xisstartingtowork\n",(int)pthread_self());
--pool->cur_queue_size;
CThread_worker*worker=pool->queue_head;
pool->queue_head=worker->next;
pthread_mutex_unlock(&(pool->queue_lock));//获取一个工作后释放锁
(*(worker->process))(worker->arg);//做工作
free(worker);
worker=NULL;
}
pthread_exit(NULL);
}
//销毁线程池
intpool_destroy()
{
if(pool->shutdown)//检测结束开关是否开启,若开启,则所有线程会自动退出
return-1;
pool->shutdown=1;
pthread_cond_broadcast(&(pool->queue_ready));//广播,唤醒所有线程,准备退出
inti;
for(i=0;i<pool->max_thread_num;++i)
pthread_join(pool->threadid[i],NULL);//主线程等待所有线程退出,只有join第一个参数不是指针,第二个参数类型是void**,接收exit的返回值,需要强制转换
free(pool->threadid);
CThread_worker*head=NULL;
while(pool->queue_head!=NULL){//释放未执行的工作链表剩余结点
head=pool->queue_head;
pool->queue_head=pool->queue_head->next;
free(head);
}
pthread_mutex_destroy(&(pool->queue_lock));//销毁锁和条件变量
pthread_cond_destroy(&(pool->queue_ready));
free(pool);
pool=NULL;
return0;
}
void*myprocess(void*arg)
{
printf("threadidis0x%x,workingontask%d\n",(int)pthread_self(),*(int*)arg);
sleep(1);
returnNULL;
}
//添加工作
intpool_add_worker(void*(*process)(void*arg),void*arg)
{
CThread_worker*newworker=(CThread_worker*)malloc(sizeof(CThread_worker));
newworker->process=process;//具体的工作函数
newworker->arg=arg;
newworker->next=NULL;
pthread_mutex_lock(&(pool->queue_lock));//加锁
CThread_worker*member=pool->queue_head;//插入链表尾部
if(member!=NULL){
while(member->next!=NULL)
member=member->next;
member->next=newworker;
}
else{
pool->queue_head=newworker;
}
++pool->cur_queue_size;
pthread_mutex_unlock(&(pool->queue_lock));//解锁
pthread_cond_signal(&(pool->queue_ready));//通知一个等待的线程
return0;
}
intmain(intargc,char**argv)
{
pool_init(3);//主线程创建线程池,3个线程
int*workingnum=(int*)malloc(sizeof(int)*10);
inti;
for(i=0;i<10;++i){
workingnum[i]=i;
pool_add_worker(myprocess,&workingnum[i]);//主线程负责添加工作,10个工作
}
sleep(5);
pool_destroy();//销毁线程池
free(workingnum);
return0;
}
希望本文所述对大家的C++程序设计有所帮助。