亚洲香蕉成人av网站在线观看_欧美精品成人91久久久久久久_久久久久久久久久久亚洲_热久久视久久精品18亚洲精品_国产精自产拍久久久久久_亚洲色图国产精品_91精品国产网站_中文字幕欧美日韩精品_国产精品久久久久久亚洲调教_国产精品久久一区_性夜试看影院91社区_97在线观看视频国产_68精品久久久久久欧美_欧美精品在线观看_国产精品一区二区久久精品_欧美老女人bb

首頁 > 編程 > C++ > 正文

c++版線程池和任務池示例

2020-01-26 15:35:20
字體:
來源:轉載
供稿:網友

commondef.h

復制代碼 代碼如下:

//單位秒,監測空閑列表時間間隔,在空閑隊列中超過TASK_DESTROY_INTERVAL時間的任務將被自動銷毀
const int CHECK_IDLE_TASK_INTERVAL = 300;
//單位秒,任務自動銷毀時間間隔
const int TASK_DESTROY_INTERVAL = 60;

//監控線程池是否為空時間間隔,微秒
const int IDLE_CHECK_POLL_EMPTY = 500;

//線程池線程空閑自動退出時間間隔 ,5分鐘
const int  THREAD_WAIT_TIME_OUT = 300;

taskpool.cpp

復制代碼 代碼如下:

#include "taskpool.h"

#include <string.h>

#include <stdio.h>
#include <pthread.h>

    TaskPool::TaskPool(const int & poolMaxSize)
    : m_poolSize(poolMaxSize)
      , m_taskListSize(0)
      , m_bStop(false)
{
    pthread_mutex_init(&m_lock, NULL);
    pthread_mutex_init(&m_idleMutex, NULL);
    pthread_cond_init(&m_idleCond, NULL);

    pthread_attr_t attr;
    pthread_attr_init( &attr );
    pthread_attr_setdetachstate( &attr, PTHREAD_CREATE_JOINABLE ); // 讓線程獨立運行
    pthread_create(&m_idleId, &attr, CheckIdleTask, this); //創建監測空閑任務進程
    pthread_attr_destroy(&attr);
}

TaskPool::~TaskPool()
{
    if(!m_bStop)
    {
        StopPool();
    }
    if(!m_taskList.empty())
    {
        std::list<Task*>::iterator it = m_taskList.begin();
        for(; it != m_taskList.end(); ++it)
        {
            if(*it != NULL)
            {
                delete *it;
                *it = NULL;
            }
        }
        m_taskList.clear();
        m_taskListSize = 0;
    }
    if(!m_idleList.empty())
    {
        std::list<Task*>::iterator it = m_idleList.begin();
        for(; it != m_idleList.end(); ++it)
        {
            if(*it != NULL)
            {
                delete *it;
                *it = NULL;
            }
        }
        m_idleList.clear();
    }


    pthread_mutex_destroy(&m_lock);
    pthread_mutex_destroy(&m_idleMutex);
    pthread_cond_destroy(&m_idleCond);
}

void * TaskPool::CheckIdleTask(void * arg)
{
    TaskPool * pool = (TaskPool*)arg;
    while(1)
    {
        pool->LockIdle();
        pool->RemoveIdleTask();
        if(pool->GetStop())
        {
            pool->UnlockIdle();
            break;
        }
        pool->CheckIdleWait();
        pool->UnlockIdle();
    }
}

void TaskPool::StopPool()
{
    m_bStop = true;
    LockIdle();
    pthread_cond_signal(&m_idleCond); //防止監控線程正在等待,而引起無法退出的問題
    UnlockIdle();
    pthread_join(m_idleId, NULL);
}

bool TaskPool::GetStop()
{
    return m_bStop;
}

void TaskPool::CheckIdleWait()
{
    struct timespec timeout;
    memset(&timeout, 0, sizeof(timeout));
    timeout.tv_sec = time(0) + CHECK_IDLE_TASK_INTERVAL;
    timeout.tv_nsec = 0;
    pthread_cond_timedwait(&m_idleCond, &m_idleMutex, &timeout);
}

int TaskPool::RemoveIdleTask()
{
    int iRet = 0;
    std::list<Task*>::iterator it, next;
    std::list<Task*>::reverse_iterator rit = m_idleList.rbegin();
    time_t curTime = time(0);
    for(; rit != m_idleList.rend(); )
    {
        it = --rit.base();
        if(difftime(curTime,((*it)->last_time)) >= TASK_DESTROY_INTERVAL)
        {
            iRet++;
            delete *it;
            *it = NULL;
            next = m_idleList.erase(it);
            rit = std::list<Task*>::reverse_iterator(next);
        }
        else
        {
            break;   
        }
    }
}

