亚洲香蕉成人av网站在线观看_欧美精品成人91久久久久久久_久久久久久久久久久亚洲_热久久视久久精品18亚洲精品_国产精自产拍久久久久久_亚洲色图国产精品_91精品国产网站_中文字幕欧美日韩精品_国产精品久久久久久亚洲调教_国产精品久久一区_性夜试看影院91社区_97在线观看视频国产_68精品久久久久久欧美_欧美精品在线观看_国产精品一区二区久久精品_欧美老女人bb

首頁 > 編程 > C++ > 正文

深入解析C++編程中線程池的使用

2020-05-23 14:12:44
字體:
來源:轉載
供稿:網友

這篇文章主要介紹了深入解析C++編程中線程池的使用,包括線程池的封裝實現等內容,需要的朋友可以參考下

為什么需要線程池

目前的大多數網絡服務器,包括Web服務器、Email服務器以及數據庫服務器等都具有一個共同點,就是單位時間內必須處理數目巨大的連接請求,但處理時間卻相對較短。

傳 統多線程方案中我們采用的服務器模型則是一旦接受到請求之后,即創建一個新的線程,由該線程執行任務。任務執行完畢后,線程退出,這就是是“即時創建,即 時銷毀”的策略。盡管與創建進程相比,創建線程的時間已經大大的縮短,但是如果提交給線程的任務是執行時間較短,而且執行次數極其頻繁,那么服務器將處于 不停的創建線程,銷毀線程的狀態。

我們將傳統方案中的線程執行過程分為三個過程:T1、T2、T3。

T1:線程創建時間

T2:線程執行時間,包括線程的同步等時間

T3:線程銷毀時間

那么我們可以看出,線程本身的開銷所占的比例為(T1+T3) / (T1+T2+T3)。如果線程執行的時間很短的話,這比開銷可能占到20%-50%左右。如果任務執行時間很頻繁的話,這筆開銷將是不可忽略的。

除此之外,線程池能夠減少創建的線程個數。通常線程池所允許的并發線程是有上界的,如果同時需要并發的線程數超過上界,那么一部分線程將會等待。而傳統方案中,如果同時請求數目為2000,那么最壞情況下,系統可能需要產生2000個線程。盡管這不是一個很大的數目,但是也有部分機器可能達不到這種要求。

因此線程池的出現正是著眼于減少線程池本身帶來的開銷。線程池采用預創建的技術,在應用程序啟動之后,將立即創建一定數量的線程(N1),放入空閑隊列中。這些線程都是處于阻塞(Suspended)狀態,不消耗CPU,但占用較小的內存空間。當任務到來后,緩沖池選擇一個空閑線程,把任務傳入此線程中運行。當N1個線程都在處理任務后,緩沖池自動創建一定數量的新線程,用于處理更多的任務。在任務執行完畢后線程也不退出,而是繼續保持在池中等待下一次的任務。當系統比較空閑時,大部分線程都一直處于暫停狀態,線程池自動銷毀一部分線程,回收系統資源。

基于這種預創建技術,線程池將線程創建和銷毀本身所帶來的開銷分攤到了各個具體的任務上,執行次數越多,每個任務所分擔到的線程本身開銷則越小,不過我們另外可能需要考慮進去線程之間同步所帶來的開銷

構建線程池框架

一般線程池都必須具備下面幾個組成部分:

線程池管理器:用于創建并管理線程池

工作線程: 線程池中實際執行的線程

任務接口: 盡管線程池大多數情況下是用來支持網絡服務器,但是我們將線程執行的任務抽象出來,形成任務接口,從而是的線程池與具體的任務無關。

任務隊列: 線程池的概念具體到實現則可能是隊列,鏈表之類的數據結構,其中保存執行線程。

我們實現的通用線程池框架由五個重要部分組成CThreadManage,CThreadPool,CThread,CJob,CWorkerThread,除此之外框架中還包括線程同步使用的類CThreadMutex和CCondition。

CJob是所有的任務的基類,其提供一個接口Run,所有的任務類都必須從該類繼承,同時實現Run方法。該方法中實現具體的任務邏輯。

CThread是Linux中線程的包裝,其封裝了Linux線程最經常使用的屬性和方法,它也是一個抽象類,是所有線程類的基類,具有一個接口Run。

CWorkerThread是實際被調度和執行的線程類,其從CThread繼承而來,實現了CThread中的Run方法。

CThreadPool是線程池類,其負責保存線程,釋放線程以及調度線程。

CThreadManage是線程池與用戶的直接接口,其屏蔽了內部的具體實現。

CThreadMutex用于線程之間的互斥。

CCondition則是條件變量的封裝,用于線程之間的同步。

CThreadManage直接跟客戶端打交道,其接受需要創建的線程初始個數,并接受客戶端提交的任務。這兒的任務是具體的非抽象的任務。CThreadManage的內部實際上調用的都是CThreadPool的相關操作。CThreadPool創建具體的線程,并把客戶端提交的任務分發給CWorkerThread,CWorkerThread實際執行具體的任務。

理解系統組件

下面我們分開來了解系統中的各個組件。

CThreadManage

CThreadManage的功能非常簡單,其提供最簡單的方法,其類定義如下:

 

 
  1. class CThreadManage 
  2. private
  3. CThreadPool* m_Pool; 
  4. int m_NumOfThread; 
  5.  
  6. protected
  7.  
  8. public
  9. CThreadManage(); 
  10. CThreadManage(int num); 
  11. virtual ~CThreadManage(); 
  12.  
  13. void SetParallelNum(int num);  
  14. void Run(CJob* job,void* jobdata); 
  15. void TerminateAll(void); 
  16. }; 

