天天看點

一個線程池的簡單的實作

線程池實作: 用于執行大量相對短暫的任務 當任務增加的時候能夠動态的增加線程池中線程的數量直到達到一個門檻值。 當任務執行完畢的時候,能夠動态的銷毀線程池中的線程

該線程池的實作本質上也是生産者與消費模型的應用。生産者線程向任務隊列中添加任務,一旦隊列有任務到來,如果有等待線程就喚醒來執行任務,如果沒有等待線程并且線程數沒有達到門檻值,就建立新線程來執行任務。

contion.h #ifndef _CONDITION_H_

#define _CONDITION_H_

#include <pthread.h>

typedef struct condition

{

pthread_mutex_t pmutex;

pthread_cond_t pcond;

} condition_t;

int condition_init(condition_t *cond);

int condition_lock(condition_t *cond);

int condition_unlock(condition_t *cond);

int condition_wait(condition_t *cond);

int condition_timedwait(condition_t *cond, const struct timespec *abstime);

int condition_signal(condition_t *cond);

int condition_broadcast(condition_t *cond);

int condition_destroy(condition_t *cond);

#endif

condition.c #include "condition.h"

int condition_init(condition_t *cond)

{

int status;

if ((status = pthread_mutex_init(&cond->pmutex, NULL)))

return status;

if ((status = pthread_cond_init(&cond->pcond, NULL)))

return status;

return 0;

}

int condition_lock(condition_t *cond)

{

return pthread_mutex_lock(&cond->pmutex);

}

int condition_unlock(condition_t *cond)

{

return pthread_mutex_unlock(&cond->pmutex);

}

int condition_wait(condition_t *cond)

{

return pthread_cond_wait(&cond->pcond, &cond->pmutex);

}

int condition_timedwait(condition_t *cond, const struct timespec *abstime)

{

return pthread_cond_timedwait(&cond->pcond, &cond->pmutex, abstime);

}

int condition_signal(condition_t *cond)

{

return pthread_cond_signal(&cond->pcond);

}

int condition_broadcast(condition_t* cond)

{

return pthread_cond_broadcast(&cond->pcond);

}

int condition_destroy(condition_t* cond)

{

int status;

if ((status = pthread_mutex_destroy(&cond->pmutex)))

return status;

if ((status = pthread_cond_destroy(&cond->pcond)))

return status;

return 0;

}

一個線程池的簡單的實作

threadpool.h #ifndef _THREAD_POOL_H_

#define _THREAD_POOL_H_

#include "condition.h"

// 任務結構體,将任務放入隊列由線程池中的線程來執行

typedef struct task

{

void *(*run)(void *arg); // 任務回調函數

void *arg; // 回調函數參數

struct task *next;

} task_t;

// 線程池結構體

typedef struct threadpool

{

condition_t ready; //任務準備就緒或者線程池銷毀通知

task_t *first; //任務隊列頭指針

task_t *last; //任務隊列尾指針

int counter; //線程池中目前線程數

int idle; //線程池中目前正在等待任務的線程數

int max_threads; //線程池中最大允許的線程數

int quit; //銷毀線程池的時候置1

} threadpool_t;

// 初始化線程池

void threadpool_init(threadpool_t *pool, int threads);

// 往線程池中添加任務

void threadpool_add_task(threadpool_t *pool, void *(*run)(void *arg), void *arg);

// 銷毀線程池

void threadpool_destroy(threadpool_t *pool);

#endif

threadpool.c #include "threadpool.h"

#include <stdlib.h>

#include <stdio.h>

#include <string.h>

#include <errno.h>

#include <time.h>

void *thread_routine(void *arg)

{

struct timespec abstime;

int timeout;

printf("thread 0x%x is starting\n", (int)pthread_self());

threadpool_t *pool = (threadpool_t *)arg;

while (1)

{

timeout = 0;

condition_lock(&pool->ready);

pool->idle++;

// 等待隊列有任務到來或者線程池銷毀通知

while (pool->first == NULL && !pool->quit)

{

printf("thread 0x%x is waiting\n", (int)pthread_self());

//condition_wait(&pool->ready);

clock_gettime(CLOCK_REALTIME, &abstime);

abstime.tv_sec += 2;

int status = condition_timedwait(&pool->ready, &abstime);

if (status == ETIMEDOUT)

{

printf("thread 0x%x is wait timed out\n", (int)pthread_self());

timeout = 1;

break;

}

}

// 等待到條件,處于工作狀态

pool->idle--;

// 等待到任務

if (pool->first != NULL)

{

// 從隊頭取出任務

task_t *t = pool->first;

pool->first = t->next;

// 執行任務需要一定的時間,是以要先解鎖,以便生産者程序

// 能夠往隊列中添加任務,其它消費者線程能夠進入等待任務

condition_unlock(&pool->ready);

t->run(t->arg);

free(t);

condition_lock(&pool->ready);

}

// 如果等待到線程池銷毀通知, 且任務都執行完畢

if (pool->quit && pool->first == NULL)

{

pool->counter--;

if (pool->counter == 0)

condition_signal(&pool->ready);

condition_unlock(&pool->ready);

// 跳出循環之前要記得解鎖

break;

}

if (timeout && pool->first == NULL)

{

pool->counter--;

condition_unlock(&pool->ready);

// 跳出循環之前要記得解鎖

break;

}

condition_unlock(&pool->ready);

}

printf("thread 0x%x is exting\n", (int)pthread_self());

return NULL;

}

// 初始化線程池

void threadpool_init(threadpool_t *pool, int threads)

{

// 對線程池中的各個字段初始化

condition_init(&pool->ready);

pool->first = NULL;

pool->last = NULL;

pool->counter = 0;

pool->idle = 0;

pool->max_threads = threads;

pool->quit = 0;

}

// 往線程池中添加任務

void threadpool_add_task(threadpool_t *pool, void *(*run)(void *arg), void *arg)

{

// 生成新任務

task_t *newtask = (task_t *)malloc(sizeof(task_t));

newtask->run = run;

newtask->arg = arg;

newtask->next = NULL;

condition_lock(&pool->ready);

// 将任務添加到隊列

if (pool->first == NULL)

pool->first = newtask;

else

pool->last->next = newtask;

pool->last = newtask;

// 如果有等待線程,則喚醒其中一個

if (pool->idle > 0)

condition_signal(&pool->ready);

else if (pool->counter < pool->max_threads)

{

// 沒有等待線程,并且目前線程數不超過最大線程數,則建立一個新線程

pthread_t tid;

pthread_create(&tid, NULL, thread_routine, pool);

pool->counter++;

}

condition_unlock(&pool->ready);

}

// 銷毀線程池

void threadpool_destroy(threadpool_t *pool)

{

if (pool->quit)

{

return;

}

condition_lock(&pool->ready);

pool->quit = 1;

if (pool->counter > 0)

{

if (pool->idle > 0)

condition_broadcast(&pool->ready);

// 處于執行任務狀态中的線程,不會收到廣播

// 線程池需要等待執行任務狀态中的線程全部退出

while (pool->counter > 0)

condition_wait(&pool->ready);

}

condition_unlock(&pool->ready);

condition_destroy(&pool->ready);

}

main.c #include "threadpool.h"

#include <unistd.h>

#include <stdio.h>

#include <stdlib.h>

void* mytask(void *arg)

{

printf("thread 0x%x is working on task %d\n", (int)pthread_self(), *(int*)arg);

sleep(1);

free(arg);

return NULL;

}

int main(void)

{

threadpool_t pool;

threadpool_init(&pool, 3);

int i;

for (i=0; i<10; i++)

{

int *arg = (int *)malloc(sizeof(int));

*arg = i;

threadpool_add_task(&pool, mytask, arg);

}

//sleep(15);

threadpool_destroy(&pool);

return 0;

}

makefile: .PHONY:clean

CC=gcc

CFLAGS=-Wall -g

ALL=main

all:$(ALL)

OBJS=threadpool.o main.o condition.o

.c.o:

$(CC) $(CFLAGS) -c $<

main:$(OBJS)

$(CC) $(CFLAGS) $^ -o [email protected] -lpthread -lrt

clean:

rm -f $(ALL) *.o

繼續閱讀