int TaskPool::AddTask(task_fun fun, void *arg)
{
    int iRet = 0;
    if(0 != fun)
    {
        pthread_mutex_lock(&m_lock);
        if(m_taskListSize >= m_poolSize)
        {
            pthread_mutex_unlock(&m_lock);
            iRet = -1; //task pool is full;
        }
        else
        {
            pthread_mutex_unlock(&m_lock);
            Task * task = GetIdleTask();
            if(NULL == task)
            {
                task = new Task;
            }
            if(NULL == task)
            {
                iRet = -2; // new failed
            }
            else
            {
                task->fun = fun;
                task->data = arg;
                pthread_mutex_lock(&m_lock);
                m_taskList.push_back(task);
                ++m_taskListSize;
                pthread_mutex_unlock(&m_lock);
            }
        }
    }
    return iRet;
}

Task* TaskPool::GetTask()
{
    Task *task = NULL;
    pthread_mutex_lock(&m_lock);
    if(!m_taskList.empty())
    {
        task =  m_taskList.front();
        m_taskList.pop_front();
        --m_taskListSize;
    }
    pthread_mutex_unlock(&m_lock);
    return task;
}

void TaskPool::LockIdle()
{
    pthread_mutex_lock(&m_idleMutex);
}

void TaskPool::UnlockIdle()
{
    pthread_mutex_unlock(&m_idleMutex);
}

Task * TaskPool::GetIdleTask()
{
    LockIdle();
    Task * task = NULL;
    if(!m_idleList.empty())
    {
        task = m_idleList.front();
        m_idleList.pop_front();
    }
    UnlockIdle();
    return task;
}

void TaskPool::SaveIdleTask(Task*task)
{
    if(NULL != task)
    {
        task->fun = 0;
        task->data = NULL;
        task->last_time = time(0);
        LockIdle();
        m_idleList.push_front(task);
        UnlockIdle();
    }
}

taskpool.h

復制代碼 代碼如下:

#ifndef TASKPOOL_H
#define TASKPOOL_H
/* purpose @ 任務池,主要是緩沖外部高并發任務數,有manager負責調度任務
 *          任務池可自動銷毀長時間空閑的Task對象
 *          可通過CHECK_IDLE_TASK_INTERVAL設置檢查idle空閑進程輪訓等待時間
 *          TASK_DESTROY_INTERVAL 設置Task空閑時間,超過這個時間值將會被CheckIdleTask線程銷毀
 * date    @ 2013.12.23
 * author  @ haibin.wang
 */

#include <list>
#include <pthread.h>
#include "commondef.h"

//所有的用戶操作為一個task,
typedef void (*task_fun)(void *);
struct Task
{
    task_fun fun; //任務處理函數
    void* data; //任務處理數據
    time_t last_time; //加入空閑隊列的時間,用于自動銷毀
};

//任務池,所有任務會投遞到任務池中,管理線程負責將任務投遞給線程池
class TaskPool
{
public:
 /* pur @ 初始化任務池,啟動任務池空閑隊列自動銷毀線程
     * para @ maxSize 最大任務數,大于0
    */
    TaskPool(const int & poolMaxSize);
    ~TaskPool();

    /* pur @ 添加任務到任務隊列的尾部
     * para @ task, 具體任務
     * return @ 0 添加成功,負數 添加失敗
    */   
    int AddTask(task_fun fun, void* arg);

    /* pur @ 從任務列表的頭獲取一個任務
     * return @  如果列表中有任務則返回一個Task指針,否則返回一個NULL
    */   
    Task* GetTask();

    /* pur @ 保存空閑任務到空閑隊列中
     * para @ task 已被調用執行的任務
     * return @
    */
    void SaveIdleTask(Task*task);

    void StopPool();
public:
    void LockIdle();
    void UnlockIdle();
    void CheckIdleWait();
    int RemoveIdleTask();
    bool GetStop();
private:
    static void * CheckIdleTask(void *);
    /* pur @ 獲取空閑的task
     * para @
     * para @
     * return @ NULL說明沒有空閑的,否則從m_idleList中獲取一個
    */
    Task* GetIdleTask();
    int GetTaskSize();
private:
    int m_poolSize; //任務池大小
    int m_taskListSize; // 統計taskList的大小,因為當List的大小會隨著數量的增多而耗時增加
    bool m_bStop; //是否停止
    std::list<Task*> m_taskList;//所有待處理任務列表
    std::list<Task*> m_idleList;//所有空閑任務列表
    pthread_mutex_t m_lock; //對任務列表進行加鎖,保證每次只能取一個任務
    pthread_mutex_t m_idleMutex; //空閑任務隊列鎖
    pthread_cond_t m_idleCond; //空閑隊列等待條件
    pthread_t m_idleId;;
};
#endif

threadpool.cpp

復制代碼 代碼如下:

/* purpose @ 線程池類,負責線程的創建與銷毀,實現線程超時自動退出功能(半駐留)
 * date    @ 2014.01.03
 * author  @ haibin.wang
 */

#include "threadpool.h"
#include <errno.h>
#include <string.h>

