線程池的組件網上很多,之前我自己也嘗試寫個一個demo,但這些組件一般都比較簡單,沒有完整的實現后臺線程池組件應用的功能。因此,這里我們實現一個可以用在線上環境的線程池組件,該線程池組件具備線程池應用的特性,如下所示:
1. 伸縮性:即線程池中線程的個數應該是動態變化的。繁忙的時候可以申請更多的線程;空閑的時候則注銷一部分線程。
2. 線程狀態:線程池中對線程的管理引入睡眠、喚醒機制。當線程沒有任務在運行時,使線程處于睡眠狀態。
3. 線程管理:對線程池中線程的申請和注銷,不是通過創建一個單獨線程來管理,而是線程池自動管理。
最終,我們實現的線程池局部一下幾個特點:
1. 線程池中線程的個數介于min和max之間;
2. 線程池中線程具有睡眠和喚醒機制;
3. 當線程池中的線程睡眠時間超過1秒鐘,則結束一個線程;當線程池中持續1秒沒有空閑線程時,則創建一個新線程。
1 關鍵數據結構主要的數據結構包括兩個CThreadPool和CThreadWorker。
這里,我們先給出數據結構的定義,然后簡要描述他們的作用。
相關數據結構定義如下:
1 class CThreadWorker 2 { 3 CThreadWorker *next_; // 線程池中的線程是單鏈表結構保存 4 5 int wake_up_; // 喚醒標志 6 pthread_cond_t wake_; 7 pthread_mutex_t mutex_; 8 9 pthread_t tid_;10 void (*func)(void *arg); // 函數執行體11 void *arg; // 函數執行體參數12 time_t sleep_when_; // 線程睡眠時間13 };14 15 class CThreadPool16 {17 CThreadWorker *head_;18 19 unsigned int min_; // 最少線程數20 unsigned int cur_; // 當前線程數21 unsigned int max_; // 最大線程數22 23 time_t last_empty_; // 最后一次線程池中沒有線程的時間24 pthread_mutex_t mutex_;25 };
其中,CThreadPool用來管理線程池,記錄線程池當前線程個數、最小線程個數、最大線程個數等。
CThreadWorker是具體保存線程記錄的實體,它里面保存了線程的執行函數體,函數參數、睡眠時間等等。還有一個信號量,用來讓線程睡眠或者喚醒。
當我們創建一個線程池時,即默認創建了min個CThreadWorker,每個線程實體默認都是睡眠的,阻塞在pthread_cond_wait處,然后我們具體執行用戶函數時,從線程池中獲取線程,更改該線程的func,arg等參數,然后喚醒該線程。
中間,線程池會對線程做一些管理,保證線程池的伸縮性。
2 源碼2.1 頭文件1 #ifndef _THREAD_POOL_H_ 2 #define _THREAD_POOL_H_ 3 4 #include <pthread.h> 5 #include <time.h> 6 7 struct CThreadWorker 8 { 9 CThreadWorker *next_; // 線程池中的線程是單鏈表結構保存10 11 int wake_up_; // 喚醒標志12 pthread_cond_t wake_; 13 pthread_mutex_t mutex_;14 15 pthread_t tid_;16 void (*func)(void *arg); // 函數執行體17 void *arg; // 函數執行體參數18 time_t sleep_when_; // 線程睡眠時間19 };20 21 struct CThreadPool22 {23 CThreadWorker *head_;24 25 unsigned int min_; // 最少線程數26 unsigned int cur_; // 當前線程數27 unsigned int max_; // 最大線程數28 29 time_t last_empty_; // 最后一次線程池中沒有線程的時間30 pthread_mutex_t mutex_;31 };32 33 CThreadPool *CreateThreadPool(unsigned int min, unsigned int max);34 int StartWork(CThreadPool *pool, void (*func)(void *arg), void *arg);35 #endif2.2 實現文件
實現文件主要完成.h中的CreateThreadPool和StartWork。我們先給個流程圖大概描述一下。
2.2.1 CreateThreadPool流程描述如下圖所示為創建線程池的流程,其中最關鍵的步驟是DoPRocess,當執行到DoProcess時,我們默認創建了min個線程,所有線程都因為信號而睡眠。StartWork里面會獲取一個線程,然后修改該線程的func,arg參數,最后喚醒線程執行我們的任務。當一個線程執行完的時候,我們需要判斷線程池當前的狀態,是需要新創建一個線程,還是把該線程重新加入到線程池,還是注銷該線程。具體的這些邏輯圖示不好描述,我們在下面代碼里給出注釋。
如下圖是啟動一個任務的流程,2.2.1我們已經描述到,啟動任務時,我們會從線程池中拿出一個線程,修改該線程的func/arg屬性,然后喚醒該線程。當線程執行完以后,是需要重新加入線程池,還是注銷,則是在2.2.1的DoProcess中處理。
1 #include <stdlib.h> 2 #include "pthread_pool.h" 3 4 // 線程池持續1秒沒有空閑線程 5 #define WaitWorkerTimeout(pool) ((time(NULL) - pool->last_empty_) > 1) 6 // 線程池中沒有線程,所有的線程已經pop出去執行具體的任務去了 7 #define NoThreadInPool(pool) (pool->head_ == NULL) 8 #define CanCreateThread(pool) (pool->cur_ < pool->max_) 9 10 11 static int CreateOneThread(CThreadPool *pool); 12 static void *DoProcess(void *arg); 13 static void PushWork(CThreadPool *pool, CThreadWorker *worker); 14 static void PopWork(CThreadPool *pool, CThreadWorker *worker); 15 static void InitWorker(CThreadWorker *worker); 16 static int WorkerIdleTimeout(CThreadPool *pool); 17 static CThreadWorker *GetWorker(CThreadPool *pool, void (*func)(void *arg), void *arg); 18 static void WakeupWorkerThread(CThreadWorker *worker); 19 20 int StartWork(CThreadPool *pool, void (*func)(void *arg), void *arg); 21 CThreadPool *CreateThreadPool(unsigned int min, unsigned int max); 22 23 CThreadPool *CreateThreadPool(unsigned int min, unsigned int max) 24 { 25 CThreadPool *poo; 26 27 pool = (CThreadPool *)malloc(sizeof(CThreadPool)); 28 29 if (pool == NULL) { 30 return NULL; 31 } 32 33 pool->head_ = NULL; 34 pool->min_ = min; 35 pool->cur_ = 0; 36 pool->max_ = max; 37 pool->last_empty_ = time(NULL); 38 pthread_mutex_init(&pool->mutex_, NULL); 39 40 int ret = 0; 41 while (min--) { 42 ret = CreateOneThread(pool); 43 if (ret != 0) { 44 exit(0); 45 } 46 } 47 return pool; 48 } 49 50 static int CreateOneThread(CThreadPool *pool) 51 { 52 pthread_t tid; 53 return pthread_create(&tid, NULL, DoProcess, pool); 54 } 55 56 static void *DoProcess(void *arg) 57 { 58 CThreadPool *pool = (CThreadPool *)arg; 59 60 CThreadWorker worker; 61 62 InitWorker(&worker); 63 64 pthread_mutex_lock(&pool->mutex_); 65 pool->cur_ += 1; 66 67 for (;;) { 68 PushWork(pool, &worker); 69 worker.sleep_when_ = time(NULL); 70 pthread_mutex_unlock(&pool->mutex_); 71 72 pthread_mutex_lock(&worker.mutex_); 73 while (worker.wake_up_ != 1) { 74 pthread_cond_wait(&worker.wake_, &worker.mutex_); 75 } 76 // worker線程已被喚醒,準備開始執行任務,修改wake_up_標志。 77 worker.wake_up_ = 0; 78 pthread_mutex_unlock(&worker.mutex_); 79 80 // 執行我們的任務,執行完畢之后,修改worker.func為NULL。 81 worker.func(arg); 82 worker.func = NULL; 83 84 // 任務執行完以后,線程池需要根據當前的線程池狀態來判斷是要把該線程重新加入線程池,還是要創建一個新的線程。 85 pthread_mutex_lock(&pool->mutex_); 86 if (WaitWorkerTimeout(pool) && NoThreadInPool(pool) && CanCreateThread(pool)) { 87 // 在我們執行這個任務的時候,其他任務等待空閑線程的時間超過了1秒,而且線程池中沒有線程,且線程池當前線程數沒有超過最大允許創建線程數 88 CreateOneThread(pool); 89 } 90 91 // 線程池中沒有線程了,重新把該線程加入線程池 92 if (NoThreadInPool(pool)) { 93 continue; 94 } 95 96 // 線程池中線程數低于最低閾值,重新把該線程加入線程池 97 if (pool->curr <= pool->min) { 98 continue; 99 }100 101 // 線程中睡眠的線程時間超過了1秒,說明線程池不是很繁忙,不需要把該線程重新加回線程池102 if (WorkerIdleTimeout(pool)) {103 break;104 }105 }106 107 pool->cur -= 1;108 pthread_mutex_unlock(&pool->mutex);109 110 pthread_cond_destroy(&worker.wake_);111 pthread_mutex_destroy(&worker.mutex_);112 113 pthread_exit(NULL);114 }115 116 static void InitWorker(CThreadWorker *worker)117 {118 worker->next_ = NULL;119 worker->wake_up_ = 0;120 pthread_cond_init(&worker->wake_, NULL);121 pthread_mutex_init(&worker->mutex_, NULL);122 worker->tid_ = pthread_self();123 worker->func = NULL;124 worker->arg = NULL;125 worker->sleep_when_ = 0;126 }127 128 static void PushWork(CThreadPool *pool, CThreadWorker *worker)129 {130 worker->next_ = pool->head_;131 pool->next_ = worker;132 }133 134 static int WorkerIdleTimeout(CThreadPool *pool)135 {136 CThreadWorker *worker;137 138 if (NoThreadInPool(pool)) {139 return 0;140 }141 worker = pool->head_;142 return (time(NULL) > worker->sleep_when_ + 1)? 1 : 0;143 }144 145 int StartWork(CThreadPool *pool, void (*func)(void *arg), void *arg)146 {147 if (func == NULL) {148 return -1;149 }150 151 CThreadWorker *worker;152 pthread_mutex_lock(&pool->mutex_);153 worker = GetWorker(pool, func, arg);154 pthread_mutex_unlock(&pool->mutex_);155 156 if (worker == NULL) {157 return -2;158 }159 160 WakeupWorkerThread(worker);161 return 0;162 }163 164 static CThreadWorker *GetWorker(CThreadPool *pool, void (*func)(void *arg), void *arg)165 {166 CThreadWorker *worker;167 168 if (NoThreadInPool(pool)) {169 return NULL;170 }171 172 worker = pool->head_;173 PopWork(pool, worker);174 175 if (NoThreadInPool(pool)) {176 pool->last_empty_ = time(NULL);177 }178 179 worker->func = func;180 worker->arg = arg;181 182 return worker;183 }184 185 static void PopWork(CThreadPool *pool, CThreadWorker *worker)186 {187 pool->head_ = worker->next_;188 worker->next_ = NULL;189 }190 191 static void WakeupWorkerThread(CThreadWorker *worker)192 {193 pthread_mutex_lock(&worker->mutex_);194 worker->wake_up_ = 1;195 pthread_mutex_unlock(&worker->mutex_);196 197 pthread_cond_signal(&worker->wake_);198 }3 總結
該線程池模型實現了我們線上環境實際的一些應用場景,沒有考慮的問題有這么幾點:
1. 沒有考慮任務類的概念,寫一個任務基類,然后具體的任務類繼承這個基類,實現自己的功能,具體執行時候,只需要add_task,把任務加進線程池即可,這樣做的目的是想著可以給每個任務發信號,是否要終止任務的執行。當前的線程池做不到這點,也做不到判斷任務執行時間,是否超時。
2. 每個線程只執行一個任務,而不是搞一個任務隊列,去執行任務隊列里的任務。關于這點,我也不清楚,搞任務隊列的優勢是什么。
個人能想到的就是這些,不知道諸位平時工作中,具體應用線程池的時候需要考慮一些什么場景,我這里想到的就是1、可以靈活的終止任務;2、可以判斷任務是否超時;3、可以對需要執行的任務做到負載均衡(覺得這一點,在這里的線程池里不是問題,極端的情況是來了一堆任務,線程池中線程不夠了,那確實會有這個問題)。
關于這幾個問題,第3個我個人能解決,可以把上面的線程池稍微修改一下,每個線程結果ThreadWorker里面再加個任務隊列,當沒有任務時候線程阻塞,否則就取出任務來執行。
關于第一點,怎么修改這個框架,達到可以給任務發信號呢。
還有第二點,判斷任務是否超時呢?
歡迎大家一起討論!
新聞熱點
疑難解答