在工作组模式中,数据由一组线程分别独立的处理。
以下例子来自于《POSIX多线程编程》,显示了一个简单的工作组。使用两个参数运行程序,一个字符串和一个搜索路径,结果类似于命令“find argv[2] -name * | xargs grep argv[1]". 程序使用一个crew来管理工作组,工作组将搜素路径排队为单链表,如果该路劲是文件,则打开搜素是否包含。
/*
* crew.c
* make:gcc -g -o crew -Wall -lpthread crew.c
* 没有在程序异常情况下,对申请的资源work_t 链表释放。
*/
#include <pthread.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <dirent.h>
#include "errors.h"
#define CREW_SIZE 4
/* work_t组成单链表,其中,表头为crew->first
* 表尾为crew->last,crew->last->nest = crew->last
*/
typedef struct work_tag {
struct work_tag *next;
char *path;
char *string;
} work_t, *work_p;
typedef struct worter_tag {
int index;
pthread_t pthread;
struct crew_tag *crew;
} worker_t, *worker_p;
typedef struct crew_tag {
int crew_size;
worker_t crew[CREW_SIZE];
long work_count;
work_t *first, *last;
pthread_mutex_t mutex;
pthread_cond_t done;
pthread_cond_t go;
} crew_t, *crew_p;
size_t path_max;
size_t name_max;
void* worker_routine(void *arg)
{
worker_p mine = (worker_t*)arg;
crew_p crew = mine->crew;
work_p work, new_work;
struct stat filestat;
struct dirent *entry;
int status;
entry = (struct dirent*)malloc(sizeof(struct dirent) + name_max);
if(entry == NULL) errno_abort("malloc error");
status = pthread_mutex_lock(&crew->mutex);
if(status != 0) err_abort(status, "pthread_mutex_lock failed");
while(crew->work_count == 0) {
status = pthread_cond_wait(&crew->go, &crew->mutex);
if(status != 0) err_abort(status, "pthread_cond_wait failed");
}
status = pthread_mutex_unlock(&crew->mutex);
if(status != 0) err_abort(status, "pthread_mutex_unlock failed");
//printf("Crew %d starting\n", mine->index);
while(1)
{
status = pthread_mutex_lock(&crew->mutex);
if (status != 0) err_abort(status, "pthread_mutex_lock failed");
//printf("Crew %d top: first is %#lx, count is %d\n", mine->index, crew->first, crew->work_count);
while(crew->first == NULL){
status = pthread_cond_wait(&crew->go, &crew->mutex);
if (status != 0) err_abort(status, "pthread_cond_wait failed");
}
work = crew->first;
crew->first = crew->first->next;
if(crew->first == NULL) {
crew->last= NULL;
}
status = pthread_mutex_unlock(&crew->mutex);
if(status != 0) err_abort(status, "pthread_mutex_unlock failed");
status = lstat(work->path, &filestat);
if(S_ISLNK(filestat.st_mode))
printf("Thread %d :%s is a link, skipping. \n", mine->index, work->path);
else if(S_ISDIR(filestat.st_mode)) {
DIR *directory;
struct dirent *result;
directory = opendir(work->path);
if(directory == NULL) {
fprintf(stderr, "unable to open directory %s: %d (%s)\n", work->path, errno, strerror(errno));
continue;
}
while(1) {
/* readdir_r 可重入函数,线程安全 */
status = readdir_r(directory, entry, &result);
if(status != 0){
fprintf(stderr, "unable to open directory %s: %d (%s)\n", work->path, status, strerror(status));
break;
}
if(result == NULL) break;
if(strcmp(entry->d_name, ".") == 0 || strcmp(entry->d_name, "..") == 0) continue;
new_work = (work_p)malloc(sizeof(work_t));
if(new_work == NULL) errno_abort("unable to allocate request");
new_work->path = (char*)malloc(path_max);
if(new_work->path == NULL) errno_abort("unable to allocate path");
strcpy(new_work->path, work->path);
strcat(new_work->path, "/");
strcat(new_work->path, entry->d_name);
new_work->string = work->string;
new_work->next = NULL;
status = pthread_mutex_lock(&crew->mutex);
if(status != 0) err_abort(status, "pthread_mutex_lock failed");
if(crew->first == NULL) {
crew->first = new_work;
crew->last = new_work;
} else {
crew->last->next = new_work;
crew->last = new_work;
}
crew->work_count++;
status = pthread_cond_signal(&crew->go);
status = pthread_mutex_unlock(&crew->mutex);
if(status != 0) err_abort(status, "pthread_mutex_unlock failed");
}
closedir(directory);
} else if(S_ISREG(filestat.st_mode)) {
FILE *search;
char buffer[256], *buf_ptr, *search_ptr;
if((search = fopen(work->path, "r")) == NULL)
fprintf(stderr, "unable to open file %s: %d (%s)\n", work->path, status, strerror(status));
else {
while((buf_ptr = fgets(buffer, sizeof(buffer), search)) != NULL) {
search_ptr = strstr(buffer, work->string);
if(search_ptr != NULL){
printf("thread %d found \"%s\" in %s \n", mine->index, work->string, work->path);
break;
}
}
}
fclose(search);
} else {
fprintf(stderr, "thred %d : %s is type %o (%s) \n", mine->index, work->path, filestat.st_mode & S_IFMT,
S_ISFIFO(filestat.st_mode) ? "FIFO" :(S_ISCHR(filestat.st_mode) ? "CHR":
((S_ISBLK(filestat.st_mode) ? "BLK":((S_ISSOCK(filestat.st_mode) ? "SOCK":"UNKNOWN"))))));
}
free(work->path);
free(work);
status = pthread_mutex_lock(&crew->mutex);
if(status != 0) err_abort(status, "pthread_mutex_lock failed");
crew->work_count--;
if(crew->work_count <= 0) {
status = pthread_cond_broadcast(&crew->done);
if(status != 0) err_abort(status, "pthread_cond_broadcast failed");
status = pthread_mutex_unlock(&crew->mutex);
if(status != 0) err_abort(status, "pthread_mutex_unlock failed");
break;
}
status = pthread_mutex_unlock(&crew->mutex);
if(status != 0) err_abort(status, "pthread_mutex_unlock failed");
}
free(entry);
return (void*)0;
}
int crew_start(crew_p crew, char *filepath, char *search)
{
work_p request;
int status;
status = pthread_mutex_lock(&crew->mutex);
if (status != 0) return status;
/*等待crew完成*/
while(crew->work_count >0){
pthread_cond_wait(&crew->done, &crew->mutex);
if(status != 0) {
pthread_mutex_unlock(&crew->mutex);
return status;
}
}
errno = 0;
path_max = pathconf(filepath, _PC_PATH_MAX);
if(path_max == -1){
if(errno == 0) path_max = 256;
else errno_abort("unable to get path_max");
}
errno = 0;
name_max = pathconf(filepath, _PC_NAME_MAX);
if(name_max == -1){
if(errno == 0) name_max = 256;
else errno_abort("unable to get name_max");
}
//printf("PATH_MAX for %s is %ld, NAME_MAX is %ld", filepath, path_max, name_max);
path_max++;
name_max++;
request = (work_p)malloc(sizeof(work_t));
if(request == NULL) errno_abort("unable to allocate request");
request->path = (char*)malloc(path_max);
if(request->path == NULL) errno_abort("unable to allocate path");
strcpy(request->path, filepath);
request->string = search;
request->next = NULL;
if(crew->first == NULL) {
crew->first = request;
crew->last = request;
} else {
crew->last->next = request;
crew->last = request;
}
crew->work_count++;
status = pthread_cond_signal(&crew->go);
if(status != 0) {
free(crew->first);
crew->first = NULL;
crew->work_count = 0;
pthread_mutex_unlock(&crew->mutex);
return status;
}
while(crew->work_count > 0) {
status = pthread_cond_wait(&crew->done, &crew->mutex);
if(status != 0) err_abort(status, "pthread_cond_wait failed");
}
status = pthread_mutex_unlock(&crew->mutex);
if(status != 0) err_abort(status, "pthread_mutex_unlock failed");
return 0;
}
int crew_create(crew_t *crew, int crew_size)
{
int status;
int crew_index;
if(crew_size > CREW_SIZE)
return EINVAL;
crew->crew_size = crew_size;
crew->work_count = 0;
crew->first = NULL;
crew->last = NULL;
status = pthread_mutex_init(&crew->mutex, NULL);
if (status != 0) return status;
status = pthread_cond_init(&crew->go, NULL);
if (status != 0) return status;
status = pthread_cond_init(&crew->done, NULL);
if (status != 0) return status;
for(crew_index = 0; crew_index < crew_size; crew_index++) {
crew->crew[crew_index].index = crew_index;
crew->crew[crew_index].crew = crew;
status = pthread_create(&crew->crew[crew_index].pthread, NULL,
worker_routine, (void*)&crew->crew[crew_index]);
if(status != 0) err_abort(status, "pthread_create failed");
}
return 0;
}
int main(int argc, char *argv[])
{
crew_t my_crew;
int status;
if(argc < 3) {
fprintf(stderr, "Usage:%s string path\n", argv[0]);
exit(-1);
}
status = crew_create(&my_crew, CREW_SIZE);
if(status != 0) err_abort(status, "create crew.");
status = crew_start(&my_crew, argv[2], argv[1]);
if(status != 0) err_abort(status, "start crew.");
return 0;
}