/*
#include <iostream>
#include <stdio.h>
*/

Thread::Thread(bool detach, ThreadPool * pool)
    : m_pool(pool)
{
    pthread_attr_init(&m_attr);
    if(detach)
    {
        pthread_attr_setdetachstate(&m_attr, PTHREAD_CREATE_DETACHED ); // 讓線程獨立運行
    }
    else
    {
         pthread_attr_setdetachstate(&m_attr, PTHREAD_CREATE_JOINABLE );
    }

    pthread_mutex_init(&m_mutex, NULL); //初始化互斥量
    pthread_cond_init(&m_cond, NULL); //初始化條件變量
    task.fun = 0;
    task.data = NULL;
}

Thread::~Thread()
{
    pthread_cond_destroy(&m_cond);
    pthread_mutex_destroy(&m_mutex);
    pthread_attr_destroy(&m_attr);
}

    ThreadPool::ThreadPool()
    : m_poolMax(0)
    , m_idleNum(0)
    , m_totalNum(0)
      , m_bStop(false)
{
    pthread_mutex_init(&m_mutex, NULL);
    pthread_mutex_init(&m_runMutex,NULL);
    pthread_mutex_init(&m_terminalMutex, NULL);
    pthread_cond_init(&m_terminalCond, NULL);
    pthread_cond_init(&m_emptyCond, NULL);
}

ThreadPool::~ThreadPool()
{
    /*if(!m_threads.empty())
    {
        std::list<Thread*>::iterator it = m_threads.begin();
        for(; it != m_threads.end(); ++it)
        {
            if(*it != NULL)
            {
                pthread_cond_destroy( &((*it)->m_cond) );
                pthread_mutex_destroy( &((*it)->m_mutex) );
                delete *it;
                *it = NULL;
            }
        }
        m_threads.clear();
    }*/
    pthread_mutex_destroy(&m_runMutex);
    pthread_mutex_destroy(&m_terminalMutex);
    pthread_mutex_destroy(&m_mutex);
    pthread_cond_destroy(&m_terminalCond);
    pthread_cond_destroy(&m_emptyCond);
}

int ThreadPool::InitPool(const int & poolMax, const int & poolPre)
{
    if(poolMax < poolPre
            || poolPre < 0
            || poolMax <= 0)
    {
        return -1;
    }
    m_poolMax = poolMax;

    int iRet = 0;
    for(int i=0; i<poolPre; ++i)
    {
        Thread * thread = CreateThread();
        if(NULL == thread)
        {
            iRet = -2;
        }
    }

    if(iRet < 0)
    { 
        std::list<Thread*>::iterator it = m_threads.begin();
        for(; it!= m_threads.end(); ++it)
        {
            if(NULL != (*it) )
            {
                delete *it;
                *it = NULL;
            }
        }
        m_threads.clear();
        m_totalNum = 0;
    }
    return iRet;
}

void ThreadPool::GetThreadRun(task_fun fun, void* arg)
{
    //從線程池中獲取一個線程
    pthread_mutex_lock( &m_mutex);
    if(m_threads.empty())
    {
        pthread_cond_wait(&m_emptyCond,&m_mutex); //阻塞等待有空閑線程
    }

    Thread * thread = m_threads.front();
    m_threads.pop_front();
    pthread_mutex_unlock( &m_mutex);

    pthread_mutex_lock( &thread->m_mutex );
    thread->task.fun = fun;
    thread->task.data = arg;       
    pthread_cond_signal(&thread->m_cond); //觸發線程WapperFun循環執行
    pthread_mutex_unlock( &thread->m_mutex );
}

int ThreadPool::Run(task_fun fun, void * arg)
{
    pthread_mutex_lock(&m_runMutex); //保證每次只能由一個線程執行
    int iRet = 0;
    if(m_totalNum <m_poolMax) //
    {
        if(m_threads.empty() && (NULL == CreateThread()) )
        {
            iRet = -1;//can not create new thread!
        }
        else
        {
            GetThreadRun(fun, arg);
        }
    }
    else
    {
        GetThreadRun(fun, arg);
    }
    pthread_mutex_unlock(&m_runMutex);
    return iRet;
}

void ThreadPool::StopPool(bool bStop)
{
    m_bStop = bStop;
    if(bStop)
    {
        //啟動監控所有空閑線程是否退出的線程
        Thread thread(false, this);
        pthread_create(&thread.m_threadId,&thread.m_attr, ThreadPool::TerminalCheck , &thread); //啟動監控所有線程退出線程
        //阻塞等待所有空閑線程退出
        pthread_join(thread.m_threadId, NULL);
    }
    /*if(bStop)
    {
        pthread_mutex_lock(&m_terminalMutex);
        //啟動監控所有空閑線程是否退出的線程
        Thread thread(true, this);
        pthread_create(&thread.m_threadId,&thread.m_attr, ThreadPool::TerminalCheck , &thread); //啟動監控所有線程退出線程
        //阻塞等待所有空閑線程退出
        pthread_cond_wait(&m_terminalCond, & m_terminalMutex);
        pthread_mutex_unlock(&m_terminalMutex);
    }*/
}

