亚洲香蕉成人av网站在线观看_欧美精品成人91久久久久久久_久久久久久久久久久亚洲_热久久视久久精品18亚洲精品_国产精自产拍久久久久久_亚洲色图国产精品_91精品国产网站_中文字幕欧美日韩精品_国产精品久久久久久亚洲调教_国产精品久久一区_性夜试看影院91社区_97在线观看视频国产_68精品久久久久久欧美_欧美精品在线观看_国产精品一区二区久久精品_欧美老女人bb

首頁 > 編程 > C++ > 正文

深入解析C++編程中線程池的使用

2020-01-26 14:49:12
字體:
來源:轉載
供稿:網友

為什么需要線程池
目前的大多數網絡服務器,包括Web服務器、Email服務器以及數據庫服務器等都具有一個共同點,就是單位時間內必須處理數目巨大的連接請求,但處理時間卻相對較短。
傳 統多線程方案中我們采用的服務器模型則是一旦接受到請求之后,即創建一個新的線程,由該線程執行任務。任務執行完畢后,線程退出,這就是是“即時創建,即 時銷毀”的策略。盡管與創建進程相比,創建線程的時間已經大大的縮短,但是如果提交給線程的任務是執行時間較短,而且執行次數極其頻繁,那么服務器將處于 不停的創建線程,銷毀線程的狀態。
我們將傳統方案中的線程執行過程分為三個過程:T1、T2、T3。

  1. T1:線程創建時間
  2. T2:線程執行時間,包括線程的同步等時間
  3.  T3:線程銷毀時間

那么我們可以看出,線程本身的開銷所占的比例為(T1+T3) / (T1+T2+T3)。如果線程執行的時間很短的話,這比開銷可能占到20%-50%左右。如果任務執行時間很頻繁的話,這筆開銷將是不可忽略的。

 
除此之外,線程池能夠減少創建的線程個數。通常線程池所允許的并發線程是有上界的,如果同時需要并發的線程數超過上界,那么一部分線程將會等待。而傳統方案中,如果同時請求數目為2000,那么最壞情況下,系統可能需要產生2000個線程。盡管這不是一個很大的數目,但是也有部分機器可能達不到這種要求。
 
因此線程池的出現正是著眼于減少線程池本身帶來的開銷。線程池采用預創建的技術,在應用程序啟動之后,將立即創建一定數量的線程(N1),放入空閑隊列中。這些線程都是處于阻塞(Suspended)狀態,不消耗CPU,但占用較小的內存空間。當任務到來后,緩沖池選擇一個空閑線程,把任務傳入此線程中運行。當N1個線程都在處理任務后,緩沖池自動創建一定數量的新線程,用于處理更多的任務。在任務執行完畢后線程也不退出,而是繼續保持在池中等待下一次的任務。當系統比較空閑時,大部分線程都一直處于暫停狀態,線程池自動銷毀一部分線程,回收系統資源。
 
基于這種預創建技術,線程池將線程創建和銷毀本身所帶來的開銷分攤到了各個具體的任務上,執行次數越多,每個任務所分擔到的線程本身開銷則越小,不過我們另外可能需要考慮進去線程之間同步所帶來的開銷

構建線程池框架
一般線程池都必須具備下面幾個組成部分:

  • 線程池管理器:用于創建并管理線程池
  • 工作線程: 線程池中實際執行的線程
  • 任務接口: 盡管線程池大多數情況下是用來支持網絡服務器,但是我們將線程執行的任務抽象出來,形成任務接口,從而是的線程池與具體的任務無關。
  • 任務隊列: 線程池的概念具體到實現則可能是隊列,鏈表之類的數據結構,其中保存執行線程。

我們實現的通用線程池框架由五個重要部分組成CThreadManage,CThreadPool,CThread,CJob,CWorkerThread,除此之外框架中還包括線程同步使用的類CThreadMutex和CCondition。
 

  • CJob是所有的任務的基類,其提供一個接口Run,所有的任務類都必須從該類繼承,同時實現Run方法。該方法中實現具體的任務邏輯。
  •  
  • CThread是Linux中線程的包裝,其封裝了Linux線程最經常使用的屬性和方法,它也是一個抽象類,是所有線程類的基類,具有一個接口Run。
  •  
  • CWorkerThread是實際被調度和執行的線程類,其從CThread繼承而來,實現了CThread中的Run方法。
  •  
  • CThreadPool是線程池類,其負責保存線程,釋放線程以及調度線程。
  •  
  • CThreadManage是線程池與用戶的直接接口,其屏蔽了內部的具體實現。
  •  
  • CThreadMutex用于線程之間的互斥。
  •  
  • CCondition則是條件變量的封裝,用于線程之間的同步。

CThreadManage直接跟客戶端打交道,其接受需要創建的線程初始個數,并接受客戶端提交的任務。這兒的任務是具體的非抽象的任務。CThreadManage的內部實際上調用的都是CThreadPool的相關操作。CThreadPool創建具體的線程,并把客戶端提交的任務分發給CWorkerThread,CWorkerThread實際執行具體的任務。
 
理解系統組件
下面我們分開來了解系統中的各個組件。
 
CThreadManage
CThreadManage的功能非常簡單,其提供最簡單的方法,其類定義如下:
 