其中m_Pool指向實際的線程池;m_NumOfThread是初始創建時候允許創建的并發的線程個數。另外Run和TerminateAll方法也非常簡單,只是簡單的調用CThreadPool的一些相關方法而已。其具體的實現如下:

 

 
  1. CThreadManage::CThreadManage() 
  2. m_NumOfThread = 10; 
  3. m_Pool = new CThreadPool(m_NumOfThread); 
  4.  
  5. CThreadManage::CThreadManage(int num) 
  6. m_NumOfThread = num; 
  7. m_Pool = new CThreadPool(m_NumOfThread); 
  8.  
  9. CThreadManage::~CThreadManage() 
  10. if(NULL != m_Pool) 
  11. delete m_Pool; 
  12.  
  13. void CThreadManage::SetParallelNum(int num) 
  14. m_NumOfThread = num; 
  15.  
  16. void CThreadManage::Run(CJob* job,void* jobdata) 
  17. m_Pool->Run(job,jobdata); 
  18.  
  19. void CThreadManage::TerminateAll(void
  20. m_Pool->TerminateAll(); 

CThread

CThread 類實現了對Linux中線程操作的封裝,它是所有線程的基類,也是一個抽象類,提供了一個抽象接口Run,所有的CThread都必須實現該Run方法。CThread的定義如下所示:

 

 
  1. class CThread 
  2. private
  3. int m_ErrCode; 
  4. Semaphore m_ThreadSemaphore; //the inner semaphore, which is used to realize 
  5. unsigned long m_ThreadID;  
  6. bool m_Detach; //The thread is detached 
  7. bool m_CreateSuspended; //if suspend after creating 
  8. char* m_ThreadName; 
  9. ThreadState m_ThreadState; //the state of the thread 
  10.  
  11. protected
  12. void SetErrcode(int errcode){m_ErrCode = errcode;} 
  13. static void* ThreadFunction(void*); 
  14.  
  15. public
  16. CThread(); 
  17. CThread(bool createsuspended,bool detach); 
  18. virtual ~CThread(); 
  19.  
  20. virtual void Run(void) = 0; 
  21. void SetThreadState(ThreadState state){m_ThreadState = state;} 
  22. bool Terminate(void); //Terminate the threa 
  23. bool Start(void); //Start to execute the thread 
  24. void Exit(void); 
  25. bool Wakeup(void); 
  26. ThreadState GetThreadState(void){return m_ThreadState;} 
  27. int GetLastError(void){return m_ErrCode;} 
  28. void SetThreadName(char* thrname){strcpy(m_ThreadName,thrname);} 
  29. char* GetThreadName(void){return m_ThreadName;} 
  30. int GetThreadID(void){return m_ThreadID;} 
  31. bool SetPriority(int priority); 
  32. int GetPriority(void); 
  33. int GetConcurrency(void); 
  34. void SetConcurrency(int num); 
  35. bool Detach(void); 
  36. bool Join(void); 
  37. bool Yield(void); 
  38. int Self(void); 
  39. }; 

線程的狀態可以分為四種,空閑、忙碌、掛起、終止(包括正常退出和非正常退出)。由于目前Linux線程庫不支持掛起操作,因此,我們的此處的掛起操作類似于暫停。如果線程創建后不想立即執行任務,那么我們可以將其“暫停”,如果需要運行,則喚醒。有一點必須注意的是,一旦線程開始執行任務,將不能被掛起,其將一直執行任務至完畢。

線程類的相關操作均十分簡單。線程的執行入口是從Start()函數開始,其將調用函數ThreadFunction,ThreadFunction再調用實際的Run函數,執行實際的任務。

CThreadPool

