大家都知道在C++中都會使用到線程池,但是對于新手們來說實現線程池并不是一件容易的事,那么接下來的內容中就讓小編為大家介紹c++如何實現線程池的方法吧,感興趣的朋友們一起來看看。
這是對pthread線程的一個簡單應用
1.????? 實現了線程池的概念,線程可以重復使用。
2.????? 對信號量,互斥鎖等進行封裝,業務處理函數中只需寫和業務相關的代碼。
3.????? 移植性好。如果想把這個線程池代碼應用到自己的實現中去,只要寫自己的業務處理函數和改寫工作隊列數據的處理方法就可以了。
Sample代碼主要包括一個主程序和兩個線程實現類
ThreadTest.cpp:主程序
CThreadManager:線程管理Class,線程池的實現類
CThread:線程Class.
主程序實現方法。
1.????? 實現main函數和一個需要線程處理的業務函數(例子代碼中業務函數是一個簡單的計算函數Count)。在main函數中創建CThreadManager的實例,產生線程池。這個時候,把業務函數作為函數指針傳到CThreadManager里面,最終會被線程調用。
2.????? 向工作隊列中放入業務函數要處理的數據。
3.????? 設置信號量,喚醒線程。
// 線程要執行的函數int Count(int nWork){ int nResult = nWork * nWork; printf("count result is %d/n",nResult); return 0;}int main() { // 創建線程管理類的實例,把要執行的線程函數和最大線程數傳進去 CThreadManager* pManager = new CThreadManager(Count, 3); // 把要進行計算的數放到工作隊列中 pManager->PushWorkQue(5); pManager->PushWorkQue(20); // 設置信號量,喚醒線程 pManager->PostSem(); pManager->PostSem(); // 等待子線程執行 sleep(1); return 0;}
CThreadManager實現的方法
1. 把信號量和互斥鎖等封裝成自己的函數
2. 在new方法里,循環調用CThread的new方法,啟動一定數量(可設定)的線程,產生線程池。
3. 這些線程啟動后,就會執行CThreadManager中的ManageFuction函數。這個函數是無限循環的,保證了線程在整個程序的生命周期中不銷毀。
4. 在循環處理里面,第一行代碼就是等待一個信號量,這個信號量是由主程序進行設置的,這個信號信號量如果沒有被設置(代表暫時沒有需要處理的工作),所有線程都在這里阻塞著。
4.????? 一旦信號量被設置,根據Linux線程調度機制,在阻塞的線程隊列中,其中一個線程被喚醒,可以執行后面的代碼。
5.????? 從工作隊列中取出要進行處理的數據(使用互斥鎖進行排他)
6.????? 通過函數指針調用main函數傳過來的業務函數,處理數據。
7.????? 業務函數執行完之后,線程進入下一個循環,等待新的信號量。
class CThreadManager { friend void* ManageFuction(void*);private: sem_t m_sem; // 信號量 pthread_mutex_t m_mutex; // 互斥鎖 queue<int> m_queWork; // 工作隊列 list<CThread*> m_lstThread; // 線程list int (*m_threadFuction)(int); //函數指針,指向main函數傳過來的線程執行函數public: CThreadManager(int (*threadFuction)(int), int nMaxThreadCnt); virtual ~CThreadManager(); int WaitSem(); int PostSem(); int LockMutex(); int UnlockMutex(); void PushWorkQue(int nWork); int PopWorkQue(); int RunThreadFunction(int nWork);};// 線程執行函數,它只是個殼子,處理信號量和互斥鎖等,// 最后調用main函數傳過來的線程執行函數來實現業務處理void* ManageFuction(void* argv){ CThreadManager* pManager = (CThreadManager*)argv; // 進行無限循環(意味著線程是不銷毀的,重復利用) while(true) { // 線程開啟后,就在這里阻塞著,直到main函數設置了信號量 pManager->WaitSem(); printf("thread wakeup./n"); // 從工作隊列中取出要處理的數 pManager->LockMutex(); int nWork = pManager->PopWorkQue(); pManager->UnlockMutex(); printf("call Count function./n"); pManager->RunThreadFunction(nWork); } return 0;}// 構造方法CThreadManager::CThreadManager(int (*threadFuction)(int), int nMaxThreadCnt) { sem_init(&m_sem, 0, 0); pthread_mutex_init(&m_mutex, NULL); m_threadFuction = threadFuction; for(int i=0; i<nMaxThreadCnt; i++) { CThread* pThread = new CThread(ManageFuction, this); printf("thread started./n"); m_lstThread.push_back(pThread); }}
CThread實現的方法
CThreadManager比較簡單,封裝了創建線程和join線程的函數。
CThread::CThread(void* (*threadFuction)(void*),void* threadArgv) { // 初始化線程屬性 pthread_attr_t threadAttr; pthread_attr_init(&threadAttr); pthread_create(&m_thread, &threadAttr, threadFuction, threadArgv);}
c++線程池,繼承CDoit,實現其中的start和end
/* * 多線程管理類 * */ #ifndef CTHREADPOOLMANAGE_H#define CTHREADPOOLMANAGE_H#include <iostream>#include <pthread.h>#include <unistd.h> #include <list>#include <vector>#include <time.h>#include <asm/errno.h> #define USLEEP_TIME 100#define CHECK_TIME 1 using namespace std;class CDoit{public: virtual int start(void *){}; virtual int end(){};}; class CthreadPoolManage{private: int _minThreads; //最少保留幾個線程 int _maxThreads; //最多可以有幾個線程 int _waitSec; //空閑多少秒后將線程關閉 class threadInfo{ public: threadInfo(){ isbusy = false; doFlag = true; } // pthread_mutex_t mtx=PTHREAD_MUTEX_INITIALIZER; pthread_cond_t cond=PTHREAD_COND_INITIALIZER; bool isbusy; //是否空閑 bool doFlag; // time_t beginTime; //線程不工作開始時間 pthread_t cThreadPid; //線程id pthread_attr_t cThreadAttr; //線程屬性 CDoit * doit; //任務類 void * value; //需要傳遞的值 }; //線程函數 static void* startThread(void*); //任務隊列鎖 pthread_mutex_t _duty_mutex; //任務隊列 list<threadInfo*> _dutyList; //線程隊列鎖 pthread_mutex_t _thread_mutex; //線程隊列 list<threadInfo*> _threadList; ///初始化,創建最小個數線程///void initThread(); ///任務分配線程///static void* taskAllocation(void*arg);pthread_t tasktPid;///線程銷毀、狀態檢查線程///static void* checkThread(void* arg);pthread_t checktPid;bool checkrun; //線程異常退出清理static void threadCleanUp(void* arg); //int addThread(list<threadInfo*> *plist,threadInfo* ptinfo); public:CthreadPoolManage();/*保留的最少線程,最多線程數,空閑多久銷毀,保留幾個線程的冗余 */CthreadPoolManage(int min,int max,int waitSec);~CthreadPoolManage(); int start();//任務注入器int putDuty(CDoit *,void *); int getNowThreadNum(); }; #endif // CTHREADPOOLMANAGE_H
CPP
/* * 線程池,線程管理類 * */ #include "cthreadpoolmanage.h" CthreadPoolManage::CthreadPoolManage(){ _minThreads = 5; //最少保留幾個線程 _maxThreads = 5; //最多可以有幾個線程 _waitSec = 10; //空閑多少秒后將線程關閉 pthread_mutex_init(&_duty_mutex, NULL); pthread_mutex_init(&_thread_mutex, NULL); checkrun = true;} CthreadPoolManage::CthreadPoolManage(int min, int max, int waitSec){ CthreadPoolManage(); _minThreads = min; //最少保留幾個線程 _maxThreads = max; //最多可以有幾個線程 _waitSec = waitSec; //空閑多少秒后將線程關閉} CthreadPoolManage::~CthreadPoolManage(){ }void CthreadPoolManage::threadCleanUp(void* arg){ threadInfo* tinfo = (threadInfo*)arg; tinfo->isbusy = false; pthread_mutex_unlock(&tinfo->mtx); pthread_attr_destroy (&tinfo->cThreadAttr); delete tinfo;} void* CthreadPoolManage::startThread(void* arg){ cout<<"線程開始工作"<<endl; threadInfo* tinfo = (threadInfo*)arg; pthread_cleanup_push(threadCleanUp,arg); while(tinfo->doFlag){ pthread_mutex_lock(&tinfo->mtx); if(tinfo->doit == NULL) { cout<<"開始等待任務"<<endl; pthread_cond_wait(&tinfo->cond,&tinfo->mtx); cout<<"有任務了"<<endl; } tinfo->isbusy = true; tinfo->doit->start(tinfo->value); tinfo->doit->end(); tinfo->doit=NULL; tinfo->isbusy = false; time( &tinfo->beginTime); pthread_mutex_unlock(&tinfo->mtx); } //0正常執行到這兒不執行清理函數,異常會執行 pthread_cleanup_pop(0); pthread_attr_destroy (&tinfo->cThreadAttr); delete tinfo; cout<<"線程結束"<<endl;} void CthreadPoolManage::initThread(){ int i = 0; for(i = 0;i<this->_minThreads;i++) { threadInfo *tinfo = new threadInfo; tinfo->doit = NULL; tinfo->value = NULL; tinfo->isbusy = false; tinfo->doFlag = true; // PTHREAD_CREATE_DETACHED (分離線程) 和 PTHREAD _CREATE_JOINABLE (非分離線程) pthread_attr_init(&tinfo->cThreadAttr); pthread_attr_setdetachstate(&tinfo->cThreadAttr,PTHREAD_CREATE_DETACHED ); cout<<"初始化了一個線程"<<endl; if(pthread_create(&tinfo->cThreadPid,&tinfo->cThreadAttr,startThread,(void *)tinfo) != 0) { cout<<"創建線程失敗"<<endl; break; } this->_threadList.push_back(tinfo); }} int CthreadPoolManage::addThread(std::list< CthreadPoolManage::threadInfo* >* plist, CthreadPoolManage::threadInfo* ptinfo){ threadInfo *tinfo = new threadInfo; tinfo->doit = ptinfo->doit; tinfo->value = ptinfo->value; tinfo->isbusy = true; if(pthread_create(&tinfo->cThreadPid,NULL,startThread,(void *)tinfo) != 0) { cout<<"創建線程失敗"<<endl; return -1; } plist->push_back(tinfo); return 0;} int CthreadPoolManage::putDuty(CDoit* doit, void* value){ threadInfo *tinfo = new threadInfo; time( &tinfo->beginTime); tinfo->doit= doit; tinfo->value = value; pthread_mutex_lock(&_duty_mutex); this->_dutyList.push_back(tinfo); pthread_mutex_unlock(&_duty_mutex); return 0;} void* CthreadPoolManage::taskAllocation(void*arg){ CthreadPoolManage * ptmanage = (CthreadPoolManage*)arg; int size_1 = 0; int size_2 = 0; int i_1 = 0; int i_2 = 0; bool a_1 = true; bool a_2 = true; threadInfo* ptinfo; threadInfo* ptinfoTmp; while(true){ size_1 = 0; size_2 = 0; pthread_mutex_lock(&ptmanage->_duty_mutex); pthread_mutex_lock(&ptmanage->_thread_mutex); size_1 = ptmanage->_dutyList.size(); size_2 =ptmanage->_threadList.size(); for(list<threadInfo*>::iterator itorti1 = ptmanage->_dutyList.begin();itorti1 !=ptmanage->_dutyList.end();) { ptinfo = *itorti1; a_1 = true; for(list<threadInfo*>::iterator itorti2 = ptmanage->_threadList.begin();itorti2!=ptmanage->_threadList.end();itorti2++){ ptinfoTmp = *itorti2; if(EBUSY == pthread_mutex_trylock(&ptinfoTmp->mtx)) { continue; } if(!ptinfoTmp->isbusy) { ptinfoTmp->doit = ptinfo->doit; ptinfoTmp->value = ptinfo->value; ptinfoTmp->isbusy = true; pthread_cond_signal(&ptinfoTmp->cond); pthread_mutex_unlock(&ptinfoTmp->mtx); a_1 = false; delete ptinfo; break; } pthread_mutex_unlock(&ptinfoTmp->mtx); } if(a_1){ if(ptmanage->_threadList.size()>ptmanage->_maxThreads||ptmanage->addThread(&ptmanage->_threadList,ptinfo)!=0) { itorti1++; continue; }else{ itorti1 = ptmanage->_dutyList.erase(itorti1); } delete ptinfo; }else{ itorti1 = ptmanage->_dutyList.erase(itorti1); } } pthread_mutex_unlock(&ptmanage->_duty_mutex); pthread_mutex_unlock(&ptmanage->_thread_mutex); usleep(USLEEP_TIME); } return 0;} void* CthreadPoolManage::checkThread(void* arg){ CthreadPoolManage * ptmanage = (CthreadPoolManage*)arg; threadInfo* ptinfo; time_t nowtime; while(ptmanage->checkrun){ sleep(CHECK_TIME); pthread_mutex_lock(&ptmanage->_thread_mutex); if(ptmanage->_threadList.size()<=ptmanage->_minThreads) { pthread_mutex_unlock(&ptmanage->_thread_mutex); continue; } for(list<threadInfo*>::iterator itorti2 = ptmanage->_threadList.begin();itorti2!=ptmanage->_threadList.end();){ ptinfo = *itorti2; if(EBUSY == pthread_mutex_trylock(&ptinfo->mtx)) { itorti2++; continue; } time(&nowtime); if(ptinfo->isbusy == false && nowtime-ptinfo->beginTime>ptmanage->_waitSec) { ptinfo->doFlag = false; itorti2 = ptmanage->_threadList.erase(itorti2); }else{ itorti2++; } pthread_mutex_unlock(&ptinfo->mtx); } pthread_mutex_unlock(&ptmanage->_thread_mutex); }} int CthreadPoolManage::start(){ //初始化 this->initThread(); //啟動任務分配線程 if(pthread_create(&tasktPid,NULL,taskAllocation,(void *)this) != 0) { cout<<"創建任務分配線程失敗"<<endl; return -1; } //創建現程狀態分配管理線程 if(pthread_create(&checktPid,NULL,checkThread,(void *)this) != 0) { cout<<"創建線程狀態分配管理線程失敗"<<endl; return -1; } return 0;} ///////////////////////////////int CthreadPoolManage::getNowThreadNum(){ int num = 0; pthread_mutex_lock(&this->_thread_mutex); num = this->_threadList.size(); pthread_mutex_unlock(&this->_thread_mutex); return num ;}
看完后你知道c++如何實現線程池了嗎?小編的經驗尚淺,這里就簡單總結了這么多,如果有其他更好的實現PHP多線程的方法可以一起討論!
新聞熱點
疑難解答
圖片精選