bool ThreadPool::GetStop()
{
    return m_bStop;
}

Thread * ThreadPool::CreateThread()
{
    Thread * thread = NULL;
    thread = new Thread(true, this);
    if(NULL != thread)
    {
        int iret = pthread_create(&thread->m_threadId,&thread->m_attr, ThreadPool::WapperFun , thread); //通過WapperFun將線程加入到空閑隊列中
        if(0 != iret)
        {
            delete thread;
            thread = NULL;
        }
    }
    return thread;
}

void * ThreadPool::WapperFun(void*arg)
{
    Thread * thread = (Thread*)arg;
    if(NULL == thread || NULL == thread->m_pool)
    {
        return NULL;
    }
    ThreadPool * pool = thread->m_pool;
    pool->IncreaseTotalNum();
    struct timespec abstime;
    memset(&abstime, 0, sizeof(abstime));
    while(1)
    {
        if(0 != thread->task.fun)
        {
            thread->task.fun(thread->task.data);
        }

        if( true == pool->GetStop() ) 
        {
            break; //確定當前任務執行完畢后再判定是否退出線程
        }
        pthread_mutex_lock( &thread->m_mutex );
        pool->SaveIdleThread(thread); //將線程加入到空閑隊列中
        abstime.tv_sec = time(0) + THREAD_WAIT_TIME_OUT;
        abstime.tv_nsec = 0;
        if(ETIMEDOUT  == pthread_cond_timedwait( &thread->m_cond, &thread->m_mutex, &abstime )) //等待線程被喚醒 或超時自動退出
        {
            pthread_mutex_unlock( &thread->m_mutex );
            break;
        }
        pthread_mutex_unlock( &thread->m_mutex );
    }

    pool->LockMutex();
    pool->DecreaseTotalNum();
    if(thread != NULL)
    {
        pool->RemoveThread(thread);
        delete thread;
        thread = NULL;
    }
    pool->UnlockMutex();
    return 0;
}

void ThreadPool::SaveIdleThread(Thread * thread )
{
    if(thread)
    {
        thread->task.fun = 0;
        thread->task.data = NULL;
        LockMutex();
        if(m_threads.empty())
        {
            pthread_cond_broadcast(&m_emptyCond); //發送不空的信號,告訴run函數線程隊列已經不空了
        }
        m_threads.push_front(thread);
        UnlockMutex();
    }
}

int ThreadPool::TotalThreads()
{
    return m_totalNum;
}


void ThreadPool::SendSignal()
{
    LockMutex();
    std::list<Thread*>::iterator it = m_threads.begin();
    for(; it!= m_threads.end(); ++it)
    {
        pthread_mutex_lock( &(*it)->m_mutex );
        pthread_cond_signal(&((*it)->m_cond));
        pthread_mutex_unlock( &(*it)->m_mutex );
    }
    UnlockMutex();
}

void * ThreadPool::TerminalCheck(void* arg)
{
    Thread * thread = (Thread*)arg;
    if(NULL == thread || NULL == thread->m_pool)
    {
        return NULL;
    }
    ThreadPool * pool = thread->m_pool;
    while((false == pool->GetStop()) || pool->TotalThreads() >0 )
    {
        pool->SendSignal();

        usleep(IDLE_CHECK_POLL_EMPTY);
    }
    //pool->TerminalCondSignal();
    return 0;
}

void ThreadPool::TerminalCondSignal()
{
    pthread_cond_signal(&m_terminalCond);
}

void ThreadPool::RemoveThread(Thread* thread)
{
    m_threads.remove(thread);
}

void ThreadPool::LockMutex()
{
    pthread_mutex_lock( &m_mutex);
}

void ThreadPool::UnlockMutex()
{
    pthread_mutex_unlock( &m_mutex );
}

void ThreadPool::IncreaseTotalNum()
{
    LockMutex();
    m_totalNum++;
    UnlockMutex();
}
void ThreadPool::DecreaseTotalNum()
{
    m_totalNum--;
}

threadpool.h

復制代碼 代碼如下:

#ifndef THREADPOOL_H
#define THREADPOOL_H
/* purpose @ 線程池類,負責線程的創建與銷毀,實現線程超時自動退出功能(半駐留)a
 *          當線程池退出時創建TerminalCheck線程,負責監測線程池所有線程退出
 * date    @ 2013.12.23
 * author  @ haibin.wang
 */

#include <list>
#include <string>
#include "taskpool.h"
//通過threadmanager來控制任務調度進程
//threadpool的TerminalCheck線程負責監測線程池所有線程退出