CThreadPool是線程的承載容器,一般可以將其實現為堆棧、單向隊列或者雙向隊列。在我們的系統中我們使用STL Vector對線程進行保存。CThreadPool的實現代碼如下:

 

 
  1. class CThreadPool 
  2. friend class CWorkerThread; 
  3.  
  4. private
  5. unsigned int m_MaxNum; //the max thread num that can create at the same time 
  6. unsigned int m_AvailLow; //The min num of idle thread that shoule kept 
  7. unsigned int m_AvailHigh; //The max num of idle thread that kept at the same time 
  8. unsigned int m_AvailNum; //the normal thread num of idle num; 
  9. unsigned int m_InitNum; //Normal thread num; 
  10.  
  11. protected
  12. CWorkerThread* GetIdleThread(void);  
  13. void AppendToIdleList(CWorkerThread* jobthread); 
  14. void MoveToBusyList(CWorkerThread* idlethread); 
  15. void MoveToIdleList(CWorkerThread* busythread); 
  16. void DeleteIdleThread(int num); 
  17. void CreateIdleThread(int num); 
  18.  
  19. public
  20. CThreadMutex m_BusyMutex; //when visit busy list,use m_BusyMutex to lock and unlock 
  21. CThreadMutex m_IdleMutex; //when visit idle list,use m_IdleMutex to lock and unlock 
  22. CThreadMutex m_JobMutex; //when visit job list,use m_JobMutex to lock and unlock 
  23. CThreadMutex m_VarMutex; 
  24. CCondition m_BusyCond; //m_BusyCond is used to sync busy thread list 
  25. CCondition m_IdleCond; //m_IdleCond is used to sync idle thread list 
  26. CCondition m_IdleJobCond; //m_JobCond is used to sync job list 
  27. CCondition m_MaxNumCond; 
  28.  
  29. vector<CWorkerThread*> m_ThreadList; 
  30. vector<CWorkerThread*> m_BusyList; //Thread List 
  31. vector<CWorkerThread*> m_IdleList; //Idle List 
  32.  
  33. CThreadPool(); 
  34. CThreadPool(int initnum); 
  35. virtual ~CThreadPool();  
  36.  
  37. void SetMaxNum(int maxnum){m_MaxNum = maxnum;} 
  38. int GetMaxNum(void){return m_MaxNum;} 
  39. void SetAvailLowNum(int minnum){m_AvailLow = minnum;} 
  40. int GetAvailLowNum(void){return m_AvailLow;} 
  41. void SetAvailHighNum(int highnum){m_AvailHigh = highnum;} 
  42. int GetAvailHighNum(void){return m_AvailHigh;} 
  43. int GetActualAvailNum(void){return m_AvailNum;} 
  44. int GetAllNum(void){return m_ThreadList.size();} 
  45. int GetBusyNum(void){return m_BusyList.size();} 
  46. void SetInitNum(int initnum){m_InitNum = initnum;} 
  47. int GetInitNum(void){return m_InitNum;} 
  48. void TerminateAll(void); 
  49. void Run(CJob* job,void* jobdata); 
  50. }; 
  51.  
  52.  
  53.  
  54. CWorkerThread* CThreadPool::GetIdleThread(void
  55.  
  56.  
  57. while(m_IdleList.size() ==0 ) 
  58.  
  59. m_IdleCond.Wait(); 
  60.  
  61.  
  62.  
  63. m_IdleMutex.Lock(); 
  64.  
  65. if(m_IdleList.size() > 0 ) 
  66.  
  67.  
  68. CWorkerThread* thr = (CWorkerThread*)m_IdleList.front(); 
  69.  
  70. printf("Get Idle thread %d/n",thr->GetThreadID()); 
  71.  
  72. m_IdleMutex.Unlock(); 
  73.  
  74. return thr; 
  75.  
  76.  
  77. m_IdleMutex.Unlock(); 
  78.  
  79. return NULL;  
  80.  
  81.  
  82. //create num idle thread and put them to idlelist 
  83.  
  84. void CThreadPool::CreateIdleThread(int num) 
  85.  
  86.  
  87. for(int i=0;i<num;i++){ 
  88.  
  89. CWorkerThread* thr = new CWorkerThread(); 
  90.  
  91. thr->SetThreadPool(this); 
  92.  
  93. AppendToIdleList(thr); 
  94.  
  95. m_VarMutex.Lock(); 
  96.  
  97. m_AvailNum++; 
  98.  
  99. m_VarMutex.Unlock(); 
  100.  
  101. thr->Start(); //begin the thread,the thread wait for job 
  102.  
  103.  
  104.  
  105.  
  106.  
  107. void CThreadPool::Run(CJob* job,void* jobdata) 
  108.  
  109.  
  110. assert(job!=NULL); 
  111.  
  112.  
  113.  
  114. //if the busy thread num adds to m_MaxNum,so we should wait 
  115.  
  116. if(GetBusyNum() == m_MaxNum) 
  117.  
  118. m_MaxNumCond.Wait(); 
  119.  
  120.  
  121.  
  122. if(m_IdleList.size()<m_AvailLow) 
  123.  
  124.  
  125. if(GetAllNum()+m_InitNum-m_IdleList.size() < m_MaxNum ) 
  126.  
  127. CreateIdleThread(m_InitNum-m_IdleList.size()); 
  128.  
  129. else 
  130.  
  131. CreateIdleThread(m_MaxNum-GetAllNum()); 
  132.  
  133.  
  134.  
  135.  
  136. CWorkerThread* idlethr = GetIdleThread(); 
  137.  
  138. if(idlethr !=NULL) 
  139.  
  140.  
  141. idlethr->m_WorkMutex.Lock(); 
  142.  
  143. MoveToBusyList(idlethr); 
  144.  
  145. idlethr->SetThreadPool(this); 
  146.  
  147. job->SetWorkThread(idlethr); 
  148.  
  149. printf("Job is set to thread %d /n",idlethr->GetThreadID()); 
  150.  
  151. idlethr->SetJob(job,jobdata); 
  152.  
  153.  

在CThreadPool中存在兩個鏈表,一個是空閑鏈表,一個是忙碌鏈表。Idle鏈表中存放所有的空閑進程,當線程執行任務時候,其狀態變為忙碌狀態,同時從空閑鏈表中刪除,并移至忙碌鏈表中。在CThreadPool的構造函數中,我們將執行下面的代碼:

 

 
  1. for(int i=0;i<m_InitNum;i++) 
  2.  
  3.  
  4. CWorkerThread* thr = new CWorkerThread(); 
  5.  
  6. AppendToIdleList(thr); 
  7.  
  8. thr->SetThreadPool(this); 
  9.  
  10. thr->Start(); //begin the thread,the thread wait for job 
  11.  

在該代碼中,我們將創建m_InitNum個線程,創建之后即調用AppendToIdleList放入Idle鏈表中,由于目前沒有任務分發給這些線程,因此線程執行Start后將自己掛起。

事實上,線程池中容納的線程數目并不是一成不變的,其會根據執行負載進行自動伸縮。為此在CThreadPool中設定四個變量:

m_InitNum:處世創建時線程池中的線程的個數。

m_MaxNum:當前線程池中所允許并發存在的線程的最大數目。

m_AvailLow:當前線程池中所允許存在的空閑線程的最小數目,如果空閑數目低于該值,表明負載可能過重,此時有必要增加空閑線程池的數目。實現中我們總是將線程調整為m_InitNum個。

m_AvailHigh:當前線程池中所允許的空閑的線程的最大數目,如果空閑數目高于該值,表明當前負載可能較輕,此時將刪除多余的空閑線程,刪除后調整數也為m_InitNum個。

m_AvailNum:目前線程池中實際存在的線程的個數,其值介于m_AvailHigh和m_AvailLow之間。如果線程的個數始終維持在m_AvailLow和m_AvailHigh之間,則線程既不需要創建,也不需要刪除,保持平衡狀態。因此如何設定m_AvailLow和m_AvailHigh的值,使得線程池最大可能的保持平衡態,是線程池設計必須考慮的問題。

線程池在接受到新的任務之后,線程池首先要檢查是否有足夠的空閑池可用。檢查分為三個步驟:

(1)檢查當前處于忙碌狀態的線程是否達到了設定的最大值m_MaxNum,如果達到了,表明目前沒有空閑線程可用,而且也不能創建新的線程,因此必須等待直到有線程執行完畢返回到空閑隊列中。

(2)如果當前的空閑線程數目小于我們設定的最小的空閑數目m_AvailLow,則我們必須創建新的線程,默認情況下,創建后的線程數目應該為m_InitNum,因此創建的線程數目應該為( 當前空閑線程數與m_InitNum);但是有一種特殊情況必須考慮,就是現有的線程總數加上創建后的線程數可能超過m_MaxNum,因此我們必須對線程的創建區別對待。

 

 
  1. if(GetAllNum()+m_InitNum-m_IdleList.size() < m_MaxNum ) 
  2.  
  3. CreateIdleThread(m_InitNum-m_IdleList.size()); 
  4.  
  5. else 
  6.  
  7. CreateIdleThread(m_MaxNum-GetAllNum()); 

如果創建后總數不超過m_MaxNum,則創建后的線程為m_InitNum;如果超過了,則只創建( m_MaxNum-當前線程總數 )個。

(3)調用GetIdleThread方法查找空閑線程。如果當前沒有空閑線程,則掛起;否則將任務指派給該線程,同時將其移入忙碌隊列。

當線程執行完畢后,其會調用MoveToIdleList方法移入空閑鏈表中,其中還調用m_IdleCond.Signal()方法,喚醒GetIdleThread()中可能阻塞的線程。

CJob

CJob類相對簡單,其封裝了任務的基本的屬性和方法,其中最重要的是Run方法,代碼如下:

 

 
  1. class CJob 
  2.  
  3. private
  4.  
  5. int m_JobNo; //The num was assigned to the job 
  6.  
  7. char* m_JobName; //The job name 
  8.  
  9. CThread *m_pWorkThread; //The thread associated with the job 
  10.  
  11. public
  12.  
  13. CJob( void ); 
  14.  
  15. virtual ~CJob();  
  16.  
  17. int GetJobNo(voidconst { return m_JobNo; } 
  18.  
  19. void SetJobNo(int jobno){ m_JobNo = jobno;} 
  20.  
  21. char* GetJobName(voidconst { return m_JobName; } 
  22.  
  23. void SetJobName(char* jobname); 
  24.  
  25. CThread *GetWorkThread(void){ return m_pWorkThread; } 
  26.  
  27. void SetWorkThread ( CThread *pWorkThread ){ 
  28.  
  29. m_pWorkThread = pWorkThread; 
  30.  
  31.  
  32. virtual void Run ( void *ptr ) = 0; 
  33.  
  34. };  

線程池使用示例

至此我們給出了一個簡單的與具體任務無關的線程池框架。使用該框架非常的簡單,我們所需要的做的就是派生CJob類,將需要完成的任務實現在Run方法中。然后將該Job交由CThreadManage去執行。下面我們給出一個簡單的示例程序

 

 
  1. class CXJob:public CJob 
  2.  
  3. public
  4.  
  5. CXJob(){i=0;} 
  6.  
  7. ~CXJob(){} 
  8.  
  9. void Run(void* jobdata) { 
  10.  
  11. printf("The Job comes from CXJOB/n"); 
  12.  
  13. sleep(2); 
  14.  
  15.  
  16. }; 
  17.  
  18.  
  19.  
  20. class CYJob:public CJob 
  21.  
  22.  
  23. public
  24.  
  25. CYJob(){i=0;} 
  26.  
  27. ~CYJob(){} 
  28.  
  29. void Run(void* jobdata) { 
  30.  
  31. printf("The Job comes from CYJob/n"); 
  32.  
  33.  
  34. }; 
  35.  
  36.  
  37.  
  38. main() 
  39.  
  40.  
  41. CThreadManage* manage = new CThreadManage(10); 
  42.  
  43. for(int i=0;i<40;i++) 
  44.  
  45.  
  46. CXJob* job = new CXJob(); 
  47.  
  48. manage->Run(job,NULL); 
  49.  
  50.  
  51. sleep(2); 
  52.  
  53. CYJob* job = new CYJob(); 
  54.  
  55. manage->Run(job,NULL); 
  56.  
  57. manage->TerminateAll(); 
  58.  
  59. }  

CXJob和CYJob都是從Job類繼承而來,其都實現了Run接口。CXJob只是簡單的打印一句”The Job comes from CXJob”,CYJob也只打印”The Job comes from CYJob”,然后均休眠2秒鐘。在主程序中我們初始創建10個工作線程。然后分別執行40次CXJob和一次CYJob。

C++ 線程池的封裝實現

為了充分利用多核的優勢,我們利用多線程來進行任務處理,但線程也同樣不能濫用,會帶來一下幾個問題:

1)線程本身存在開銷,系統必須為每個線程分配如棧,TLS(線程局部存儲),寄存器等。

2)線程管理會給系統帶來開銷,context切換同樣會給系統帶來成本。

3)線程本身是可以重用的資源,不需要每次都進行初始化。

