天天看點

《UNIX網絡程式設計 卷2》讀書筆記(四)

/* include globals */

#include    "unpipc.h"

#define    MAXNITEMS         1000000

#define    MAXNTHREADS            100

        /* globals shared by threads */

int        nitems;                /* read-only by producer and consumer */

int        buff[MAXNITEMS];

struct {

  pthread_mutex_t    mutex;

  int                nput;    /* next index to store */

  int                nval;    /* next value to store */

} put = { PTHREAD_MUTEX_INITIALIZER };

  pthread_cond_t    cond;

  int                nready;    /* number ready for consumer */

} nready = { PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER };

/* end globals */

void    *produce(void *), *consume(void *);

/* include main */

int

main(int argc, char **argv)

{

    int            i, nthreads, count[MAXNTHREADS];

    pthread_t    tid_produce[MAXNTHREADS], tid_consume;

    if (argc != 3)

        err_quit("usage: prodcons6 <#items> <#threads>");

    nitems = min(atoi(argv[1]), MAXNITEMS);

    nthreads = min(atoi(argv[2]), MAXNTHREADS);

    Set_concurrency(nthreads + 1);

        /* 4create all producers and one consumer */

    for (i = 0; i < nthreads; i++) {

        count[i] = 0;

        Pthread_create(&tid_produce[i], NULL, produce, &count[i]);

    }

    Pthread_create(&tid_consume, NULL, consume, NULL);

        /* wait for all producers and the consumer */

        Pthread_join(tid_produce[i], NULL);

        printf("count[%d] = %d\n", i, count[i]);    

    Pthread_join(tid_consume, NULL);

    exit(0);

}

/* end main */

/* include prodcons */

void *

