c++实现简单的线程池
c++线程池,继承CDoit,实现其中的start和end
头文件
/*
*多线程管理类
*
*/
#ifndefCTHREADPOOLMANAGE_H
#defineCTHREADPOOLMANAGE_H
#include<iostream>
#include<pthread.h>
#include<unistd.h>
#include<list>
#include<vector>
#include<time.h>
#include<asm/errno.h>
#defineUSLEEP_TIME100
#defineCHECK_TIME1
usingnamespacestd;
classCDoit
{
public:
virtualintstart(void*){};
virtualintend(){};
};
classCthreadPoolManage
{
private:
int_minThreads;//最少保留几个线程
int_maxThreads;//最多可以有几个线程
int_waitSec;//空闲多少秒后将线程关闭
classthreadInfo{
public:
threadInfo(){
isbusy=false;
doFlag=true;
}
//
pthread_mutex_tmtx=PTHREAD_MUTEX_INITIALIZER;
pthread_cond_tcond=PTHREAD_COND_INITIALIZER;
boolisbusy;//是否空闲
booldoFlag;
//
time_tbeginTime;//线程不工作开始时间
pthread_tcThreadPid;//线程id
pthread_attr_tcThreadAttr;//线程属性
CDoit*doit;//任务类
void*value;//需要传递的值
};
//线程函数
staticvoid*startThread(void*);
//任务队列锁
pthread_mutex_t_duty_mutex;
//任务队列
list<threadInfo*>_dutyList;
//线程队列锁
pthread_mutex_t_thread_mutex;
//线程队列
list<threadInfo*>_threadList;
///初始化,创建最小个数线程///
voidinitThread();
///任务分配线程///
staticvoid*taskAllocation(void*arg);
pthread_ttasktPid;
///线程销毁、状态检查线程///
staticvoid*checkThread(void*arg);
pthread_tchecktPid;
boolcheckrun;
//线程异常退出清理
staticvoidthreadCleanUp(void*arg);
//
intaddThread(list<threadInfo*>*plist,threadInfo*ptinfo);
public:
CthreadPoolManage();
/*
保留的最少线程,最多线程数,空闲多久销毁,保留几个线程的冗余
*/
CthreadPoolManage(intmin,intmax,intwaitSec);
~CthreadPoolManage();
intstart();
//任务注入器
intputDuty(CDoit*,void*);
intgetNowThreadNum();
};
#endif//CTHREADPOOLMANAGE_H
CPP文件
/*
*线程池,线程管理类
*
*/
#include"cthreadpoolmanage.h"
CthreadPoolManage::CthreadPoolManage()
{
_minThreads=5;//最少保留几个线程
_maxThreads=5;//最多可以有几个线程
_waitSec=10;//空闲多少秒后将线程关闭
pthread_mutex_init(&_duty_mutex,NULL);
pthread_mutex_init(&_thread_mutex,NULL);
checkrun=true;
}
CthreadPoolManage::CthreadPoolManage(intmin,intmax,intwaitSec)
{
CthreadPoolManage();
_minThreads=min;//最少保留几个线程
_maxThreads=max;//最多可以有几个线程
_waitSec=waitSec;//空闲多少秒后将线程关闭
}
CthreadPoolManage::~CthreadPoolManage()
{
}
voidCthreadPoolManage::threadCleanUp(void*arg)
{
threadInfo*tinfo=(threadInfo*)arg;
tinfo->isbusy=false;
pthread_mutex_unlock(&tinfo->mtx);
pthread_attr_destroy(&tinfo->cThreadAttr);
deletetinfo;
}
void*CthreadPoolManage::startThread(void*arg)
{
cout<<"线程开始工作"<<endl;
threadInfo*tinfo=(threadInfo*)arg;
pthread_cleanup_push(threadCleanUp,arg);
while(tinfo->doFlag){
pthread_mutex_lock(&tinfo->mtx);
if(tinfo->doit==NULL)
{
cout<<"开始等待任务"<<endl;
pthread_cond_wait(&tinfo->cond,&tinfo->mtx);
cout<<"有任务了"<<endl;
}
tinfo->isbusy=true;
tinfo->doit->start(tinfo->value);
tinfo->doit->end();
tinfo->doit=NULL;
tinfo->isbusy=false;
time(&tinfo->beginTime);
pthread_mutex_unlock(&tinfo->mtx);
}
//0正常执行到这儿不执行清理函数,异常会执行
pthread_cleanup_pop(0);
pthread_attr_destroy(&tinfo->cThreadAttr);
deletetinfo;
cout<<"线程结束"<<endl;
}
voidCthreadPoolManage::initThread()
{
inti=0;
for(i=0;i<this->_minThreads;i++)
{
threadInfo*tinfo=newthreadInfo;
tinfo->doit=NULL;
tinfo->value=NULL;
tinfo->isbusy=false;
tinfo->doFlag=true;
//PTHREAD_CREATE_DETACHED(分离线程)和PTHREAD_CREATE_JOINABLE(非分离线程)
pthread_attr_init(&tinfo->cThreadAttr);
pthread_attr_setdetachstate(&tinfo->cThreadAttr,PTHREAD_CREATE_DETACHED);
cout<<"初始化了一个线程"<<endl;
if(pthread_create(&tinfo->cThreadPid,&tinfo->cThreadAttr,startThread,(void*)tinfo)!=0)
{
cout<<"创建线程失败"<<endl;
break;
}
this->_threadList.push_back(tinfo);
}
}
intCthreadPoolManage::addThread(std::list<CthreadPoolManage::threadInfo*>*plist,CthreadPoolManage::threadInfo*ptinfo)
{
threadInfo*tinfo=newthreadInfo;
tinfo->doit=ptinfo->doit;
tinfo->value=ptinfo->value;
tinfo->isbusy=true;
if(pthread_create(&tinfo->cThreadPid,NULL,startThread,(void*)tinfo)!=0)
{
cout<<"创建线程失败"<<endl;
return-1;
}
plist->push_back(tinfo);
return0;
}
intCthreadPoolManage::putDuty(CDoit*doit,void*value)
{
threadInfo*tinfo=newthreadInfo;
time(&tinfo->beginTime);
tinfo->doit=doit;
tinfo->value=value;
pthread_mutex_lock(&_duty_mutex);
this->_dutyList.push_back(tinfo);
pthread_mutex_unlock(&_duty_mutex);
return0;
}
void*CthreadPoolManage::taskAllocation(void*arg)
{
CthreadPoolManage*ptmanage=(CthreadPoolManage*)arg;
intsize_1=0;
intsize_2=0;
inti_1=0;
inti_2=0;
boola_1=true;
boola_2=true;
threadInfo*ptinfo;
threadInfo*ptinfoTmp;
while(true){
size_1=0;
size_2=0;
pthread_mutex_lock(&ptmanage->_duty_mutex);
pthread_mutex_lock(&ptmanage->_thread_mutex);
size_1=ptmanage->_dutyList.size();
size_2=ptmanage->_threadList.size();
for(list<threadInfo*>::iteratoritorti1=ptmanage->_dutyList.begin();itorti1!=ptmanage->_dutyList.end();)
{
ptinfo=*itorti1;
a_1=true;
for(list<threadInfo*>::iteratoritorti2=ptmanage->_threadList.begin();itorti2!=ptmanage->_threadList.end();itorti2++){
ptinfoTmp=*itorti2;
if(EBUSY==pthread_mutex_trylock(&ptinfoTmp->mtx))
{
continue;
}
if(!ptinfoTmp->isbusy)
{
ptinfoTmp->doit=ptinfo->doit;
ptinfoTmp->value=ptinfo->value;
ptinfoTmp->isbusy=true;
pthread_cond_signal(&ptinfoTmp->cond);
pthread_mutex_unlock(&ptinfoTmp->mtx);
a_1=false;
deleteptinfo;
break;
}
pthread_mutex_unlock(&ptinfoTmp->mtx);
}
if(a_1){
if(ptmanage->_threadList.size()>ptmanage->_maxThreads||ptmanage->addThread(&ptmanage->_threadList,ptinfo)!=0)
{
itorti1++;
continue;
}else{
itorti1=ptmanage->_dutyList.erase(itorti1);
}
deleteptinfo;
}else{
itorti1=ptmanage->_dutyList.erase(itorti1);
}
}
pthread_mutex_unlock(&ptmanage->_duty_mutex);
pthread_mutex_unlock(&ptmanage->_thread_mutex);
usleep(USLEEP_TIME);
}
return0;
}
void*CthreadPoolManage::checkThread(void*arg)
{
CthreadPoolManage*ptmanage=(CthreadPoolManage*)arg;
threadInfo*ptinfo;
time_tnowtime;
while(ptmanage->checkrun){
sleep(CHECK_TIME);
pthread_mutex_lock(&ptmanage->_thread_mutex);
if(ptmanage->_threadList.size()<=ptmanage->_minThreads)
{
continue;
}
for(list<threadInfo*>::iteratoritorti2=ptmanage->_threadList.begin();itorti2!=ptmanage->_threadList.end();){
ptinfo=*itorti2;
if(EBUSY==pthread_mutex_trylock(&ptinfo->mtx))
{
itorti2++;
continue;
}
time(&nowtime);
if(ptinfo->isbusy==false&&nowtime-ptinfo->beginTime>ptmanage->_waitSec)
{
ptinfo->doFlag=false;
itorti2=ptmanage->_threadList.erase(itorti2);
}else{
itorti2++;
}
pthread_mutex_unlock(&ptinfo->mtx);
}
pthread_mutex_unlock(&ptmanage->_thread_mutex);
}
}
intCthreadPoolManage::start()
{
//初始化
this->initThread();
//启动任务分配线程
if(pthread_create(&tasktPid,NULL,taskAllocation,(void*)this)!=0)
{
cout<<"创建任务分配线程失败"<<endl;
return-1;
}
//创建现程状态分配管理线程
if(pthread_create(&checktPid,NULL,checkThread,(void*)this)!=0)
{
cout<<"创建线程状态分配管理线程失败"<<endl;
return-1;
}
return0;
}
///////////////////////////////
intCthreadPoolManage::getNowThreadNum()
{
intnum=0;
pthread_mutex_lock(&this->_thread_mutex);
num=this->_threadList.size();
pthread_mutex_unlock(&this->_thread_mutex);
returnnum;
}
以上所述就是本文的全部内容了,希望大家能够喜欢。
请您花一点时间将文章分享给您的朋友或者留下评论。我们将会由衷感谢您的支持!