所以往往在使用中,我們無需把線程與task任務進行一對一對應,只需要預先初始化有限的線程個數來處理無限的task任務即可,線程池應運而生,原理也就是如此。

深入解析C++編程中線程池的使用

主要含有三個隊列

工作隊列

工作線程隊列

忙碌線程隊列

工作隊列是一個阻塞隊列,任務(仿函數)任務不算被push進來(notify阻塞獲取的工作線程),工作線程隊列(一直不變)則從該隊列中獲取任務執行(wait獲取,當任務隊列為空時阻塞等待通知),如果獲取到任務,則將線程會進入忙碌線程隊列中,執行任務的仿函數,當工作完成,重新移出工作線程隊列。

定義線程池專屬異常:

 

 
  1. struct TC_ThreadPool_Exception : public TC_Exception 
  2. TC_ThreadPool_Exception(const string &buffer) : TC_Exception(buffer){}; 
  3. TC_ThreadPool_Exception(const string &buffer, int err) : TC_Exception(buffer, err){}; 
  4. ~TC_ThreadPool_Exception () throw (){}; 
  5. }; 
  6.  
  7.  
  8. /** 
  9. * @brief 用通線程池類, 與tc_functor, tc_functorwrapper配合使用. 
  10.  
  11. * 使用方式說明: 
  12. * 1 采用tc_functorwrapper封裝一個調用 
  13. * 2 用tc_threadpool對調用進行執行 
  14. * 具體示例代碼請參見:test/test_tc_thread_pool.cpp 
  15. */ 
  16.  
  17. /**線程池本身繼承自鎖,可以幫助鎖定**/ 
  18. class TC_ThreadPool : public TC_ThreadLock 
  19. public
  20.  
  21. /** 
  22. * @brief 構造函數 
  23. * 
  24. */ 
  25. TC_ThreadPool (); 
  26.  
  27. /** 
  28. * @brief 析構, 會停止所有線程 
  29. */ 
  30. ~TC_ThreadPool (); 
  31.  
  32. /** 
  33. * @brief 初始化. 
  34.  
  35. * @param num 工作線程個數 
  36. */ 
  37. void init(size_t num); 
  38.  
  39. /** 
  40. * @brief 獲取線程個數. 
  41. * 
  42. * @return size_t 線程個數 
  43. */ 
  44. size_t getThreadNum() { Lock sync(* this); return _jobthread. size(); } 
  45.  
  46. /** 
  47. * @brief 獲取線程池的任務數( exec添加進去的). 
  48. * 
  49. * @return size_t 線程池的任務數 
  50. */ 
  51. size_t getJobNum() { return _jobqueue. size(); } 
  52.  
  53. /** 
  54. * @brief 停止所有線程 
  55. */ 
  56. void stop(); 
  57.  
  58. /** 
  59. * @brief 啟動所有線程 
  60. */ 
  61. void start(); 
  62.  
  63. /** 
  64. * @brief 啟動所有線程并, 執行初始化對象. 
  65.  
  66. * @param ParentFunctor 
  67. * @param tf 
  68. */ 
  69. template<class ParentFunctor> 
  70. void start(const TC_FunctorWrapper< ParentFunctor> &tf) 
  71. for(size_t i = 0; i < _jobthread .size(); i++) 
  72. _startqueue. push_back(new TC_FunctorWrapper<ParentFunctor >(tf)); 
  73.  
  74. start(); 
  75.  
  76. /** 
  77. * @brief 添加對象到線程池執行,該函數馬上返回, 
  78. * 線程池的線程執行對象 
  79. */ 
  80. template<class ParentFunctor> 
  81. void exec(const TC_FunctorWrapper< ParentFunctor> &tf) 
  82. _jobqueue.push_back(new TC_FunctorWrapper<ParentFunctor >(tf)); 
  83.  
  84. /** 
  85. * @brief 等待所有工作全部結束(隊列無任務, 無空閑線程). 
  86. * 
  87. * @param millsecond 等待的時間( ms), -1:永遠等待 
  88. * @return true, 所有工作都處理完畢 
  89. * false,超時退出 
  90. */ 
  91. bool waitForAllDone(int millsecond = -1); 
  92.  
  93. public
  94.  
  95. /** 
  96. * @brief 線程數據基類,所有線程的私有數據繼承于該類 
  97. */ 
  98. class ThreadData 
  99. public
  100. /** 
  101. * @brief 構造 
  102. */ 
  103. ThreadData(){}; 
  104. /** 
  105. * @brief 析夠 
  106. */ 
  107. virtual ~ThreadData(){}; 
  108.  
  109. /** 
  110. * @brief 生成數據. 
  111.  
  112. * @ param T 
  113. * @return ThreadData* 
  114. */ 
  115. template<typename T> 
  116. static T* makeThreadData() 
  117. return new T; 
  118. }; 
  119.  
  120. /** 
  121. * @brief 設置線程數據. 
  122.  
  123. * @param p 線程數據 
  124. */ 
  125. static void setThreadData(ThreadData *p); 
  126.  
  127. /** 
  128. * @brief 獲取線程數據. 
  129. * 
  130. * @return ThreadData* 線程數據 
  131. */ 
  132. static ThreadData* getThreadData(); 
  133.  
  134. /** 
  135. * @brief 設置線程數據, key需要自己維護. 
  136.  
  137. * @param pkey 線程私有數據key 
  138. * @param p 線程指針 
  139. */ 
  140. static void setThreadData(pthread_key_t pkey, ThreadData *p); 
  141.  
  142. /** 
  143. * @brief 獲取線程數據, key需要自己維護. 
  144.  
  145. * @param pkey 線程私有數據key 
  146. * @return 指向線程的ThreadData*指針 
  147. */ 
  148. static ThreadData* getThreadData(pthread_key_t pkey); 
  149.  
  150. protected
  151.  
  152. /** 
  153. * @brief 釋放資源. 
  154.  
  155. * @param p 
  156. */ 
  157. static void destructor(void *p); 
  158.  
  159. /** 
  160. * @brief 初始化key 
  161. */ 
  162. class KeyInitialize 
  163. public
  164. /** 
  165. * @brief 初始化key 
  166. */ 
  167. KeyInitialize() 
  168. int ret = pthread_key_create(&TC_ThreadPool::g_key, TC_ThreadPool::destructor); 
  169. if(ret != 0) 
  170. throw TC_ThreadPool_Exception("[TC_ThreadPool::KeyInitialize] pthread_key_create error", ret); 
  171.  
  172. /** 
  173. * @brief 釋放key 
  174. */ 
  175. ~KeyInitialize() 
  176. pthread_key_delete(TC_ThreadPool::g_key); 
  177. }; 
  178.  
  179. /** 
  180. * @brief 初始化key的控制 
  181. */ 
  182. static KeyInitialize g_key_initialize; 
  183.  
  184. /** 
  185. * @brief 數據key 
  186. */ 
  187. static pthread_key_t g_key; 
  188.  
  189. protected
  190. /** 
  191. * @brief 線程池中的工作線程 
  192. */ 
  193. class ThreadWorker : public TC_Thread 
  194. public
  195. /** 
  196. * @brief 工作線程構造函數. 
  197.  
  198. * @ param tpool 
  199. */ 
  200. ThreadWorker(TC_ThreadPool *tpool); 
  201.  
  202. /** 
  203. * @brief 通知工作線程結束 
  204. */ 
  205. void terminate(); 
  206.  
  207. protected
  208. /** 
  209. * @brief 運行 
  210. */ 
  211. virtual void run(); 
  212.  
  213. protected
  214. /** 
  215. * 線程池指針 
  216. */ 
  217. TC_ThreadPool * _tpool; 
  218.  
  219. /** 
  220. * 是否結束線程 
  221. */ 
  222. bool _bTerminate; 
  223. }; 
  224.  
  225. protected
  226.  
  227. /** 
  228. * @brief 清除 
  229. */ 
  230. void clear(); 
  231.  
  232. /** 
  233. * @brief 獲取任務, 如果沒有任務, 則為NULL. 
  234. * 
  235. * @return TC_FunctorWrapperInterface* 
  236. */ 
  237. TC_FunctorWrapperInterface * get(ThreadWorker *ptw); 
  238.  
  239. /** 
  240. * @brief 獲取啟動任務. 
  241. * 
  242. * @return TC_FunctorWrapperInterface* 
  243. */ 
  244. TC_FunctorWrapperInterface * get(); 
  245.  
  246. /** 
  247. * @brief 空閑了一個線程. 
  248.  
  249. * @param ptw 
  250. */ 
  251. void idle(ThreadWorker *ptw); 
  252.  
  253. /** 
  254. * @brief 通知等待在任務隊列上的工作線程醒來 
  255. */ 
  256. void notifyT(); 
  257.  
  258. /** 
  259. * @brief 是否處理結束. 
  260. * 
  261. * @return bool 
  262. */ 
  263. bool finish(); 
  264.  
  265. /** 
  266. * @brief 線程退出時調用 
  267. */ 
  268. void exit(); 
  269.  
  270. friend class ThreadWorker; 
  271. protected
  272.  
  273. /** 
  274. * 任務隊列 
  275. */ 
  276. TC_ThreadQueue< TC_FunctorWrapperInterface*> _jobqueue; 
  277.  
  278. /** 
  279. * 啟動任務 
  280. */ 
  281. TC_ThreadQueue< TC_FunctorWrapperInterface*> _startqueue; 
  282.  
  283. /** 
  284. * 工作線程 
  285. */ 
  286. std::vector<ThreadWorker *> _jobthread; 
  287.  
  288. /** 
  289. * 繁忙線程 
  290. */ 
  291. std::set<ThreadWorker *> _busthread; 
  292.  
  293. /** 
  294. * 任務隊列的鎖 
  295. */ 
  296. TC_ThreadLock _tmutex; 
  297.  
  298. /** 
  299. * 是否所有任務都執行完畢 
  300. */ 
  301. bool _bAllDone; 
  302. }; 

