天天看點

rcu鎖所使用的一個機制--dynticks

可以睡眠的rcu鎖很大程度上有益于系統的實時性,因為不用禁用搶占了,該rcu鎖巧妙的使用兩個“階段”來跟蹤rcu的狀态,内部維持着一個狀态機,該狀态機的其中兩個狀态需要所有cpu的确認,也就是說隻有所有cpu都确認了之後,狀态機才能向前推進,這就暴露出一個缺點,就是在nohz啟用的情況下,如果一個cpu不再接收時鐘中斷,那麼它就沒有機會執行确認,結果就是所有的和rcu相關的cpu需要等待很長一段時間才能推進rcu狀态機,一直等到這個停掉時鐘心跳的cpu重新開始心跳,怎麼解決這個問題呢?核心中引入了一個叫做dynticks的機制,該機制很簡單,就是當一個cpu處于心跳停止狀态的時候可以直接跳過它,不需要它的确認,畢竟它上面是不可能操作read-rcu鎖的,代碼實作也非常清晰:

static inline void rcu_enter_nohz(void)

{

smp_mb(); /* CPUs seeing ++ must see prior RCU read-side crit sects */

__get_cpu_var(dynticks_progress_counter)++;

if (unlikely(__get_cpu_var(dynticks_progress_counter) & 0x1)) {

...

}

static inline void rcu_exit_nohz(void)

smp_mb(); /* CPUs seeing ++ must see later RCU read-side crit sects */

if (unlikely(!(__get_cpu_var(dynticks_progress_counter) & 0x1))) {

每一個cpu被引入了一個變量--dynticks_progress_counter,該變量被初始化為1,在進入nohz狀态時和退出時該變量都會遞增,然後在需要等待cpu确認的地方之需要判斷該變量的奇偶即可,如果是偶數,那麼就說明進入了nohz心跳停止狀态,直接跳過即可,就當它已經确認了,反之如果是奇數就需要等待确認,但是問題出來了,如果一個中斷打斷了nohz的cpu,而該中斷中有操作read-rcu的邏輯,那麼就可能導緻系統挂起,因為可能dynticks機制跳過了改cpu,但是該cpu即刻被中斷所打斷,緻使它不能被跳過,最終可能導緻兩個數組混亂掉,于是在每個中斷進入和退出的時候必然也需要有相應的邏輯來優化dynticks機制,這就是cu_irq_enter和rcu_irq_exit要做的:

void rcu_irq_enter(void)

int cpu = smp_processor_id();

if (per_cpu(rcu_update_flag, cpu))//如果已經在中斷中了,那麼嵌套中斷中同樣将rcu_update_flag遞增,然後在内層中斷的rcu_irq_exit中會導緻遞減rcu_update_flag不為0,直接退出,再更新dynticks_progress_counter的值,該機制隻考慮是否被中斷,并不考慮具體中斷的層數

per_cpu(rcu_update_flag, cpu)++;

if (!in_interrupt() &&

(per_cpu(dynticks_progress_counter, cpu) & 0x1) == 0) {//注意,這個判斷并不是所有進入中斷時都為真的,隻有在從nohz狀态進入時才為真

per_cpu(dynticks_progress_counter, cpu)++;

smp_mb(); /* see above block comment. */

void rcu_irq_exit(void)

if (per_cpu(rcu_update_flag, cpu)) {

if (--per_cpu(rcu_update_flag, cpu)) //處理嵌套中斷的邏輯,如果非嵌套,那麼該rcu_update_flag在此時應該被減為0的。

return;

WARN_ON(per_cpu(dynticks_progress_counter, cpu) & 0x1);

有了以上兩個函數把關,基本上就沒有什麼問題了,最後在rcu_try_flip_waitack_needed中進行判斷:

static inline int rcu_try_flip_waitack_needed(int cpu)

long curr;

long snap;

curr = per_cpu(dynticks_progress_counter, cpu);

snap = per_cpu(rcu_dyntick_snapshot, cpu);

smp_mb(); /* force ordering with cpu entering/leaving dynticks. */

if ((curr == snap) && ((curr & 0x1) == 0)) //仍然在idle也就是nohz中

return 0;

* if ((curr - snap) > 2 || (snap & 0x1) == 0) //這裡有些複雜哦!

return 1;

上面函數的*行的判斷邏輯别看短,非常複雜,注意看一下cpu_idle函數中的内層while循環,也就是該idlecpu僅僅被中斷了一次,但是還沒有被徹底從idle狀态喚醒的邏輯,裡面調用了rcu_check_callbacks函數,該函數本質上就是處理本cpu上的工作,“盡可能”的推進狀态機,然後更新本cpu的rcu資料,為何是盡可能推進呢?想想看如果大家都等它了,那麼隻要它一确認,那麼狀态機就推進了,就是這個意思。我們說rcu_try_flip_waitack_needed這個函數就是盡可能的跳過處于nohz狀态的cpu,不需要它們确認狀态機的狀态,但是這是有條件的,可搶占rcu的兩個階段的等待加和為0的數組索引是通過rcu_ctrlblk.completed的奇偶來确定的,這就是說如果這個索引錯了,那麼一切資料就會亂掉。現在看看這個索引怎麼會錯,在rcu狀态機推進到下一個狀态的時候,rcu_ctrlblk.completed會遞增,然後等待所有的cpu“看到”這個“遞增事件”,如果看不到,那麼資料就亂掉了。如果說if ((curr - snap) > 2 || (snap & 0x1) == 0)很晦澀的話,我們不妨采用反證法,這個if邏輯的反面就是(curr - snap) <= 2 && (snap & 0x1) == 1,(curr - snap) <= 2無非就是0,1,2,而(snap & 0x1) == 1無非就是在rcu推進到該階段的時候cpu的dynticks值為奇數,為奇數的情況無非有兩種,一種是未處于nohz狀态,另一種是處于nohz狀态,但是被中斷了,并且中斷還沒有退出(因為中斷退出時又會将dynticks加為偶數),先看未處于nohz狀态的情形,如果未處于nohz狀态,那麼curr和當時的dynticks內插補點是1或者2的情形就是進入了nohz狀态沒有被中斷或者進入nohz後被中斷,不管哪一類情形,都是在cpu_idle的内層while循環中,而内層循環已經調用了rcu_check_callbacks“盡可能”的推進了rcu狀态機,中斷處理函數中可能已經釋放或者得到了rcu鎖,直接導緻數組變化,是以必須要确認;另外一種就是cpu被中斷了,但是還沒有退出中斷,這個和上面的分析結果一緻。這下這個if邏輯就清晰多了,正過來考慮該if邏輯,如果(curr - snap) > 2成立,那麼可以肯定要麼發生了進入nohz又出來的情形,要麼發生了本來就在nohz中被中斷了至少兩次的情形,無論哪一種情形都不會導緻數組索引錯誤。

從另一個角度來看上面的if判斷,隻要是(snap & 0x1) == 0成立,那麼dyntick_save_progress_counter肯定是在cpu_idle中的内層循環中被調用的,那麼既然是内層循環,那就說明是nohz狀态,時鐘中斷已然停止,我們知道推進rcu狀态機的函數是在時鐘中斷函數中被調用的,是以如果等待确認的話會帶來很長的延時,不等待确認不會有任何問題,進入nohz之後,該cpu肯定進入了一個嶄新的狀态,不會攜帶有歧義的資料的;再看(curr - snap) > 2這個條件,分為snap進入nohz和未進入nohz兩種情形,第一種情形下,要想使內插補點大于2,那麼必然要麼被中斷(內插補點1)再出來(內插補點2)再被中斷(內插補點3)…如果內插補點為1或者2時,必然有一次機會調用rcu的callback函數,這樣即使等待也不會造成長久的延時;如果是snap時沒有進入nohz,那麼內插補點為1或者2就是進入nohz或者進入後再被中斷一次,既然是中斷那麼必然會離開中斷,離開的話就有可能調用callback函數,是以可以等待确認。反過來如果(snap & 0x1) == 1,那麼加上兩個計數器內插補點為1或者2的條件下就必須等待确認,如果內插補點為1或者2,那麼可能根本就沒有執行callback函數,進而導緻歧義。

<a>賞析一個函數--udp_v4_get_port</a>

首先呈上這個函數:

static int udp_v4_get_port(struct sock *sk, unsigned short snum)

struct hlist_node *node;

struct sock *sk2;

struct inet_sock *inet = inet_sk(sk);

write_lock_bh(&amp;udp_hash_lock);

if (snum == 0) {

int best_size_so_far, best, result, i;

if (udp_port_rover &gt; sysctl_local_port_range[1] ||

udp_port_rover &lt; sysctl_local_port_range[0])

udp_port_rover = sysctl_local_port_range[0];

best_size_so_far = 32767;

best = result = udp_port_rover;

for (i = 0; i &lt; UDP_HTABLE_SIZE; i++, result++) {

struct hlist_head *list;

int size;

list = &amp;udp_hash[result &amp; (UDP_HTABLE_SIZE - 1)];

if (hlist_empty(list)) { //如果該沖突鍊是空的,那麼直接就可以得到了

goto gotit;

size = 0;

sk_for_each(sk2, node, list) //否則。如果可以的話記錄最小size的沖突鍊

if (++size &gt;= best_size_so_far)//這裡實作了去的最小size沖突鍊的算法,很巧妙

goto next;

best_size_so_far = size;//到此為止,best_size_so_far是最小size的沖突鍊

best = result;

next:;

result = best;

for(i = 0; i &lt; (1 &lt;&lt; 16) / UDP_HTABLE_SIZE; i++, result += UDP_HTABLE_SIZE) {//注意,result一次遞增UDP_HTABLE_SIZE實際上就到了下一條哈希沖突鍊上了

if (!udp_lport_inuse(result))//周遊該鍊上的所有的socket,隻要有一個端口是result,那麼就說明失敗

break;

if (i &gt;= (1 &lt;&lt; 16) / UDP_HTABLE_SIZE) //如果沒有找到

goto fail;

gotit:

udp_port_rover = snum = result;

} else {

write_unlock_bh(&amp;udp_hash_lock);

以上列舉的這一段代碼的目的是取得一個沒有使用的端口,衆所周知,udp端口的大小是16位,是65536個,取得一個沒有使用的端口的最直覺的方法就是設定一個65536個元素的數組,數組元素為一個結構體,包含兩個元素,一個是從0到65535的數字,另一個是是否已經被使用,程式邏輯就是周遊這個數組,然後得到一個是否被使用字段是0的即可,但是這個算法非常拙劣,浪費空間不說,時間複雜度也不見得很低,于是想到了雜湊演算法,将如此之大數目的數字散列到一些有限的桶内,這就必須将這些數字進行分類了,linux核心取了128這個不大不小的數字,也就是result &amp; (UDP_HTABLE_SIZE - 1)這句代碼展現的,這樣的話一共就會有128個哈希桶,顯然的會有很多的端口會沖突,進而連接配接到一條沖突鍊上。

為何linux核心采用這麼一種算法呢,比如為何要取得最小的沖突鍊呢,這是因為為了在後面的第二個for循環中從最小size的沖突連開始尋找沖突鍊中的可用端口,size最小的結果就是udp_lport_inuse内部周遊的時候開銷最小。事實上該函數的兩個for分别實作了兩種尋找空閑端口的方式,第一個for循環的優先級比第二個for循環的優先級要高,也就是說首先尋找沖突鍊為空的,如果找到,那麼後面就不用費事周遊沖突鍊了,如果沒有找到,那麼就從最小size的沖突鍊開始周遊,一條一條沖突鍊循環周遊,這也是第二個for的作用,當然從最小size沖突鍊開始周遊比較節省開銷了。

該函數的一個要點就是在于将問題分解成了兩個部分,第一個部分是哈希優化,第二個部分是不得已的周遊,同時在問題第一個部分求解當中為第二部分埋下了伏筆,這就是得到了最小size的沖突鍊,可謂奇妙!

 本文轉自 dog250 51CTO部落格,原文連結:http://blog.51cto.com/dog250/1273476

繼續閱讀