亚洲香蕉成人av网站在线观看_欧美精品成人91久久久久久久_久久久久久久久久久亚洲_热久久视久久精品18亚洲精品_国产精自产拍久久久久久_亚洲色图国产精品_91精品国产网站_中文字幕欧美日韩精品_国产精品久久久久久亚洲调教_国产精品久久一区_性夜试看影院91社区_97在线观看视频国产_68精品久久久久久欧美_欧美精品在线观看_国产精品一区二区久久精品_欧美老女人bb

首頁 > 學院 > 操作系統 > 正文

簡單實用可線上應用的線程池組件

2024-06-28 13:24:04
字體:
來源:轉載
供稿:網友
簡單實用可線上應用的線程池組件0 前言

線程池的組件網上很多,之前我自己也嘗試寫個一個demo,但這些組件一般都比較簡單,沒有完整的實現后臺線程池組件應用的功能。因此,這里我們實現一個可以用在線上環境的線程池組件,該線程池組件具備線程池應用的特性,如下所示:

1. 伸縮性:即線程池中線程的個數應該是動態變化的。繁忙的時候可以申請更多的線程;空閑的時候則注銷一部分線程。

2. 線程狀態:線程池中對線程的管理引入睡眠、喚醒機制。當線程沒有任務在運行時,使線程處于睡眠狀態。

3. 線程管理:對線程池中線程的申請和注銷,不是通過創建一個單獨線程來管理,而是線程池自動管理。

最終,我們實現的線程池局部一下幾個特點:

1. 線程池中線程的個數介于min和max之間;

2. 線程池中線程具有睡眠和喚醒機制;

3. 當線程池中的線程睡眠時間超過1秒鐘,則結束一個線程;當線程池中持續1秒沒有空閑線程時,則創建一個新線程。

1 關鍵數據結構

主要的數據結構包括兩個CThreadPool和CThreadWorker。

這里,我們先給出數據結構的定義,然后簡要描述他們的作用。

