歡迎來到Linux教程網
Linux教程網
Linux教程網
Linux教程網
Linux教程網 >> Linux編程 >> Linux編程 >> Linux 下C++線程池的簡單實現

Linux 下C++線程池的簡單實現

日期:2017/3/1 9:36:21   编辑:Linux編程

Linux 下C++線程池的簡單實現(在老外代碼上添加注釋)。

作為一個C++菜鳥,研究半天這個代碼的實現原理,發現好多語法不太熟悉,因此加了一大堆注釋,僅供參考。該段代碼主要通過繼承workthread類來實現自己的線程代碼,通過thread_pool類來管理線程池,線程池不能夠實現動態改變線程數目,存在一定局限性。目前可能還有缺陷,畢竟C++來封裝這個東西,資源釋放什麼的必須想清楚,比如vector存儲了基類指針實現多態,那麼如何釋放對象仍需要考慮,後續我可能會更進一步修改完善該代碼,下面貢獻一下自己的勞動成果。

#include <pthread.h>
#include <semaphore.h>
#include <iostream>
#include <vector>

using namespace std;
/*
WorkerThread class
This class needs to be sobclassed by the user.
*/
class WorkerThread{
public:
int id;
unsigned virtual executeThis()
{
return 0;
}

WorkerThread(int id) : id(id) {}
virtual ~WorkerThread(){}
};

/*
ThreadPool class manages all the ThreadPool related activities. This includes keeping track of idle threads and synchronizations between all threads.
*/
class ThreadPool{
public:
ThreadPool();
ThreadPool(int maxThreadsTemp);
virtual ~ThreadPool();

void destroyPool(int maxPollSecs);

bool assignWork(WorkerThread *worker);
bool fetchWork(WorkerThread **worker);

void initializeThreads();

static void *threadExecute(void *param); // pthread_create()調用的函數必須為靜態的
static pthread_mutex_t mutexSync;
static pthread_mutex_t mutexWorkCompletion;//工作完成個數互斥量


private:
int maxThreads;

pthread_cond_t condCrit;
sem_t availableWork;
sem_t availableThreads;

vector<WorkerThread *> workerQueue;

int topIndex;
int bottomIndex;
int incompleteWork;
int queueSize;
};

#include <stdlib.h>
#include "threadpool.h"
using namespace std;


//初始化類的靜態成員必須加上類型和作用域,static數據成員必須在類定義體的外部定義,不像不同數據成員可以用構造函數初始化
//應該在定義時進行初始化,注意是定義,這個定義應該放在包含類的非內聯成員函數定義的文件中。
//注:靜態成員函數只能使用靜態變量,非靜態沒有限制,靜態變量必須在外部定義和初始化,沒初始化就為默認數值
pthread_mutex_t ThreadPool::mutexSync = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t ThreadPool::mutexWorkCompletion = PTHREAD_MUTEX_INITIALIZER;


ThreadPool::ThreadPool()
{
ThreadPool(2);
}

ThreadPool::ThreadPool(int maxThreads)
{
if (maxThreads < 1)
maxThreads=1;

pthread_mutex_lock(&mutexSync);
this->maxThreads = maxThreads;
this->queueSize = maxThreads;
workerQueue.resize(maxThreads, NULL);//調整容器大小,然後用默認構造函數初始化新的空間
topIndex = 0;
bottomIndex = 0;
incompleteWork = 0;
sem_init(&availableWork, 0, 0); //工作隊列信號量,表示已經加入隊列的工作,初始時沒有工作
sem_init(&availableThreads, 0, queueSize); //空閒線程信號量,一開始就有quisize個線程可以使用
pthread_mutex_unlock(&mutexSync);
}

//調用pthread_create()讓線程跑起來,threadExecute是類的靜態函數,因為pthread_create()第三個參數必須為靜態函數
void ThreadPool::initializeThreads()
{
for(int i = 0; i<maxThreads; ++i)
{
pthread_t tempThread;
pthread_create(&tempThread, NULL, ThreadPool::threadExecute, (void*)this );
}
}

ThreadPool::~ThreadPool()
{
//因為對於vector,clear並不真正釋放內存(這是為優化效率所做的事),clear實際所做的是為vector中所保存的所有對象調用析構函數(如果有的話),
//然後初始化size這些東西,讓你覺得把所有的對象清除了。。。
//真正釋放內存是在vector的析構函數裡進行的,所以一旦超出vector的作用域(如函數返回),首先它所保存的所有對象會被析構,
//然後會調用allocator中的deallocate函數回收對象本身的內存。。。
workerQueue.clear();
}


void ThreadPool::destroyPool(int maxPollSecs = 2)
{
while(incompleteWork>0 )
{
//cout << "Work is still incomplete=" << incompleteWork << endl;
sleep(maxPollSecs);
}
cout << "All Done!! Wow! That was a lot of work!" << endl;
sem_destroy(&availableWork);
sem_destroy(&availableThreads);
pthread_mutex_destroy(&mutexSync);
pthread_mutex_destroy(&mutexWorkCompletion);

}