class CThreadManage{private:  CThreadPool*  m_Pool;  int     m_NumOfThread; protected: public:  CThreadManage();  CThreadManage(int num);  virtual ~CThreadManage();   void   SetParallelNum(int num);    void  Run(CJob* job,void* jobdata);  void  TerminateAll(void);};

 
其中m_Pool指向實際的線程池;m_NumOfThread是初始創建時候允許創建的并發的線程個數。另外Run和TerminateAll方法也非常簡單,只是簡單的調用CThreadPool的一些相關方法而已。其具體的實現如下:
 

CThreadManage::CThreadManage(){  m_NumOfThread = 10;  m_Pool = new CThreadPool(m_NumOfThread);} CThreadManage::CThreadManage(int num){  m_NumOfThread = num;  m_Pool = new CThreadPool(m_NumOfThread);} CThreadManage::~CThreadManage(){  if(NULL != m_Pool)  delete m_Pool;} void CThreadManage::SetParallelNum(int num){  m_NumOfThread = num;} void CThreadManage::Run(CJob* job,void* jobdata){  m_Pool->Run(job,jobdata);} void CThreadManage::TerminateAll(void){  m_Pool->TerminateAll();}

 
CThread
CThread 類實現了對Linux中線程操作的封裝,它是所有線程的基類,也是一個抽象類,提供了一個抽象接口Run,所有的CThread都必須實現該Run方法。CThread的定義如下所示:
 

