- 在该代码中,我们将创建m_InitNum个线程,创建之后即调用AppendToIdleList放入Idle链表中,由于目前没有任务分发给这些线程,因此线程执行Start后将自己挂起。
- 事实上,线程池中容纳的线程数目并不是一成不变的,其会根据执行负载进行自动伸缩。为此在CThreadPool中设定四个变量:
- m_InitNum:处世创建时线程池中的线程的个数。
- m_MaxNum:当前线程池中所允许并发存在的线程的最大数目。
- m_AvailLow:当前线程池中所允许存在的空闲线程的最小数目,如果空闲数目低于该值,表明负载可能过重,此时有必要增加空闲线程池的数目。实现中我们总是将线程调整为m_InitNum个。
- m_AvailHigh:当前线程池中所允许的空闲的线程的最大数目,如果空闲数目高于该值,表明当前负载可能较轻,此时将删除多余的空闲线程,删除后调整数也为m_InitNum个。
- m_AvailNum:目前线程池中实际存在的线程的个数,其值介于m_AvailHigh和m_AvailLow之间。如果线程的个数始终维持在m_AvailLow和m_AvailHigh之间,则线程既不需要创建,也不需要删除,保持平衡状态。因此如何设定m_AvailLow和m_AvailHigh的值,使得线程池最大可能的保持平衡态,是线程池设计必须考虑的问题。
- 线程池在接受到新的任务之后,线程池首先要检查是否有足够的空闲池可用。检查分为三个步骤:
- (1)检查当前处于忙碌状态的线程是否达到了设定的最大值m_MaxNum,如果达到了,表明目前没有空闲线程可用,而且也不能创建新的线程,因此必须等待直到有线程执行完毕返回到空闲队列中。
- (2)如果当前的空闲线程数目小于我们设定的最小的空闲数目m_AvailLow,则我们必须创建新的线程,默认情况下,创建后的线程数目应该为m_InitNum,因此创建的线程数目应该为( 当前空闲线程数与m_InitNum);但是有一种特殊情况必须考虑,就是现有的线程总数加上创建后的线程数可能超过m_MaxNum,因此我们必须对线程的创建区别对待。
- if(GetAllNum()+m_InitNum-m_IdleList.size() < m_MaxNum )
- CreateIdleThread(m_InitNum-m_IdleList.size());
- else
- CreateIdleThread(m_MaxNum-GetAllNum());
- 如果创建后总数不超过m_MaxNum,则创建后的线程为m_InitNum;如果超过了,则只创建( m_MaxNum-当前线程总数)个。
- (3)调用GetIdleThread方法查找空闲线程。如果当前没有空闲线程,则挂起;否则将任务指派给该线程,同时将其移入忙碌队列。
- 当线程执行完毕后,其会调用MoveToIdleList方法移入空闲链表中,其中还调用m_IdleCond.Signal()方法,唤醒GetIdleThread()中可能阻塞的线程。
- //CWorkerThread
- //CWorkerThread是CThread的派生类,是事实上的工作线程。在CThreadPool的构造函数中,我们创建了一定数量的CWorkerThread。一旦这些线程创建完毕,我们将调用Start()启动该线程。Start方法最终会调用Run方法。Run方法是个无限循环的过程。在没有接受到实际的任务的时候,m_Job为NULL,此时线程将调用Wait方法进行等待,从而处于挂起状态。一旦线程池将具体的任务分发给该线程,其将被唤醒,从而通知线程从挂起的地方继续执行。CWorkerThread的完整定义如下:
- class CWorkerThread:public CThread
- {
- private:
- CThreadPool* m_ThreadPool;
- CJob* m_Job;
- void* m_JobData;
- CThreadMutex m_VarMutex;
- bool m_IsEnd;
- protected:
- public:
- CCondition m_JobCond;
- CThreadMutex m_WorkMutex;
- CWorkerThread();
- virtual ~CWorkerThread();
- void Run();
- void SetJob(CJob* job,void* jobdata);
- CJob* GetJob(void){return m_Job;}
- void SetThreadPool(CThreadPool* thrpool);
- CThreadPool* GetThreadPool(void){return m_ThreadPool;}
- };
- CWorkerThread::CWorkerThread()
- {
- m_Job = NULL;
- m_JobData = NULL;
- m_ThreadPool = NULL;
- m_IsEnd = false;
- }
- CWorkerThread::~CWorkerThread()
- {
- if(NULL != m_Job)
- delete m_Job;
- if(m_ThreadPool != NULL)
- delete m_ThreadPool;
- }
- void CWorkerThread::Run()
- {
- SetThreadState(THREAD_RUNNING);
- for(;;)
- {
- while(m_Job == NULL)
- m_JobCond.Wait();
- m_Job->Run(m_JobData);
- m_Job->SetWorkThread(NULL);
- m_Job = NULL;
- m_ThreadPool->MoveToIdleList(this);
- if(m_ThreadPool->m_IdleList.size() > m_ThreadPool->GetAvailHighNum())
- {
- m_ThreadPool->DeleteIdleThread(m_ThreadPool->m_IdleList.size()-m_ThreadPool->GetInitNum());
- }
- m_WorkMutex.Unlock();
- }
- }
- void CWorkerThread::SetJob(CJob* job,void* jobdata)
- {
- m_VarMutex.Lock();
- m_Job = job;
- m_JobData = jobdata;
- job->SetWorkThread(this);
- m_VarMutex.Unlock();
- m_JobCond.Signal();
- }
- void CWorkerThread::SetThreadPool(CThreadPool* thrpool)
- {
- m_VarMutex.Lock();
- m_ThreadPool = thrpool;
- m_VarMutex.Unlock();
- }
- //当线程执行任务之前首先必须判断空闲线程的数目是否低于m_AvailLow,如果低于,则必须创建足够的空闲线程,使其数目达到m_InitNum个,然后将调用MoveToBusyList()移出空闲队列,移入忙碌队列。当任务执行完毕后,其又调用MoveToIdleList()移出忙碌队列,移入空闲队列,等待新的任务。
- //除了Run方法之外,CWorkerThread中另外一个重要的方法就是SetJob,该方法将实际的任务赋值给线程。当没有任何执行任务即m_Job为NULL的时候,线程将调用m_JobCond.Wait进行等待。一旦Job被赋值给线程,其将调用m_JobCond.Signal方法唤醒该线程。由于m_JobCond属于线程内部的变量,每个线程都维持一个m_JobCond,只有得到任务的线程才被唤醒,没有得到任务的将继续等待。无论一个线程何时被唤醒,其都将从等待的地方继续执行m_Job->Run(m_JobData),这是线程执行实际任务的地方。
- //在线程执行给定Job期间,我们必须防止另外一个Job又赋给该线程,因此在赋值之前,通过m_VarMutex进行锁定, Job执行期间,其于的Job将不能关联到该线程;任务执行完毕,我们调用m_VarMutex.Unlock()进行解锁,此时,线程又可以接受新的执行任务。
- //在线程执行任务结束后返回空闲队列前,我们还需要判断当前空闲队列中的线程是否高于m_AvailHigh个。如果超过m_AvailHigh,则必须从其中删除(m_ThreadPool->m_IdleList.size()-m_ThreadPool->GetInitNum())个线程,使线程数目保持在m_InitNum个。
- //CJob
- //CJob类相对简单,其封装了任务的基本的属性和方法,其中最重要的是Run方法,代码如下:
- class CJob
- {
- private:
- int m_JobNo; //The num was assigned to the job
- char* m_JobName; //The job name
- CThread *m_pWorkThread; //The thread associated with the job
- public:
- CJob( void );
- virtual ~CJob();
- int GetJobNo(void) const { return m_JobNo; }
- void SetJobNo(int jobno){ m_JobNo = jobno;}
- char* GetJobName(void) const { return m_JobName; }
- void SetJobName(char* jobname);
- CThread *GetWorkThread(void){ return m_pWorkThread; }
- void SetWorkThread ( CThread *pWorkThread ){
- m_pWorkThread = pWorkThread;
- }
- virtual void Run ( void *ptr ) = 0;
- };
- CJob::CJob(void)
- :m_pWorkThread(NULL)
- ,m_JobNo(0)
- ,m_JobName(NULL)
- {
- }
- CJob::~CJob(){
- if(NULL != m_JobName)
- free(m_JobName);
- }
- void CJob::SetJobName(char* jobname)
- {
- if(NULL !=m_JobName) {
- free(m_JobName);
- m_JobName = NULL;
- }
- if(NULL !=jobname) {
- m_JobName = (char*)malloc(strlen(jobname)+1);
- strcpy(m_JobName,jobname);
- }
- }
- //线程池使用示例
- //至此我们给出了一个简单的与具体任务无关的线程池框架。使用该框架非常的简单,我们所需要的做的就是派生CJob类,将需要完成的任务实现在Run方法中。然后将该Job交由CThreadManage去执行。下面我们给出一个简单的示例程序
- class CXJob:public CJob
- {
- public:
- CXJob(){i=0;}
- ~CXJob(){}
- void Run(void* jobdata) {
- printf("The Job comes from CXJOB/n");
- sleep(2);
- }
- };
- class CYJob:public CJob
- {
- public:
- CYJob(){i=0;}
- ~CYJob(){}
- void Run(void* jobdata) {
- printf("The Job comes from CYJob/n");
- }
- };
- main()
- {
- CThreadManage* manage = new CThreadManage(10);
- for(int i=0;i<40;i++)
- {
- CXJob* job = new CXJob();
- manage->Run(job,NULL);
- }
- sleep(2);
- CYJob* job = new CYJob();
- manage->Run(job,NULL);
- manage->TerminateAll();
- }
- //CXJob和CYJob都是从Job类继承而来,其都实现了Run接口。CXJob只是简单的打印一句"The Job comes from CXJob",CYJob也只打印"The Job comes from CYJob",然后均休眠2秒钟。在主程序中我们初始创建10个工作线程。然后分别执行40次CXJob和一次CYJob。
来源: