天天看點

Scalable Hazard Pointers

由于我對于Java并發庫JUC的深入了解,一直以來有個想法,能不能把Java并發庫移植到純C語言環境下,并且在實作、使用方式上都與Java平台保持相當程度的相似性呢?

純C環境下記憶體模型與Java平台不一緻?加上記憶體屏障(fence)或者lock指令就行。 

C環境下缺少對象模型?無非是給每個資料塊提供個*init方法(比如pthread_mutex_t的pthread_mutex_init,pthread_barrier_t的pthread_barrier_init),之後再将邏輯的部分直接寫到函數裡(phtread_mutex_lock、pthread_barrier_wait),無非是這些邏輯并不是像Java平台上一樣被綁定到某個"對象"。這些都不是本質上困難的地方。

真正的困難在于記憶體回收。

Java平台,我們能夠對記憶體做的唯一事情就是申請、建立(new關鍵字),如此一來便得到一個新的對象。之後,我們無法直接針對這塊記憶體釋放。我們最多隻能把自己目所能及範圍内的關于這塊記憶體的"引用"置為null,進而期待GC去回收(前提是這塊記憶體不存在其他引用)。同時,我們根本無法知道究竟哪個時刻這塊記憶體會被回收,我們隻能認為,Runtime environment能夠選擇最适當的時刻。

**同時,Java虛拟機給出了一個更強的保證:隻要你的對象(引用)obj != null,那麼這個引用所訓示的對象便是肯定存在的,我們可以絕對安全地調用obj.method()而不用害怕任何意外。**

把這個場景放在C環境下比喻似乎就在說:如果你能看到某個位址(數值),那麼就放心對他做任何合理的事吧!Runtime environment確定了你的安全!

總之,Java運作時給出了強大的保證:

1 看得到的對象都存在,你可以放心對它操作。

2 那些正在被回收的、你不能安全操作的對象(記憶體),你絕對無法看到。

如此一來,無論是記憶體獨占或共享、線程并發或非并發,我們都無需擔心記憶體本身的問題(Java GC回收所有記憶體,全平台的垃圾回收器)。

那麼現在我們回頭來看C環境。它的運作時環境根本不幫你做任何事,你甚至可以虛構一個記憶體位址,然後将它強制轉化為虛構的struct類型,接着對它操作。隻不過這樣很可能破壞了記憶體,導緻不可預期的結果,同時這個錯誤會一直潛伏,你根本不知道何時出現。甚至于,都不需要定義結構體資訊,假如你了解運作機器的具體資訊,你都可以直接在一個位址上做位址偏移來操作記憶體......當然,前提是這塊記憶體是安全的。

那麼問題就很清楚了,

Java環境下記憶體安全回收由虛拟機完全負責。

C環境下,記憶體安全回收由邏輯單元(線程)本身來負責。

也就是說C環境下,我們不僅要處理算法本身的邏輯,同時也要額外去處理記憶體回收的問題。

那麼C環境下記憶體回收困難在哪呢?

首先,我們不考慮不需要回收的記憶體。我們知道,獨占的記憶體容易回收,對于共享的記憶體,假如有個邏輯點,我們確定所有線程當下以及将來都不會使用這塊記憶體,那麼也是可以安全回收的。

**是以真正難于回收的是滿足以下3個條件的記憶體:**

 1. **共享的。**

 2. **需要回收的(有些程式設計成不回收)。**

 3. **沒有明确的可以安全回收記憶體的邏輯點(存在被并發讀寫,但是有明确邏輯點回收的設計,比如主邏輯join)**

OK,我們接着引出Maged M. Michael,在2004發表的《Hazard Pointers:Safe Memory Reclamation for Lock-Free Objects》。

Hazard Pointer可以說是最知名的一種共享記憶體的回收方案。借用Erez Petrank的ppt,它可以概括如下:

Scalable Hazard Pointers

以及後續變化的[Lock-Free Data Structures with Hazard Pointers][2]、[Hazard Eras][3]、[Maged M. Michael][4]等。