class ThreadPool;
class Thread
{
public:
    Thread(bool detach, ThreadPool * pool);
    ~Thread();
    pthread_t  m_threadId; //線程id
    pthread_mutex_t m_mutex; //互斥鎖
    pthread_cond_t m_cond; //條件變量
    pthread_attr_t m_attr; //線程屬性
 Task  task; //
    ThreadPool * m_pool; //所屬線程池
};

//線程池,負責創建線程處理任務,處理完畢后會將線程加入到空閑隊列中,從任務池中
class ThreadPool
{
public:
    ThreadPool();
    ~ThreadPool();

    /* pur @ 初始化線程池
     * para @ poolMax 線程池最大線程數
     * para @ poolPre 預創建線程數
     * return @ 0:成功
     *          -1: parameter error, must poolMax > poolPre >=0
     *          -2: 創建線程失敗
    */
    int InitPool(const int & poolMax, const int & poolPre);

    /* pur @ 執行一個任務
     * para @ task 任務指針
     * return @ 0任務分配成功,負值 任務分配失敗,-1,創建新線程失敗
    */
    int Run(task_fun fun, void* arg);

 /* pur @ 設置是否停止線程池工作
     * para @ bStop true停止,false不停止
    */
 void StopPool(bool bStop);

public: //此公有函數主要用于靜態函數調用
    /* pur @ 獲取進程池的啟停狀態
     * return @
    */
    bool GetStop();   
 void SaveIdleThread(Thread * thread );
    void LockMutex();
    void UnlockMutex();
    void DecreaseTotalNum();
    void IncreaseTotalNum();
    void RemoveThread(Thread* thread);
    void TerminalCondSignal();
    int TotalThreads();
    void SendSignal();
private:
 /* pur @ 創建線程
     * return @ 非空 成功,NULL失敗,
    */
 Thread * CreateThread();

    /* pur @ 從線程池中獲取一個一個線程運行任務
     * para @ fun 函數指針
     * para @ arg 函數參數
     * return @
    */
    void GetThreadRun(task_fun fun, void* arg);

 static void * WapperFun(void*);
 static void * TerminalCheck(void*);//循環監測是否所有線程終止線程

private:
    int m_poolMax;//線程池最大線程數
    int m_idleNum; //空閑線程數
    int m_totalNum; //當前線程總數 小于最大線程數 
 bool m_bStop; //是否停止線程池
 pthread_mutex_t m_mutex; //線程列表鎖
 pthread_mutex_t m_runMutex; //run函數鎖

    pthread_mutex_t m_terminalMutex; //終止所有線程互斥量
    pthread_cond_t  m_terminalCond; //終止所有線程條件變量
    pthread_cond_t  m_emptyCond; //空閑線程不空條件變量

    std::list<Thread*> m_threads; // 線程列表
};
#endif

threadpoolmanager.cpp

復制代碼 代碼如下:

#include "threadpoolmanager.h"
#include "threadpool.h"
#include "taskpool.h"

#include <errno.h>
#include <string.h>

/*#include <string.h>
#include <sys/time.h>
#include <stdio.h>*/
 //   struct timeval time_beg, time_end;
ThreadPoolManager::ThreadPoolManager()
    : m_threadPool(NULL)
    , m_taskPool(NULL)
    , m_bStop(false)
{
    pthread_mutex_init(&m_mutex_task,NULL);
    pthread_cond_init(&m_cond_task, NULL);

   /* memset(&time_beg, 0, sizeof(struct timeval));
    memset(&time_end, 0, sizeof(struct timeval));
    gettimeofday(&time_beg, NULL);*/
}

ThreadPoolManager::~ThreadPoolManager()
{
    StopAll();
    if(NULL != m_threadPool)
    {
        delete m_threadPool;
        m_threadPool = NULL;
    }
    if(NULL != m_taskPool)
    {
        delete m_taskPool;
        m_taskPool = NULL;
    }

    pthread_cond_destroy( &m_cond_task);
    pthread_mutex_destroy( &m_mutex_task );

    /*gettimeofday(&time_end, NULL);
    long total = (time_end.tv_sec - time_beg.tv_sec)*1000000 + (time_end.tv_usec - time_beg.tv_usec);
    printf("manager total time = %d/n", total);
    gettimeofday(&time_beg, NULL);*/
}

int ThreadPoolManager::Init(
        const int &tastPoolSize,
        const int &threadPoolMax,
        const int &threadPoolPre)
{
    m_threadPool = new ThreadPool();
    if(NULL == m_threadPool)
    {
        return -1;
    }
    m_taskPool = new TaskPool(tastPoolSize);
    if(NULL == m_taskPool)
    {
        return -2;
    }

    if(0>m_threadPool->InitPool(threadPoolMax, threadPoolPre))
    {
        return -3;
    }
    //啟動線程池
    //啟動任務池
    //啟動任務獲取線程,從任務池中不斷拿任務到線程池中
    pthread_attr_t attr;
    pthread_attr_init( &attr );
    pthread_attr_setdetachstate( &attr, PTHREAD_CREATE_JOINABLE );
    pthread_create(&m_taskThreadId, &attr, TaskThread, this); //創建獲取任務進程
    pthread_attr_destroy(&attr);
    return 0;
}