工作線程設計如下:

 

 
  1. TC_ThreadPool ::ThreadWorker::ThreadWorker(TC_ThreadPool *tpool) 
  2. : _tpool (tpool) 
  3. , _bTerminate ( false
  4.  
  5. void TC_ThreadPool ::ThreadWorker::terminate() 
  6. _bTerminate = true
  7. _tpool->notifyT(); 
  8.  
  9. void TC_ThreadPool ::ThreadWorker::run() 
  10. //調用初始化部分 
  11. TC_FunctorWrapperInterface *pst = _tpool->get(); 
  12. if(pst) 
  13. try 
  14. (*pst)(); 
  15. catch ( ... ) 
  16. delete pst; 
  17. pst = NULL; 
  18.  
  19. //調用處理部分 
  20. while (! _bTerminate) 
  21. TC_FunctorWrapperInterface *pfw = _tpool->get( this); 
  22. if(pfw != NULL) 
  23. auto_ptr< TC_FunctorWrapperInterface> apfw(pfw); 
  24.  
  25. try 
  26. (*pfw)(); 
  27. catch ( ... ) 
  28.  
  29. _tpool->idle( this); 
  30.  
  31. //結束 
  32. _tpool->exit(); 
  33.  
  34. 每個工作線程在剛開始時都會執行一下初始化操作,并進入一個無限循環的部分//調用處理部分 
  35. while (! _bTerminate) 
  36. TC_FunctorWrapperInterface *pfw = _tpool->get( this); 
  37. if(pfw != NULL) 
  38. auto_ptr< TC_FunctorWrapperInterface> apfw(pfw); 
  39.  
  40. try 
  41. (*pfw)(); 
  42. catch ( ... ) 
  43.  
  44. _tpool->idle( this); 

該工作主要是無限的從線程池的工作隊列中獲取任務并執行,如果成功獲取任務,則會將線程移進忙碌隊列:

 

 
  1. TC_FunctorWrapperInterface *TC_ThreadPool:: get(ThreadWorker *ptw) 
  2.  
  3. TC_FunctorWrapperInterface *pFunctorWrapper = NULL; 
  4. if(! _jobqueue. pop_front(pFunctorWrapper, 1000)) 
  5. return NULL; 
  6.  
  7. Lock sync( _tmutex); 
  8. _busthread. insert(ptw); 
  9. return pFunctorWrapper; 

執行完,移回工作線程隊列:_tpool->idle( this);

 

 
  1. void TC_ThreadPool:: idle(ThreadWorker *ptw) 
  2. Lock sync( _tmutex); 
  3. _busthread. erase(ptw); 
  4.  
  5. //無繁忙線程, 通知等待在線程池結束的線程醒過來 
  6. if( _busthread. empty()) 
  7. _bAllDone = true
  8. _tmutex.notifyAll(); 

此處jobThread隊列初始化后不會改變(因為沒有實現自增長功能),所以非線程安全的vector隊列即可,busthread的忙碌線程隊列會被移進移出,但是操作會自帶Lock sync( _tmutex),該互斥量是線程池本身繼承的,所以是共有的,也無需另外使用線程安全的TC_ThreadQueue,使用vector即可。

TC_ThreadPool:: idle中的

 

 
  1. if( _busthread. empty()) 
  2. _bAllDone = true
  3. _tmutex.notifyAll(); 

主要用于當線程池工作起來后的waitForAllDone方法:

 

 
  1. bool TC_ThreadPool:: waitForAllDone( int millsecond) 
  2. Lock sync( _tmutex); 
  3.  
  4. start1: 
  5. //任務隊列和繁忙線程都是空的 
  6. if (finish()) 
  7. return true
  8.  
  9. //永遠等待 
  10. if(millsecond < 0) 
  11. _tmutex.timedWait(1000); 
  12. goto start1; 
  13.  
  14. int64_t iNow = TC_Common:: now2ms(); 
  15. int m = millsecond; 
  16. start2: 
  17.  
  18. bool b = _tmutex.timedWait(millsecond); 
  19. //完成處理了 
  20. if(finish()) 
  21. return true
  22.  
  23. if(!b) 
  24. return false
  25.  
  26. millsecond = max((int64_t )0, m - (TC_Common ::now2ms() - iNow)); 
  27. goto start2; 
  28.  
  29. return false
  30.  
  31. _tmutex.timedWait(millsecond)方法喚醒。反復判斷是否所有的工作是否完成: 
  32.  
  33. bool TC_ThreadPool:: finish() 
  34. return _startqueue. empty() && _jobqueue .empty() && _busthread. empty() && _bAllDone; 

整體cpp實現如下:

 

 
  1. TC_ThreadPool ::KeyInitialize TC_ThreadPool::g_key_initialize; 
  2. pthread_key_t TC_ThreadPool::g_key ; 
  3.  
  4. void TC_ThreadPool::destructor( void *p) 
  5. ThreadData *ttd = ( ThreadData*)p; 
  6. if(ttd) 
  7. delete ttd; 
  8.  
  9. void TC_ThreadPool::exit() 
  10. TC_ThreadPool:: ThreadData *p = getThreadData(); 
  11. if(p) 
  12. delete p; 
  13. int ret = pthread_setspecific( g_key, NULL ); 
  14. if(ret != 0) 
  15. throw TC_ThreadPool_Exception ("[TC_ThreadPool::setThreadData] pthread_setspecific error", ret); 
  16.  
  17. _jobqueue. clear(); 
  18.  
  19. void TC_ThreadPool::setThreadData( TC_ThreadPool:: ThreadData *p) 
  20. TC_ThreadPool:: ThreadData *pOld = getThreadData(); 
  21. if(pOld != NULL && pOld != p) 
  22. delete pOld; 
  23.  
  24. int ret = pthread_setspecific( g_key, ( void *)p); 
  25. if(ret != 0) 
  26. throw TC_ThreadPool_Exception ("[TC_ThreadPool::setThreadData] pthread_setspecific error", ret); 
  27.  
  28. TC_ThreadPool ::ThreadData * TC_ThreadPool::getThreadData () 
  29. return ( ThreadData *) pthread_getspecific( g_key); 
  30.  
  31. void TC_ThreadPool::setThreadData( pthread_key_t pkey, ThreadData *p) 
  32. TC_ThreadPool:: ThreadData *pOld = getThreadData(pkey); 
  33. if(pOld != NULL && pOld != p) 
  34. delete pOld; 
  35.  
  36. int ret = pthread_setspecific(pkey, ( void *)p); 
  37. if(ret != 0) 
  38. throw TC_ThreadPool_Exception ("[TC_ThreadPool::setThreadData] pthread_setspecific error", ret); 
  39.  
  40. TC_ThreadPool ::ThreadData * TC_ThreadPool::getThreadData( pthread_key_t pkey) 
  41. return ( ThreadData *) pthread_getspecific(pkey); 
  42.  
  43. TC_ThreadPool::TC_ThreadPool() 
  44. : _bAllDone ( true
  45.  
  46. TC_ThreadPool::~TC_ThreadPool() 
  47. stop(); 
  48. clear(); 
  49.  
  50. void TC_ThreadPool::clear() 
  51. std::vector< ThreadWorker *>::iterator it = _jobthread. begin(); 
  52. while(it != _jobthread. end()) 
  53. delete (*it); 
  54. ++it; 
  55.  
  56. _jobthread. clear(); 
  57. _busthread. clear(); 
  58.  
  59. void TC_ThreadPool::init( size_t num) 
  60. stop(); 
  61.  
  62. Lock sync(* this); 
  63.  
  64. clear(); 
  65.  
  66. for( size_t i = 0; i < num; i++) 
  67. _jobthread. push_back( new ThreadWorker( this)); 
  68.  
  69. void TC_ThreadPool::stop() 
  70. Lock sync(* this); 
  71.  
  72. std::vector< ThreadWorker *>::iterator it = _jobthread. begin(); 
  73. while(it != _jobthread. end()) 
  74. if ((*it)-> isAlive()) 
  75. (*it)-> terminate(); 
  76. (*it)-> getThreadControl().join (); 
  77. ++it; 
  78. _bAllDone = true
  79.  
  80. void TC_ThreadPool::start() 
  81. Lock sync(* this); 
  82.  
  83. std::vector< ThreadWorker *>::iterator it = _jobthread. begin(); 
  84. while(it != _jobthread. end()) 
  85. (*it)-> start(); 
  86. ++it; 
  87. _bAllDone = false
  88.  
  89. bool TC_ThreadPool:: finish() 
  90. return _startqueue. empty() && _jobqueue .empty() && _busthread. empty() && _bAllDone; 
  91.  
  92. bool TC_ThreadPool::waitForAllDone( int millsecond) 
  93. Lock sync( _tmutex); 
  94.  
  95. start1: 
  96. //任務隊列和繁忙線程都是空的 
  97. if (finish ()) 
  98. return true
  99.  
  100. //永遠等待 
  101. if(millsecond < 0) 
  102. _tmutex.timedWait(1000); 
  103. goto start1; 
  104.  
  105. int64_t iNow = TC_Common:: now2ms(); 
  106. int m = millsecond; 
  107. start2: 
  108.  
  109. bool b = _tmutex.timedWait(millsecond); 
  110. //完成處理了 
  111. if(finish ()) 
  112. return true
  113.  
  114. if(!b) 
  115. return false
  116.  
  117. millsecond = max((int64_t )0, m - (TC_Common ::now2ms() - iNow)); 
  118. goto start2; 
  119.  
  120. return false
  121.  
  122. TC_FunctorWrapperInterface *TC_ThreadPool::get( ThreadWorker *ptw) 
  123.  
  124. TC_FunctorWrapperInterface *pFunctorWrapper = NULL; 
  125. if(! _jobqueue. pop_front(pFunctorWrapper, 1000)) 
  126. return NULL; 
  127.  
  128. Lock sync( _tmutex); 
  129. _busthread. insert(ptw); 
  130. return pFunctorWrapper; 
  131.  
  132. TC_FunctorWrapperInterface *TC_ThreadPool::get() 
  133. TC_FunctorWrapperInterface *pFunctorWrapper = NULL; 
  134. if(! _startqueue. pop_front(pFunctorWrapper)) 
  135. return NULL; 
  136.  
  137. return pFunctorWrapper; 
  138.  
  139. void TC_ThreadPool::idle( ThreadWorker *ptw) 
  140. Lock sync( _tmutex); 
  141. _busthread. erase(ptw); 
  142.  
  143. //無繁忙線程, 通知等待在線程池結束的線程醒過來 
  144. if( _busthread. empty()) 
  145. _bAllDone = true
  146. _tmutex.notifyAll(); 
  147.  
  148. void TC_ThreadPool::notifyT() 
  149. _jobqueue. notifyT(); 

線程池使用后記

線程池適合場合

事 實上,線程池并不是萬能的。它有其特定的使用場合。線程池致力于減少線程本身的開銷對應用所產生的影響,這是有前提的,前提就是線程本身開銷與線程執行任 務相比不可忽略。如果線程本身的開銷相對于線程任務執行開銷而言是可以忽略不計的,那么此時線程池所帶來的好處是不明顯的,比如對于FTP服務器以及Telnet服務器,通常傳送文件的時間較長,開銷較大,那么此時,我們采用線程池未必是理想的方法,我們可以選擇“即時創建,即時銷毀”的策略。

總之線程池通常適合下面的幾個場合:

(1) 單位時間內處理任務頻繁而且任務處理時間短

(2) 對實時性要求較高。如果接受到任務后在創建線程,可能滿足不了實時要求,因此必須采用線程池進行預創建。

(3) 必須經常面對高突發性事件,比如Web服務器,如果有足球轉播,則服務器將產生巨大的沖擊。此時如果采取傳統方法,則必須不停的大量產生線程,銷毀線程。此時采用動態線程池可以避免這種情況的發生。


發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
亚洲香蕉成人av网站在线观看_欧美精品成人91久久久久久久_久久久久久久久久久亚洲_热久久视久久精品18亚洲精品_国产精自产拍久久久久久_亚洲色图国产精品_91精品国产网站_中文字幕欧美日韩精品_国产精品久久久久久亚洲调教_国产精品久久一区_性夜试看影院91社区_97在线观看视频国产_68精品久久久久久欧美_欧美精品在线观看_国产精品一区二区久久精品_欧美老女人bb
成人深夜直播免费观看| 欧美日韩激情网| 欧美日韩人人澡狠狠躁视频| 2018中文字幕一区二区三区| 成人午夜一级二级三级| 亚洲自拍偷拍视频| 久久精品国产69国产精品亚洲| 日韩精品免费在线播放| 亚洲视频在线免费看| 国产精品免费福利| 久久韩国免费视频| 成人免费视频a| 日韩在线免费视频| 国产亚洲视频在线观看| 久久久久久久av| 欧美午夜视频一区二区| 欧洲中文字幕国产精品| 欧美电影免费观看| 亚洲精品日韩久久久| 欧美亚洲第一区| 欧美激情一级二级| 激情懂色av一区av二区av| 国产精品久久一区| 俺也去精品视频在线观看| 欧美高清视频在线播放| 91精品视频在线免费观看| 久久久久久久久久国产精品| 伊人久久男人天堂| 亚洲视频axxx| 中文字幕无线精品亚洲乱码一区| 日日狠狠久久偷偷四色综合免费| 日韩av在线电影网| 亚洲人精品午夜在线观看| 国产中文字幕91| 国模gogo一区二区大胆私拍| 在线观看国产成人av片| 中文字幕亚洲字幕| 亚洲无线码在线一区观看| 久久久久亚洲精品成人网小说| 岛国av一区二区三区| 久久香蕉国产线看观看网| 午夜免费日韩视频| 欧美日韩国产va另类| 久久久久久久久中文字幕| 精品久久久久久中文字幕一区奶水| 国产精品一区二区性色av| 亚洲www视频| 色偷偷偷亚洲综合网另类| 亚洲第一精品电影| 久久久久99精品久久久久| 久久影视电视剧凤归四时歌| 久久久av免费| 中文字幕在线精品| 97在线看免费观看视频在线观看| 亚洲天堂av女优| 欧美精品激情blacked18| 欧美日韩激情美女| 91九色国产在线| 久久久精品国产网站| www.亚洲一二| 日本道色综合久久影院| 久久99青青精品免费观看| 久久久久久高潮国产精品视| 久久视频免费观看| 国产精品久久久久一区二区| 国产69精品久久久| 国外成人免费在线播放| 国产精品一区二区久久| 亚洲欧美日韩国产中文| 欧美亚洲第一区| 国内精品久久久久久影视8| 欧美限制级电影在线观看| 亚洲自拍另类欧美丝袜| 久久天天躁日日躁| 欧美最猛性xxxxx亚洲精品| 欧美中文字幕精品| 亚洲综合在线做性| 国产精品激情自拍| 日韩中文字幕在线视频| 欧美在线视频免费播放| 九九热精品在线| 国产精品wwwwww| 黄色91在线观看| 91a在线视频| 欧美日韩精品在线| 国产欧美一区二区三区四区| 欧美国产极速在线| 亚洲国产欧美日韩精品| 午夜免费在线观看精品视频| 久久久精品日本| 美女撒尿一区二区三区| 亚洲精品99999| 日韩视频永久免费观看| 日韩欧美国产高清91| 97在线看免费观看视频在线观看| 一区二区三区美女xx视频| 精品夜色国产国偷在线| 北条麻妃一区二区三区中文字幕| 国内精品久久久久影院优| 国产日韩欧美中文在线播放| 亚洲qvod图片区电影| 日韩综合视频在线观看| 精品亚洲国产成av人片传媒| 国产精品久久久久久亚洲影视| 亚洲精品影视在线观看| 日韩视频欧美视频| 中文字幕日韩有码| 成人福利视频在线观看| 色狠狠av一区二区三区香蕉蜜桃| 久久久成人的性感天堂| 国产一区二区激情| 日韩综合视频在线观看| 欧美超级乱淫片喷水| 亚洲成人a级网| 久久精品国产2020观看福利| 欧美亚洲免费电影| 亚洲国产99精品国自产| 欧美日韩免费看| 精品国产一区二区三区在线观看| 在线精品视频视频中文字幕| www.xxxx精品| 欧美丝袜第一区| 亚洲无av在线中文字幕| 成人午夜在线观看| 欧美日韩国产二区| 国产精品亚洲综合天堂夜夜| 久久久久久12| 国产精品狼人色视频一区| www.日韩.com| 2019精品视频| 国内精品久久久久影院 日本资源| 亚洲成av人乱码色午夜| 欧美性生交xxxxx久久久| 日韩精品视频中文在线观看| 国产+成+人+亚洲欧洲| 在线视频欧美日韩精品| 正在播放国产一区| 亚洲免费av网址| 中文字幕在线观看亚洲| 久久久精品国产网站| 色综合久久久久久中文网| 伊人伊成久久人综合网站| 国内精品一区二区三区| 国产精品扒开腿做爽爽爽视频| 91久久国产综合久久91精品网站| 欧美激情精品久久久久久久变态| 91久久综合亚洲鲁鲁五月天| 日韩欧美国产黄色| 欧美影院成年免费版| 欧美一区二区色| 97精品一区二区三区| 国产日韩欧美日韩| 日韩中文字幕免费视频| 欧美日韩在线视频观看| 欧美日韩国产成人在线观看| 成人黄色午夜影院| 久久精视频免费在线久久完整在线看| 欧美人在线视频| 成人免费高清完整版在线观看| 亚洲免费视频一区二区| 97婷婷涩涩精品一区| 久久久最新网址| 亚洲最大福利网站| 成人精品aaaa网站|