class CThread{private:  int     m_ErrCode;  Semaphore  m_ThreadSemaphore; //the inner semaphore, which is used to realize  unsigned   long m_ThreadID;    bool     m_Detach;    //The thread is detached  bool     m_CreateSuspended; //if suspend after creating  char*    m_ThreadName;  ThreadState m_ThreadState;   //the state of the thread protected:  void   SetErrcode(int errcode){m_ErrCode = errcode;}  static void* ThreadFunction(void*); public:  CThread();  CThread(bool createsuspended,bool detach);  virtual ~CThread();   virtual void Run(void) = 0;  void   SetThreadState(ThreadState state){m_ThreadState = state;}   bool   Terminate(void);  //Terminate the threa  bool   Start(void);    //Start to execute the thread  void   Exit(void);  bool   Wakeup(void);  ThreadState GetThreadState(void){return m_ThreadState;}  int   GetLastError(void){return m_ErrCode;}  void   SetThreadName(char* thrname){strcpy(m_ThreadName,thrname);}  char*  GetThreadName(void){return m_ThreadName;}  int   GetThreadID(void){return m_ThreadID;}  bool   SetPriority(int priority);  int   GetPriority(void);  int   GetConcurrency(void);  void   SetConcurrency(int num);  bool   Detach(void);  bool   Join(void);  bool   Yield(void);  int   Self(void);};

 
線程的狀態可以分為四種,空閑、忙碌、掛起、終止(包括正常退出和非正常退出)。由于目前Linux線程庫不支持掛起操作,因此,我們的此處的掛起操作類似于暫停。如果線程創建后不想立即執行任務,那么我們可以將其“暫?!保绻枰\行,則喚醒。有一點必須注意的是,一旦線程開始執行任務,將不能被掛起,其將一直執行任務至完畢。
 
線程類的相關操作均十分簡單。線程的執行入口是從Start()函數開始,其將調用函數ThreadFunction,ThreadFunction再調用實際的Run函數,執行實際的任務。
 
CThreadPool
CThreadPool是線程的承載容器,一般可以將其實現為堆棧、單向隊列或者雙向隊列。在我們的系統中我們使用STL Vector對線程進行保存。CThreadPool的實現代碼如下:
 

class CThreadPool{friend class CWorkerThread; private:  unsigned int m_MaxNum;  //the max thread num that can create at the same time  unsigned int m_AvailLow; //The min num of idle thread that shoule kept  unsigned int m_AvailHigh;  //The max num of idle thread that kept at the same time  unsigned int m_AvailNum; //the normal thread num of idle num;  unsigned int m_InitNum; //Normal thread num; protected:  CWorkerThread* GetIdleThread(void);   void  AppendToIdleList(CWorkerThread* jobthread);  void  MoveToBusyList(CWorkerThread* idlethread);  void  MoveToIdleList(CWorkerThread* busythread);  void  DeleteIdleThread(int num);  void  CreateIdleThread(int num); public:  CThreadMutex m_BusyMutex;  //when visit busy list,use m_BusyMutex to lock and unlock  CThreadMutex m_IdleMutex;  //when visit idle list,use m_IdleMutex to lock and unlock  CThreadMutex m_JobMutex; //when visit job list,use m_JobMutex to lock and unlock  CThreadMutex m_VarMutex;  CCondition    m_BusyCond; //m_BusyCond is used to sync busy thread list  CCondition    m_IdleCond; //m_IdleCond is used to sync idle thread list  CCondition    m_IdleJobCond; //m_JobCond is used to sync job list  CCondition    m_MaxNumCond;  vector<CWorkerThread*>  m_ThreadList;  vector<CWorkerThread*>  m_BusyList;   //Thread List  vector<CWorkerThread*>  m_IdleList; //Idle List  CThreadPool();  CThreadPool(int initnum);  virtual ~CThreadPool();   void  SetMaxNum(int maxnum){m_MaxNum = maxnum;}  int   GetMaxNum(void){return m_MaxNum;}  void  SetAvailLowNum(int minnum){m_AvailLow = minnum;}  int   GetAvailLowNum(void){return m_AvailLow;}  void  SetAvailHighNum(int highnum){m_AvailHigh = highnum;}  int   GetAvailHighNum(void){return m_AvailHigh;}  int   GetActualAvailNum(void){return m_AvailNum;}  int   GetAllNum(void){return m_ThreadList.size();}  int   GetBusyNum(void){return m_BusyList.size();}  void  SetInitNum(int initnum){m_InitNum = initnum;}  int   GetInitNum(void){return m_InitNum;}  void  TerminateAll(void);  void  Run(CJob* job,void* jobdata);};   CWorkerThread* CThreadPool::GetIdleThread(void) {   while(m_IdleList.size() ==0 )   m_IdleCond.Wait();      m_IdleMutex.Lock();   if(m_IdleList.size() > 0 )   {   CWorkerThread* thr = (CWorkerThread*)m_IdleList.front();   printf("Get Idle thread %d/n",thr->GetThreadID());   m_IdleMutex.Unlock();   return thr;   }   m_IdleMutex.Unlock();   return NULL; }  //create num idle thread and put them to idlelist void CThreadPool::CreateIdleThread(int num) {   for(int i=0;i<num;i++){   CWorkerThread* thr = new CWorkerThread();   thr->SetThreadPool(this);   AppendToIdleList(thr);   m_VarMutex.Lock();   m_AvailNum++;   m_VarMutex.Unlock();   thr->Start();    //begin the thread,the thread wait for job   } }  void CThreadPool::Run(CJob* job,void* jobdata) {   assert(job!=NULL);      //if the busy thread num adds to m_MaxNum,so we should wait   if(GetBusyNum() == m_MaxNum)     m_MaxNumCond.Wait();     if(m_IdleList.size()<m_AvailLow)   {   if(GetAllNum()+m_InitNum-m_IdleList.size() < m_MaxNum )     CreateIdleThread(m_InitNum-m_IdleList.size());   else     CreateIdleThread(m_MaxNum-GetAllNum());   }     CWorkerThread* idlethr = GetIdleThread();   if(idlethr !=NULL)   {   idlethr->m_WorkMutex.Lock();   MoveToBusyList(idlethr);   idlethr->SetThreadPool(this);   job->SetWorkThread(idlethr);   printf("Job is set to thread %d /n",idlethr->GetThreadID());   idlethr->SetJob(job,jobdata);   } }

 
在CThreadPool中存在兩個鏈表,一個是空閑鏈表,一個是忙碌鏈表。Idle鏈表中存放所有的空閑進程,當線程執行任務時候,其狀態變為忙碌狀態,同時從空閑鏈表中刪除,并移至忙碌鏈表中。在CThreadPool的構造函數中,我們將執行下面的代碼:
 

for(int i=0;i<m_InitNum;i++)   {   CWorkerThread* thr = new CWorkerThread();   AppendToIdleList(thr);   thr->SetThreadPool(this);   thr->Start();    //begin the thread,the thread wait for job   }

 
在該代碼中,我們將創建m_InitNum個線程,創建之后即調用AppendToIdleList放入Idle鏈表中,由于目前沒有任務分發給這些線程,因此線程執行Start后將自己掛起。
 
事實上,線程池中容納的線程數目并不是一成不變的,其會根據執行負載進行自動伸縮。為此在CThreadPool中設定四個變量:
 

m_InitNum:處世創建時線程池中的線程的個數。

m_MaxNum:當前線程池中所允許并發存在的線程的最大數目。

m_AvailLow:當前線程池中所允許存在的空閑線程的最小數目,如果空閑數目低于該值,表明負載可能過重,此時有必要增加空閑線程池的數目。實現中我們總是將線程調整為m_InitNum個。

m_AvailHigh:當前線程池中所允許的空閑的線程的最大數目,如果空閑數目高于該值,表明當前負載可能較輕,此時將刪除多余的空閑線程,刪除后調整數也為m_InitNum個。

m_AvailNum:目前線程池中實際存在的線程的個數,其值介于m_AvailHigh和m_AvailLow之間。如果線程的個數始終維持在m_AvailLow和m_AvailHigh之間,則線程既不需要創建,也不需要刪除,保持平衡狀態。因此如何設定m_AvailLow和m_AvailHigh的值,使得線程池最大可能的保持平衡態,是線程池設計必須考慮的問題。
 
線程池在接受到新的任務之后,線程池首先要檢查是否有足夠的空閑池可用。檢查分為三個步驟:
 
(1)檢查當前處于忙碌狀態的線程是否達到了設定的最大值m_MaxNum,如果達到了,表明目前沒有空閑線程可用,而且也不能創建新的線程,因此必須等待直到有線程執行完畢返回到空閑隊列中。
 
(2)如果當前的空閑線程數目小于我們設定的最小的空閑數目m_AvailLow,則我們必須創建新的線程,默認情況下,創建后的線程數目應該為m_InitNum,因此創建的線程數目應該為( 當前空閑線程數與m_InitNum);但是有一種特殊情況必須考慮,就是現有的線程總數加上創建后的線程數可能超過m_MaxNum,因此我們必須對線程的創建區別對待。
 

  if(GetAllNum()+m_InitNum-m_IdleList.size() < m_MaxNum )     CreateIdleThread(m_InitNum-m_IdleList.size());   else     CreateIdleThread(m_MaxNum-GetAllNum());

 
如果創建后總數不超過m_MaxNum,則創建后的線程為m_InitNum;如果超過了,則只創建( m_MaxNum-當前線程總數 )個。
 
(3)調用GetIdleThread方法查找空閑線程。如果當前沒有空閑線程,則掛起;否則將任務指派給該線程,同時將其移入忙碌隊列。
 
當線程執行完畢后,其會調用MoveToIdleList方法移入空閑鏈表中,其中還調用m_IdleCond.Signal()方法,喚醒GetIdleThread()中可能阻塞的線程。
 
CJob
CJob類相對簡單,其封裝了任務的基本的屬性和方法,其中最重要的是Run方法,代碼如下:

class CJob{ private:   int   m_JobNo;    //The num was assigned to the job   char*  m_JobName;   //The job name   CThread *m_pWorkThread;   //The thread associated with the job public:   CJob( void );   virtual ~CJob();     int   GetJobNo(void) const { return m_JobNo; }   void   SetJobNo(int jobno){ m_JobNo = jobno;}   char*  GetJobName(void) const { return m_JobName; }   void   SetJobName(char* jobname);   CThread *GetWorkThread(void){ return m_pWorkThread; }   void   SetWorkThread ( CThread *pWorkThread ){     m_pWorkThread = pWorkThread;   }   virtual void Run ( void *ptr ) = 0; }; 

 
線程池使用示例
至此我們給出了一個簡單的與具體任務無關的線程池框架。使用該框架非常的簡單,我們所需要的做的就是派生CJob類,將需要完成的任務實現在Run方法中。然后將該Job交由CThreadManage去執行。下面我們給出一個簡單的示例程序
 

class CXJob:public CJob { public:   CXJob(){i=0;}   ~CXJob(){}   void Run(void* jobdata)  {     printf("The Job comes from CXJOB/n");     sleep(2);   } };   class CYJob:public CJob { public:   CYJob(){i=0;}   ~CYJob(){}   void Run(void* jobdata)  {     printf("The Job comes from CYJob/n");   } };   main() {   CThreadManage* manage = new CThreadManage(10);   for(int i=0;i<40;i++)   {     CXJob*  job = new CXJob();     manage->Run(job,NULL);   }   sleep(2);   CYJob* job = new CYJob();   manage->Run(job,NULL);   manage->TerminateAll(); } 

CXJob和CYJob都是從Job類繼承而來,其都實現了Run接口。CXJob只是簡單的打印一句”The Job comes from CXJob”,CYJob也只打印”The Job comes from CYJob”,然后均休眠2秒鐘。在主程序中我們初始創建10個工作線程。然后分別執行40次CXJob和一次CYJob。
 

C++ 線程池的封裝實現
為了充分利用多核的優勢,我們利用多線程來進行任務處理,但線程也同樣不能濫用,會帶來一下幾個問題:
1)線程本身存在開銷,系統必須為每個線程分配如棧,TLS(線程局部存儲),寄存器等。
2)線程管理會給系統帶來開銷,context切換同樣會給系統帶來成本。
3)線程本身是可以重用的資源,不需要每次都進行初始化。

所以往往在使用中,我們無需把線程與task任務進行一對一對應,只需要預先初始化有限的線程個數來處理無限的task任務即可,線程池應運而生,原理也就是如此。

20151123145708426.png (508×534)

主要含有三個隊列