void ThreadPoolManager::StopAll()
{
    m_bStop = true;
    LockTask();
    pthread_cond_signal(&m_cond_task);
    UnlockTask();
    pthread_join(m_taskThreadId, NULL);
    //等待當前所有任務執行完畢
    m_taskPool->StopPool();
    m_threadPool->StopPool(true); // 停止線程池工作
}

void ThreadPoolManager::LockTask()
{
    pthread_mutex_lock(&m_mutex_task);
}

void ThreadPoolManager::UnlockTask()
{
    pthread_mutex_unlock(&m_mutex_task);
}

void* ThreadPoolManager::TaskThread(void* arg)
{
    ThreadPoolManager * manager = (ThreadPoolManager*)arg;
    while(1)
    {
        manager->LockTask(); //防止任務沒有執行完畢發送了停止信號
        while(1) //將任務隊列中的任務執行完再退出
        {
            Task * task = manager->GetTaskPool()->GetTask();
            if(NULL == task)
            {
                break;
            }
            else
            {
                manager->GetThreadPool()->Run(task->fun, task->data);
                manager->GetTaskPool()->SaveIdleTask(task);
            }
        }

        if(manager->GetStop())
        {
            manager->UnlockTask();
            break;
        }
        manager->TaskCondWait(); //等待有任務的時候執行
        manager->UnlockTask();
    }
    return 0;
}

ThreadPool * ThreadPoolManager::GetThreadPool()
{
    return m_threadPool;
}

TaskPool * ThreadPoolManager::GetTaskPool()
{
    return m_taskPool;
}

int  ThreadPoolManager::Run(task_fun fun,void* arg)
{
    if(0 == fun)
    {
        return 0;
    }
    if(!m_bStop)
    {  
        int iRet =  m_taskPool->AddTask(fun, arg);

        if(iRet == 0 && (0 == pthread_mutex_trylock(&m_mutex_task)) )
        {
            pthread_cond_signal(&m_cond_task);
            UnlockTask();
        }
        return iRet;
    }
    else
    {
        return -3;
    }
}

bool ThreadPoolManager::GetStop()
{
    return m_bStop;
}

void ThreadPoolManager::TaskCondWait()
{
    struct timespec to;
    memset(&to, 0, sizeof to);
    to.tv_sec = time(0) + 60;
    to.tv_nsec = 0;

    pthread_cond_timedwait( &m_cond_task, &m_mutex_task, &to); //60秒超時
}

threadpoolmanager.h

復制代碼 代碼如下:

#ifndef THREADPOOLMANAGER_H
#define THREADPOOLMANAGER_H
/* purpose @
 *      基本流程:
 *          管理線程池和任務池,先將任務加入任務池,然后由TaskThread負責從任務池中將任務取出放入到線程池中
 *      基本功能:
 *          1、工作線程可以在業務不忙的時候自動退出部分長時間不使用的線程
 *          2、任務池可以在業務不忙的時候自動釋放長時間不使用的資源(可通過commondef.h修改)
 *          3、當程序退時不再向任務池中添加任務,當任務池中所有任務執行完畢后才退出相關程序(做到程序的安全退出)
 *      線程資源:
 *          如果不預分配任何處理線程的話,ThreadPool只有當有任務的時候才實際創建需要的線程,最大線程創建數為用戶指定
 *          當manager銷毀的時候,manager會創建一個監控所有任務執行完畢的監控線程,只有當所有任務執行完畢后manager才銷毀
 *          線程最大數為:1個TaskPool線程 + 1個manager任務調度線程 + ThreadPool最大線程數 + 1個manager退出監控線程 + 1線程池所有線程退出監控線程
 *          線程最小數為:1個TaskPool創建空閑任務資源銷毀監控線程 + 1個manager創建任務調度線程
 *      使用方法:
 *          ThreadPoolManager manager;
 *          manager.Init(100000, 50, 5);//初始化一個任務池為10000,線程池最大線程數50,預創建5個線程的管理器
 *          manager.run(fun, data); //添加執行任務到manager中,fun為函數指針,data為fun需要傳入的參數,data可以為NULL
 *
 * date    @ 2013.12.23
 * author  @ haibin.wang
 *
 *  詳細參數控制可以修改commondef.h中的相關變量值
 */

#include <pthread.h>
typedef void (*task_fun)(void *);

class ThreadPool;
class TaskPool;

class ThreadPoolManager
{
public:
    ThreadPoolManager();
    ~ThreadPoolManager();

