天天看点

使用线程的几种方式(2)工作组

在工作组模式中,数据由一组线程分别独立的处理。

以下例子来自于《POSIX多线程编程》,显示了一个简单的工作组。使用两个参数运行程序,一个字符串和一个搜索路径,结果类似于命令“find argv[2] -name * | xargs grep argv[1]".  程序使用一个crew来管理工作组,工作组将搜素路径排队为单链表,如果该路劲是文件,则打开搜素是否包含。

/*
 * crew.c
 * make:gcc -g -o crew -Wall -lpthread crew.c
 * 没有在程序异常情况下,对申请的资源work_t 链表释放。
 */
#include <pthread.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <dirent.h>
#include "errors.h"

#define CREW_SIZE 4
/* work_t组成单链表,其中,表头为crew->first
 * 表尾为crew->last,crew->last->nest = crew->last
 */
typedef struct work_tag {
    struct work_tag *next;
    char            *path;
    char            *string;
} work_t, *work_p;

typedef struct worter_tag {
    int             index;
    pthread_t       pthread;    
    struct crew_tag *crew;
} worker_t, *worker_p;

typedef struct crew_tag {
    int             crew_size;
    worker_t        crew[CREW_SIZE];
    long            work_count;
    work_t          *first, *last;
    pthread_mutex_t mutex;
    pthread_cond_t  done;
    pthread_cond_t  go;
} crew_t, *crew_p;

size_t path_max;
size_t name_max;

void* worker_routine(void *arg)
{
    worker_p mine = (worker_t*)arg;
    crew_p crew = mine->crew;
    work_p work, new_work;
    struct stat filestat;
    struct dirent *entry;
    int status;

    entry = (struct dirent*)malloc(sizeof(struct dirent) + name_max);
    if(entry ==  NULL) errno_abort("malloc error");
    status = pthread_mutex_lock(&crew->mutex);
    if(status != 0) err_abort(status, "pthread_mutex_lock failed");
    while(crew->work_count == 0) {
        status = pthread_cond_wait(&crew->go, &crew->mutex);
        if(status != 0) err_abort(status, "pthread_cond_wait failed");
    }
    status = pthread_mutex_unlock(&crew->mutex);
    if(status != 0) err_abort(status, "pthread_mutex_unlock failed");
    //printf("Crew %d starting\n", mine->index); 

    while(1)
    {
        status = pthread_mutex_lock(&crew->mutex);
        if (status != 0) err_abort(status, "pthread_mutex_lock failed");
        //printf("Crew %d top: first is %#lx, count is %d\n", mine->index, crew->first, crew->work_count);
        while(crew->first == NULL){
            status = pthread_cond_wait(&crew->go, &crew->mutex);
            if (status != 0) err_abort(status, "pthread_cond_wait  failed");     
        }
        
        work = crew->first;
        crew->first = crew->first->next;
        if(crew->first == NULL) {
            crew->last= NULL;
        }
        status = pthread_mutex_unlock(&crew->mutex);
        if(status != 0) err_abort(status, "pthread_mutex_unlock failed");

        status = lstat(work->path, &filestat);
        if(S_ISLNK(filestat.st_mode))
            printf("Thread %d :%s is a link, skipping. \n", mine->index, work->path);
        else if(S_ISDIR(filestat.st_mode)) {
            DIR *directory;
            struct dirent *result;
            directory = opendir(work->path);
            if(directory == NULL) {
                fprintf(stderr, "unable to open directory %s: %d (%s)\n", work->path, errno, strerror(errno));
                continue;
            }
            while(1) {
                /* readdir_r 可重入函数,线程安全 */
                status = readdir_r(directory, entry, &result);
                if(status != 0){
                    fprintf(stderr, "unable to open directory %s: %d (%s)\n", work->path, status, strerror(status));            
                    break;
                }
                if(result == NULL) break;
                if(strcmp(entry->d_name, ".") == 0 || strcmp(entry->d_name, "..") == 0) continue;
                new_work = (work_p)malloc(sizeof(work_t));
                if(new_work == NULL) errno_abort("unable to allocate request");
                new_work->path = (char*)malloc(path_max);
                if(new_work->path == NULL) errno_abort("unable to allocate path");
                strcpy(new_work->path, work->path);
                strcat(new_work->path, "/");
                strcat(new_work->path, entry->d_name);
                new_work->string = work->string;
                new_work->next = NULL;
                status = pthread_mutex_lock(&crew->mutex);
                if(status != 0) err_abort(status, "pthread_mutex_lock failed");
                if(crew->first == NULL) {
                    crew->first = new_work;
                    crew->last = new_work;
                } else {
                    crew->last->next = new_work;
                    crew->last = new_work;
                }
                crew->work_count++;
                status = pthread_cond_signal(&crew->go);
                status = pthread_mutex_unlock(&crew->mutex);
                if(status != 0) err_abort(status, "pthread_mutex_unlock failed");
            }
            closedir(directory);
       } else if(S_ISREG(filestat.st_mode)) {
           FILE *search;
           char buffer[256], *buf_ptr, *search_ptr;
           if((search = fopen(work->path, "r")) == NULL)
               fprintf(stderr, "unable to open file %s: %d (%s)\n", work->path, status, strerror(status));            
           else {
               while((buf_ptr = fgets(buffer, sizeof(buffer), search)) != NULL) {
                   search_ptr = strstr(buffer, work->string);
                   if(search_ptr != NULL){
                       printf("thread %d found \"%s\" in %s \n", mine->index, work->string, work->path);
                       break;
                   }
               }
           } 
           fclose(search); 
       } else {
           fprintf(stderr, "thred %d : %s is type %o (%s) \n", mine->index, work->path, filestat.st_mode & S_IFMT,
                           S_ISFIFO(filestat.st_mode) ? "FIFO" :(S_ISCHR(filestat.st_mode) ? "CHR":
                           ((S_ISBLK(filestat.st_mode) ? "BLK":((S_ISSOCK(filestat.st_mode) ? "SOCK":"UNKNOWN"))))));
       }
       free(work->path);
       free(work);

       status = pthread_mutex_lock(&crew->mutex);
       if(status != 0) err_abort(status, "pthread_mutex_lock failed");
       crew->work_count--;
       if(crew->work_count <= 0) {
           status = pthread_cond_broadcast(&crew->done);
           if(status != 0) err_abort(status, "pthread_cond_broadcast failed"); 
           status = pthread_mutex_unlock(&crew->mutex);
           if(status != 0) err_abort(status, "pthread_mutex_unlock failed");
           break;     
       }
       status = pthread_mutex_unlock(&crew->mutex);
       if(status != 0) err_abort(status, "pthread_mutex_unlock failed");
    }

    free(entry);
    return (void*)0;
}

