天天看点

实现一个简单的linux线程池

线程池:简单地说,线程池 就是预先创建好一批线程,方便、快速地处理收到的业务。比起传统的到来一个任务,即时创建一个线程来处理,节省了线程的创建和回收的开销,响应更快,效率更高。

在linux中,使用的是posix线程库,首先介绍几个常用的函数:

1 线程的创建和取消函数

pthread_create

创建线程

pthread_join

合并线程

pthread_cancel

取消线程

2 线程同步函数

pthread_mutex_lock

pthread_mutex_unlock

pthread_cond_signal

pthread_cond_wait

关于函数的详细说明,参考man手册

线程池的实现:

线程池的实现主要分为三部分,线程的创建、添加任务到线程池中、工作线程从任务队列中取出任务进行处理。

主要有两个类来实现,CTask,CThreadPool

class CTask

{

protected:

 string m_strTaskName;  //任务的名称

 void* m_ptrData;       //要执行的任务的具体数据

public:

 CTask(){}

 CTask(string taskName)

 {

  this->m_strTaskName = taskName;

  m_ptrData = NULL;

 }

 virtual int Run()= 0;

 void SetData(void* data);    //设置任务数据

};

任务类是个虚类,所有的任务要从CTask类中继承 ,实现run接口,run接口中需要实现的就是具体解析任务的逻辑。m_ptrData是指向任务数据的指针,可以是简单数据类型,也可以是自定义的复杂数据类型。

线程池类

class CThreadPool

{

private:

 vector<CTask*> m_vecTaskList;         //任务列表

 int m_iThreadNum;                            //线程池中启动的线程数          

 static vector<pthread_t> m_vecIdleThread;   //当前空闲的线程集合

 static vector<pthread_t> m_vecBusyThread;   //当前正在执行的线程集合

 static pthread_mutex_t m_pthreadMutex;    //线程同步锁

 static pthread_cond_t m_pthreadCond;    //线程同步的条件变量

protected:

 static void* ThreadFunc(void * threadData); //新线程的线程函数

 static int MoveToIdle(pthread_t tid);   //线程执行结束后,把自己放入到空闲线程中

 static int MoveToBusy(pthread_t tid);   //移入到忙碌线程中去

 int Create();          //创建所有的线程

public:

 CThreadPool(int threadNum);

 int AddTask(CTask *task);      //把任务添加到线程池中

 int StopAll();

};

当线程池对象创建后,启动一批线程,并把所有的线程放入空闲列表中,当有任务到达时,某一个线程取出任务并进行处理。

线程之间的同步用线程锁和条件变量。

这个类的对外接口有两个:

AddTask函数把任务添加到线程池的任务列表中,并通知线程进行处理。当任务到到时,把任务放入m_vecTaskList任务列表中,并用pthread_cond_signal唤醒一个线程进行处理。

StopAll函数停止所有的线程

************************************************

代码:

××××××××××××××××××××CThread.h

#ifndef __CTHREAD

#define __CTHREAD

#include <vector>

#include <string>

#include <pthread.h>

using namespace std;

class CTask

{

protected:

 string m_strTaskName;  //任务的名称

 void* m_ptrData;       //要执行的任务的具体数据

public:

 CTask(){}

 CTask(string taskName)

 {

  this->m_strTaskName = taskName;

  m_ptrData = NULL;

 }

 virtual int Run()= 0;

 void SetData(void* data);    //设置任务数据

};

class CThreadPool

{

private:

 vector<CTask*> m_vecTaskList;         //任务列表

 int m_iThreadNum;                            //线程池中启动的线程数          

 static vector<pthread_t> m_vecIdleThread;   //当前空闲的线程集合

 static vector<pthread_t> m_vecBusyThread;   //当前正在执行的线程集合

 static pthread_mutex_t m_pthreadMutex;    //线程同步锁

 static pthread_cond_t m_pthreadCond;    //线程同步的条件变量

protected:

 static void* ThreadFunc(void * threadData); //新线程的线程函数

 static int MoveToIdle(pthread_t tid);   //线程执行结束后,把自己放入到空闲线程中

 static int MoveToBusy(pthread_t tid);   //移入到忙碌线程中去

 int Create();          //创建所有的线程

public:

 CThreadPool(int threadNum);

 int AddTask(CTask *task);      //把任务添加到线程池中