    /* pur @ 初始化線程池與任務池,threadPoolMax > threadPoolPre > threadPoolMin >= 0
     * para @ tastPoolSize 任務池大小
     * para @ threadPoolMax 線程池最大線程數
     * para @ threadPoolPre 預創建線程數
     * return @ 0:初始化成功,負數 初始化失敗
     *          -1:創建線程池失敗
     *          -2:創建任務池失敗
     *          -3:線程池初始化失敗
    */
    int Init(const int &tastPoolSize,
            const int &threadPoolMax,
            const int &threadPoolPre);

    /* pur @ 執行一個任務
     * para @ fun 需要執行的函數指針
     * para @ arg fun需要的參數,默認為NULL
     * return @ 0 任務分配成功,負數 任務分配失敗
     *          -1:任務池滿
     *          -2:任務池new失敗
     *          -3:manager已經發送停止信號,不再接收新任務
    */
    int Run(task_fun fun,void* arg=NULL);

public: //以下public函數主要用于靜態函數調用
    bool GetStop();
    void TaskCondWait();
    TaskPool * GetTaskPool();
    ThreadPool * GetThreadPool();
    void LockTask();
    void UnlockTask();
    void LockFull();

private:
 static void * TaskThread(void*); //任務處理線程
 void StopAll();

private:
    ThreadPool *m_threadPool; //線程池
    TaskPool * m_taskPool; //任務池
    bool m_bStop; // 是否終止管理器

    pthread_t m_taskThreadId; // TaskThread線程id
 pthread_mutex_t m_mutex_task;
    pthread_cond_t m_cond_task;
};
#endif

main.cpp

復制代碼 代碼如下:

#include <iostream>
#include <string>
#include "threadpoolmanager.h"
#include <sys/time.h>
#include <string.h>
#include <stdlib.h>
#include <pthread.h>


using namespace std;
int seq = 0;
int billNum =0;
int inter = 1;
pthread_mutex_t m_mutex;
void myFunc(void*arg)
{
    pthread_mutex_lock(&m_mutex);
    seq++;
    if(seq%inter == 0 )
    {
        cout << "fun 1=" << seq << endl;
    }
    if(seq>=1000000000)
    {
        cout << "billion" << endl;
        seq = 0;
        billNum++;
    }
    pthread_mutex_unlock(&m_mutex);
    //sleep();
}

int main(int argc, char** argv)
{
    if(argc != 6)
    {
        cout << "必須有5個參數 任務執行次數 任務池大小 線程池大小 預創建線程數 輸出間隔" << endl;
        cout << "eg: ./test 999999 10000 100 10 20" << endl;
        cout << "上例代表創建一個間隔20個任務輸出,任務池大小為10000,線程池大小為100,預創建10個線程,執行任務次數為:999999" << endl;
        return 0;
    }
    double loopSize = atof(argv[1]);
    int taskSize = atoi(argv[2]);
    int threadPoolSize = atoi(argv[3]);
    int preSize = atoi(argv[4]);
    inter = atoi(argv[5]);

    pthread_mutex_init(&m_mutex,NULL);
    ThreadPoolManager manager;
    if(0>manager.Init(taskSize,  threadPoolSize, preSize))
    {
        cout << "初始化失敗" << endl;
        return 0;
    }
    cout << "*******************初始化完成*********************" << endl;
    struct timeval time_beg, time_end;
    memset(&time_beg, 0, sizeof(struct timeval));
    memset(&time_end, 0, sizeof(struct timeval));
    gettimeofday(&time_beg, NULL);
    double i=0;
    for(; i<loopSize; ++i)
    {
        while(0>manager.Run(myFunc,NULL))
        {
            usleep(100);
        }
    }
    gettimeofday(&time_end, NULL);
    long total = (time_end.tv_sec - time_beg.tv_sec)*1000000 + (time_end.tv_usec - time_beg.tv_usec);
    cout << "total time =" << total << endl;
    cout << "total num =" << i  << " billion num=" << billNum<< endl;
    cout << __FILE__ << "將關閉所有線程" << endl;
    //pthread_mutex_destroy(&m_mutex);
    return 0;
}