  1. 工作隊列
  2. 工作線程隊列
  3. 忙碌線程隊列

工作隊列是一個阻塞隊列,任務(仿函數)任務不算被push進來(notify阻塞獲取的工作線程),工作線程隊列(一直不變)則從該隊列中獲取任務執行(wait獲取,當任務隊列為空時阻塞等待通知),如果獲取到任務,則將線程會進入忙碌線程隊列中,執行任務的仿函數,當工作完成,重新移出工作線程隊列。


定義線程池專屬異常:

struct TC_ThreadPool_Exception : public TC_Exception{  TC_ThreadPool_Exception(const string &buffer) : TC_Exception(buffer){};  TC_ThreadPool_Exception(const string &buffer, int err) : TC_Exception(buffer, err){};  ~TC_ThreadPool_Exception () throw (){};};/** * @brief 用通線程池類, 與tc_functor, tc_functorwrapper配合使用. *  * 使用方式說明: * 1 采用tc_functorwrapper封裝一個調用 * 2 用tc_threadpool對調用進行執行 * 具體示例代碼請參見:test/test_tc_thread_pool.cpp *//**線程池本身繼承自鎖,可以幫助鎖定**/class TC_ThreadPool : public TC_ThreadLock{public:  /**   * @brief 構造函數   *   */  TC_ThreadPool ();  /**   * @brief 析構, 會停止所有線程   */  ~TC_ThreadPool ();  /**   * @brief 初始化.   *    * @param num 工作線程個數   */  void init(size_t num);  /**   * @brief 獲取線程個數.   *   * @return size_t 線程個數   */  size_t getThreadNum()  { Lock sync(* this); return _jobthread. size(); }  /**   * @brief 獲取線程池的任務數( exec添加進去的).   *   * @return size_t 線程池的任務數   */  size_t getJobNum()   { return _jobqueue. size(); }  /**   * @brief 停止所有線程   */  void stop();  /**   * @brief 啟動所有線程   */  void start();  /**   * @brief 啟動所有線程并, 執行初始化對象.   *    * @param ParentFunctor   * @param tf   */  template<class ParentFunctor>  void start(const TC_FunctorWrapper< ParentFunctor> &tf)  {    for(size_t i = 0; i < _jobthread .size(); i++)    {      _startqueue. push_back(new TC_FunctorWrapper<ParentFunctor >(tf));    }    start();  }  /**   * @brief 添加對象到線程池執行,該函數馬上返回,   *   線程池的線程執行對象   */  template<class ParentFunctor>   void exec(const TC_FunctorWrapper< ParentFunctor> &tf)  {    _jobqueue.push_back(new TC_FunctorWrapper<ParentFunctor >(tf));  }  /**   * @brief 等待所有工作全部結束(隊列無任務, 無空閑線程).   *   * @param millsecond 等待的時間( ms), -1:永遠等待   * @return      true, 所有工作都處理完畢   *            false,超時退出   */  bool waitForAllDone(int millsecond = -1);public:  /**   * @brief 線程數據基類,所有線程的私有數據繼承于該類   */  class ThreadData  {  public:    /**     * @brief 構造     */    ThreadData(){};    /**     * @brief 析夠     */    virtual ~ThreadData(){};    /**      * @brief 生成數據.      *       * @ param T     * @return ThreadData*     */    template<typename T>    static T* makeThreadData()    {      return new T;    }  };  /**   * @brief 設置線程數據.   *    * @param p 線程數據   */  static void setThreadData(ThreadData *p);  /**   * @brief 獲取線程數據.   *   * @return ThreadData* 線程數據   */  static ThreadData* getThreadData();  /**   * @brief 設置線程數據, key需要自己維護.   *    * @param pkey 線程私有數據key   * @param p  線程指針   */  static void setThreadData(pthread_key_t pkey, ThreadData *p);  /**   * @brief 獲取線程數據, key需要自己維護.   *    * @param pkey 線程私有數據key   * @return   指向線程的ThreadData*指針   */  static ThreadData* getThreadData(pthread_key_t pkey);protected:  /**   * @brief 釋放資源.   *    * @param p   */  static void destructor(void *p);  /**   * @brief 初始化key   */  class KeyInitialize  {  public:    /**     * @brief 初始化key     */    KeyInitialize()    {      int ret = pthread_key_create(&TC_ThreadPool::g_key, TC_ThreadPool::destructor);      if(ret != 0)      {        throw TC_ThreadPool_Exception("[TC_ThreadPool::KeyInitialize] pthread_key_create error", ret);      }    }    /**     * @brief 釋放key     */    ~KeyInitialize()    {      pthread_key_delete(TC_ThreadPool::g_key);    }  };  /**   * @brief 初始化key的控制   */  static KeyInitialize g_key_initialize;  /**   * @brief 數據key   */  static pthread_key_t g_key;protected:  /**   * @brief 線程池中的工作線程   */  class ThreadWorker : public TC_Thread  {  public:    /**      * @brief 工作線程構造函數.      *      * @ param tpool     */    ThreadWorker(TC_ThreadPool *tpool);    /**     * @brief 通知工作線程結束     */    void terminate();  protected:    /**     * @brief 運行     */    virtual void run();  protected:    /**     * 線程池指針     */    TC_ThreadPool  * _tpool;    /**     * 是否結束線程     */    bool      _bTerminate;  };protected:  /**   * @brief 清除   */  void clear();  /**   * @brief 獲取任務, 如果沒有任務, 則為NULL.   *   * @return TC_FunctorWrapperInterface*   */  TC_FunctorWrapperInterface * get(ThreadWorker *ptw);  /**   * @brief 獲取啟動任務.   *   * @return TC_FunctorWrapperInterface*   */  TC_FunctorWrapperInterface * get();  /**   * @brief 空閑了一個線程.   *    * @param ptw   */  void idle(ThreadWorker *ptw);  /**   * @brief 通知等待在任務隊列上的工作線程醒來   */  void notifyT();  /**   * @brief 是否處理結束.   *   * @return bool   */  bool finish();  /**   * @brief 線程退出時調用   */  void exit();  friend class ThreadWorker;protected:  /**   * 任務隊列   */  TC_ThreadQueue< TC_FunctorWrapperInterface*> _jobqueue;  /**   * 啟動任務   */  TC_ThreadQueue< TC_FunctorWrapperInterface*> _startqueue;  /**   * 工作線程   */  std::vector<ThreadWorker *>         _jobthread;  /**   * 繁忙線程   */  std::set<ThreadWorker *>           _busthread;  /**   * 任務隊列的鎖   */  TC_ThreadLock                _tmutex;   /**   * 是否所有任務都執行完畢   */   bool                    _bAllDone;};

工作線程設計如下:

TC_ThreadPool ::ThreadWorker::ThreadWorker(TC_ThreadPool *tpool): _tpool (tpool), _bTerminate ( false){}void TC_ThreadPool ::ThreadWorker::terminate(){  _bTerminate = true;  _tpool->notifyT();}void TC_ThreadPool ::ThreadWorker::run(){  //調用初始化部分  TC_FunctorWrapperInterface *pst = _tpool->get();  if(pst)  {    try    {      (*pst)();    }    catch ( ... )    {    }    delete pst;    pst = NULL;  }  //調用處理部分  while (! _bTerminate)  {    TC_FunctorWrapperInterface *pfw = _tpool->get( this);    if(pfw != NULL)    {      auto_ptr< TC_FunctorWrapperInterface> apfw(pfw);      try      {        (*pfw)();      }      catch ( ... )      {      }      _tpool->idle( this);    }  }  //結束  _tpool->exit();}每個工作線程在剛開始時都會執行一下初始化操作,并進入一個無限循環的部分//調用處理部分  while (! _bTerminate)  {    TC_FunctorWrapperInterface *pfw = _tpool->get( this);    if(pfw != NULL)    {      auto_ptr< TC_FunctorWrapperInterface> apfw(pfw);      try      {        (*pfw)();      }      catch ( ... )      {      }      _tpool->idle( this);    }  }

該工作主要是無限的從線程池的工作隊列中獲取任務并執行,如果成功獲取任務,則會將線程移進忙碌隊列:

TC_FunctorWrapperInterface *TC_ThreadPool:: get(ThreadWorker *ptw){  TC_FunctorWrapperInterface *pFunctorWrapper = NULL;  if(! _jobqueue. pop_front(pFunctorWrapper, 1000))  {    return NULL;  }   {      Lock sync( _tmutex);     _busthread. insert(ptw);  }  return pFunctorWrapper;}

執行完,移回工作線程隊列:_tpool->idle( this);

void TC_ThreadPool:: idle(ThreadWorker *ptw){  Lock sync( _tmutex);  _busthread. erase(ptw);  //無繁忙線程, 通知等待在線程池結束的線程醒過來  if( _busthread. empty())  {    _bAllDone = true;    _tmutex.notifyAll();  }}


此處jobThread隊列初始化后不會改變(因為沒有實現自增長功能),所以非線程安全的vector隊列即可,busthread的忙碌線程隊列會被移進移出,但是操作會自帶Lock sync( _tmutex),該互斥量是線程池本身繼承的,所以是共有的,也無需另外使用線程安全的TC_ThreadQueue,使用vector即可。

TC_ThreadPool:: idle中的

  if( _busthread. empty())  {    _bAllDone = true;    _tmutex.notifyAll();  }

主要用于當線程池工作起來后的waitForAllDone方法:

bool TC_ThreadPool:: waitForAllDone( int millsecond){  Lock sync( _tmutex);start1:  //任務隊列和繁忙線程都是空的  if (finish())  {    return true;  }  //永遠等待  if(millsecond < 0)  {    _tmutex.timedWait(1000);    goto start1;  }  int64_t iNow = TC_Common:: now2ms();  int m    = millsecond;start2:  bool b = _tmutex.timedWait(millsecond);  //完成處理了  if(finish())  {    return true;  }  if(!b)  {    return false;  }  millsecond = max((int64_t )0, m - (TC_Common ::now2ms() - iNow));  goto start2;  return false;}_tmutex.timedWait(millsecond)方法喚醒。反復判斷是否所有的工作是否完成:bool TC_ThreadPool:: finish(){  return _startqueue. empty() && _jobqueue .empty() && _busthread. empty() && _bAllDone;}


整體cpp實現如下:

TC_ThreadPool ::KeyInitialize TC_ThreadPool::g_key_initialize;pthread_key_t TC_ThreadPool::g_key ;void TC_ThreadPool::destructor( void *p){  ThreadData *ttd = ( ThreadData*)p;  if(ttd)  {    delete ttd;  }}void TC_ThreadPool::exit(){  TC_ThreadPool:: ThreadData *p = getThreadData();  if(p)  {    delete p;    int ret = pthread_setspecific( g_key, NULL );    if(ret != 0)    {      throw TC_ThreadPool_Exception ("[TC_ThreadPool::setThreadData] pthread_setspecific error", ret);    }  }  _jobqueue. clear();}void TC_ThreadPool::setThreadData( TC_ThreadPool:: ThreadData *p){  TC_ThreadPool:: ThreadData *pOld = getThreadData();  if(pOld != NULL && pOld != p)  {    delete pOld;  }  int ret = pthread_setspecific( g_key, ( void *)p);  if(ret != 0)  {    throw TC_ThreadPool_Exception ("[TC_ThreadPool::setThreadData] pthread_setspecific error", ret);  }}TC_ThreadPool ::ThreadData * TC_ThreadPool::getThreadData (){  return ( ThreadData *) pthread_getspecific( g_key);}void TC_ThreadPool::setThreadData( pthread_key_t pkey, ThreadData *p){  TC_ThreadPool:: ThreadData *pOld = getThreadData(pkey);  if(pOld != NULL && pOld != p)  {    delete pOld;  }  int ret = pthread_setspecific(pkey, ( void *)p);  if(ret != 0)  {    throw TC_ThreadPool_Exception ("[TC_ThreadPool::setThreadData] pthread_setspecific error", ret);  }}TC_ThreadPool ::ThreadData * TC_ThreadPool::getThreadData( pthread_key_t pkey){  return ( ThreadData *) pthread_getspecific(pkey);}TC_ThreadPool::TC_ThreadPool(): _bAllDone ( true){}TC_ThreadPool::~TC_ThreadPool(){  stop();  clear();}void TC_ThreadPool::clear(){  std::vector< ThreadWorker *>::iterator it = _jobthread. begin();  while(it != _jobthread. end())  {    delete (*it);    ++it;  }  _jobthread. clear();  _busthread. clear();}void TC_ThreadPool::init( size_t num){  stop();  Lock sync(* this);  clear();  for( size_t i = 0; i < num; i++)  {    _jobthread. push_back( new ThreadWorker( this));  }}void TC_ThreadPool::stop(){  Lock sync(* this);  std::vector< ThreadWorker *>::iterator it = _jobthread. begin();  while(it != _jobthread. end())  {    if ((*it)-> isAlive())    {      (*it)-> terminate();      (*it)-> getThreadControl().join ();    }    ++it;  }  _bAllDone = true;}void TC_ThreadPool::start(){  Lock sync(* this);  std::vector< ThreadWorker *>::iterator it = _jobthread. begin();  while(it != _jobthread. end())  {    (*it)-> start();    ++it;  }  _bAllDone = false;}bool TC_ThreadPool:: finish(){  return _startqueue. empty() && _jobqueue .empty() && _busthread. empty() && _bAllDone;}bool TC_ThreadPool::waitForAllDone( int millsecond){  Lock sync( _tmutex);start1:  //任務隊列和繁忙線程都是空的  if (finish ())  {    return true;  }  //永遠等待  if(millsecond < 0)  {    _tmutex.timedWait(1000);    goto start1;  }  int64_t iNow = TC_Common:: now2ms();  int m    = millsecond;start2:  bool b = _tmutex.timedWait(millsecond);  //完成處理了  if(finish ())  {    return true;  }  if(!b)  {    return false;  }  millsecond = max((int64_t )0, m - (TC_Common ::now2ms() - iNow));  goto start2;  return false;}TC_FunctorWrapperInterface *TC_ThreadPool::get( ThreadWorker *ptw){  TC_FunctorWrapperInterface *pFunctorWrapper = NULL;  if(! _jobqueue. pop_front(pFunctorWrapper, 1000))  {    return NULL;  }   {      Lock sync( _tmutex);     _busthread. insert(ptw);  }  return pFunctorWrapper;}TC_FunctorWrapperInterface *TC_ThreadPool::get(){  TC_FunctorWrapperInterface *pFunctorWrapper = NULL;  if(! _startqueue. pop_front(pFunctorWrapper))  {    return NULL;  }  return pFunctorWrapper;}void TC_ThreadPool::idle( ThreadWorker *ptw){  Lock sync( _tmutex);  _busthread. erase(ptw);  //無繁忙線程, 通知等待在線程池結束的線程醒過來  if( _busthread. empty())  {      _bAllDone = true;    _tmutex.notifyAll();  }}void TC_ThreadPool::notifyT(){  _jobqueue. notifyT();}

線程池使用后記
線程池適合場合
事 實上,線程池并不是萬能的。它有其特定的使用場合。線程池致力于減少線程本身的開銷對應用所產生的影響,這是有前提的,前提就是線程本身開銷與線程執行任 務相比不可忽略。如果線程本身的開銷相對于線程任務執行開銷而言是可以忽略不計的,那么此時線程池所帶來的好處是不明顯的,比如對于FTP服務器以及Telnet服務器,通常傳送文件的時間較長,開銷較大,那么此時,我們采用線程池未必是理想的方法,我們可以選擇“即時創建,即時銷毀”的策略。
 總之線程池通常適合下面的幾個場合:
 
(1)  單位時間內處理任務頻繁而且任務處理時間短
 
(2)  對實時性要求較高。如果接受到任務后在創建線程,可能滿足不了實時要求,因此必須采用線程池進行預創建。
 
(3)  必須經常面對高突發性事件,比如Web服務器,如果有足球轉播,則服務器將產生巨大的沖擊。此時如果采取傳統方法,則必須不停的大量產生線程,銷毀線程。此時采用動態線程池可以避免這種情況的發生。

發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
亚洲香蕉成人av网站在线观看_欧美精品成人91久久久久久久_久久久久久久久久久亚洲_热久久视久久精品18亚洲精品_国产精自产拍久久久久久_亚洲色图国产精品_91精品国产网站_中文字幕欧美日韩精品_国产精品久久久久久亚洲调教_国产精品久久一区_性夜试看影院91社区_97在线观看视频国产_68精品久久久久久欧美_欧美精品在线观看_国产精品一区二区久久精品_欧美老女人bb
日韩av第一页| 亚洲自拍欧美色图| 久久97久久97精品免视看| 国产精品揄拍一区二区| 亚洲女人被黑人巨大进入al| 国产不卡精品视男人的天堂| 亚洲男人的天堂网站| 亚洲色图综合网| 久久成年人视频| 在线观看欧美成人| 欧美高清激情视频| 国产精品成人v| 国产精品久久久久免费a∨大胸| 欧美自拍大量在线观看| 亚洲高清久久网| 国模视频一区二区三区| 精品福利一区二区| 91精品视频在线免费观看| 91久久国产精品91久久性色| 成人黄色免费片| 欧美日韩福利在线观看| 亚洲视屏在线播放| 国模视频一区二区| 亚洲国产精品一区二区三区| 亚洲精品国产拍免费91在线| 日本一本a高清免费不卡| 精品久久久久久亚洲国产300| 欧美大片在线影院| 三级精品视频久久久久| 亚洲国产黄色片| 欧美人在线观看| 亚洲精品国精品久久99热一| 久久久久久久久国产精品| 午夜精品蜜臀一区二区三区免费| 中文字幕日韩欧美在线| 久久久久久国产精品| 亚洲人成电影网站色xx| 国产亚洲欧美日韩精品| 久久久久久尹人网香蕉| 78m国产成人精品视频| 亚洲伊人一本大道中文字幕| 国产精品pans私拍| 久久中文字幕在线| 91香蕉电影院| 亚洲国产精品国自产拍av秋霞| 亚洲一二三在线| 美女啪啪无遮挡免费久久网站| 亚洲精品久久7777777| 成人免费高清完整版在线观看| 久久久久久一区二区三区| 欧美性猛交xxxx免费看久久久| 丝袜亚洲欧美日韩综合| 亚洲性69xxxbbb| 亚洲国模精品一区| 久久成人亚洲精品| 久久躁狠狠躁夜夜爽| 国产精品九九九| 秋霞成人午夜鲁丝一区二区三区| 精品美女久久久久久免费| 久久久久国产视频| 亚洲人精选亚洲人成在线| 久久免费视频这里只有精品| 91在线国产电影| 日韩av一区二区在线观看| 理论片在线不卡免费观看| 亚洲精品中文字幕有码专区| 欧美精品激情blacked18| 国产福利精品视频| 亚洲乱亚洲乱妇无码| 26uuu另类亚洲欧美日本一| 91国产视频在线| 国产香蕉精品视频一区二区三区| 福利一区福利二区微拍刺激| 欧美成人午夜激情视频| 亚洲成年网站在线观看| 国产精品成人v| 97视频在线观看免费| 精品在线小视频| 欧美日韩一区二区免费在线观看| 欧美高清无遮挡| 久久久久久91香蕉国产| www.99久久热国产日韩欧美.com| 97免费视频在线播放| 欧美日韩加勒比精品一区| 亚洲自拍偷拍视频| 高清视频欧美一级| 81精品国产乱码久久久久久| 精品国产自在精品国产浪潮| 日韩毛片在线看| 日本久久久久久久久| 日韩精品视频免费| 狠狠爱在线视频一区| 欧美一级淫片播放口| 成人在线视频网站| 日本国产精品视频| 91色中文字幕| 九九热这里只有在线精品视| 国产日韩欧美电影在线观看| 欧美激情视频给我| 2018中文字幕一区二区三区| 亚洲精品97久久| 日韩精品视频在线播放| 亚洲激情视频网站| 国产亚洲a∨片在线观看| 日韩va亚洲va欧洲va国产| 日韩av综合中文字幕| 欧美另类老肥妇| 欧美成人免费一级人片100| 欧美一二三视频| 欧美乱大交xxxxx| 国产精品入口免费视频一| 伊人激情综合网| 亚洲aⅴ男人的天堂在线观看| 欧美日韩在线影院| 久久99国产精品自在自在app| 精品久久久久久国产91| 欧美精品18videosex性欧美| 成人做爰www免费看视频网站| 欧美做受高潮电影o| 成人av在线亚洲| 久久全国免费视频| 国产精品久久久久久一区二区| 欧美成人sm免费视频| 欧美日韩国产中文精品字幕自在自线| 超在线视频97| 日韩成人中文电影| 揄拍成人国产精品视频| 久久精品国产亚洲7777| 欧美激情中文字幕在线| 人人做人人澡人人爽欧美| 久久婷婷国产麻豆91天堂| 亚洲成色777777女色窝| 91超碰中文字幕久久精品| 国产精品久久国产精品99gif| 日韩一级裸体免费视频| 日本精品久久久久久久| 成人精品视频99在线观看免费| 日韩精品日韩在线观看| 97视频在线观看网址| 日韩美女写真福利在线观看| 欧美肥老妇视频| 日韩欧美在线国产| 国产一区二区三区在线观看网站| 国产精品99久久99久久久二8| 亚洲激情视频网站| 欧美激情手机在线视频| 欧美精品videofree1080p| 亚洲精品一区二区三区婷婷月| 欧美精品18videos性欧| 91色p视频在线| 成人妇女淫片aaaa视频| 狠狠操狠狠色综合网| 久久精品国产一区二区三区| 91亚洲人电影| 国产一区二中文字幕在线看| 亚洲精品456在线播放狼人| 97香蕉超级碰碰久久免费的优势| 午夜精品蜜臀一区二区三区免费| 91av中文字幕| 久久天天躁狠狠躁夜夜躁| 成人亲热视频网站| 一本色道久久综合亚洲精品小说| 青青草原成人在线视频| 欧美成人免费全部观看天天性色|