編輯推薦:
本文來自于csdn,本文簡單描述了c++在多線程的實際程式設計中的重要性,希望對您的學習有幫助。
前言
多線程在實際的程式設計中的重要性不言而喻。對于C++而言,當我們需要使用多線程時,可以使用boost::thread庫或者自從C++
11開始支援的std::thread,也可以使用作業系統相關的線程API,如在Linux上,可以使用pthread庫。除此之外,還可以使用omp來使用多線程。它的好處是跨平台,使用簡單。
在Linux平台上,如果需要使用omp,隻需在編譯時使用"-fopenmp"指令。在Windows的visual
studio開發環境中,開啟omp支援的步驟為“項目屬性 -> C/C++ -> 所有選項
-> openmp支援 -> 是(/openmp)”。
本文我們就介紹omp在C++中的使用方法。
c++ openmp入門簡介
openmp是由一系列#paragma指令組成,這些指令控制如何多線程的執行程式。另外,即使編譯器不支援omp,程式也也能夠正常運作,隻是程式不會多線程并行運作。以下為使用omp的簡單的例子:
int main()
{
vector vecInt(100);
#pragma omp parallel for
for (int i = 0; i < vecInt.size(); ++i)
{
vecInt[i] = i*i;
}
return 0;
}
以上代碼會自動以多線程的方式運作for循環中的内容。如果你删除"#pragma omp parallel
for"這行,程式依然能夠正常運作,唯一的差別在于程式是在單線程中執行。由于C和C++的标準規定,當編譯器遇到無法識别的"#pragma"指令時,編譯器自動忽略這條指令。是以即使編譯器不支援omp,也不會影響程式的編譯和運作。
omp文法
所有的omp指令都是以"#pragma omp“開頭,換行符結束。并且除了barrier和flush兩個指令不作用于代碼以外,其他的指令都隻與指令後面的那段代碼相關,比如上面例子中的for循環。
parallel編譯訓示
parallel告訴編譯器開始 一個并行塊,編譯器會建立一個包含N(在運作時決定,通常為硬體線程數)個線程的線程組,所有線程都運作接下來的語句或者由”{...}"包含的代碼塊,在這執行結束之後,又回到主線程,建立的這N個線程會被回收。
#pragma
omp parallel
{
cout << "parallel run!!!\n";
}
以上代碼在4雙核4線程的cpu的電腦上運作時,輸出了4行”parallel run!!!"。即編譯器建立了一個包含4個線程的線程組來運作這段代碼。在這段代碼運作結束後,程式執行回到主線程。GCC編譯器的實作方式是在内部建立一個函數,然後将相關的執行代碼移至這個函數,這樣一來代碼塊中定義的變量成為了線程的局部變量,互不影響。而ICC的實作方式是利用fork()來實作。
線程之間共享的變量是通過傳遞引用或者利用register變量來實作同步的,其中register變量在代碼執行結束之後或者在flush指令調用時進行同步。
我們也可以利用if條件判斷來決定是否對後續的代碼采用并行方式執行,如:
externint
parallelism_enabled;
#pragma omp parallel for if(parallelism_enabled)
for(int c=0; c
handle(c);
在這個例子中,如果parallelism_enabled為false,那麼這個for循環隻會由一個線程來執行。
for指令
omp中的for指令用于告訴編譯器,拆分接下來的for循環,并分别在不同的線程中運作不同的部分。如果for指令後沒有緊接着for循環,編譯器會報錯。例如,
#pragma omp parallel for
for (int i = 0; i < 10; ++i)
{
printf("%d ", i);
}
以上的代碼執行後,會列印出[0,9]這10個數字。但是它們的順序是随機出現的,在我的電腦上,運作的輸出是"0
1 2 8 9 6 7 3 4 5"。事實上,輸出結果不會是完全随機的,輸出的序列是局部有序的,因為在編譯器對for循環的拆分相當于下面的代碼:
int
this_thread = omp_get_thread_num();
int num_threads = omp_get_num_threads();
int my_start = (this_thread)* 10 / num_threads;
int my_end = (this_thread + 1) * 10 / num_threads;
for (int n = my_start; n < my_end; ++n)
printf("%d ", n)
以上代碼中,omp_get_thread_num()用于擷取目前線程在目前線程組中的序号;omp_get_num_threads()用于擷取縣城組中的線程數。是以線程組中每個線程都運作了for循環中的不同部分。
這裡提到for循環的不同部分線上程組中的不同線程中執行的,線程組是在程式遇到"#pragma
omp parallel"時建立,在程式塊(parallel後的”{...}"或者語句)結束後,線程組中的隻有一個主線程。是以上面示例中的代碼事實上是以下代碼的縮寫:
#pragma
omp parallel
{
#pragma omp for
for (int i = 0; i < 10; ++i)
{
printf("%d ", i);
}
}
此處的"#pragma omp for"即使在“#pragma omp parallel”指令建立的線程組中執行的。加入此處沒有#pragma
omp parallel指令,那麼for循環隻會在主線程中執行。parallel指令所建立的線程組的線程數預設是有編譯器決定的,我們也可以通過num_threads指令來指定線程數,如”#pragma
omp parallel num_threads(3)“即告訴編譯器,此處需要建立一個包含3個線程的線程組。
Schedule指令
Schedule指令提供對for指令中線程排程更多的控制能力。它有兩種排程方式:static和dynamic。
static:每個線程自行決定要執行哪個塊,即每個線程執行for循環中的一個子塊。
dynamic:一個線程并不是執行for循環的一個子塊,而是每次都向omp運作時庫索取一個for循環中的疊代值,然後執行這次疊代,在執行完之後再索取新的值。是以,線程有可能執行任意的疊代值,而不是一個子塊。
之前的”#pragma omp parallel for“實際上的效果是”#pragma omp parallel
for schedule(static)"。如果我們将之前的示例采用dynamic排程方式,即:
#pragma omp parallel for schedule(dynamic)
for (int i = 0; i < 10; ++i)
{
printf("%d ", i);
}
那麼列印的結果則有可能不是局部有序的。
在dynamic排程方式中,還可以指定每次索取的疊代值數量。如,
#pragma
omp parallel for schedule(dynamic,3)
for (int i = 0; i < 10; ++i)
{
printf("%d ", i);
}
在這個例子中,每個線程每次都索取3個疊代值。執行完之後,再拿3個疊代值,直到for循環所有疊代值都運作結束。在最後一次索取的結果有可能不足3個。在程式内部,上面的例子與下面的代碼是等價的:
int a,b;
if(GOMP_loop_dynamic_start(0,10,1,3,&a,&b))
{
do{
for(int n=a; n
n);
}while(GOMP_loop_dynamic_next(&a,&b));
}
ordered指令
ordered指令用于控制一段代碼在for循環中的執行順序,它保證這段代碼一定是按照for中的順序依次執行的。
for (int i = 0; i < 10; ++i)
{
Data data = ReadFile(files[i]);
#pragma omp ordered
PutDataToDataset(data);
}
這個循環負責讀取10個檔案,然後将資料放入一個記憶體結構中。讀檔案的操作是并行的,但是将資料存入記憶體結構中則是嚴格串行的。即先存第一個檔案資料,然後第二個...,最後是第十個檔案。假設一個線程已經讀取了第七個檔案的,但是第六個檔案還沒有存入記憶體結構,那麼這個線程會阻塞,知道第六個檔案存入記憶體結構之後,線程才會繼續運作。
在每一個ordered for循環中,有且僅有一個“#pragma omp ordered"指令限定的代碼塊。
sections指令
section指令用于指定哪些程式塊可以并行運作。一個section塊内的代碼必須串行運作,而section塊之間是可以并行運作的。如,
#pragma
omp parallel sections
{
{ Work1();}
#pragma omp section
{ Work2();
Work3();}
#pragma omp section
{ Work4();}
}
以上代碼表明,work1,Work1,Work2 + Work3以及Work4可以并行運作,但是work2和work3的運作必須是串行運作,并且每個work都隻會被運作一次。
task指令(OpenMP 3.0新增)
當覺得for和section指令用着不友善時,可以用task指令。它用于告訴編譯器其後續的指令可以并行運作。如OpenMP
3.0使用者手冊上的一個示例:
struct
node { node *left,*right;};
externvoid process(node*);
void traverse(node* p)
{
if(p->left)
#pragma omp task // p is firstprivate by default
traverse(p->left);
if(p->right)
#pragma omp task // p is firstprivate by default
traverse(p->right);
process(p);
}
上面的示例中,當處理目前節點process(p)時,并不能夠保證p的左右子樹已經處理完畢。為了保證在處理目前節點前,目前節點的左右子樹已經處理完成,可以使用taskwait指令,這個指令會保證前面的task都已經處理完成,然後才會繼續往下走,添加taskwait指令之後,以上示例代碼變為:
struct node { node *left,*right;};
externvoid process(node*);
void postorder_traverse(node* p)
{
if(p->left)
#pragma omp task // p is firstprivate by default
postorder_traverse(p->left);
if(p->right)
#pragma omp task // p is firstprivate by default
postorder_traverse(p->right);
#pragma omp taskwait
process(p);
}
以下示例示範了如何利用task指令來并行處理連結清單的元素,由于指針p預設是firstprivate方式共享,是以無需特别指定。
struct
node {int data; node* next;};
externvoid process(node*);
void increment_list_items(node* head)
{
#pragma omp parallel
{
#pragma omp single
{
for(node* p = head; p; p = p->next)
{
#pragma omp task
process(p);// p is firstprivate by default
}
}
}
}
atomic指令
atomic指令用于保證其後續的語句執行時原子性的。所謂原子性,即事務的概念,它的執行不可拆分,要麼執行成功,要麼什麼都沒有執行。例如,
#pragma omp atomic
counter += value;
以上代碼中,atomic保證對counter的改變時原子性的,如果多個線程同時執行這句代碼,也能夠保證counter最終擁有正确的值。
需要說明的是,atomic隻能用于簡單的表達式,比如+=、-=、*=、&=等,它們通常能夠被編譯成一條指令。如果上面的示例改為"counter
= counter + value",那将無法通過編譯;atomic作用的表達式中也不能夠有函數調用、數組索引等操作。另外,它隻保證等号左邊變量的指派操作的原子性,等号右邊的變量的取值并不是原子性的。這就意味着另外一個線程可能在指派前改變等号右邊的變量。如果要保證更複雜的原子性可以參考後續的critical指令。
critical指令
critical指令用于保證其相關聯的代碼隻在一個線程中執行。另外,我們還可以給critical指令傳遞一個名稱,這個名稱是全局性的,所有具有相同名字的critical相關聯的代碼保證不會同時在多個線程中運作,同一時間最多隻會有一個代碼塊在運作。如果沒有指定名稱,那系統會給定一個預設的名稱:
#pragma
omp critical(dataupdate)
{
datastructure.reorganize();
}
...
#pragma omp critical(dataupdate)
{
datastructure.reorganize_again();
}
以上代碼展示的兩個名稱為"dataupdate"的critical代碼一次隻會有一個執行,即datastructure的reorganize()和reorganize_again()不會并行運作,一次最多隻會有一個線上程中執行。
openmp中的鎖
omp運作庫提供了一種鎖:omp_lock_t,它定義在omp.h頭檔案中。針對omp_lock_t有5中操作,它們分别是:
omp_init_lock初始化鎖,初始化後鎖處于未鎖定狀态.
omp_destroy_lock銷毀鎖,調用這個函數時,鎖必須是未鎖定狀态.
omp_set_lock嘗試擷取鎖,如果鎖已經被其他線程加鎖了,那目前線程進入阻塞狀态。
omp_unset_lock釋放鎖,調用這個方法的線程必須已經獲得了鎖,如果目前線程沒有獲得鎖,則會有未定義行為。
omp_test_locka嘗試擷取鎖,擷取鎖成功則傳回1,否則傳回0.
omp_lock_t相當于mutex,如果線程已經獲得了鎖,那在釋放鎖之前,目前線程不能對鎖進行上鎖。為了滿足這種遞歸鎖的需求,omp提供了omp_nest_lock_t,這種鎖相當于recursive_mutex可以遞歸上鎖,但是釋放操作必須與上鎖操作一一對應,否則鎖不會得到釋放。
flush指令
對于多線程之間共享的變量,編譯器有可能會将它們設為寄存器變量,意味着每個線程事實上都隻是擁有這個變量的副本,導緻變量值并沒有在多個線程之間共享。為了保證共享變量能夠線上程之間是真實共享,保證每個線程看到的值都是一緻的,可以使用flush指令告訴編譯器我們需要哪些哪些共享變量。當我們要在多個線程中讀寫共同的變量時,我們都應該使用flush指令。
例如,
b =1; a =1;
#pragma omp flush(a,b) #pragma omp flush(a,b)
if(a ==0)if(b ==0)
{{
}}
在上面的例子中,變量a,b在兩個線程中是共享的,兩個線程任何時候看到的a,b的值都是一緻的,即線程1所見的即使線程2所見的。
private,firstprivate,lastprivate及shared指令控制變量共享方式
這些指令用于控制變量線上程組中多個線程之間的共享方式。其中private,firstprivate,lastprivate表示變量的共享方式是私有的,即每個線程都有一份自己的拷貝;而shared表示線程組的線程通路的是同一個變量。
私有變量共享方式有三種指令,它們的差別在于:
private:每個線程都有一份自己的拷貝,但是這些變量并沒有拷貝值,即如果變量是int,long,double等這些内置類型,那麼這些變量在進入線程時時未初始化狀态的;如果變量是類的執行個體對象,那麼線上程中變量是通過預設構造得到的對象,假設類沒有預設構造,則編譯會報錯,告訴你類沒有可用的預設構造;
firstPrivate:每個線程有一份自己的拷貝,每個線程都會通過複制一份。如果變量是int,long,double等内置類型則直接複制,如果為類的執行個體對象,則會調用示例對象的拷貝構造函數,這就意味着,假如類是的拷貝構造不可通路,則變量不能夠使用firstprivate方式共享;
lastprivate:變量在每個線程的共享方式與private一緻,但不同的是,變量的最後一次疊代中的值會flush會主線程中的變量中。最後一次疊代的意思是,如果是for循環,則主線程的變量的值是最後一個疊代值那次疊代中賦的值;如果是section,則主線程的變量最終的值是最後一個section中賦的值。要注意的是,最終主線程的中變量的值并非通過拷貝構造指派的,而是通過operator=操作符,是以如果類的指派操作符不可通路,那麼變量不能采用lastprivate方式共享。
default指令
default指令用于設定所有變量的預設的共享方式,如default(shared)表示所有變量預設共享方式為shared。除此之外,我們可以使用default(none)來檢查我們是否顯示設定了所有使用了的變量的共享方式,如:
int a, b=0;
#pragma omp parallel default(none) shared(b)
{
b += a;
}
以上代碼無法通過編譯,因為在parallel的代碼塊中使用了變量a和b,但是我們隻設定了b的共享方式,而沒有設定變量a的共享方式。
另外需要注意的是,default中的參數不能使用private、firstprivate以及lastprivate。
reduction指令
reductino指令是private,shared及atomic的綜合體。它的文法是:
reduction(operator : list)
其中operator指操作符,list表示操作符要作用的清單,通常是一個共享變量名,之是以稱之為清單是因為線程組中的每個線程都有一份變量的拷貝,reduction即負責用給定的操作符将這些拷貝的局部變量的值進行聚合,并設定回共享變量。
其中操作符可以是如下的操作符:
以下為階乘的多線程的實作:
int factorial(int number)
{
int fac =1;
#pragma omp parallel for reduction(*:fac)
for(int n=2; n<=number;++n)
fac *= n;
return fac;
}
開始,每個線程會拷貝一份fac;
parallel塊結束之後,每個線程中的fac會利用“*”進行聚合,并将聚合的結果設定回主線程中的fac中。
如果這裡我們不用reduction,那麼則需用适用atomic指令,代碼如下:
int factorial(int number)
{
int fac =1;
#pragma omp parallel for
for(int n=2; n<=number;++n)
{
#pragma omp atomic
fac *= n;
}
return fac;
}
但是這樣一來,性能會大大的下降,因為這裡沒有使用局部變量,每個線程對fac的操作都需要進行同步。是以在這個例子中,并不會從多線程中受益多少,因為atomic成為了性能瓶頸。
使用reduction指令的代碼事實上類似于以下代碼:
int
factorial(int number)
{
int fac =1;
#pragma omp parallel
{
int fac_private =1;
#pragma omp for nowait
for(int n=2; n<=number;++n)
fac_private *= n;
#pragma omp atomic
fac *= fac_private;
}
return fac;
}
注:最後的聚合實際是包括主線程中共享變量的初始值一起的,在階乘的例子中,如果fac的初始值不是1,而是10,則最終的結果會是實際階乘值的10倍!
barrier和nowait指令
barrier指令是線程組中線程的一個同步點,隻有線程組中的所有線程都到達這個位置之後,才會繼續往下運作。而在每個for、section以及後面要講到的single代碼塊最後都隐式的設定了barrier指令。例如
#pragma omp parallel
{
SomeCode();
#pragma omp barrier
SomeMoreCode();
}
nowait指令用來告訴編譯器無需隐式調用barrier指令,是以如果為for、section、single設定了nowait标志,則在它們最後不會隐式的調用barrier指令,例如:
#pragma omp parallel
{
#pragma omp for
for(int n=0; n<10;++n) Work();
// This line is not reached before the for-loop
is completely finished
SomeMoreCode();
}
// This line is reached only after all threads
from
// the previous parallel block are finished.
CodeContinues();
#pragma omp parallel
{
#pragma omp for nowait
for(int n=0; n<10;++n) Work();
// This line may be reached while some threads
are still executing the for-loop.
SomeMoreCode();
}
// This line is reached only after all threads
from
// the previous parallel block are finished.
CodeContinues();
single和master指令
single指令相關的代碼塊隻運作一個線程執行,但并不限定具體哪一個線程來執行,其它線程必須跳過這個代碼塊,并在代碼塊後wait,直到執行這段代碼的線程完成。
#pragma
omp parallel
{
Work1();
#pragma omp single
{
Work2();
}
Work3();
}
以上代碼中,work1()和work3()會線上程組中所有線程都 運作一遍,但是work2()隻會在一個線程中執行,即隻會執行一遍。
master指令則指定其相關的代碼塊必須在主線程中執行,且其它線程不必在代碼塊後阻塞。
#pragma omp parallel
{
Work1();
// This...
#pragma omp master
{
Work2();
}
// ...is practically identical to this:
if(omp_get_thread_num()==0)
{
Work2();
}
Work3();
}
循環嵌套
如下代碼并不會按照我們期望的方式運作:
#pragma
omp parallel for
for(int y=0; y<25;++y)
{
#pragma omp parallel for
for(int x=0; x<80;++x)
{
tick(x,y);
}
}
實際上内部的那個“parallel for"會被忽略,自始至終隻建立了一個線程組。假如将上述代碼改為如下所示,将無法通過編譯:
#pragma omp parallel for
for(int y=0; y<25;++y)
{
#pragma omp for // ERROR, nesting like this
is not allowed.
for(int x=0; x<80;++x)
{
tick(x,y);
}
}
OpenMP 3.0中的解決方案
在OpenMP 3.0中,可以利用collapse指令來解決循環嵌套問題,如:
#pragma omp parallel for collapse(2)
for(int y=0; y<25;++y)
for(int x=0; x<80;++x)
{
tick(x,y);
}
collapse指令傳遞的數字就代表了循環嵌套的深度,這裡為2層。
OpenMP 2.5中正确的做法
在OpenMP 2.5中,我們可以通過将多層循環改為單層循環的方法來達到目的,這樣便無需循環嵌套:
#pragma
omp parallel for
for(int pos=0; pos
{
int x = pos%80;
int y = pos/80;
tick(x,y);
}
然而重寫這樣的代碼也并非易事,另一個辦法是采用omp_set_nested方法開啟循環嵌套支援,預設是關閉的:
omp_set_nested(1);
#pragma omp parallel for
for(int y=0; y<25;++y)
{
#pragma omp parallel for
for(int x=0; x<80;++x)
{
tick(x,y);
}
}
現在内層循環中,也會建立一個大小為N的線程組,是以實際上我們将得到N*N個線程,這便會導緻頻繁的線程切換,會帶來較大的性能損失,這也就是為什麼循環嵌套預設是關閉的。也許最好的方法就是将外層循環的parallel指令删除,隻保留内層循環的parallel:
for(int y=0; y<25;++y)
{
#pragma omp parallel for
for(int x=0; x<80;++x)
{
tick(x,y);
}
}
取消線程(退出循環)
constchar* FindAnyNeedle(constchar* haystack,
size_t size,char needle)
{
for(size_t p =0; p < size;++p)
if(haystack[p]== needle)
{return haystack+p; //找到needle,直接退出函數
}
return NULL;
}
加入我們想要使用omp多線程來優化如下方法:
我們最直覺的想法應該是在for循環外加上“#pragma omp parallel for",但是讓人失望的是這将無法通過編譯。因為omp要求必須每個循環疊代都能得到處理,是以不允許直接退出循環,這也就是說在循環中不能使用return、break、goto、throw等能夠中斷循環的語句。為了能夠提前退出循環,我們需要退出時,通知線程組的其他線程,讓它們結束運作:
棄用omp,選擇其他多線程程式設計,如pthread
通過共享變量,通知線程組其他線程
以下為使用布爾标記來通知其他線程的示例:
constchar*
FindAnyNeedle(constchar* haystack, size_t size,char
needle)
{
constchar* result = NULL;
bool done =false;
#pragma omp parallel for
for(size_t p =0; p < size;++p)
{
#pragma omp flush(done)
if(!done)
{
if(haystack[p]== needle)
{
done =true;
#pragma omp flush(done)
result = haystack+p;
}
}
}
return result;
}
然而這樣寫有個缺點就是,即使done标記變為true了,其他線程仍然需要完成每次疊代,即使這些疊代是完全沒有意義的。.
當然,我們也可以不用上述的done标記:
constchar* FindAnyNeedle(constchar* haystack,
size_t size,char needle)
{
constchar* result = NULL;
#pragma omp parallel for
for(size_t p =0; p < size;++p)
if(haystack[p]== needle)
result = haystack+p;
return result;
}
但是這也并沒有完全解決問題,因為這樣當一個線程已經找到需要的結果是,也不能夠避免其他線程繼續運作,這也就造成了不必要的浪費。
事實上,omp針對這個問題并沒有很好的解決辦法,如果确實需要,那隻能求助于其他線程庫了。