int crew_start(crew_p crew, char *filepath, char *search)
{
    work_p request;
    int status;

    status = pthread_mutex_lock(&crew->mutex);
    if (status != 0) return status;
    
    /*等待crew完成*/
    while(crew->work_count >0){
        pthread_cond_wait(&crew->done, &crew->mutex);
        if(status != 0) {
            pthread_mutex_unlock(&crew->mutex);
            return status;
        }
    }

    errno = 0;
    path_max = pathconf(filepath, _PC_PATH_MAX);
    if(path_max == -1){
        if(errno == 0) path_max = 256;
        else errno_abort("unable to get path_max");
    }  
    errno = 0;
    name_max = pathconf(filepath, _PC_NAME_MAX);
    if(name_max == -1){
        if(errno == 0) name_max = 256;
        else errno_abort("unable to get name_max");
    }  
    
    //printf("PATH_MAX for %s is %ld, NAME_MAX is %ld", filepath, path_max, name_max); 
    path_max++;
    name_max++;

    request = (work_p)malloc(sizeof(work_t));
    if(request == NULL) errno_abort("unable to allocate request");
    
    request->path = (char*)malloc(path_max);
    if(request->path == NULL) errno_abort("unable to allocate path");
    strcpy(request->path, filepath);
    request->string = search;
    request->next = NULL;
    
    if(crew->first == NULL) {
        crew->first = request;
        crew->last = request;
    } else {
        crew->last->next = request;
        crew->last = request;
    }
    
    crew->work_count++;
    status = pthread_cond_signal(&crew->go);
    if(status != 0) {
        free(crew->first);
        crew->first = NULL;
        crew->work_count = 0;
        pthread_mutex_unlock(&crew->mutex);
        return status;
    }
    while(crew->work_count > 0) {
        status = pthread_cond_wait(&crew->done, &crew->mutex); 
        if(status != 0) err_abort(status, "pthread_cond_wait failed");
    }
    status = pthread_mutex_unlock(&crew->mutex);
    if(status != 0) err_abort(status, "pthread_mutex_unlock failed");
    return 0;
}

int crew_create(crew_t *crew, int crew_size)
{
    int status;
    int crew_index;

    if(crew_size > CREW_SIZE)
        return EINVAL;

    crew->crew_size = crew_size;
    crew->work_count = 0;
    crew->first = NULL;
    crew->last = NULL;

    status = pthread_mutex_init(&crew->mutex, NULL);
    if (status != 0) return status; 
    status = pthread_cond_init(&crew->go, NULL);
    if (status != 0) return status; 
    status = pthread_cond_init(&crew->done, NULL);
    if (status != 0) return status;

    for(crew_index = 0; crew_index < crew_size; crew_index++) {
        crew->crew[crew_index].index = crew_index;
        crew->crew[crew_index].crew = crew;
        status = pthread_create(&crew->crew[crew_index].pthread, NULL, 
                                worker_routine, (void*)&crew->crew[crew_index]);
        if(status != 0) err_abort(status, "pthread_create failed"); 
    }
    return 0;
}

int main(int argc, char *argv[])
{
    crew_t my_crew;
    int status;

    if(argc < 3) {
        fprintf(stderr, "Usage:%s string path\n", argv[0]);
        exit(-1);
    }

    status = crew_create(&my_crew, CREW_SIZE);
    if(status != 0) err_abort(status, "create crew.");
    status = crew_start(&my_crew, argv[2], argv[1]);
    if(status != 0) err_abort(status, "start crew.");
    return 0;
}