//分配人物到top,然後通知有任務需要執行。
bool ThreadPool::assignWork(WorkerThread *workerThread)
{
pthread_mutex_lock(&mutexWorkCompletion);
incompleteWork++;
//cout << "assignWork...incomapleteWork=" << incompleteWork << endl;
pthread_mutex_unlock(&mutexWorkCompletion);
sem_wait(&availableThreads);
pthread_mutex_lock(&mutexSync);
//workerVec[topIndex] = workerThread;
workerQueue[topIndex] = workerThread;
//cout << "Assigning Worker[" << workerThread->id << "] Address:[" << workerThread << "] to Queue index [" << topIndex << "]" << endl;
if(queueSize !=1 )
topIndex = (topIndex+1) % (queueSize-1);
sem_post(&availableWork);
pthread_mutex_unlock(&mutexSync);
return true;
}

//當已經有人物放到隊列裡面後,就會受到通知,然後從底部拿走工作,在workerArg中返回
bool ThreadPool::fetchWork(WorkerThread **workerArg)
{
sem_wait(&availableWork);

pthread_mutex_lock(&mutexSync);
WorkerThread * workerThread = workerQueue[bottomIndex];
workerQueue[bottomIndex] = NULL;
*workerArg = workerThread;
if(queueSize !=1 )
bottomIndex = (bottomIndex+1) % (queueSize-1);
sem_post(&availableThreads);
pthread_mutex_unlock(&mutexSync);
return true;
}

//每個線程運行的靜態函數實體,executeThis 方法將會被繼承累從寫,之後實現具體線程的工作。
void *ThreadPool::threadExecute(void *param)
{
WorkerThread *worker = NULL;
while(((ThreadPool *)param)->fetchWork(&worker))
{
if(worker)
{
worker->executeThis();
//cout << "worker[" << worker->id << "]\tdelete address: [" << worker << "]" << endl;
delete worker;
worker = NULL;
}

pthread_mutex_lock( &(((ThreadPool *)param)->mutexWorkCompletion) );
//cout << "Thread " << pthread_self() << " has completed a Job !" << endl;
((ThreadPool *)param)->incompleteWork--;
pthread_mutex_unlock( &(((ThreadPool *)param)->mutexWorkCompletion) );
}
return 0;
}

#include <iostream>
#include "threadpool.h"

using namespace std;


#define ITERATIONS 20

class SampleWorkerThread : public WorkerThread
{
public:
int id;
unsigned virtual executeThis()
{
// Instead of sleep() we could do anytime consuming work here.
// Using ThreadPools is advantageous only when the work to be done is really time consuming. (atleast 1 or 2 seconds)
cout<<"This is SampleWorkerThread sleep 2s"<<endl;
sleep(2);
return(0);
}

SampleWorkerThread(int id) : WorkerThread(id), id(id)
{
// cout << "Creating SampleWorkerThread " << id << "\t address=" << this << endl;
}

~SampleWorkerThread()
{
// cout << "Deleting SampleWorkerThread " << id << "\t address=" << this << endl;
}
};


int main(int argc, char **argv)
{

cout<<"Thread pool"<<endl;
ThreadPool* myPool = new ThreadPool(25);
//pthread_create()執行,開始等待任務分配
myPool->initializeThreads();

//用來計算時間間隔。
time_t t1=time(NULL);

//分配具體工作到線程池
for(unsigned int i=0;i<ITERATIONS;i++){
SampleWorkerThread* myThreathreadExecuted = new SampleWorkerThread(i);
myPool->assignWork(myThreathreadExecuted);
}

//銷毀錢等待所有線程結束,等待間隔為2秒。
myPool->destroyPool(2);

time_t t2=time(NULL);
cout << t2-t1 << " seconds elapsed\n" << endl;
delete myPool;

return 0;
}

Ubuntu 12.04下運行成功,編譯命令如下:g++ -g main.cpp thread_pool.cpp -o thread_pool -lpthread。

C++ 設計新思維》 下載見 http://www.linuxidc.com/Linux/2014-07/104850.htm

C++ Primer Plus 第6版 中文版 清晰有書簽PDF+源代碼 http://www.linuxidc.com/Linux/2014-05/101227.htm

讀C++ Primer 之構造函數陷阱 http://www.linuxidc.com/Linux/2011-08/40176.htm

讀C++ Primer 之智能指針 http://www.linuxidc.com/Linux/2011-08/40177.htm

讀C++ Primer 之句柄類 http://www.linuxidc.com/Linux/2011-08/40175.htm

將C語言梳理一下,分布在以下10個章節中:

  1. Linux-C成長之路(一):Linux下C編程概要 http://www.linuxidc.com/Linux/2014-05/101242.htm
  2. Linux-C成長之路(二):基本數據類型 http://www.linuxidc.com/Linux/2014-05/101242p2.htm
  3. Linux-C成長之路(三):基本IO函數操作 http://www.linuxidc.com/Linux/2014-05/101242p3.htm
  4. Linux-C成長之路(四):運算符 http://www.linuxidc.com/Linux/2014-05/101242p4.htm
  5. Linux-C成長之路(五):控制流 http://www.linuxidc.com/Linux/2014-05/101242p5.htm
  6. Linux-C成長之路(六):函數要義 http://www.linuxidc.com/Linux/2014-05/101242p6.htm
  7. Linux-C成長之路(七):數組與指針 http://www.linuxidc.com/Linux/2014-05/101242p7.htm
  8. Linux-C成長之路(八):存儲類,動態內存 http://www.linuxidc.com/Linux/2014-05/101242p8.htm
  9. Linux-C成長之路(九):復合數據類型 http://www.linuxidc.com/Linux/2014-05/101242p9.htm
  10. Linux-C成長之路(十):其他高級議題

Copyright © Linux教程網 All Rights Reserved