produce(void *arg)

    for ( ; ; ) {

        Pthread_mutex_lock(&put.mutex);

        if (put.nput >= nitems) {

            Pthread_mutex_unlock(&put.mutex);

            return(NULL);        /* array is full, we're done */

        }

        buff[put.nput] = put.nval;

        put.nput++;

        put.nval++;

        Pthread_mutex_unlock(&put.mutex);

        Pthread_mutex_lock(&nready.mutex);

        if (nready.nready == 0)

            Pthread_cond_signal(&nready.cond);//發出信号

        nready.nready++;//置為1 

        Pthread_mutex_unlock(&nready.mutex);

        *((int *) arg) += 1;

consume(void *arg)

    int        i;

    for (i = 0; i < nitems; i++) {

        while (nready.nready == 0)

            Pthread_cond_wait(&nready.cond, &nready.mutex);//wait條件變量

        nready.nready--;//置為0

        if (buff[i] != i)

            printf("buff[%d] = %d\n", i, buff[i]);

    return(NULL);

/* end prodcons */

這裡在生産者的代碼中,當它擷取到互斥鎖時,若發出信号喚醒消費者,則此時可能系統立即排程喚醒消費者,但互斥鎖任然在生産者之手,則消費者擷取互斥鎖必然失敗,為了避免此種低效的情況出現,我們可以直到生産者釋放互斥鎖後才給與之關聯的條件變量發送信号,這在Posix裡是可以這麼做的,但Posix又接着說:若要可預見的排程行為,則調用pthead_cond_signal的線程必須鎖住該互斥鎖。

當在程序間共享互斥鎖時,持有該互斥鎖的程序可能在持有期間終止,但無法讓系統在終止時自動釋放掉所持有的鎖。一個線程也可以在持有互斥鎖期間終止,可能是自己調用pthread_exit或被另一個線程取消,若是前者,則它應該知道自己還有一個互斥鎖,若是後者,則該線程可以先安裝一個将在被取消時調用的清理處理程式。但最緻命的情況是由于此線程的終止導緻整個程序的終止。即使一個程序終止時系統自動釋放其持有的鎖,但也會導緻臨界區内資料處于不一緻狀态,

讀寫鎖的規則:

1,隻要沒有線程持有某個給定的讀寫鎖用于寫,則任意數目的線程可以持有

該讀寫鎖用于讀

2,僅當沒有線程持有某個給定的讀寫鎖用于讀或用于寫時,才能配置設定該讀寫鎖用于寫

這種鎖在那些讀資料比寫資料頻繁的應用中使用比較有用,允許多個讀者提供了更高的并發度,同時在寫者修改資料時保護資料,避免任何其他讀者或寫者的幹擾。

這種對某個給定資源的共享通路也叫共享—獨占上鎖,擷取一個讀寫鎖用于讀稱為共享鎖,擷取一個讀寫鎖用于寫稱為獨占鎖。在作業系統中就介紹過這種,經典的問題就是讀者—寫者問題,有多種類型:多讀者,單寫者或多讀者,多寫者。,此外還有要考慮的就是讀者和寫者誰優先,也就産生了1類和2類讀寫問題。

      讀寫鎖類型為pthread_rwlock_t。pthread_rwlock_rdlock擷取一個讀出鎖,若對應的讀寫鎖已經被某個寫者持有,則阻塞調用線程,pthread_rwlock_wrlock擷取一個寫出鎖,若對應的讀寫鎖已經被另一個寫者持有或被一個或多個讀者持有,則阻塞調用線程,pthread_rwlock_unlock釋放一個讀出鎖或寫入鎖。

      使用互斥鎖和條件變量實作讀寫鎖(寫者優先)

typedef struct {

  pthread_mutex_t    rw_mutex;        /* basic lock on this struct *///通路此讀寫鎖使用的互斥鎖

  pthread_cond_t    rw_condreaders;    /* for reader threads waiting  讀者線程使用*/

  pthread_cond_t    rw_condwriters;    /* for writer threads waiting  寫者線程使用*/

  int                rw_magic;        /* for error checking 初始化成功後, 被設定為RW_MAGIC,所有函數測試此成員,檢查調用者是否作為參數傳遞了指向某個已經初始化的讀寫鎖的指針,讀寫鎖摧毀時,被設定為0*/

  int                rw_nwaitreaders;/* the number waiting  讀者計數器*/

  int                rw_nwaitwriters;/* the number waiting   寫者計數器*/

  int                rw_refcount;//本讀寫鎖的目前狀态,-1表示是寫入鎖(任意時刻隻有一個),0表示可用,大于0表示目前容納着的讀出鎖數目

    /* 4-1 if writer has the lock, else # readers holding the lock */

} pthread_rwlock_t;

int pthread_rwlock_init(pthread_rwlock_t *rw, pthread_rwlockattr_t *attr)

    int        result;

    if (attr != NULL)

        return(EINVAL);        /* not supported */

    if ( (result = pthread_mutex_init(&rw->rw_mutex, NULL)) != 0)

        goto err1;

    if ( (result = pthread_cond_init(&rw->rw_condreaders, NULL)) != 0)

        goto err2;

    if ( (result = pthread_cond_init(&rw->rw_condwriters, NULL)) != 0)

        goto err3;

    rw->rw_nwaitreaders = 0;

    rw->rw_nwaitwriters = 0;

    rw->rw_refcount = 0;

    rw->rw_magic = RW_MAGIC;

    return(0);

err3:

    pthread_cond_destroy(&rw->rw_condreaders);

err2:

    pthread_mutex_destroy(&rw->rw_mutex);

err1:

    return(result);            /* an errno value */

int pthread_rwlock_destroy(pthread_rwlock_t *rw)

    //檢查參數是否有效

    if (rw->rw_magic != RW_MAGIC)

        return(EINVAL);

    if (rw->rw_refcount != 0 ||

        rw->rw_nwaitreaders != 0 || rw->rw_nwaitwriters != 0)

        return(EBUSY);

    pthread_cond_destroy(&rw->rw_condwriters);

    rw->rw_magic = 0;

int pthread_rwlock_rdlock(pthread_rwlock_t *rw)

    //操作讀寫鎖前,先給其互斥鎖上鎖

    if ( (result = pthread_mutex_lock(&rw->rw_mutex)) != 0)

        return(result);

        /* 4give preference to waiting writers */

    while (rw->rw_refcount < 0 || rw->rw_nwaitwriters > 0) 

{// rw_refcount 小于0(表示有寫者持有讀寫鎖),rw_nwaitwriters大于0表示有線程正等着擷取讀寫鎖的一個寫入鎖,則無法擷取該讀寫鎖的一個讀出鎖

        rw->rw_nwaitreaders++;

        result = pthread_cond_wait(&rw->rw_condreaders, &rw->rw_mutex);

        rw->rw_nwaitreaders--;

        if (result != 0)

            break;

    if (result == 0)

        rw->rw_refcount++;        /* another reader has a read lock */

    pthread_mutex_unlock(&rw->rw_mutex);

    return (result);

int pthread_rwlock_tryrdlock(pthread_rwlock_t *rw)

    if (rw->rw_refcount < 0 || rw->rw_nwaitwriters > 0)

        result = EBUSY;            /* held by a writer or waiting writers */

    else

        rw->rw_refcount++;        /* increment count of reader locks */

    return(result);

int pthread_rwlock_wrlock(pthread_rwlock_t *rw)

    while (rw->rw_refcount != 0) 

{//隻要有讀者持有讀出鎖或有一個寫者持有唯一的寫入鎖,調用線程阻塞

        rw->rw_nwaitwriters++;

        result = pthread_cond_wait(&rw->rw_condwriters, &rw->rw_mutex);

        rw->rw_nwaitwriters--;

        rw->rw_refcount = -1;

int pthread_rwlock_trywrlock(pthread_rwlock_t *rw)

    if (rw->rw_refcount != 0)

        result = EBUSY;            /* held by either writer or reader(s) */

        rw->rw_refcount = -1;    /* available, indicate a writer has it */

int pthread_rwlock_unlock(pthread_rwlock_t *rw)

    if (rw->rw_refcount > 0)

        rw->rw_refcount--;            /* releasing a reader */

    else if (rw->rw_refcount == -1)

        rw->rw_refcount = 0;        /* releasing a reader */

        err_dump("rw_refcount = %d", rw->rw_refcount);

        /* 4give preference to waiting writers over waiting readers */

    if (rw->rw_nwaitwriters > 0) {

        if (rw->rw_refcount == 0)

            result = pthread_cond_signal(&rw->rw_condwriters);

    } else if (rw->rw_nwaitreaders > 0)

        result = pthread_cond_broadcast(&rw->rw_condreaders);

        rw->rw_refcount = 0;        /* releasing a writer */

        //寫者優先,而隻有一個寫者

            result = pthread_cond_signal(&rw->rw_condwriters);//通知等待的那個寫者

        result = pthread_cond_broadcast(&rw->rw_condreaders);//通知所有等待的讀者

這裡的讀出鎖和寫入鎖函數都有一個問題,若調用線程阻塞在pthread_cond_wati調用上,并且随後此線程被取消了,則它會在還持有互斥鎖的情況下終止,于是rw_nwaitreaders計數器的值會出錯

本文轉自Phinecos(洞庭散人)部落格園部落格,原文連結:http://www.cnblogs.com/phinecos/archive/2008/05/28/1209230.html,如需轉載請自行聯系原作者