**對于每一塊被共享的記憶體,這些技術都需要為它配置另外N(MAX_THREAD)個記憶體來标記它是否被對應的線程通路,同時為了釋放一塊記憶體,都需要周遊這N個記憶體位置進而判定是否可以安全回收(盡管可以采用先排序,二分搜尋等方式降低檢索的資料量,但是檢索過程的代價依然是随着MAX_THREAD而增長的,就算你總操作數一樣。**盡管它的算法是lock-free(甚至wait-free)。***這裡的缺陷在于每一塊共享資料都與線程本身緊緊耦合,幾乎沒有擴充性。***

而另一種被廣泛讨論的回收方案是epoch-based Reclamation[Practical lock-freedom][5]以及相似的技術[Performance of memory reclamation for lockless synchronization][6],**它的性能很好,但是如[記憶體管理規則][7]所說,它并不是無阻塞的算法,它在本質上就會阻塞,是以progress無法得到保證。**

其中2005年,哥德堡大學的研究團隊發表的方案[Practical and Efficient Lock-Free Garbage Collection Based on Reference Counting][8],**盡管也采用了引用計數的方式(與我将要介紹的方式類似),但是卻并沒有解除線程本身與共享資料之間的依賴,并且scan時依然要周遊整個線程組。**

在不計較lock指令或者fence的情況下,是否存在一種方法能夠同時做到:

 - **解除共享資料與線程之間的強制依賴,不需要為每一塊共享資料造另N個标記(正因如此,通常線程數N較小),進而極大得增強算法的擴充性,并且節省記憶體。**

 - **減少甚至取消判斷記憶體是否可被回收的周遊過程,消除MAX_THREAD的限制(算法視角),進而極大地減少了算法的搜尋代價,同時增強了算法的靈活性和可用性(無MAX_THREAD限制)。**

 - **同時,它保持non-blocking的progress。**

 - **另外,能不能提供一種方式,在我們開發某些并發資料結構時,它能夠正确回收對象層次内部的資料呢?**

為了達成以上三個目标,我向大家介紹自創的**SHP(Scalable Hazard Pointers)**。

----------

SHP

===

Scalable Hazard Pointers分為如下三個部分來講述:

 - **3RE&S協定(每一塊滿足以上三個條件的記憶體,都通過3RE協定規定的方式回收)**

 - **并發無鎖帶引用計數的有序單連結清單(我們組織被記錄位址的方式)**

 - **可擴充的Reocrd資源庫(我們維護Record的方式)**

 - **不斷的優化(局部保留Record+每線程每記憶體變量、連結清單周遊保留前驅、批量擷取Record、統一回收的大塊資料)**

 - **帶參數的retire,通過傳入定制的freeMemory方法,進而在回收目前記憶體前,先回收内部指針開辟的資料**

3RE&S Protocol

----

3RE&S指的是如下四個,針對被配置設定記憶體位址(pointer)的抽象操作:

 - **RECORD(增加一個記錄計數)** 

 - **REMOVE(移除一個記錄計數)**

 - **RETIRE(标記該記錄的retire,表明之後若記錄計數為0則可回收)**

 - **SCAN:周遊已經retire的所有record,有政策地回收記憶體**

同時,對于共享資料的讀、寫為下面兩種方式:

 - read: read共享變量+RECORD+再檢驗共享變量

 - write: write共享變量+RETIRE共享變量(write操作一般的應用場景會在read之後)

這個協定總結了,包含了hazardpointer以及許多類似技術的處理方式,RECORD操作時将共享記憶體位址本身也作為傳輸傳入。 

幾乎任何一種算法,并發共享的資料都可通過以上方式回收。

這裡的關鍵點是:

 - 一塊記憶體是否被回收這件事情最好由SCAN操作中的原子操作來完成,原因是我們肯定是在retire之後才考慮回收記憶體,同時RECORD可以回退。

 - **RECORD之後一般有個再判斷的操作。**

這裡給出個針對Michael原始論文的對比例子:

Scalable Hazard Pointers
void enqueue(int value){
	NodeType* node;
	posix_memalign(&node, 64, sizeof(NodeType));
	memset(node, 0, sizeof(NodeType));
	node->value = value;
	node->next = NULL;
	NodeType* t;
	for(;;){
		t = Tail;
		psly_record(&Tail, t);
		if(Tail != t) {
			psly_remove(t);
			continue; 
		}
		NodeType* next = t->next;
		if(Tail != t) {
			psly_remove(t);
			continue;
		}
		if(next != NULL){ 
			psly_remove(t);
			__sync_bool_compare_and_swap(&Tail, t, next);
			continue;
		}
		if(__sync_bool_compare_and_swap(&t->next, NULL, node)) {
			psly_remove(t);
			break;
		}
		psly_remove(t);
	}
	__sync_bool_compare_and_swap(&Tail, t, node);
}
           
Scalable Hazard Pointers
int dequeue(){
	int data;
	NodeType* h;
	for(;;){
		h = Head;
		//myhprec->HP[0] = h;
		psly_record(&Tail, h);
		if(Head != h) {
			psly_remove(h);
			continue;
		}
		NodeType* t = Tail;
		NodeType* next = h->next;
		psly_record(&(h->next), next);
		//myhprec->HP[1] = next;
		if(Head != h) {
			psly_remove(next);
            psly_remove(h);
			continue;
		}
		if(next == NULL) {
            psly_remove(next);
			psly_remove(h);
			return -1000000;	
		}
		if(h == t){
			psly_remove(next);
			psly_remove(h);
			__sync_bool_compare_and_swap(&Tail, t, next);
			continue;
		}
		data = next->value;
		//myhprec->HP[1] = NULL;
		//myhprec->HP[0] = NULL;
		if(__sync_bool_compare_and_swap(&Head, h, next)) {
			psly_remove(next);
			psly_remove(h);
			retireNode(h);
			break;
		}
		psly_remove(next);
		psly_remove(h);	
	}
	//myhprec->HP[0] = NULL;
	//myhprec->HP[1] = NULL;	
	return data;
}	
           

concurrent lock-free ordered singly linked-list with reference counting(基于Harris's list的帶引用計數,可回收節點的list.)

----

首先,為什麼要為每塊共享記憶體維護N個變量,這樣做不僅浪費記憶體而且增加搜尋代價。理想情況下我們應該隻需要一個記憶體資料來處理一個共享記憶體。那麼我們怎麼來處理多個線程引用它的情況呢?這裡的一個自然的想法是引入引用計數(reference count),(注意,這裡的引用計數是線程引用共享資料,與另一個對象層次間的引用無關),我們用refcount代表當下通路它的線程數,refCount>0的記憶體絕對不會被回收,refCount == 0代表沒有線程引用它,處于可以被回收的狀态。我們将這樣的一個記憶體資料稱為Record,

struct Record{
    //資料字段
    void* pointer;
    int refcount;
}
           

第二,由于Record記憶體本身是要被維護的,是以我們的政策是随需配置設定,已配置設定的不在回收,我們将Record作為一種可重用的資源,用于追蹤那些共享記憶體。那麼意味着Record能夠被高效地從資源庫并發擷取和傳回。 

這裡的技巧是用一個固定的self字段,标記Record本身,(比如低10位作為indexNum,接下來4位作為arrayNum),用于唯一的标記Record自身。這麼做的目的是為了支援後面的map。

想象一下,我們已經有了很多共享的pointer(位址),每個配置設定一個Record,接着我們要如何組織它們呢?進而高效的支援add,remove,search操作。

這裡的方式是用一個大的map,它具有一個大的buckets數目比如1024。

typedef struct RecordList {
    Record* volatile head ;
    Record* volatile tail ;
} RecordList ;


typedef struct RecordMap {
    RecordList* lists[1024] ;
} RecordMap ;
           

RecordList初始化之後久擁有了固定的head/tail。

很明顯,我們這裡自然的政策是将具有相同字尾的位址base到同一個list上,同時由于開辟的記憶體位址一般是單調遞增,并且保持16位元組對齊,我們可以根據位址本身來避開了hits。

我們可以抽象出如下的接口如下:

struct Record{
    //Record庫維護字段
    int volatile next ; 
    int self;
    //資料字段
    void* pointer;
    int refcount;
}


int record(void* pointer) {
  long key = ((long) pointer) >> 4 ;
  RecordList* list = map.lists[key & (1024-1)];
  return handle_records(list, pointer, RECORD);
}
           

那麼接下來的問題是如何在一個list上組織那些帶有同樣字尾位址的Record?

這裡推薦著名的[Harris' list][11],它做到用一個并發無鎖單連結清單來組織有序資料,支援增加、删除、搜尋。

我們需要給Record再加一個retire'bit和一個long字段nextRecord,它裡面包含四個字段:

 - 下一個Record的self(位移)(最右23位 next 8百萬+節點)

 - 下一個Record本身的版本号(20位 nextV 百萬+個版本号)

 - 該Record本身的版本号(20位 nodeV 百萬+個版本号)

 - 該Record是否已經被邏輯删除的标記(最高位 isDeleted)

 - retireBit代表該Record是否被retire

struct Record{
    //Record庫維護字段
    int volatile next ; 
    int self;
    //資料字段
    void* pointer;
    int refcount;
    //retire
    bool retireBit;
    long nextRecord;
}
           

然後,對它進行了相當程度的改造,改造的地方如下:

 - **不将next字段(harris's list的next字段,不是Record資源庫的)作為位址,将它作為數值,把Record的self字段原子交換過去。(CAS nextRecord)**

 - **對于被切除的Record,我們要自己回收這些Record并将它重新放入資源庫(Harris'list基于GC可以不用管)。(nextRecord)**

 - **因為Record是可複用,是以我們為它增加版本号字段(nodeV),同時它的next字段也需要版本号來回避ABA問題(nextV) **

 - **原來的insert操作有對象的情況下什麼都不做,我們給引用計數+1 (refcount),如果發現已經retire那麼回退**

 - **需要一個bit來标記這塊記憶體被retire了(retireBit),同時需要另一個bit來标記是否被邏輯删除(邏輯删除後,該塊記憶體就可以被回收了,nextRecord最高位)**

注意,由于以上的操作存在互相依賴,比如新增的Record不能連結到已經邏輯删除的Record上。  

是以,當一個Record被從資源庫get到之後,生命周期為:

 1. 它的nodeVersion+1,refCount+1,clear DELETE,同時将它的nextValue設定為後續節點。

 2. 接着它會經曆refCount +1/-1的序列操作。

 3. nextVersion+1 & nextValue改變 的序列操作。(與2沒有先後關系)

 4. retire的bit被設定的操作。(會導緻RECORD的失敗或則回退)

 5. 最高位DELETE被cmpandswap。(之後該pointer訓示的位址可以被回收)

 6. 包含該Record在内連在一起被DELETE的節點,都從list上切除,依次回收,并将Record傳回資源庫。

 7. 該Record被return回資源庫。 

這裡最核心的技術就是基于兩點判定節點沒有失效:

 1. **共享變量的值依舊是傳入位址值。** 

 2. **節點的 nodeVersion 以及 DELETE bit維持原狀。**

可擴充的Reocrd資源庫

----

我們需要一些Record去持有位址,我們采用随需批量malloc的方式。這種方式的特點是:

 - 可重複利用:大量malloc和free的方式對作業系統的記憶體維護也不友好,我們替換為支援并發的getRecord/returnRecord/idx_Record。

 - 多用途(也就是nextRecord的工作方式):因為我們開辟的是一塊大記憶體,使用的是其中一小塊資料。那麼每小塊資料都可以唯一的标記為:大記憶體首位址+大記憶體中的offset。這樣,在64位機器上,我們不需要再使用64位位址,而隻需要用較小的位數便可以,進而帶來了極大的好處(當原子操作時,64位中剩下的位數可以用于其他作用)。同時,20位便可以标記(1<<20,百萬多個小塊記憶體)。

 - 支援多種方式實作,隻需要提供getRecord/returnRecord/idx_Record接口(下面會給出具體的一種實作),

 - 而我們付出的代價僅僅隻是一定固定長度的數組。

下面給出一個大概例子,采用的經典的[MSQueue][12],當然,存在其他更高效的方式[lcrq][13]:

int PSLY_Record_IDXNUM = 16; 
int PSLY_Record_IDXBIT = ((1 << 16) - 1); 

int PSLY_Record_ARRAYNUM_MAX = (1 << 4); 
int PSLY_Record_ARRAYNUM = (1 << 2);
int PSLY_Record_ARRAYBITS = ((1 << 4) -1);  
int PSLY_Record_ARRBIT = (((1 << 4) - 1) << 16);  
int PSLY_Record_ARRBITR = ((1 << 4) - 1);  

int PSLY_Record_ARRIDXBIT = ((((1 << 4) - 1) << 16) | ((1 << 16) - 1)); 

int PSLY_Record_NEXTIDXNUM  = 16;  
int PSLY_Record_NEXTIDXBIT  = ((1 << 16) - 1);  
             
int PSLY_Record_NEXTTAILNUM = 1;   
int PSLY_Record_NEXTTAILBIT = (((1 << 1) - 1) << 16);   
   
int PSLY_Record_NEXTVERSIONNUM = (32 - 1 - 16);      
int PSLY_Record_NEXTVERSIONBIT = ((~0)^((((1 << 1) - 1) << 16) | ((1 << 16) - 1)));  
int PSLY_Record_NEXTVERSIONONE = (1 + ((((1 << 1) - 1) << 16) | ((1 << 16) - 1))); 
  
  
int PSLY_Record_TAILIDXNUM = 16;  
int PSLY_Record_TAILIDXBIT = ((1 << 16) - 1);  
  
int PSLY_Record_TAILVERSIONNUM = (32 - 16);  
int PSLY_Record_TAILVERSIONBIT = ((~0) ^ ((1 << 16) - 1));   
int PSLY_Record_TAILVERSIONONE = (1 + ((1 << 16) - 1));  
   
   
int PSLY_Record_HEADIDXNUM = 16;  
int PSLY_Record_HEADIDXBIT = ((1 << 16) - 1);   
   
int PSLY_Record_HEADVERSIONNUM = (32 - 16);   
int PSLY_Record_HEADVERSIONBIT = ((~0) ^ ((1 << 16) - 1));  
int PSLY_Record_HEADVERSIONONE = (1 + ((1 << 16) - 1));  


typedef struct Record { 
    int volatile next __attribute__((aligned(128))); 
    int self ; 
    long volatile nextRecord __attribute__((aligned(128))); 
    void* volatile pointer ; 
} Record __attribute__((aligned(128))); 

typedef struct RecordQueue { 
    int volatile head ; 
    int volatile tail ; 
} RecordQueue ;  
static Record* volatile psly_Records[1 << 4]; 
static RecordQueue volatile psly_Record_queues[1 << 4]; 
static int volatile recordTake = 0;  
  
Record* idx_Record(int index) {   
    return psly_Records[(index & PSLY_Record_ARRBIT) >> PSLY_Record_IDXNUM] + (index & PSLY_Record_IDXBIT);   
} 
   
      
Record* get_Record() {             
	for(;;) {
		int localArrayNum = PSLY_Record_ARRAYNUM;   
		//取最高隊列
		int array = localArrayNum - 1;
		RecordQueue* queue = psly_Record_queues + array;
		Record* arr = psly_Records[array];
		for(;;){
			int headIndex = (queue->head);
			int indexHead = headIndex & PSLY_Record_HEADIDXBIT;
			Record* head = arr + indexHead;
			int tailIndex = (queue->tail);
			int indexTail = tailIndex & PSLY_Record_TAILIDXBIT;

			int nextIndex = (head->next);

			if(headIndex == (queue->head)) {
				if(indexHead == indexTail){
					if((nextIndex & PSLY_Record_NEXTTAILBIT) == PSLY_Record_NEXTTAILBIT)
						break;
					__sync_bool_compare_and_swap(&queue->tail, tailIndex, (((tailIndex & PSLY_Record_TAILVERSIONBIT) + PSLY_Record_TAILVERSIONONE ) & PSLY_Record_TAILVERSIONBIT)|(nextIndex & PSLY_Record_TAILIDXBIT));    
				} else {
					if(__sync_bool_compare_and_swap(&queue->head, headIndex, (((headIndex & PSLY_Record_HEADVERSIONBIT) + PSLY_Record_HEADVERSIONONE) & PSLY_Record_HEADVERSIONBIT)|(nextIndex & PSLY_Record_HEADIDXBIT))) {
						return head;
					}
				}
			}

		}	
	//	輪詢某些隊列
    for(int i = 0; i < localArrayNum; ++i) {
        int array = __sync_fetch_and_add(&recordTake, 1) % localArrayNum;
        RecordQueue* queue = psly_Record_queues + array;
    	Record* arr = psly_Records[array];
        for(;;){
            int headIndex = (queue->head);
            int indexHead = headIndex & PSLY_Record_HEADIDXBIT;
            Record* head = arr + indexHead;
            int tailIndex = (queue->tail);
            int indexTail = tailIndex & PSLY_Record_TAILIDXBIT;
            int nextIndex = (head->next);

            if(headIndex == (queue->head)) {
                if(indexHead == indexTail){
                    if((nextIndex & PSLY_Record_NEXTTAILBIT) == PSLY_Record_NEXTTAILBIT)
                        break;
                    __sync_bool_compare_and_swap(&queue->tail, tailIndex, (((tailIndex & PSLY_Record_TAILVERSIONBIT) + PSLY_Record_TAILVERSIONONE ) & PSLY_Record_TAILVERSIONBIT)|(nextIndex & PSLY_Record_TAILIDXBIT));    
                } else {
                    if(__sync_bool_compare_and_swap(&queue->head, headIndex, (((headIndex & PSLY_Record_HEADVERSIONBIT) + PSLY_Record_HEADVERSIONONE) & PSLY_Record_HEADVERSIONBIT)|(nextIndex & PSLY_Record_HEADIDXBIT))) {
                        return head;
                    }
                }
            }
        }
    }
  // 周遊所有隊列
    for(int i = 0; i < localArrayNum; ++i) {
        int array = i;
        RecordQueue* queue = psly_Record_queues + array;
        Record* arr = psly_Records[array];
        for(;;){
            int headIndex = (queue->head);
            int indexHead = headIndex & PSLY_Record_HEADIDXBIT;
            Record* head = arr + indexHead;
            int tailIndex = (queue->tail);
            int indexTail = tailIndex & PSLY_Record_TAILIDXBIT;

            int nextIndex = (head->next);

            if(headIndex == (queue->head)) {
                if(indexHead == indexTail){
                    if((nextIndex & PSLY_Record_NEXTTAILBIT) == PSLY_Record_NEXTTAILBIT)
                        break;
                    __sync_bool_compare_and_swap(&queue->tail, tailIndex, (((tailIndex & PSLY_Record_TAILVERSIONBIT) + PSLY_Record_TAILVERSIONONE ) & PSLY_Record_TAILVERSIONBIT)|(nextIndex & PSLY_Record_TAILIDXBIT));    
                } else {
                    if(__sync_bool_compare_and_swap(&queue->head, headIndex, (((headIndex & PSLY_Record_HEADVERSIONBIT) + PSLY_Record_HEADVERSIONONE) & PSLY_Record_HEADVERSIONBIT)|(nextIndex & PSLY_Record_HEADIDXBIT))) {
                        return head;
                    }
                }
            }

        }
    }
			//不夠增加
		if(localArrayNum == PSLY_Record_ARRAYNUM_MAX)
			return NULL;
		if(localArrayNum == PSLY_Record_ARRAYNUM) {
			if(psly_Records[localArrayNum] == NULL) {
				int array_ = localArrayNum;
				Record* record;
				void * ptr;
				int ret = posix_memalign(&ptr, 4096, (1 << PSLY_Record_IDXNUM) * sizeof(Record));
				record = ptr;
				memset(record, 0, (1 << PSLY_Record_IDXNUM) * sizeof(Record)); 
				for(int j = 0; j < (1 << PSLY_Record_IDXNUM) - 1; ++j){ 
				  record->self = (array_ << PSLY_Record_IDXNUM) | j; 
				  record->next = j+1; 
				  record->pointer = NULL;\
				  record->nextRecord = 0;
				  record += 1; 
				}    
				record->self = (array_ << PSLY_Record_IDXNUM) | ((1 << PSLY_Record_IDXNUM) - 1); 
				record->next = PSLY_Record_NEXTTAILBIT;  
				record->pointer = NULL;
				record->nextRecord = 0;
				//printf("I'm here %d %ld\n", localArrayNum, pthread_self());
				if(!__sync_bool_compare_and_swap(&psly_Records[array_], NULL, ptr))
					{free(ptr);}
				else
					/*printf("extend to %d\n", localArrayNum + 1)*/;
			}
		   	if(localArrayNum == PSLY_Record_ARRAYNUM)
				__sync_bool_compare_and_swap(&PSLY_Record_ARRAYNUM, localArrayNum, localArrayNum + 1);
		}
	}
}
  
void return_Record(Record* record) {
  long local = (record->next);
  local |= PSLY_Record_NEXTTAILBIT;
  record->next = local;
    int self = record->self;
    int array = (self >> PSLY_Record_IDXNUM) & PSLY_Record_ARRBITR;
    Record* arr = psly_Records[array];
    RecordQueue* queue = psly_Record_queues + array;
    for(;;) {
        int tailIndex = (queue->tail);
        int indexTail = tailIndex & PSLY_Record_TAILIDXBIT;
        Record* tail = arr + indexTail;
        int nextIndex = (tail->next);

        if(tailIndex == (queue->tail)){
            if((nextIndex & PSLY_Record_NEXTTAILBIT) == PSLY_Record_NEXTTAILBIT) {
                if(__sync_bool_compare_and_swap(&tail->next, nextIndex, (((nextIndex & PSLY_Record_NEXTVERSIONBIT) + PSLY_Record_NEXTVERSIONONE) & PSLY_Record_NEXTVERSIONBIT)|(self & PSLY_Record_NEXTIDXBIT))){
                    __sync_bool_compare_and_swap(&queue->tail, tailIndex, (((tailIndex & PSLY_Record_TAILVERSIONBIT) + PSLY_Record_TAILVERSIONONE) & PSLY_Record_TAILVERSIONBIT)|(self & PSLY_Record_TAILIDXBIT));
                    return;
                }
            } else {
                __sync_bool_compare_and_swap(&queue->tail, tailIndex, (((tailIndex & PSLY_Record_TAILVERSIONBIT) + PSLY_Record_TAILVERSIONONE) & PSLY_Record_TAILVERSIONBIT)|(nextIndex & PSLY_Record_TAILIDXBIT));
            }
        }
    }
}  


typedef struct RecordList {
    Record* volatile head ;
  Record* volatile tail ;
} RecordList ;

typedef struct RecordMap {
    volatile RecordList* lists[131070] ;
} RecordMap ;

static volatile RecordMap map;

#define INIT_RESOURCE(listNum)   \
  for(int i = 0; i < (PSLY_Record_ARRAYNUM); ++i){ \
    Record* record; \
    void * ptr;\
    int ret = posix_memalign(&ptr, 4096, (1 << PSLY_Record_IDXNUM) * sizeof(Record));\
    psly_Records[i] = record = ptr; \
    memset(record, 0, (1 << PSLY_Record_IDXNUM) * sizeof(Record)); \
    for(int j = 0; j < (1 << PSLY_Record_IDXNUM) - 1; ++j){ \
      record->self = (i << PSLY_Record_IDXNUM) | j; \
      record->next = j+1; \
	  record->pointer = NULL;\
      record->nextRecord = 0;\
      record += 1; \
    }    \
    record->self = (i << PSLY_Record_IDXNUM) | ((1 << PSLY_Record_IDXNUM) - 1); \
    record->next = PSLY_Record_NEXTTAILBIT;  \
	record->pointer = NULL;\
    record->nextRecord = 0;\
  }\
  for(int i = 0; i < PSLY_Record_ARRAYNUM_MAX; ++i){\
    psly_Record_queues[i].head = 0; \
    psly_Record_queues[i].tail = (1 << PSLY_Record_IDXNUM) - 1; \
  } \
  for(int i = 0; i < listNum; ++i) { \
    void* ptr;\
    int ret = posix_memalign(&ptr, 4096, sizeof(RecordList));\
    Record* head = get_Record();\
    Record* tail = get_Record();\
    head->nextRecord = newNext(head->nextRecord, tail); \
	map.lists[i] = ptr;\
    map.lists[i]->head = head; \
    map.lists[i]->tail = tail; \
  }

#define UNINIT_RESOURCE(listNum)  \
    for(int i = 0; i < (PSLY_Record_ARRAYNUM); ++i){ \
        free(psly_Records[i]); \
    } \
	for(int i = 0; i < listNum; ++i) {\
		free(map.lists[i]);\
	}
           

這裡的psly_Records[1 << 4],psly_Record_queues[1 << 4] 代表總共可以提供16組,每組65536個資料(PSLY_Record_IDXNUM = 16),初始配置設定4組資料(PSLY_Record_ARRAYNUM = (1 << 2)),之後如果不夠就擴充一組。

**不斷的優化**

----

**局部變量**:

對于一塊共享記憶體S,我們為它的Record保持一個局部變量reordS,隻需要在RECORD時候傳回,後續的REMOVE跟RETIRE就不需要去查詢了,相對于之前的enqueue給出的例子如下:

+++為變動代碼

void enqueue(int value){
	NodeType* node;
	posix_memalign(&node, 64, sizeof(NodeType));
	memset(node, 0, sizeof(NodeType));
	node->value = value;
	node->next = NULL;
	NodeType* t;
	for(;;){
		t = Tail;
+++	 Record* recordT = psly_record(&Tail, t);
+++     if(recordT == NULL)
+++         continue;
		if(Tail != t) {
+++		 psly_remove(recordT);
			continue; 
		}
		NodeType* next = t->next;
		if(Tail != t) {
+++		 psly_remove(recordT);
			continue;
		}
		if(next != NULL){ 
+++		 psly_remove(recordT);
			__sync_bool_compare_and_swap(&Tail, t, next);
			continue;
		}
		if(__sync_bool_compare_and_swap(&t->next, NULL, node)) {
+++		 psly_remove(recordT);
			break; 
		}
+++	    psly_remove(recordT);
	}
	__sync_bool_compare_and_swap(&Tail, t, node);
}
           

**這樣以來,每次操作,最需要在RECORD時候周遊一次。**

我們還可以做的更好

**線程私有資料**:

有些場景的共享記憶體,會在一段長期時間内不會改變,這種情況的話每次都去查詢maplist顯得很浪費,我們可以做個緩存來節省查詢。

對于一塊确定的共享記憶體S,我們嘗試為每線程配置一個私有變量(static __tread),用一段結構化的代碼跟蹤它的Record,這樣以來就不需要每次都查詢maplist了,

雖然不是非常合适,但我們還是拿前面Hazard pointer的例子做個示例,代碼如下:

void enqueue(int value){
	NodeType* node;
	posix_memalign(&node, 64, sizeof(NodeType));
	memset(node, 0, sizeof(NodeType));
	node->value = value;
	node->next = NULL;
	NodeType* t;
	for(;;){
		t = Tail;
+++     static __thread LocalRecord localRecordT;
		Record* recordT;
+++		if(localRecordT.pointer == NULL) {
+++			recordT = psly_record(&Tail, t, NULL, NULL);
			if(recordT == NULL)
				continue;
+++        	else {
+++			    localRecordT.pointer = t;
+++				localRecordT.record = recordT;
+++				localRecordT.nextRecord = recordT->nextRecord;
+++			}
+++		} else {
+++			if(t != localRecordT.pointer || isChange(localRecordT.record, localRecordT.nextRecord) || t != localRecordT.record->pointer) {
+++				localRecordT.pointer = NULL;
+++				continue;
+++			} 
+++			recordT = psly_record(&Tail, t, localRecordT.record, localRecordT.nextRecord);
+++			if(recordT == NULL) {
+++				localRecordT.pointer = NULL;
+++				continue;
+++			}
+++		}
		if(Tail != t) {
			psly_remove(recordT);
			continue; 
		}
		NodeType* next = t->next;
		if(Tail != t) {
			psly_remove(recordT);
			continue;
		}
		if(next != NULL){ 
			psly_remove(recordT);
			__sync_bool_compare_and_swap(&Tail, t, next);
			continue;
		}
		if(__sync_bool_compare_and_swap(&t->next, NULL, node)) {
			psly_remove(recordT);
			break;
		}
		psly_remove(recordT);
	}
	__sync_bool_compare_and_swap(&Tail, t, node);
}
           

這種場景下,假如我們系統共有N個線程,那麼對于一個記憶體,在它的整個生命周期裡,需要查詢maplist的次數上限為N+1次!

這種方式極大地減少了查詢的次數,進而為設計某些高效的共享資料結構提供了可能。

**連結清單周遊保留前驅**:

當我們查詢maplist時候,有可能會因為前驅節點的失效,而要重新在該list的head開始周遊,假如連結清單過長會代價較大,是以我們在周遊過程中維護些前驅節點可能會好點。

示例代碼如下:

保留:

if(currKey != key) {
		int bucket;
		if((steps & STEPS_) == 0 && (bucket = (steps >> STEPBIT)) < MAXPREV) {
			Prevs* step = &prevs_[bucket];
			step->r = prev;
			step->rNext = prevNext;
		}
		++steps; 
        prev = curr;
        prevNext = currNext;
      }
           

失效之後啟用保留的前驅:

--steps;
      for(;;) {
        int bucket = steps >> STEPBIT;
        bucket = bucket < MAXPREV ? bucket: (MAXPREV - 1);
        Prevs* prevs = &prevs_[bucket];
        prev = prevs->r;
        prevNext = prev->nextRecord;
        long prevNextKeep = prevs->rNext;
        if((prevNextKeep & NODEBITS) != (prevNext & NODEBITS) || (prevNext & REFCBITS) == DELETED) {
          steps -= STEPS;
        } else {
          prevs->rNext = prevNext;
          curr = idx_Record(prevNext);
          break;
        }
      }
      steps = steps & (~STEPS_);
           

**這裡的steps記錄我們目前所在的位置,STEPS表達我們隔幾個節點記錄一次(極端情況下可以拷貝所有周遊過的節點)。**

批量擷取Record

----------

批量擷取Record方式指的是,由于擷取Record的競争過于激烈,我們不再每次擷取一個,而是每次擷取一批,剩餘的作為線程私有之後使用,維護好資料的 未使用/使用 狀态,以及作為整體傳回給資源庫。進而極大地減少了線程間的競争。

統一回收的大塊資料

---------

對于某些場景,許多小塊的共享資料同時産生,又可以同時回收。我們不再為每個記憶體位址配置設定一個Record,我們嘗試将一塊大記憶體的分割為許多小記憶體來使用,如此一來小記憶體統一映射到大記憶體首位址的Record,直接省去了插傳入連結表的操作。我需要對Record進行改造,retireBit不再作為一個元素使用,這裡可以換成short

struct Record{
    //Record庫維護字段
    int volatile next ; 
    int self;
    //資料字段
    void* pointer;
    int refcount;
    //retire
+++ short retireNum;
    long nextRecord;
}
           

同時,我們的psy_record接口增加一個參數retireNum:

recordT = psly_record(void** ppointer, void* pointer, Record* record, long nextRecord, short retireNum);
           

 1. 我們對于某個記憶體pointer,我們映射首位址的方式要依賴于如何配置設定記憶體。

 2. retire操作現在要給retireNum減一。

最後講一下,唯一需要注意的是,如果記憶體已經處于可回收狀态:

 1. 所有分片記憶體都已經retire。

 2. 觀察到一次refcount為0.

那麼我們便可以立即回收内部,因為對于企圖使用該記憶體的線程而言,要麼正在遞增引用計數,要麼已經完成通路。完成通路的沒關系,根據我們的設計,遞增引用計數的線程稍後會回退減一,進而不再通路這塊記憶體。

帶參數的retire

----

假如我們在開發一個并發資料結構,它本身将會被共享/動态開辟/回收,資料本身帶有指針,指針指向的資料随着程式的執行變得不滿足需求,進而我們要重新配置這一資料,并且回收原資料。

**這種情況下我們能不能正确回收所有資料呢?**

答案是可以的。

對于搜優記憶體,如果滿足

>   共享的 / 需要回收的 / 沒有明确的可以安全回收記憶體的邏輯點

**我們都采用3RE&S Protocol提供的語義來回收記憶體。**

因為

 1. 隻要采用這種方式,内部記憶體同樣能夠被record/remove/retire/scan維護好,而新開辟的記憶體将正确地作為内部記憶體。

 2. 任何時候,資料結構本身都是完整的。同時,當整體可被回收時,不會存線上程讀寫内部資料。安全性得到保證

**最後,我們必須自己提供freeMemory函數用于先回收内部記憶體,再回收外部對象的記憶體。**

  [1]: /img/bVYqVm

  [2]: https://erdani.com/publications/cuj-2004-12.pdf

  [3]: https://github.com/pramalhe/ConcurrencyFreaks/blob/master/papers/hazarderas-2017.pdf

  [4]: http://web.cecs.pdx.edu/~walpole/class/cs510/fall2011/slides/07.pdf

  [5]: https://www.cl.cam.ac.uk/techreports/UCAM-CL-TR-579.pdf

  [6]: http://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.135.9418&rep=rep1&type=pdf

  [7]: http://blog.jobbole.com/107955/

  [8]: http://www.non-blocking.com/download/GidPST05_LockFreeGC_TR.pdf

  [9]: /img/bVYBoJ

  [10]: /img/bVYBqU

  [11]: https://www.microsoft.com/en-us/research/wp-content/uploads/2001/10/2001-disc.pdf

  [12]: http://www.cs.rochester.edu/~scott/papers/1996_PODC_queues.pdf

  [13]: http://www.cs.tau.ac.il/~mad/publications/ppopp2013-x86queues.pdf

繼續閱讀