多線程的那點兒事
多線程程式設計是現代軟體技術中很重要的一個環節。要弄懂多線程,這就要牽涉到多程序?當然,要了解到多程序,就要涉及到作業系統。不過大家也不要緊張,聽我慢慢道來。這其中的環節其實并不複雜。
(1)單CPU下的多線程
在沒有出現多核CPU之前,我們的計算資源是唯一的。如果系統中有多個任務要處理的話,那麼就需要按照某種規則依次排程這些任務進行處理。什麼規則呢?可以是一些簡單的排程方法,比如說
1)按照優先級排程
2)按照FIFO排程
3)按照時間片排程等等
當然,除了CPU資源之外,系統中還有一些其他的資源需要共享,比如說記憶體、檔案、端口、socket等。既然前面說到系統中的資源是有限的,那麼擷取這些資源的最小單元體是什麼呢,其實就是程序。
舉個例子來說,在linux上面每一個享有資源的個體稱為task_struct,實際上和我們說的程序是一樣的。我們可以看看task_struct(linux 0.11代碼)都包括哪些内容,
view plain
struct task_struct {
long state;
long counter;
long priority;
long signal;
struct sigaction sigaction[32];
long blocked;
int exit_code;
unsigned long start_code,end_code,end_data,brk,start_stack;
long pid,father,pgrp,session,leader;
unsigned short uid,euid,suid;
unsigned short gid,egid,sgid;
long alarm;
long utime,stime,cutime,cstime,start_time;
unsigned short used_math;
int tty;
unsigned short umask;
struct m_inode * pwd;
struct m_inode * root;
struct m_inode * executable;
unsigned long close_on_exec;
struct file * filp[NR_OPEN];
struct desc_struct ldt[3];
struct tss_struct tss;
};
每一個task都有自己的pid,在系統中資源的配置設定都是按照pid進行處理的。這也就說明,程序确實是資源配置設定的主體。
這時候,可能有朋友會問了,既然task_struct是資源配置設定的主體,那為什麼又出來thread?為什麼系統排程的時候是按照thread排程,而不是按照程序排程呢?原因其實很簡單,程序之間的資料溝通非常麻煩,因為我們之是以把這些程序分開,不正是希望它們之間不要互相影響嘛。
假設是兩個程序之間資料傳輸,那麼需要如果需要對共享資料進行通路需要哪些步驟呢,
1)建立共享記憶體
2)通路共享記憶體->系統調用->讀取資料
3)寫入共享記憶體->系統調用->寫入資料
要是寫個代碼,大家可能就更明白了,
view plain
#include <unistd.h>
#include <stdio.h>
int value = 10;
int main(int argc, char* argv[])
{
int pid = fork();
if(!pid){
Value = 12;
return 0;
}
printf("value = %d\n", value);
return 1;
}
上面的代碼是一個建立子程序的代碼,我們發現列印的value數值還是10。盡管中間建立了子程序,修改了value的數值,但是我們發現列印下來的數值并沒有發生改變,這就說明了不同的程序之間記憶體上是不共享的。
那麼,如果修改成thread有什麼好處呢?其實最大的好處就是每個thread除了享受單獨cpu排程的機會,還能共享每個程序下的所有資源。要是排程的機關是程序,那麼每個程序隻能幹一件事情,但是程序之間是需要互相互動資料的,而程序之間的資料都需要系統調用才能應用,這在無形之中就降低了資料的處理效率。
(2)多核CPU下的多線程
沒有出現多核之前,我們的CPU實際上是按照某種規則對線程依次進行排程的。在某一個特定的時刻,CPU執行的還是某一個特定的線程。然而,現在有了多核CPU,一切變得不一樣了,因為在某一時刻很有可能确實是n個任務在n個核上運作。我們可以編寫一個簡單的open mp測試一下,如果還是一個核,運作的時間就應該是一樣的。
view plain
#include <omp.h>
#define MAX_VALUE 10000000
double _test(int value)
{
int index;
double result;
result = 0.0;
for(index = value + 1; index < MAX_VALUE; index +=2 )
result += 1.0 / index;
return result;
}
void test()
{
int index;
int time1;
int time2;
double value1,value2;
double result[2];
time1 = 0;
time2 = 0;
value1 = 0.0;
time1 = GetTickCount();
for(index = 1; index < MAX_VALUE; index ++)
value1 += 1.0 / index;
time1 = GetTickCount() - time1;
value2 = 0.0;
memset(result , 0, sizeof(double) * 2);
time2 = GetTickCount();
#pragma omp parallel for
for(index = 0; index < 2; index++)
result[index] = _test(index);
value2 = result[0] + result[1];
time2 = GetTickCount() - time2;
printf("time1 = %d,time2 = %d\n",time1,time2);
return;
}
(3)多線程程式設計
為什麼要多線程程式設計呢?這其中的原因很多,我們可以舉例解決
1)有的是為了提高運作的速度,比如多核cpu下的多線程
2)有的是為了提高資源的使用率,比如在網絡環境下下載下傳資源時,時延常常很高,我們可以通過不同的thread從不同的地方擷取資源,這樣可以提高效率
3)有的為了提供更好的服務,比如說是伺服器
4)其他需要多線程程式設計的地方等等
多線程建立其實十分簡單,在windows系統下面有很多函數可以建立多線程,比如說_beginthread。我們就可以利用它為我們編寫一段簡單的多線程代碼,
view plain
#include <windows.h>
#include <process.h>
#include <stdio.h>
unsigned int value = 0;
void print(void* argv)
{
while(1){
printf("&value = %x, value = %d\n", &value, value);
value ++;
Sleep(1000);
}
}
int main()
{
_beginthread( print, 0, NULL );
_beginthread( print, 0, NULL);
while(1)
Sleep(0);
return 1;
}
注意,在VC上面編譯的時候,需要打開/MD開關。具體操作為,【project】->【setting】->【c/c++】->Category【Code Generation】->【Use run-time library】->【Debug Multithreaded】即可。
通過上面的示例,我們看到作為共享變量的value事實上是可以被所有的線程通路的。這就是線程資料同步的最大優勢——友善,直接。因為線程之間除了堆棧空間不一樣之外,代碼段和資料段都是在一個空間裡面的。是以,線程想通路公共資料,就可以通路公共資料,沒有任何的限制。
當然,事物都有其兩面性。這種對公共資源的通路模式也會導緻一些問題。什麼問題呢?我們看了就知道了。
現在假設有一個池塘,我們雇兩個人來喂魚。兩個人不停地對池塘裡面的魚進行喂食。我們規定在一個人喂魚的時候,另外一個人不需要再喂魚,否則魚一次喂兩回就要撐死了。為此,我們安裝了一個牌子作為警示。如果一個人在喂魚,他會把牌子設定為FALSE,那麼另外一個人看到這個牌子,就不會繼續喂魚了。等到這個人喂完後,他再把牌子繼續設定為TRUE。
如果我們需要把這個故事寫成代碼,那麼怎麼寫呢?朋友們試試看,
view plain
while(1){
if( flag == true){
flag = false;
do_give_fish_food();
flag = true;
}
Sleep(0);
}
上面的代碼看上去沒有問題了,但是大家看看代碼的彙編代碼,看看是不是存在隐患。因為還會出現兩個人同時喂食的情況,
view plain
23: while(1){
004010E8 mov eax,1
004010ED test eax,eax
004010EF je do_action+56h (00401126)
24: if( flag == true){
004010F1 cmp dword ptr [flag (00433e04)],1
004010F8 jne do_action+43h (00401113)
25: flag = false;
004010FA mov dword ptr [flag (00433e04)],0
26: do_give_fish_food();
00401104 call @ILT+15(do_give_fish_food) (00401014)
27: flag = true;
00401109 mov dword ptr [flag (00433e04)],1
28: }
29:
30: Sleep(0);
00401113 mov esi,esp
00401115 push 0
00401117 call dword ptr [[email protected] (004361c4)]
0040111D cmp esi,esp
0040111F call __chkesp (004011e0)
31: }
00401124 jmp do_action+18h (004010e8)
32: }
我們此時假設有兩個線程a和b在不停地進行判斷和喂食操作。設定目前flag = true,此時線程a執行到004010F8處時,判斷魚還沒有喂食,正準備執行指令004010F8,但是還沒有來得及對falg進行設定,此時出現了線程排程。線程b運作到004010F8時,發現目前沒有人喂食,是以執行喂食操作。等到b線程喂食結束,運作到00401113的時候,此時又出現了排程。線程a有繼續運作,因為之前已經判斷了目前還沒有喂食,是以線程a繼續進行了喂食了操作。是以,可憐的魚,這一次就連續經曆了兩次喂食操作,估計有一部分魚要撐死了。
當然魚在這裡之是以會出現撐死的情況,主要是因為line 24和line 25之間出現了系統排程。是以,我們在編寫程式的時候必須有一個牢固的思想意識,如果缺少必須要的手段,程式可以任何時刻任何地點被排程,那此時公共資料的計算就會出現錯誤。
在多線程存在的環境中,除了堆棧中的臨時資料之外,所有的資料都是共享的。如果我們需要線程之間正确地運作,那麼務必需要保證公共資料的執行和計算是正确的。簡單一點說,就是保證資料在執行的時候必須是互斥的。否則,如果兩個或者多個線程在同一時刻對資料進行了操作,那麼後果是不可想象的。
也許有的朋友會說,不光資料需要保護,代碼也需要保護。提出這個觀點的朋友隻看到了資料通路互斥的表象。在程式的運作空間裡面,什麼最重要的呢?代碼嗎?當然不是。代碼隻是為了資料的通路存在的。資料才是我們一切工作的出發點和落腳點。
那麼,有什麼辦法可以保證在某一時刻隻有一個線程對資料進行操作呢?四個基本方法:
(1)關中斷
(2)數學互斥方法
(3)作業系統提供的互斥方法
(4)cpu原子操作
為了讓大家可以對這四種方法有詳細的認識,我們可以進行詳細的介紹。
(1)關中斷
要讓資料在某一時刻隻被一個線程通路,方法之一就是停止線程排程就可以了。那麼怎樣停止線程排程呢?那麼關掉時鐘中斷就可以了啊。在X86裡面的确存在這樣的兩個指令,
view plain
#include <stdio.h>
int main()
{
__asm{
cli
sti
}
return 1;
}
其中cli是關中斷,sti是開中斷。這段代碼沒有什麼問題,可以編過,當然也可以生成執行檔案。但是在執行的時候會出現一個異常告警:Unhandled exception in test.exe: 0xC0000096: Privileged Instruction。告警已經說的很清楚了,這是一個特權指令。隻有系統或者核心本身才可以使用這個指令。
不過,大家也可以想象一下。因為平常我們編寫的程式都是應用級别的程式,要是每個程式都是用這些代碼,那不亂了套了。比如說,你不小心安裝一個低品質的軟體,說不定什麼時候把你的中斷關了,這樣你的網絡就斷了,你的輸入就沒有回應了,你的音樂什麼都沒有了,這樣的環境你受的了嗎?應用層的軟體是千差萬别的,軟體的水準也是參差不齊的,是以系統不可能相信任何一個私有軟體,它相信的隻是它自己。
(2)數學方法
假設有兩個線程(a、b)正要對一個共享資料進行通路,那麼怎麼做到他們之間的互斥的呢?其實我們可以這麼做,
view plain
unsigned int flag[2] = {0};
unsigned int turn = 0;
void process(unsigned int index)
{
flag[index] = 1;
turn = index;
while(flag[1 - index] && (turn == index));
do_something();
flag[index] = 0;
}
其實,學過作業系統的朋友都知道,上面的算法其實就是Peterson算法,可惜它隻能用于兩個線程的資料互斥。當然,這個算法還可以推廣到更多線程之間的互斥,那就是bakery算法。但是數學算法有兩個缺點:
a)占有空間多,兩個線程就要flag占兩個機關空間,那麼n個線程就要n個flag空間,
b)代碼編寫複雜,考慮的情況比較複雜
(3)系統提供的互斥算法
系統提供的互斥算法其實是我們平時開發中用的最多的互斥工具。就拿windows來說,關于互斥的工具就有臨界區、互斥量、信号量等等。這類算法有一個特點,那就是都是依據系統提高的互斥資源,那麼系統又是怎麼完成這些功能的呢?其實也不難。
系統加鎖過程,
view plain
void Lock(HANDLE hLock)
{
__asm {cli};
while(1){
if(){
__asm {sti};
return;
}
__asm{sti};
schedule();
__asm{cli};
}
}
系統解鎖過程,
view plain
void UnLock(HANDLE hLock)
{
__asm {cli};
__asm{sti};
}
上面其實讨論的就是一種最簡單的系統鎖情況。中間沒有涉及到就緒線程的壓入和彈出過程,沒有涉及到資源個數的問題,是以不是很複雜。朋友們仔細看看,應該都可以明白代碼表達的是什麼意思。
(4)CPU的原子操作
因為在多線程操作當中,有很大一部分是比較、自增、自減等簡單操作。因為需要互斥的代碼很少,是以使用互斥量、信号量并不合算。是以,CPU廠商為了開發的友善,把一些常用的指令設計成了原子指令,在windows上面也被稱為原子鎖,常用的原子操作函數有
view plain
InterLockedAdd
InterLockedExchange
InterLockedCompareExchange
InterLockedIncrement
InterLockedDecrement
InterLockedAnd
InterLockedOr
自旋鎖是SMP中經常使用到的一個鎖。所謂的smp,就是對稱多處理器的意思。在工業用的pcb闆上面,特别是伺服器上面,一個pcb闆有多個cpu是很正常的事情。這些cpu互相之間是獨立運作的,每一個cpu均有自己的排程隊列。然而,這些cpu在記憶體空間上是共享的。舉個例子說,假設有一個資料value = 10,那麼這個資料可以被所有的cpu通路。這就是共享記憶體的本質意義。
我們可以看一段Linux 下的的自旋鎖代碼(kernel 2.6.23,asm-i386/spinlock.h),就可有清晰的認識了,
view plain
static inline void __raw_spin_lock(raw_spinlock_t *lock)
{
asm volatile("\n1:\t"
LOCK_PREFIX " ; decb %0\n\t"
"jns 3f\n"
"2:\t"
"rep;nop\n\t"
"cmpb $0,%0\n\t"
"jle 2b\n\t"
"jmp 1b\n"
"3:\n\t"
: "+m" (lock->slock) : : "memory");
}
上面這段代碼是怎麼做到自旋鎖的呢?我們可以一句一句看看,
line 4: 對lock->slock自減,這個操作是互斥的,LOCK_PREFIX保證了此刻隻能有一個CPU通路記憶體
line 5: 判斷lock->slock是否為非負數,如果是跳轉到3,即獲得自旋鎖
line 6: 位置符
line 7: lock->slock此時為負數,說明已經被其他cpu搶占了,cpu休息一會,相當于pause指令
line 8: 繼續将lock->slock和0比較,
line 9: 判斷lock->slock是否小于等于0,如果判斷為真,跳轉到2,繼續休息
line 10: 此時lock->slock已經大于0,可以繼續嘗試搶占了,跳轉到1
line 11: 位置符
上面的操作,除了第4句是cpu互斥操作,其他都不是。是以,我們發現,在cpu之間尋求互斥通路的時候,在某一時刻隻有一個記憶體通路權限。是以,如果其他的cpu之間沒有獲得通路權限,就會不斷地檢視目前是否可以再次申請自旋鎖了。這個過程中間不會停歇,除非獲得通路的權限為止。
總結:
1)在smp上自旋鎖是多cpu互斥通路的基礎
2)因為自旋鎖是自旋等待的,是以處于臨界區的代碼應盡可能短
3)上面的LOCK_PREFIX,在x86下面其實就是“lock”,gcc下可以編過,朋友們可以自己試試
在windows系統中,系統本身為我們提供了很多鎖。通過這些鎖的使用,一方面可以加強我們對鎖的認識,另外一方面可以提高代碼的性能和健壯性。常用的鎖以下四種:臨界區,互斥量,信号量,event。
(1)臨界區
臨界區是最簡單的一種鎖。基本的臨界區操作有,
view plain
InitializeCriticalSection
EnterCriticalSection
LeaveCriticalSection
DeleteCriticalSection
如果想要對資料進行互斥操作的話,也很簡單,這樣做就可以了,
view plain
EnterCriticalSection()
do_something();
LeaveCriticalSection()
(2)互斥鎖
互斥鎖也是一種鎖。和臨界區不同的是,它可以被不同程序使用,因為它有名字。同時,擷取鎖和釋放鎖的線程必須是同一個線程。常用的互斥鎖操作有
view plain
CreateMutex
OpenMutex
ReleaseMutex
那麼,怎麼用互斥鎖進行資料的通路呢,其實不難。
view plain
WaitForSingleObject();
do_something();
ReleaseMutex();
(3)信号量
信号量是使用的最多的一種鎖結果,也是最友善的一種鎖。圍繞着信号量,人們提出了很多資料互斥通路的方案,pv操作就是其中的一種。如果說互斥鎖隻能對單個資源進行保護,那麼信号量可以對多個資源進行保護。同時信号量在解鎖的時候,可以被另外一個thread進行解鎖操作。目前,常用的信号量操作有,
view plain
CreateSemaphore
OpenSemaphore
ReleaseSemaphore
信号量的使用和互斥鎖差不多。關鍵是信号量在初始化的時候需要明确目前資源的數量和信号量的初始狀态是什麼,
view plain
WaitForSingleObject();
do_something();
ReleaseSemaphore();
(4)event對象
event對象是windows下面很有趣的一種鎖結果。從某種意義上說,它和互斥鎖很相近,但是又不一樣。因為在thread獲得鎖的使用權之前,常常需要main線程調用SetEvent設定一把才可以。關鍵是,在thread結束之前,我們也不清楚目前thread獲得event之後執行到哪了。是以使用起來,要特别小心。常用的event操作有,
view plain
CreateEvent
OpenEvent
PulseEvent
ResetEvent
SetEvent
我們對event的使用習慣于分成main thread和normal thread使用。main thread負責event的設定和操作,而normal thread負責event的等待操作。在CreateEvent的時候,要務必考慮清楚event的初始狀态和基本屬性。
對于main thread,應該這麼做,
view plain
CreateEvent();
SetEvent();
WaitForMultiObjects(hThread, );
CloseHandle();
對于normal thread來說,操作比較簡單,
view plain
while(1){
WaitForSingleObject();
}
總結:
(1)關于臨界區、互斥區、信号量、event在msdn上均有示例代碼
(2)一般來說,使用頻率上信号量 > 互斥區 > 臨界區 > 事件對象
(3)信号量可以實作其他三種鎖的功能,學習上應有所側重
(4)紙上得來終覺淺,多實踐才能掌握它們之間的差別
編寫程式不容易,編寫多線程的程式更不容易。相信編寫過多線程的程式都應該有這樣的一個痛苦過程,什麼樣的情況呢?朋友們應該看一下代碼就明白了,
view plain
void data_process()
{
EnterCriticalSection();
if()
{
LeaveCriticalSection();
return;
}
if()
{
return;
}
LeaveCriticalSection();
}
上面的代碼說明了一種情形。這種多線程的互斥情況在代碼編寫過程中是經常遇到的。是以,每次對共享資料進行操作時,都需要對資料進行EnterCriticalSection和LeaveCriticalSection的操作。但是,這中間也不是一帆風順的。很有可能你會遇到各種各樣的錯誤。那麼,這時候你的程式就需要跳出去了。可能一開始遇到error的時候,你還記得需要退出臨界區。但是,如果錯誤多了,你未必記得還有這個操作了。這一錯就完了,别的線程就沒有機會擷取這個鎖了。
那麼,有沒有可能利用C++的特性,自動處理這種情況呢?還真有。我們看看下面這個代碼,
view plain
class CLock
{
CRITICAL_SECTION& cs;
public:
CLock(CRITICAL_SECTION& lock):cs(lock){
EnterCriticalSection(&cs);
}
~CLock() {
LeaveCriticalSection(&cs);
}
}
class Process
{
CRITICAL_SECTION cs;
public:
Process(){
InitializeCriticalSection(&cs);
}
~Process() {}
void data_process(){
CLock lock(cs);
if(){
return;
}
return;
}
}
C++的一個重要特點就是,不管函數什麼時候退出,系統都會自動調用類的析構函數。在Process類的data_process函數中,,函數在開始就建立了一個CLock類。那麼,在建立這個類的時候,其實就開始了臨界區的pk。那麼一旦進入到臨界區當中,在error中能不能及時退出臨界區呢?此時,c++析構函數的優勢出現了。因為不管錯誤什麼時候出現,在函數退出之前,系統都會幫我們善後。什麼善後呢?就是系統會調用CLock的析構函數,也就是退出臨界區。這樣,我們的目的就達到了。
其實,這就是一個c++的trick。
多線程的那點兒事(之原子鎖)
原子鎖是多線程程式設計中的一個特色。然而,在平時的軟體編寫中,原子鎖的使用并不是很多。這其中原因很多,我想主要有兩個方面。第一,關于原子鎖這方面的内容介紹的比較少;第二,人們在程式設計上面習慣于已有的方案,如果沒有特别的需求,不過貿然修改已存在的代碼。畢竟對很多人來說,不求有功,但求無過。保持目前代碼的穩定性還是很重要的。
其實,早在《多線程資料互斥》這篇部落格中,我們就已經介紹過原子鎖。本篇部落客要讨論的就是原子鎖怎麼使用。中間的一些用法隻是我個人的一些經驗,希望能夠抛磚引玉,多聽聽大家的想法。
(1)查找函數中原子鎖
在一些函數當中,有的時候我們需要對滿足某種特性的資料進行查找。在傳統的單核CPU上,優化的空間比較有限。但是,現在多核CPU已經成了主流配置。是以我們完全可以把這些查找工作分成幾個子函數分在幾個核上面并行運算。但是,這中間就會涉及到一個問題,那就是對公共資料的通路。傳統的通路方式,應該是這樣的,
view plain
unsigned int count = 0;
int find_data_process()
{
if(){
EnterCriticalSection(&cs);
count ++;
LeaveCriticalSection(&cs);
}
}
我們看到代碼中間使用到了鎖,那麼勢必會涉及到系統調用和函數排程。是以,在執行效率上會大打折扣。那麼如果使用原子鎖呢?
view plain
unsigned int count = 0;
int find_data_process()
{
if(){
InterLockedIncrement(&count);
}
}
有興趣的朋友可以做這樣一道題目,檢視0~0xFFFFFFFF上有多少數可以被3整除?大家也可以驗證一下用原子鎖代替臨界區之後,代碼的效率究竟可以提高多少。關于多核多線程的程式設計,朋友們可以參考《多線程基礎篇》這篇部落格。
(2)代碼段中的原子鎖
上面的範例隻是介紹了統計功能中的原子鎖。那麼怎麼用原子鎖代替傳統的系統鎖呢?比如說,假設原來的資料通路是這樣的,
view plain
void data_process()
{
EnterCriticalSection(&cs);
do_something();
LeaveCriticalSection(&cs);
}
如果改成原子鎖呢,會是什麼樣的呢?
view plain
unsigned int lock = 0;
void data_process()
{
while(1 == InterLockedCompareExchange(&lock, 1, 0));
do_something();
lock = 0;
}
這裡用原子鎖代替普通的系統鎖,完成的功能其實是一樣的。那麼這中間有什麼差別呢?其實,關鍵要看do_something要執行多久。打個比方來說,現在我們去買包子,但是買包子的人很多。那怎麼辦呢?有兩個選擇,如果賣包子的人手腳麻利,服務一個顧客隻要10秒鐘,那麼即使前面排隊的有50個人,我們隻要等7、8分鐘就可以,這點等的時間還是值得的;但是如果不幸這個賣包子的老闆服務一個顧客要1分鐘,那就悲催了,假使前面有50個人,那我們就要等50多分鐘了。50分鐘對我們來說可是不短的一個時間,我們完全可以利用這個時間去買點水果,交交水電費什麼的,過了這個時間點再來買包子也不遲。
和上面的例子一樣,忙等的方法就是原子鎖,過一會再來的方法就是哪個傳統的系統鎖。用哪個,就看這個do_something的時間值不值得我們等待了。
多線程的那點兒事(之讀寫鎖)
在編寫多線程的時候,有一種情況是十分常見的。那就是,有些公共資料修改的機會比較少。相比較改寫,它們讀的機會反而高的多。通常而言,在讀的過程中,往往伴随着查找的操作,中間耗時很長。給這種代碼段加鎖,會極大地降低我們程式的效率。那麼有沒有一種方法,可以專門處理這種多讀少寫的情況呢?
有,那就是讀寫鎖。
(1)首先,我們定義一下基本的資料結構。
view plain
typedef struct _RWLock
{
int count;
int state;
HANDLE hRead;
HANDLE hWrite;
}RWLock;
同時,為了判斷目前的鎖是處于讀狀态,還是寫狀态,我們要定義一個枚舉量,
view plain
typedef enum
{
STATE_EMPTY = 0,
STATE_READ,
STATE_WRITE
};
(2)初始化資料結構
view plain
RWLock* create_read_write_lock(HANDLE hRead, HANDLE hWrite)
{
RWLock* pRwLock = NULL;
assert(NULL != hRead && NULL != hWrite);
pRwLock = (RWLock*)malloc(sizeof(RWLock));
pRwLock->hRead = hRead;
pRwLock->hWrite = hWrite;
pRwLock->count = 0;
pRwLock->state = STATE_EMPTY;
return pRwLock;
}
(3)擷取讀鎖
view plain
void read_lock(RWLock* pRwLock)
{
assert(NULL != pRwLock);
WaitForSingleObject(pRwLock->hRead, INFINITE);
pRwLock->counnt ++;
if(1 == pRwLock->count){
WaitForSingleObject(pRwLock->hWrite, INFINITE);
pRwLock->state = STATE_READ;
}
ReleaseMutex(pRwLock->hRead);
}
(4)擷取寫鎖
view plain
void write_lock(RWLock* pRwLock)
{
assert(NULL != pRwLock);
WaitForSingleObject(pRwLock->hWrite, INFINITE);
pRwLock->state = STATE_WRITE;
}
(5)釋放讀寫鎖
view plain
void read_write_unlock(RWLock* pRwLock)
{
assert(NULL != pRwLock);
if(STATE_READ == pRwLock->state){
WaitForSingleObject(pRwLock->hRead, INFINITE);
pRwLock->count --;
if(0 == pRwLock->count){
pRwLock->state = STATE_EMPTY;
ReleaseMutex(pRwLock->hWrite);
}
ReleaseMutex(pRwLock->hRead);
}else{
pRwLock->state = STATE_EMPTY;
ReleaseMutex(pRwLock->hWrite);
}
return;
}
文章總結:
(1)讀寫鎖的優勢隻有在多讀少寫、代碼段運作時間長這兩個條件下才會效率達到最大化;
(2)任何公共資料的修改都必須在鎖裡面完成;
(3)讀寫鎖有自己的應用場所,選擇合适的應用環境十分重要;
(4)編寫讀寫鎖很容易出錯,朋友們應該多加練習;
(5)讀鎖和寫鎖一定要分開使用,否則達不到效果。
多線程的那點兒事(之嵌套鎖)
嵌套鎖這個概念,主要是為了根據程式設計中的一種情形引申出來的。什麼情況呢,我們可以具體說明一下。假設你在處理一個公共函數的時候,因為中間涉及公共資料,是以你加了一個鎖。但是,有一點比較悲哀。這個公共函數自身也加了一個鎖,而且和你加的鎖是一樣的。是以,除非你的使用的是信号量,要不然你的程式一輩子也擷取不了這個鎖。
view plain
HANDLE hLock;
void sub_func()
{
WaitForSingleObject(hLock, INFINITE);
do_something();
ReleaseMutex(hLock);
}
void data_process()
{
WaitForSingleObject(hLock, INFINITE);
sub_func();
ReleaseMutex(hLock);
}
出現這種情況的原因很多。很重要的一個方面是因為軟體的各個子產品是不同的人負責的。是以本質上說,我們根本無法确定别人使用了什麼樣的鎖。你也無權不讓别人使用某個鎖。是以,遇到這種情況,隻好靠你自己了。嵌套鎖就是不錯的一個解決辦法。
(1)嵌套鎖的資料結構
view plain
typedef struct _NestLock
{
int threadId;
int count;
HANDLE hLock;
}NestLock;
NestLock* create_nest_lock(HANLDE hLock)
{
NestLock* hNestLock = (NestLock*)malloc(sizeof(NestLock));
assert(NULL != hNestLock);
hNestLock->threadId = hNestLock->count = 0;
hNestLock->hLock = hLock;
return hNestLock;
}
(2)申請嵌套鎖
view plain
void get_nest_lock(NestLock* hNestLock)
{
assert(NULL != hNestLock);
if(hNestLock->threadId == GetThreadId())
{
hNestLock->count ++;
}else{
WaitForSingleObject(hNestLock->hLock);
hNestLock->count = 1;
hNestLock->threadId = GetThreadId();
}
}
(3)釋放鎖
view plain
void release_nest_lock(NestLock* hNestLock)
{
assert(NULL != hNestLock);
assert(GetThreadId() == hNestLock->threadId);
hNestLock->count --;
if(0 == hNestLock->count){
hNestLock->threadId = 0;
ReleaseMutex(hNestLock->hLock);
}
}
文章總結:
(1)嵌套鎖與其說是新的鎖類型,不如說是統計鎖而已
(2)嵌套鎖和普通的鎖一樣,使用十分友善
(3)嵌套鎖也有缺點,它給我們的鎖檢測帶來了麻煩
多線程的那點兒事(之生産者-消費者)
生産者-消費者是很有意思的一種算法。它的存在主要是兩個目的,第一就是滿足生産者對資源的不斷創造;第二就是滿足消費者對資源的不斷索取。當然,因為空間是有限的,是以資源既不能無限存儲,也不能無限索取。
生産者的算法,
view plain
WaitForSingleObject(hEmpty, INFINITE);
WaitForSingleObject(hMutex, INIFINITE);
ReleaseMutex(hMutex);
ReleaseSemaphore(hFull, 1, NULL);
消費者的算法,
view plain
WaitForSingleObject(hFull, INFINITE);
WaitForSingleObject(hMutex, INIFINITE);
ReleaseMutex(hMutex);
ReleaseSemaphore(hEmpty, 1, NULL);
那麼,有的朋友可能會說了,這麼一個生産者-消費者算法有什麼作用呢。我們可以看看它在多線程通信方面是怎麼發揮作用的?首先我們定義一個資料結構,
view plain
typedef struct _MESSAGE_QUEUE
{
int threadId;
int msgType[MAX_NUMBER];
int count;
HANDLE hFull;
HANDLE hEmpty;
HANDLE hMutex;
}MESSAGE_QUEUE;
那麼,此時如果我們需要對一個線程發送消息,該怎麼發送呢,其實很簡單。我們完全可以把它看成是一個生産者的操作。
view plain
void send_mseesge(int threadId, MESSAGE_QUEUE* pQueue, int msg)
{
assert(NULL != pQueue);
if(threadId != pQueue->threadId)
return;
WaitForSingleObject(pQueue->hEmpty, INFINITE);
WaitForSingleObject(pQueue->hMutex, INFINITE);
pQueue->msgType[pQueue->count ++] = msg;
ReleaseMutex(pQueue->hMutex);
ReleaseSemaphore(pQueue->hFull, 1, NULL);
}
既然前面說到發消息,那麼線程自身就要對這些消息進行處理了。
view plain
void get_message(MESSAGE_QUEUE* pQueue, int* msg)
{
assert(NULL != pQueue && NULL != msg);
WaitForSingleObject(pQueue->hFull, INFINITE);
WaitForSingleObject(pQueue->hMutex, INFINITE);
*msg = pQueue->msgType[pQueue->count --];
ReleaseMutex(pQueue->hMutex);
ReleaseSemaphore(pQueue->hEmpty, 1, NULL);
}
總結:
(1)生産者-消費者隻能使用semphore作為鎖
(2)編寫代碼的時候需要判斷hFull和hEmpty的次序
(3)掌握生産者-消費者的基本算法很重要,但更重要的是自己的實踐
多線程的那點兒事(之死鎖)
相信有過多線程程式設計經驗的朋友,都吃過死鎖的苦。除非你不使用多線程,否則死鎖的可能性會一直存在。為什麼會出現死鎖呢?我想原因主要有下面幾個方面:
(1)個人使用鎖的經驗差異
(2)子產品使用鎖的差異
(3)版本之間的差異
(4)分支之間的差異
(5)修改代碼和重構代碼帶來的差異
不管什麼原因,死鎖的危機都是存在的。那麼,通常出現的死鎖都有哪些呢?我們可以一個一個看過來,
(1)忘記釋放鎖
view plain
void data_process()
{
EnterCriticalSection();
if()
return;
LeaveCriticalSection();
}
(2)單線程重複申請鎖
view plain
void sub_func()
{
EnterCriticalSection();
do_something();
LeaveCriticalSection();
}
void data_process()
{
EnterCriticalSection();
sub_func();
LeaveCriticalSection();
}
(3)雙線程多鎖申請
view plain
void data_process1()
{
EnterCriticalSection(&cs1);
EnterCriticalSection(&cs2);
do_something1();
LeaveCriticalSection(&cs2);
LeaveCriticalSection(&cs1);
}
void data_process2()
{
EnterCriticalSection(&cs2);
EnterCriticalSection(&cs1);
do_something2();
LeaveCriticalSection(&cs1);
LeaveCriticalSection(&cs2);
}
(4)環形鎖申請
view plain
假設有A、B、C、D四個人在一起吃飯,每個人左右各有一隻筷子。是以,這其中要是有一個人想吃飯,他必須首先拿起左邊的筷子,再拿起右邊的筷子。現在,我們讓所有的人同時開始吃飯。那麼就很有可能出現這種情況。每個人都拿起了左邊的筷子,或者每個人都拿起了右邊的筷子,為了吃飯,他們現在都在等另外一隻筷子。此時每個人都想吃飯,同時每個人都不想放棄自己已經得到的一那隻筷子。是以,事實上大家都吃不了飯。
總結:
(1)死鎖的危險始終存在,但是我們應該盡量減少這種危害存在的範圍
(2)解決死鎖花費的代價是異常高昂的
(3)最好的死鎖處理方法就是在編寫程式的時候盡可能檢測到死鎖
(4)多線程是一把雙刃劍,有了效率的提高當然就有死鎖的危險
(5)某些程式的死鎖是可以容忍的,大不了重新開機機器,但是有些程式不行
多線程的那點兒事(之避免死鎖)
預防死鎖的注意事項:
(1)在編寫多線程程式之前,首先編寫正确的程式,然後再移植到多線程
(2)時刻檢查自己寫的程式有沒有在跳出時忘記釋放鎖
(3)如果自己的子產品可能重複使用一個鎖,建議使用嵌套鎖
(4)對于某些鎖代碼,不要臨時重新編寫,建議使用庫裡面的鎖,或者自己曾經編寫的鎖
(5)如果某項業務需要擷取多個鎖,必須保證鎖的按某種順序擷取,否則必定死鎖
(6)編寫簡單的測試用例,驗證有沒有死鎖
(7)編寫驗證死鎖的程式,從源頭避免死鎖
首先,定義基本的資料結構和宏,
view plain
typedef struct _LOCK_INFO
{
char lockName[32];
HANDLE hLock;
}LOCK_INFO:
typedef struct _THREAD_LOCK
{
int threadId;
LOCK_INFO* pLockInfo[32];
}THREAD_LOCK;
#define CRITICAL_SECTION_TYPE 1
#define MUTEX_LOCK_TYPE 2
#define SEMAPHORE_LOCK_TYPE 3
#define NORMAL_LOCK_TYPE 4
#define WaitForSingleObject(a, b) \
WaitForSingleObject_stub((void*)a, NORMAL_LOCK_TYPE)
#define EnterCriticalSection(a) \
WaitForSingleObject_stub((void*)a, CRITICAL_SECTION_TYPE)
#define ReleaseMutex(a) \
ReleaseLock_stub((void*)a, MUTEX_LOCK_TYPE))
#define ReleaseSemaphore(a, b, c) \
ReleaseLock_stub((void*)a, SEMAPHORE_LOCK_TYPE))
#define LeaveCriticalSection(a) \
ReleaseLock_stub((void*)a, CRITICAL_SECTION_TYPE))
然後,改寫鎖的申請函數,
view plain
void WaitForSingleObject_stub(void* hLock, int type)
{
WaitForSingleObject(hDbgLock);
ReleaseMutex(hDbgLock);
if(NORMAL_LOCK_TYPE == type)
WaitForSingleObject((HANDLE)hLock, INFINITE);
else if(CRITICAL_SECTION_TYPE == type)
EnterCriticalSection((LPCRITICAL_SECTION)hLock);
else
assert(0);
WaitForSingleObject(hDbgLock);
ReleaseMutex(hDbgLock);
}
最後,需要改寫鎖的釋放函數。
view plain
void ReleaseLock_stub(void* hLock, int type)
{
WaitForSingleObject(hDbgLock);
ReleaseMutex(hDbgLock);
if(MUTEX_LOCK_TYPE))== type)
ReleaseMutex(HANDLE)hLock);
else if(SEMAPHORE_LOCK_TYPE == type)
ReleaseSemaphore((HANDLE)hLock, 1, NULL);
else if(CRITICAL_SECTION_TYPE == type)
LeaveCriticalSection((LPCRITICAL_SECTION)hLock);
assert(0);
}
多線程的那點兒事(之多線程調試)
軟體調試是我們軟體開發過程中的重要一課。在前面,我們也讨論過程式調試,比如說這裡。今天,我們還可以就軟體調試多講一些内容。比如說條件斷點,資料斷點,多線程斷點等等。
view plain
#include <stdio.h> int value = 0; void test() { int total; int index; total = 0; for(index = 0; index < 100; index ++) total += index * index; value = total; return ; } int main() { test(); return 1; }
(1)資料斷點
所謂資料斷點,就是全局變量或者函數中的數計算的過程中,如果資料值本身發生了改變,就會觸發斷點。這裡的資料有兩種,一個是全局資料,一個函數内部的資料。
以全局資料value為例:
a)按F10,運作程式,擷取value的位址;
b)Alt+F9,選擇【DATA】->【Advanced】;
c)在【Expression】中輸入DW(0x0043178),【ok】回車;
d)F5繼續運作程式,則程式會在value發生改變的時候停住。
以局部資料total為例,
a)按F10,運作程式,擷取value的位址;
b)Alt+F9,選擇【DATA】->【Advanced】;
c)在【Expression】中輸入total,在【Function】輸入test,【ok】回車;
d)F5繼續運作程式,則程式同樣會在total發生改變的時候停住。
(2)條件斷點
條件斷點和資料斷點差不多。隻不過,資料斷點在資料發生改變的時候就會斷住,而條件斷點隻有在滿足一定的條件下才會有斷住。比如說,我們可以讓test子程式在index==5的時候斷住。
a)按F10,運作程式,擷取value的位址;
b)Alt+F9,選擇【DATA】->【Advanced】;
c)在【Expression】中輸入index==5,在【Function】輸入test,【ok】回車;
d)F5繼續運作程式,則程式同樣會在index==5的時候停住。
(3)多線程調試
在VC上面對多程式的調試比較簡單。如果想要對程式進行調試的話,首先F10,開始運作程式。其次,我們需要等線程建立之後才能設定斷點,不然我們看到的程式隻有main函數一個thread。
a)單擊【Debug】,選擇【threads】,那麼我們就可以開始多線程調試了;
b)如果需要對某一個thread挂起,單擊對應的thread,選擇【suspend】即可;
c)如果需要對某一個thread重新排程,單擊對應的thread,選擇【resume】即可;
d)如果需要檢視特定thread的堆棧,那麼選擇那個thread,然後【Set Focus】,關閉threads對話框,在堆棧視窗中即可看到;
e)如果某個線程被挂住,那麼此時所有的線程都挂住了,如果你step運作,所有的threads都會參與運作;
f)如果需要對某一個thread進行調試,那麼需要對其他的thread進行suspend處理 。
總結:
1)看記憶體、看堆棧、條件斷點、資料斷點需要綜合使用,
2)程式設計越早調試,越好,
3)先編寫好單線程程式,再編寫好多線程程式,
4)對于多線程來說,子產品設計 > 程式設計預防 > 調試 > 事後補救。