 int StopAll();

};

#endif

类的实现为:

××××××××××××××××××××CThread.cpp

#include "CThread.h"

#include <string>

#include <iostream>

using namespace std;

void CTask::SetData(void * data)

{

 m_ptrData = data;

}

vector<pthread_t> CThreadPool::m_vecBusyThread;

vector<pthread_t> CThreadPool::m_vecIdleThread;

pthread_mutex_t CThreadPool::m_pthreadMutex = PTHREAD_MUTEX_INITIALIZER;

pthread_cond_t CThreadPool::m_pthreadCond = PTHREAD_COND_INITIALIZER;

CThreadPool::CThreadPool(int threadNum)

{

 this->m_iThreadNum = threadNum;

 Create();

}

int CThreadPool::MoveToIdle(pthread_t tid)

{

 vector<pthread_t>::iterator busyIter = m_vecBusyThread.begin();

 while(busyIter != m_vecBusyThread.end())

 {

  if(tid == *busyIter)

  {

   break;

  }

  busyIter++;

 }

 m_vecBusyThread.erase(busyIter);

 m_vecIdleThread.push_back(tid);

 return 0;

}

int CThreadPool::MoveToBusy(pthread_t tid)

{

 vector<pthread_t>::iterator idleIter = m_vecIdleThread.begin();

 while(idleIter != m_vecIdleThread.end())

 {

  if(tid == *idleIter)

  {

   break;

  }

  idleIter++;

 }

 m_vecIdleThread.erase(idleIter);

 m_vecBusyThread.push_back(tid);

 return 0;

}

void* CThreadPool::ThreadFunc(void * threadData)

{

 pthread_t tid = pthread_self();

 while(1)

 {

  pthread_mutex_lock(&m_pthreadMutex);

  pthread_cond_wait(&m_pthreadCond,&m_pthreadMutex);

  cout << "tid:" << tid << " run" << endl;

  //get task

  vector<CTask*>* taskList = (vector<CTask*>*)threadData;

  vector<CTask*>::iterator iter = taskList->begin();

  while(iter != taskList->end())

  {

   MoveToBusy(tid);

   break;

  }

  CTask* task = *iter;

  taskList->erase(iter);

  pthread_mutex_unlock(&m_pthreadMutex);

  cout << "idel thread number:" << CThreadPool::m_vecIdleThread.size() << endl;

  cout << "busy thread number:" << CThreadPool::m_vecBusyThread.size() << endl;

  //cout << "task to be run:" << taskList->size() << endl;

  task->Run();

  //cout << "CThread::thread work" << endl;

  cout << "tid:" << tid << " idle" << endl;

 }

 return (void*)0;

}

int CThreadPool::AddTask(CTask *task)

{

 this->m_vecTaskList.push_back(task);

 pthread_cond_signal(&m_pthreadCond);

 return 0;

}

int CThreadPool::Create()

{

 for(int i = 0; i < m_iThreadNum;i++)

 {

  pthread_t tid = 0;

  pthread_create(&tid,NULL,ThreadFunc,&m_vecTaskList);

  m_vecIdleThread.push_back(tid);

 }

 return 0;

}

int CThreadPool::StopAll()

{

 vector<pthread_t>::iterator iter = m_vecIdleThread.begin();

 while(iter != m_vecIdleThread.end())

 {

  pthread_cancel(*iter);

  pthread_join(*iter,NULL);

  iter++;

 }

 iter = m_vecBusyThread.begin();

 while(iter != m_vecBusyThread.end())

 {

  pthread_cancel(*iter);

  pthread_join(*iter,NULL);

  iter++;

 }

 return 0;

}

简单示例:

××××××××test.cpp

#include "CThread.h"

#include <iostream>

using namespace std;

class CWorkTask: public CTask

{

public:

 CWorkTask()

 {}

 int Run()

 {

  cout << (char*)this->m_ptrData << endl;

  sleep(10);

  return 0;

 }

};

int main()

{

 CWorkTask taskObj;

 char szTmp[] = "this is the first thread running,haha success";

 taskObj.SetData((void*)szTmp);

 CThreadPool threadPool(10);

 for(int i = 0;i < 11;i++)

 {

  threadPool.AddTask(&taskObj);

 }

 while(1)

 {

  sleep(120);

 }

 return 0;

}

继续阅读