最近在寫MySQL冷備server的一個模塊,稍微接觸到一點線程池的東西,自己也就想嘗試寫一個簡單的線程池練練手。
這個線程池在創建時,即按照最大的線程數生成線程。
然后作業任務通過add_task接口往線程池中加入需要運行的任務,再調用線程池的run函數開始運行所有任務,每個線程從任務隊列中讀取任務,處理完一個任務后再讀取新的任務,直到最終任務隊列為空。
補充:再回頭看這個設計,其實算不了線程池,最多算是多線程執行任務。
把所有的任務描述信息先加入到CThreadPool的任務隊列里面,然后調用CThreadPool的run函數去創建指定數量的線程,每個線程互斥的從任務隊列里面取出任務去執行。
1 線程池設計簡單描述如下(假設任務類名為CTasklet):
1、CThreadPool<CTasklet> thread_pool(MAX_THREAD_NUM);
2、創建任務,并把任務加入到線程池
CTasklet *pTask1 = new CTasklet();
CTasklet *pTask2 = new CTasklet();
...
thread_pool.add_task(pTask1);
thread_pool.add_task(pTask2);
...
3、調用線程池的run方法開始執行任務
thread_pool.run();
4、等待任務執行完成
thread_pool.join_thread();
2 源碼下面給出完整的線程池代碼
1 /* 2 * file: thread_pool.h 3 * desc: 簡單的線程池,一次性初始化任務隊列和線程池。 4 * 5 */ 6 7 #ifndef _THREAD_POOL_H_ 8 #define _THREAD_POOL_H_ 9 10 #include <pthread.h>11 #include <vector>12 13 using namespace std;14 15 template<typename workType>16 class CThreadPool17 {18 public:19 typedef void * (thread_func)(void *);20 21 CThreadPool(int thread_num, size_t stack_size = 10485760);22 ~CThreadPool();23 24 // 向任務隊列中添加任務25 int add_task(workType *pTask);26 27 // 創建新線程并執行28 int run();29 30 // 等待所有的線程執行結束31 int join_thread();32 33 PRivate:34 int init_thread_attr();35 int destroy_thread_attr();36 37 int set_thread_stacksize(size_t stack_size);38 int set_thread_joinable();39 40 protected:41 // 線程池執行函數,必須為static42 static void start_routine(void *para);43 44 private:45 pthread_attr_t attr_;46 static pthread_mutex_t mutex_lock_;47 static list<workType *> list_task_;48 49 int thread_num_; // 最大線程數50 vector<pthread_t> thread_id_vec_;51 };52 #endifView Code
1 #include "pthread_pool.h" 2 3 template<typename workType> 4 pthread_mutex_t CThreadPool<workType>::mutex_lock_; 5 6 template<typename workType> 7 list<workType*> CThreadPool<workType*>::list_task_; 8 9 template<typename workType> 10 CThreadPool<workType>::CThreadPool(int thread_num, size_t stack_size) 11 { 12 thread_num_ = thread_num; 13 pthread_mutex_init(&mutex_lock_, NULL); 14 15 init_thread_attr(); 16 set_thread_stacksize(stack_size); 17 set_thread_joinable(); 18 } 19 20 template<typename workType> 21 CThreadPool<workType>::~CthreadPool() 22 { 23 destroy_thread_attr(); 24 } 25 26 template <typename workType> 27 int init_thread_attr() 28 { 29 return pthread_attr_init(&m_attr); 30 } 31 32 template <typename workType> 33 int CThreadPool<workType>::destroy_thread_attr() 34 { 35 return pthread_attr_destroy(&attr_); 36 } 37 38 template <typename workType> 39 int CThreadPool<workType>::set_thread_stacksize(size_t stack_size) 40 { 41 return pthread_attr_setstacksize(&attr_, stack_size); 42 } 43 44 template <typename workType> 45 int CThreadPool<workType>::set_thread_joinable() 46 { 47 return pthread_attr_setdetachstate(&attr_, PTHREAD_CREATE_JOINABLE); 48 } 49 50 template <typename workType> 51 void CThreadPool<workType>::start_routine(void *para) 52 { 53 workType *pWorkType = NULL; 54 55 while (1) { 56 pthread_mutex_lock(&mutex_lock_); 57 58 if (list_task_.empty()) { 59 pthread_mutex_unlock(mutex_lock_); 60 return; 61 } 62 63 pWorkType = *(list_task_.begin()); 64 list_task_.pop_front(); 65 pthread_mutex_unlock(&mutex_lock_); 66 67 pWorkType->run(); 68 delete pWorkType; 69 pWorkType = NULL; 70 } 71 } 72 73 template <typename workType> 74 int CThreadPool<workType>::add_task(workType *pTask) 75 { 76 pthread_mutex_lock(&mutex_lock_); 77 list_task_.push_back(pTask); 78 pthread_mutex_unlock(&mutex_lock_); 79 return 0; 80 } 81 82 template <typename workType> 83 int CThreadPool<workType>::run() 84 { 85 int rc; 86 pthread_t tid; 87 for (int i = 0; i < thread_num_; ++i) { 88 rc = pthread_create(&tid, &attr_, (thread_func)start_routine, NULL); 89 thread_id_vec_.push_back(tid); 90 } 91 return rc; 92 } 93 94 template <typename workType> 95 int CThreadPool<workType>::join_thread() 96 { 97 int rc = 0; 98 vector<pthread_t>::iterator iter; 99 for (iter = thread_id_vec_.begin(); iter != thread_id_vec_.end(); ++iter) {100 rc = pthread_join((*iter), NULL);101 }102 thread_id_vec_.clear();103 return rc;104 }View Code
測試代碼如下
1 #include <unistd.h> 2 3 #include <iostream> 4 #include <list> 5 6 using namespace std; 7 8 class CTasklet 9 {10 public:11 CTasklet(int num) {12 num_ = num;13 cout << "CTasklet ctor create num: " << num_ << endl;14 }15 16 ~CTasklet() {17 cout << "CTasklet dtor delete num: " << num_ << endl;18 }19 20 int run() {21 cout << "CTasklet sleep begin: " << num_ << endl;22 sleep(num_);23 cout << "CTasklet sleep end: " << num_ << endl;24 }25 26 private:27 int num_;28 };29 30 #define MAX_THREAD_NUM 331 int main(int argc, char **argv)32 {33 // Step1. 創建線程池34 CThreadPool<CTasklet> thread_pool(MAX_THREAD_NUM);35 36 // Step2. 創建任務,并加入到線程池中37 for (int i = 0; i < 6; ++i) {38 CTasklet *pTask = new CTasklet(i);39 thread_pool.add_task(pTask);40 }41 // Step3. 開始執行任務42 thread_pool.run();43 // Step4. 等待任務結束44 thread_pool.join_thread();45 46 return 0;47 48 }View Code3 總結
上面的線程池屬于最簡單的一類線程池,即相當于程序運行時候就開啟n個線程來執行任務。真正的線程池需要考慮的方面比較多,比如1、線程池中的線程數應該能動態變化;2、線程池能動態調度線程來運行任務,以達到均衡;3、線程池還應該能記錄任務的運行時間,防止超時等等。
不過,起碼我們已經開了個頭,實現了一個簡單的線程池,接下來,讓我們在這個基礎上一步步調整、完善。
PS:對于線程池的考慮,我能想到的有動態增減線程數、超時機制、負載均衡。不知道大家理解線程池還需要考慮什么場景。
新聞熱點
疑難解答