天天看點

實作一個簡單的linux線程池

線程池:簡單地說,線程池 就是預先建立好一批線程,友善、快速地處理收到的業務。比起傳統的到來一個任務,即時建立一個線程來處理,節省了線程的建立和回收的開銷,響應更快,效率更高。

在linux中,使用的是posix線程庫,首先介紹幾個常用的函數:

1 線程的建立和取消函數

pthread_create

建立線程

pthread_join

合并線程

pthread_cancel

取消線程

2 線程同步函數

pthread_mutex_lock

pthread_mutex_unlock

pthread_cond_signal

pthread_cond_wait

關于函數的詳細說明,參考man手冊

線程池的實作:

線程池的實作主要分為三部分,線程的建立、添加任務到線程池中、工作線程從任務隊列中取出任務進行處理。

主要有兩個類來實作,CTask,CThreadPool

class CTask

{

protected:

 string m_strTaskName;  //任務的名稱

 void* m_ptrData;       //要執行的任務的具體資料

public:

 CTask(){}

 CTask(string taskName)

 {

  this->m_strTaskName = taskName;

  m_ptrData = NULL;

 }

 virtual int Run()= 0;

 void SetData(void* data);    //設定任務資料

};

任務類是個虛類,所有的任務要從CTask類中繼承 ,實作run接口,run接口中需要實作的就是具體解析任務的邏輯。m_ptrData是指向任務資料的指針,可以是簡單資料類型,也可以是自定義的複雜資料類型。

線程池類

class CThreadPool

{

private:

 vector<CTask*> m_vecTaskList;         //任務清單

 int m_iThreadNum;                            //線程池中啟動的線程數          

 static vector<pthread_t> m_vecIdleThread;   //目前空閑的線程集合

 static vector<pthread_t> m_vecBusyThread;   //目前正在執行的線程集合

 static pthread_mutex_t m_pthreadMutex;    //線程同步鎖

 static pthread_cond_t m_pthreadCond;    //線程同步的條件變量

protected:

 static void* ThreadFunc(void * threadData); //新線程的線程函數

 static int MoveToIdle(pthread_t tid);   //線程執行結束後,把自己放入到空閑線程中

 static int MoveToBusy(pthread_t tid);   //移入到忙碌線程中去

 int Create();          //建立所有的線程

public:

 CThreadPool(int threadNum);

 int AddTask(CTask *task);      //把任務添加到線程池中

 int StopAll();

};

當線程池對象建立後,啟動一批線程,并把所有的線程放入空閑清單中,當有任務到達時,某一個線程取出任務并進行處理。

線程之間的同步用線程鎖和條件變量。

這個類的對外接口有兩個:

AddTask函數把任務添加到線程池的任務清單中,并通知線程進行處理。當任務到到時,把任務放入m_vecTaskList任務清單中,并用pthread_cond_signal喚醒一個線程進行處理。

StopAll函數停止所有的線程

************************************************

代碼:

××××××××××××××××××××CThread.h

#ifndef __CTHREAD

#define __CTHREAD

#include <vector>

#include <string>

#include <pthread.h>

using namespace std;

class CTask

{

protected:

 string m_strTaskName;  //任務的名稱

 void* m_ptrData;       //要執行的任務的具體資料

public:

 CTask(){}

 CTask(string taskName)

 {

  this->m_strTaskName = taskName;

  m_ptrData = NULL;

 }

 virtual int Run()= 0;

 void SetData(void* data);    //設定任務資料

};

class CThreadPool

{

private:

 vector<CTask*> m_vecTaskList;         //任務清單

 int m_iThreadNum;                            //線程池中啟動的線程數          

 static vector<pthread_t> m_vecIdleThread;   //目前空閑的線程集合

 static vector<pthread_t> m_vecBusyThread;   //目前正在執行的線程集合

 static pthread_mutex_t m_pthreadMutex;    //線程同步鎖

 static pthread_cond_t m_pthreadCond;    //線程同步的條件變量

protected:

 static void* ThreadFunc(void * threadData); //新線程的線程函數

 static int MoveToIdle(pthread_t tid);   //線程執行結束後,把自己放入到空閑線程中

 static int MoveToBusy(pthread_t tid);   //移入到忙碌線程中去

 int Create();          //建立所有的線程

public:

 CThreadPool(int threadNum);

 int AddTask(CTask *task);      //把任務添加到線程池中