發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
亚洲香蕉成人av网站在线观看_欧美精品成人91久久久久久久_久久久久久久久久久亚洲_热久久视久久精品18亚洲精品_国产精自产拍久久久久久_亚洲色图国产精品_91精品国产网站_中文字幕欧美日韩精品_国产精品久久久久久亚洲调教_国产精品久久一区_性夜试看影院91社区_97在线观看视频国产_68精品久久久久久欧美_欧美精品在线观看_国产精品一区二区久久精品_欧美老女人bb
欧美综合国产精品久久丁香| 欧美日韩一区二区免费在线观看| 亚洲欧洲xxxx| 午夜精品久久久久久久99热| 成人国产亚洲精品a区天堂华泰| 中文字幕亚洲无线码在线一区| 国产精品欧美一区二区三区奶水| 亚洲精品有码在线| 清纯唯美日韩制服另类| 亚洲精品国产免费| 国产aⅴ夜夜欢一区二区三区| 亚洲天堂日韩电影| 国产91精品最新在线播放| 狠狠做深爱婷婷久久综合一区| 日韩欧美国产视频| 国产欧美日韩精品丝袜高跟鞋| 91情侣偷在线精品国产| 欧美激情在线播放| 在线观看精品自拍私拍| 欧美性极品xxxx娇小| 亚洲欧美中文日韩在线| 亚洲91精品在线观看| 91免费综合在线| 日韩av网站在线| 亚洲成人精品av| 国产在线视频不卡| 性色av一区二区咪爱| 亚洲精品免费一区二区三区| 黄网站色欧美视频| 69av成年福利视频| 国产一区二区成人| 亚洲成av人片在线观看香蕉| 亚洲自拍偷拍网址| 日韩精品视频在线免费观看| 午夜精品一区二区三区视频免费看| 日韩欧美精品网站| 一区二区三区视频免费| 亚洲电影成人av99爱色| 久久综合国产精品台湾中文娱乐网| 亚洲人成在线观看| 国产成人欧美在线观看| 亚洲电影免费观看高清| 亚洲色图美腿丝袜| 欧美整片在线观看| 亚洲护士老师的毛茸茸最新章节| 一区二区三区 在线观看视| 欧美老女人在线视频| 国产精品久久久久久久久男| 日韩免费观看av| 青青在线视频一区二区三区| 91在线无精精品一区二区| 亚洲夜晚福利在线观看| 亚洲欧洲中文天堂| 国产成人亚洲精品| 日本高清+成人网在线观看| 日韩高清av在线| 欧美国产在线电影| 亚洲男人7777| 日韩av资源在线播放| xxxxxxxxx欧美| 2019亚洲日韩新视频| 97视频com| 美女av一区二区| 国产在线久久久| 成人97在线观看视频| 成人国产精品久久久| 欧美国产视频日韩| 岛国精品视频在线播放| 欧洲亚洲在线视频| 奇米成人av国产一区二区三区| 精品日本高清在线播放| 成人网址在线观看| 国产精品网址在线| 96pao国产成视频永久免费| 欧美视频在线观看免费| 91精品91久久久久久| 中文字幕在线成人| 精品成人69xx.xyz| 国产精品av在线| 国产精品无码专区在线观看| 精品久久久久久久久国产字幕| 久久久免费av| 亚洲毛片在线免费观看| 综合久久五月天| 欧美精品在线观看| 亚洲精品98久久久久久中文字幕| 91av视频导航| 亚洲大胆人体av| 日韩大陆毛片av| 成人97在线观看视频| 亚洲白虎美女被爆操| 91免费国产视频| 中文字幕亚洲专区| 日韩av在线免播放器| 欧美成人精品不卡视频在线观看| 92看片淫黄大片看国产片| 久久久久国产视频| 久久精品男人天堂| 国产欧美精品在线| 97色在线视频观看| 亚洲国产精品一区二区久| 一本色道久久88综合日韩精品| 亚洲一区制服诱惑| 国产精品电影一区| 色综合久久天天综线观看| 久久成人综合视频| 日韩精品999| 欧美黑人性猛交| 欧美激情精品久久久久久| 成人精品一区二区三区| 亚洲国产成人精品久久久国产成人一区| 久久久久久久久网站| 日韩电影中文 亚洲精品乱码| 欧美激情一二区| 国产精品免费看久久久香蕉| 久久久国产成人精品| 国产亚洲欧美aaaa| 日韩av在线看| 国产午夜精品久久久| 欧美一级视频一区二区| 色噜噜狠狠狠综合曰曰曰| 国产日韩一区在线| 91av在线播放| 中文在线资源观看视频网站免费不卡| 久久综合五月天| 精品国产999| 理论片在线不卡免费观看| 亚洲国产日韩欧美在线99| 国产伦精品一区二区三区精品视频| 成年无码av片在线| 91亚洲国产精品| 欧美精品日韩www.p站| 黑人巨大精品欧美一区二区三区| 亚洲成人av片在线观看| 国产日韩欧美影视| 国产精品一二三在线| 欧美精品久久久久久久久| 日韩高清有码在线| 国产精品久久二区| 97超级碰碰碰久久久| 国产自摸综合网| 日韩免费在线播放| 亚洲精品国精品久久99热| 日韩精品免费一线在线观看| 中文字幕日韩视频| 久久艹在线视频| 欧美亚洲激情在线| 欧美自拍视频在线观看| 日韩电影第一页| 欧美成人免费播放| 日韩电影免费观看在线| 日韩欧美999| 亚洲国产成人精品一区二区| 中文字幕av一区中文字幕天堂| 欧美做爰性生交视频| 久久99久久久久久久噜噜| 亚洲精品资源美女情侣酒店| 国产精品999999| 久久国产精品偷| 国产精欧美一区二区三区| 韩国三级日本三级少妇99| 96精品久久久久中文字幕| 欧美电影免费观看高清完整| 亚洲最新av网址|