相關數據結構定義如下:

 1 class CThreadWorker 2 { 3     CThreadWorker *next_;    // 線程池中的線程是單鏈表結構保存 4      5     int wake_up_;            // 喚醒標志 6     pthread_cond_t wake_;     7     pthread_mutex_t mutex_; 8      9     pthread_t tid_;10     void (*func)(void *arg); // 函數執行體11     void *arg;               // 函數執行體參數12     time_t sleep_when_;      // 線程睡眠時間13 };14 15 class CThreadPool16 {17     CThreadWorker *head_;18     19     unsigned int min_;     // 最少線程數20     unsigned int cur_;     // 當前線程數21     unsigned int max_;     // 最大線程數22     23     time_t last_empty_;    // 最后一次線程池中沒有線程的時間24     pthread_mutex_t mutex_;25 };

其中,CThreadPool用來管理線程池,記錄線程池當前線程個數、最小線程個數、最大線程個數等。

CThreadWorker是具體保存線程記錄的實體,它里面保存了線程的執行函數體,函數參數、睡眠時間等等。還有一個信號量,用來讓線程睡眠或者喚醒。

當我們創建一個線程池時,即默認創建了min個CThreadWorker,每個線程實體默認都是睡眠的,阻塞在pthread_cond_wait處,然后我們具體執行用戶函數時,從線程池中獲取線程,更改該線程的func,arg等參數,然后喚醒該線程。

中間,線程池會對線程做一些管理,保證線程池的伸縮性。

2 源碼2.1 頭文件
 1 #ifndef _THREAD_POOL_H_ 2 #define _THREAD_POOL_H_ 3  4 #include <pthread.h> 5 #include <time.h> 6  7 struct CThreadWorker 8 { 9     CThreadWorker *next_;    // 線程池中的線程是單鏈表結構保存10     11     int wake_up_;            // 喚醒標志12     pthread_cond_t wake_;    13     pthread_mutex_t mutex_;14     15     pthread_t tid_;16     void (*func)(void *arg); // 函數執行體17     void *arg;               // 函數執行體參數18     time_t sleep_when_;      // 線程睡眠時間19 };20 21 struct CThreadPool22 {23     CThreadWorker *head_;24     25     unsigned int min_;     // 最少線程數26     unsigned int cur_;     // 當前線程數27     unsigned int max_;     // 最大線程數28     29     time_t last_empty_;    // 最后一次線程池中沒有線程的時間30     pthread_mutex_t mutex_;31 };32 33 CThreadPool *CreateThreadPool(unsigned int min, unsigned int max);34 int StartWork(CThreadPool *pool, void (*func)(void *arg), void *arg);35 #endif
2.2 實現文件

實現文件主要完成.h中的CreateThreadPool和StartWork。我們先給個流程圖大概描述一下。

2.2.1 CreateThreadPool流程描述

如下圖所示為創建線程池的流程,其中最關鍵的步驟是DoPRocess,當執行到DoProcess時,我們默認創建了min個線程,所有線程都因為信號而睡眠。StartWork里面會獲取一個線程,然后修改該線程的func,arg參數,最后喚醒線程執行我們的任務。當一個線程執行完的時候,我們需要判斷線程池當前的狀態,是需要新創建一個線程,還是把該線程重新加入到線程池,還是注銷該線程。具體的這些邏輯圖示不好描述,我們在下面代碼里給出注釋。

2.2.2 StartWork流程描述

如下圖是啟動一個任務的流程,2.2.1我們已經描述到,啟動任務時,我們會從線程池中拿出一個線程,修改該線程的func/arg屬性,然后喚醒該線程。當線程執行完以后,是需要重新加入線程池,還是注銷,則是在2.2.1的DoProcess中處理。

2.3 實現文件
  1 #include <stdlib.h>  2 #include "pthread_pool.h"  3   4 // 線程池持續1秒沒有空閑線程  5 #define WaitWorkerTimeout(pool)     ((time(NULL) - pool->last_empty_) > 1)  6 // 線程池中沒有線程,所有的線程已經pop出去執行具體的任務去了  7 #define NoThreadInPool(pool)        (pool->head_ == NULL)  8 #define CanCreateThread(pool)       (pool->cur_ < pool->max_)  9  10  11 static int CreateOneThread(CThreadPool *pool); 12 static void *DoProcess(void *arg); 13 static void PushWork(CThreadPool *pool, CThreadWorker *worker); 14 static void PopWork(CThreadPool *pool, CThreadWorker *worker); 15 static void InitWorker(CThreadWorker *worker); 16 static int WorkerIdleTimeout(CThreadPool *pool); 17 static CThreadWorker *GetWorker(CThreadPool *pool, void (*func)(void *arg), void *arg); 18 static void WakeupWorkerThread(CThreadWorker *worker); 19  20 int StartWork(CThreadPool *pool, void (*func)(void *arg), void *arg); 21 CThreadPool *CreateThreadPool(unsigned int min, unsigned int max); 22  23 CThreadPool *CreateThreadPool(unsigned int min, unsigned int max) 24 { 25     CThreadPool *poo; 26      27     pool = (CThreadPool *)malloc(sizeof(CThreadPool)); 28      29     if (pool == NULL) { 30         return NULL; 31     } 32      33     pool->head_ = NULL; 34     pool->min_  = min; 35     pool->cur_  = 0; 36     pool->max_  = max; 37     pool->last_empty_ = time(NULL); 38     pthread_mutex_init(&pool->mutex_, NULL); 39      40     int ret = 0; 41     while (min--) { 42         ret = CreateOneThread(pool); 43         if (ret != 0) { 44             exit(0); 45         } 46     } 47     return pool; 48 } 49  50 static int CreateOneThread(CThreadPool *pool) 51 { 52     pthread_t tid; 53     return pthread_create(&tid, NULL, DoProcess, pool); 54 } 55  56 static void *DoProcess(void *arg) 57 { 58     CThreadPool *pool = (CThreadPool *)arg; 59      60     CThreadWorker worker; 61      62     InitWorker(&worker); 63      64     pthread_mutex_lock(&pool->mutex_); 65     pool->cur_ += 1; 66      67     for (;;) { 68         PushWork(pool, &worker); 69         worker.sleep_when_ = time(NULL); 70         pthread_mutex_unlock(&pool->mutex_); 71          72         pthread_mutex_lock(&worker.mutex_); 73         while (worker.wake_up_ != 1) { 74             pthread_cond_wait(&worker.wake_, &worker.mutex_); 75         } 76         // worker線程已被喚醒,準備開始執行任務,修改wake_up_標志。 77         worker.wake_up_ = 0; 78         pthread_mutex_unlock(&worker.mutex_); 79          80         // 執行我們的任務,執行完畢之后,修改worker.func為NULL。 81         worker.func(arg); 82         worker.func = NULL; 83          84         // 任務執行完以后,線程池需要根據當前的線程池狀態來判斷是要把該線程重新加入線程池,還是要創建一個新的線程。 85         pthread_mutex_lock(&pool->mutex_); 86         if (WaitWorkerTimeout(pool) && NoThreadInPool(pool) && CanCreateThread(pool)) { 87             // 在我們執行這個任務的時候,其他任務等待空閑線程的時間超過了1秒,而且線程池中沒有線程,且線程池當前線程數沒有超過最大允許創建線程數 88             CreateOneThread(pool); 89         } 90          91         // 線程池中沒有線程了,重新把該線程加入線程池 92         if (NoThreadInPool(pool)) { 93             continue; 94         } 95          96         // 線程池中線程數低于最低閾值,重新把該線程加入線程池 97         if (pool->curr <= pool->min) { 98             continue; 99         }100         101         // 線程中睡眠的線程時間超過了1秒,說明線程池不是很繁忙,不需要把該線程重新加回線程池102         if (WorkerIdleTimeout(pool)) {103             break;104         }105     }106     107     pool->cur -= 1;108     pthread_mutex_unlock(&pool->mutex);109     110     pthread_cond_destroy(&worker.wake_);111     pthread_mutex_destroy(&worker.mutex_);112 113     pthread_exit(NULL);114 }115 116 static void InitWorker(CThreadWorker *worker)117 {118     worker->next_ = NULL;119     worker->wake_up_ = 0;120     pthread_cond_init(&worker->wake_, NULL);121     pthread_mutex_init(&worker->mutex_, NULL);122     worker->tid_ = pthread_self();123     worker->func = NULL;124     worker->arg = NULL;125     worker->sleep_when_ = 0;126 }127 128 static void PushWork(CThreadPool *pool, CThreadWorker *worker)129 {130     worker->next_ = pool->head_;131     pool->next_ = worker;132 }133 134 static int WorkerIdleTimeout(CThreadPool *pool)135 {136     CThreadWorker *worker;137     138     if (NoThreadInPool(pool)) {139         return 0;140     }141     worker = pool->head_;142     return (time(NULL) > worker->sleep_when_ + 1)? 1 : 0;143 }144 145 int StartWork(CThreadPool *pool, void (*func)(void *arg), void *arg)146 {147     if (func == NULL) {148         return -1;149     }150     151     CThreadWorker *worker;152     pthread_mutex_lock(&pool->mutex_);153     worker = GetWorker(pool, func, arg);154     pthread_mutex_unlock(&pool->mutex_);155     156     if (worker == NULL) {157         return -2;158     }159     160     WakeupWorkerThread(worker);161     return 0;162 }163 164 static CThreadWorker *GetWorker(CThreadPool *pool, void (*func)(void *arg), void *arg)165 {166     CThreadWorker *worker;167     168     if (NoThreadInPool(pool)) {169         return NULL;170     }171     172     worker = pool->head_;173     PopWork(pool, worker);174     175     if (NoThreadInPool(pool)) {176         pool->last_empty_ = time(NULL);177     }178 179     worker->func = func;180     worker->arg = arg;181 182     return worker;183 }184 185 static void PopWork(CThreadPool *pool, CThreadWorker *worker)186 {187     pool->head_ = worker->next_;188     worker->next_ = NULL;189 }190 191 static void WakeupWorkerThread(CThreadWorker *worker)192 {193     pthread_mutex_lock(&worker->mutex_);194     worker->wake_up_ = 1;195     pthread_mutex_unlock(&worker->mutex_);196 197     pthread_cond_signal(&worker->wake_);198 }
3 總結

該線程池模型實現了我們線上環境實際的一些應用場景,沒有考慮的問題有這么幾點:

1. 沒有考慮任務類的概念,寫一個任務基類,然后具體的任務類繼承這個基類,實現自己的功能,具體執行時候,只需要add_task,把任務加進線程池即可,這樣做的目的是想著可以給每個任務發信號,是否要終止任務的執行。當前的線程池做不到這點,也做不到判斷任務執行時間,是否超時。

2. 每個線程只執行一個任務,而不是搞一個任務隊列,去執行任務隊列里的任務。關于這點,我也不清楚,搞任務隊列的優勢是什么。

個人能想到的就是這些,不知道諸位平時工作中,具體應用線程池的時候需要考慮一些什么場景,我這里想到的就是1、可以靈活的終止任務;2、可以判斷任務是否超時;3、可以對需要執行的任務做到負載均衡(覺得這一點,在這里的線程池里不是問題,極端的情況是來了一堆任務,線程池中線程不夠了,那確實會有這個問題)。

關于這幾個問題,第3個我個人能解決,可以把上面的線程池稍微修改一下,每個線程結果ThreadWorker里面再加個任務隊列,當沒有任務時候線程阻塞,否則就取出任務來執行。

關于第一點,怎么修改這個框架,達到可以給任務發信號呢。

還有第二點,判斷任務是否超時呢?

歡迎大家一起討論!


發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
亚洲香蕉成人av网站在线观看_欧美精品成人91久久久久久久_久久久久久久久久久亚洲_热久久视久久精品18亚洲精品_国产精自产拍久久久久久_亚洲色图国产精品_91精品国产网站_中文字幕欧美日韩精品_国产精品久久久久久亚洲调教_国产精品久久一区_性夜试看影院91社区_97在线观看视频国产_68精品久久久久久欧美_欧美精品在线观看_国产精品一区二区久久精品_欧美老女人bb
成人国产精品久久久久久亚洲| 亚洲自拍欧美色图| 国产精品久久久久秋霞鲁丝| 91亚洲精品久久久久久久久久久久| 日韩精品日韩在线观看| 国产免费久久av| 欧美成人午夜激情在线| 国产精品www| 精品国产拍在线观看| 欧美大片在线看免费观看| 久久久久久国产精品久久| 国产精品成人在线| 久久久久成人精品| 欧美日韩xxxxx| 国产v综合v亚洲欧美久久| 成人激情综合网| 久久综合五月天| 91免费视频网站| 国产一区欧美二区三区| 久久久精品亚洲| 中日韩美女免费视频网站在线观看| 国产精品男人爽免费视频1| 中文字幕免费国产精品| 一区二区三区无码高清视频| 91国在线精品国内播放| 欧美国产极速在线| 欧美日韩中文字幕在线| 欧美亚洲在线播放| 久久精品国产一区二区三区| 国产精品一区二区久久精品| 国产精品久久婷婷六月丁香| 欧美精品一区在线播放| 国产精品美乳在线观看| 一区二区在线视频播放| 国产精品入口日韩视频大尺度| 亚洲美女av在线播放| 国产盗摄xxxx视频xxx69| 日韩电影免费在线观看中文字幕| 久久精品一本久久99精品| 91精品国产91久久久久| 国产伊人精品在线| 国产国语刺激对白av不卡| 精品视频在线播放色网色视频| 久久久久久久久久国产精品| 在线播放国产一区中文字幕剧情欧美| 91a在线视频| 日韩在线观看网址| 91国产高清在线| 日韩中文在线中文网三级| 日韩欧美aⅴ综合网站发布| 伊人久久久久久久久久久久久| 国产成人精品国内自产拍免费看| 欧美国产日韩在线| 久久久精品中文字幕| 亚洲国产精品悠悠久久琪琪| 欧美激情精品在线| 欧美黑人狂野猛交老妇| 国产va免费精品高清在线| 欧美激情精品久久久久久久变态| 精品国产乱码久久久久久虫虫漫画| 91精品国产91久久久久久久久| 欧美性xxxx极品hd欧美风情| 国产99久久精品一区二区永久免费| 色悠久久久久综合先锋影音下载| 色综合亚洲精品激情狠狠| 欧美国产第一页| 日韩av在线免费观看| 日产日韩在线亚洲欧美| 久久天堂av综合合色| 中文字幕欧美亚洲| 亚洲国产日韩欧美在线99| 中文字幕欧美精品在线| 欧美老少做受xxxx高潮| 国产精品国模在线| 国产视频福利一区| 日韩精品在线看| 亚洲欧美日韩中文在线| 欧美精品午夜视频| 国产亚洲激情在线| 国产亚洲一区二区精品| 欧美大全免费观看电视剧大泉洋| 精品久久久久久亚洲国产300| 久久久999国产| 亚洲人成亚洲人成在线观看| 久久久伊人欧美| 亚洲黄色在线观看| 欧美乱人伦中文字幕在线| 欧洲亚洲妇女av| 国产精品91视频| 国产精品日韩欧美| 国产亚洲精品久久久久久| 欧美日韩成人网| 91成人天堂久久成人| 亚洲欧洲av一区二区| 国产精品免费看久久久香蕉| 亚洲а∨天堂久久精品喷水| 亚洲高清在线观看| 国产精品成人一区| 亚洲高清在线观看| 亚洲第一区在线观看| 俺去亚洲欧洲欧美日韩| 亚洲伊人久久综合| 性欧美办公室18xxxxhd| 国产精品高潮呻吟久久av黑人| 夜夜嗨av一区二区三区免费区| 日韩成人xxxx| 欧美黄色片免费观看| 另类色图亚洲色图| 久久精品91久久久久久再现| 国产精品视频播放| 成人免费视频97| 精品国产91乱高清在线观看| 欧美成人精品在线视频| 亚洲视频axxx| 亚洲国产91精品在线观看| 欧美精品久久一区二区| 亚洲嫩模很污视频| 亚洲成人黄色在线观看| 国产精品电影久久久久电影网| 中文字幕久热精品在线视频| 最近中文字幕2019免费| 欧美精品videosex性欧美| 亚洲电影第1页| 欧美午夜片欧美片在线观看| 自拍偷拍亚洲一区| 国产国语刺激对白av不卡| 亚洲一区二区久久久久久| 亚洲日韩欧美视频一区| 69精品小视频| 欧美理论在线观看| 国内精品美女av在线播放| 成人夜晚看av| 中文综合在线观看| 中文字幕久久久av一区| 欧美丝袜一区二区三区| 亚洲精品之草原avav久久| 日本精品视频在线观看| 69**夜色精品国产69乱| 成人免费高清完整版在线观看| 国产成人免费av| 国产精品精品久久久久久| 色樱桃影院亚洲精品影院| 欧美日韩国产一区二区三区| 这里只有精品在线观看| 国产精品99免视看9| 欧美在线视频网站| 欧美成人免费va影院高清| 亚洲精品国产精品久久清纯直播| 国内免费精品永久在线视频| 国产精品香蕉av| 欧美另类高清videos| 国产日韩欧美中文| 精品动漫一区二区三区| 一本色道久久88精品综合| 久久精品一偷一偷国产| 亚洲大胆美女视频| 日韩有码片在线观看| 久久久精品国产网站| 国产91精品黑色丝袜高跟鞋| 国产精品久久国产精品99gif| 亚洲精品99久久久久中文字幕| 国产精品九九久久久久久久| 亚洲另类xxxx| 日韩免费观看在线观看|