 int StopAll();

};

#endif

類的實作為:

××××××××××××××××××××CThread.cpp

#include "CThread.h"

#include <string>

#include <iostream>

using namespace std;

void CTask::SetData(void * data)

{

 m_ptrData = data;

}

vector<pthread_t> CThreadPool::m_vecBusyThread;

vector<pthread_t> CThreadPool::m_vecIdleThread;

pthread_mutex_t CThreadPool::m_pthreadMutex = PTHREAD_MUTEX_INITIALIZER;

pthread_cond_t CThreadPool::m_pthreadCond = PTHREAD_COND_INITIALIZER;

CThreadPool::CThreadPool(int threadNum)

{

 this->m_iThreadNum = threadNum;

 Create();

}

int CThreadPool::MoveToIdle(pthread_t tid)

{

 vector<pthread_t>::iterator busyIter = m_vecBusyThread.begin();

 while(busyIter != m_vecBusyThread.end())

 {

  if(tid == *busyIter)

  {

   break;

  }

  busyIter++;

 }

 m_vecBusyThread.erase(busyIter);

 m_vecIdleThread.push_back(tid);

 return 0;

}

int CThreadPool::MoveToBusy(pthread_t tid)

{

 vector<pthread_t>::iterator idleIter = m_vecIdleThread.begin();

 while(idleIter != m_vecIdleThread.end())

 {

  if(tid == *idleIter)

  {

   break;

  }

  idleIter++;

 }

 m_vecIdleThread.erase(idleIter);

 m_vecBusyThread.push_back(tid);

 return 0;

}

void* CThreadPool::ThreadFunc(void * threadData)

{

 pthread_t tid = pthread_self();

 while(1)

 {

  pthread_mutex_lock(&m_pthreadMutex);

  pthread_cond_wait(&m_pthreadCond,&m_pthreadMutex);

  cout << "tid:" << tid << " run" << endl;

  //get task

  vector<CTask*>* taskList = (vector<CTask*>*)threadData;

  vector<CTask*>::iterator iter = taskList->begin();

  while(iter != taskList->end())

  {

   MoveToBusy(tid);

   break;

  }

  CTask* task = *iter;

  taskList->erase(iter);

  pthread_mutex_unlock(&m_pthreadMutex);

  cout << "idel thread number:" << CThreadPool::m_vecIdleThread.size() << endl;

  cout << "busy thread number:" << CThreadPool::m_vecBusyThread.size() << endl;

  //cout << "task to be run:" << taskList->size() << endl;

  task->Run();

  //cout << "CThread::thread work" << endl;

  cout << "tid:" << tid << " idle" << endl;

 }

 return (void*)0;

}

int CThreadPool::AddTask(CTask *task)

{

 this->m_vecTaskList.push_back(task);

 pthread_cond_signal(&m_pthreadCond);

 return 0;

}

int CThreadPool::Create()

{

 for(int i = 0; i < m_iThreadNum;i++)

 {

  pthread_t tid = 0;

  pthread_create(&tid,NULL,ThreadFunc,&m_vecTaskList);

  m_vecIdleThread.push_back(tid);

 }

 return 0;

}

int CThreadPool::StopAll()

{

 vector<pthread_t>::iterator iter = m_vecIdleThread.begin();

 while(iter != m_vecIdleThread.end())

 {

  pthread_cancel(*iter);

  pthread_join(*iter,NULL);

  iter++;

 }

 iter = m_vecBusyThread.begin();

 while(iter != m_vecBusyThread.end())

 {

  pthread_cancel(*iter);

  pthread_join(*iter,NULL);

  iter++;

 }

 return 0;

}

簡單示例:

××××××××test.cpp

#include "CThread.h"

#include <iostream>

using namespace std;

class CWorkTask: public CTask

{

public:

 CWorkTask()

 {}

 int Run()

 {

  cout << (char*)this->m_ptrData << endl;

  sleep(10);

  return 0;

 }

};

int main()

{

 CWorkTask taskObj;

 char szTmp[] = "this is the first thread running,haha success";

 taskObj.SetData((void*)szTmp);

 CThreadPool threadPool(10);

 for(int i = 0;i < 11;i++)

 {

  threadPool.AddTask(&taskObj);

 }

 while(1)

 {

  sleep(120);

 }

 return 0;

}

繼續閱讀