文章目錄
- kernel mutex 原理
-
- 資料結構
- 樂觀自旋和handoff機制
-
- 樂觀自旋
- osq_lock
- handoff 機制
- 初始化
- 持鎖
-
- __mutex_lock_common
- __mutex_trylock
-
- __mutex_trylock_or_owner
- mutex_optimistic_spin
- 解鎖
kernel mutex 原理
mutex是可睡眠的鎖,通常用于保護對實時性要求不高的資料,結構和用法也并不複雜,但是實作原理還是有一些不好了解的地方。
本文基于kernel 4.19的代碼分析mutex的初始化/加鎖/解鎖流程,對于使用而言,并不需要去深入了解其原理,但是如果你的工作與系統穩定性相關,需要頻繁的接觸一些死鎖,panic問題,那麼就得非常熟悉常用的鎖結構了。
如果你的核心版本與此不同,那麼實作可能也會有所差異。分析時不考慮開啟死鎖檢測(lockdep)功能,不使用ww_acquire_ctx結構,不開啟CONFIG_DEBUG_MUTEXES。此版本為第一版的随手筆記,後續會進行修訂。
資料結構
完整的mutex定義如下:
struct mutex {
atomic_long_t owner;
spinlock_t wait_lock;
#ifdef CONFIG_MUTEX_SPIN_ON_OWNER
struct optimistic_spin_queue osq; /* Spinner MCS lock */
#endif
struct list_head wait_list;
#ifdef CONFIG_DEBUG_MUTEXES
void *magic;
#endif
#ifdef CONFIG_DEBUG_LOCK_ALLOC
struct lockdep_map dep_map;
#endif
};
在大部分系統上,會開啟
CONFIG_MUTEX_SPIN_ON_OWNER
,但是
CONFIG_DEBUG_MUTEXES
,以及
CONFIG_DEBUG_LOCK_ALLOC
并不會開啟,同時也不會使用
struct ww_acquire_ctx
結構,這可以極大地簡化mutex的流程。
對于我們而言,mutex的定義如下:
struct mutex {
atomic_long_t owner; //鎖的持有者會将自己的task_struct位址寫入,來表明自己持有鎖
//同時由于task_struct是cacheline對齊的,是以其最後3bit可以作為flag來使用
spinlock_t wait_lock; //用來保護結構的spinlock
struct optimistic_spin_queue osq; //qspinlock的修改版本,用于實作樂觀自旋,隻有持有osq才能在owner上樂觀自旋
struct list_head wait_list; //等待隊列的頭部,某些等鎖的程序會将自己加入該隊列等待通知可以持鎖
}
owner的flag:
#define MUTEX_FLAG_WAITERS 0x01 //表明等待隊列中有程序在等待
#define MUTEX_FLAG_HANDOFF 0x02 //用于告知持鎖者需要将鎖交給等待隊列隊首,而不是直接釋放
#define MUTEX_FLAG_PICKUP 0x04 //用于通知等待隊列隊首程序鎖交接完成
#define MUTEX_FLAGS 0x07 //掩碼
owner & MUTEX_FLAGS 也就得到了flag部分
owner & ~MUTEX_FLAGS 就可以擷取到持鎖者的task_struct
樂觀自旋和handoff機制
樂觀自旋
在早版本的mutex中,程序拿不到鎖就會将自己加入等待隊列沒然後等待持鎖者釋放鎖時将自己喚醒。如果持鎖者非常快就釋放鎖,等鎖仍然需要經曆睡眠-喚醒的開銷,并且會帶來額外的延遲,但是mutex的屬性又決定了等鎖不能一直占用cpu,是以使用了既要減少延遲又不過分占用cpu的折中機制,樂觀自旋。
如果了解spinlock就知道自旋本質就是循環等待,spinlock就是通過不停的循環檢測條件是否滿足,直到條件滿足才會退出循環,僞代碼如下:
while(1) {
if 條件滿足可以拿鎖
break;
}
這種自旋永遠不會讓出CPU,好處是實時性特别好,一旦别人釋放鎖等待者馬上可以持鎖,壞處也顯而易見,會占滿一個cpu用于等鎖。
而樂觀自旋則适用于對實時性要求不高的場合,樂觀指的是當持鎖者還在cpu上運作時,可以期待持鎖者很快就會釋放鎖,這時候我們可以占用cpu等待,但是占用時間又不能過長。等排程器告訴我們該放開cpu的時候,要及時的将自己排程走。這個機制僞代碼如下:
while(1) {
if 條件滿足
goto 持鎖
if 需要排程 | 持鎖者不在運作狀态
return 樂觀自旋失敗
}
加入等待隊列
讓出cpu,開始睡眠
osq_lock
osq_lock 是 MCS lock以及qspinlock的一種定制化,主要服務于mutex以及rw-sem兩種鎖,很少獨立使用,是以這裡介紹簡單一點。
他也是一種樂觀自旋機制,意味着滿足樂觀自旋條件(鎖者還在cpu上運作 && 排程器沒有要求我們讓出cpu)的時候,可以一直占用cpu進行等待。
如果不滿足自旋條件,退出,傳回失敗。
handoff 機制
handoff這裡應該翻譯為“接力”,它的出現是為了彌補樂觀自旋帶來的漏洞,既mutex并非先到先得。鎖大多被設計為先到先得,是為了實作公平,防止某些程序一直得不到資源被餓死。而樂觀自旋是為了減少程序切換的開銷,降低鎖帶來的延遲,但是樂觀自旋會打破先到先得的規則。
假設有A, B, C, D四個程序,程序A持有mutex,程序B也需要這把鎖,是以它開始樂觀自旋等待。但是它所在的CPU比較繁忙,很快排程器告訴它該放棄cpu了,是以B将自己加入等待隊列隊首,開始睡眠。此時C D依次開始等鎖,它們運氣比較好,都在樂觀自旋等待。
A釋放鎖時,按照規則應該将其交給B,但是B已經睡眠,将其喚醒拿鎖會有額外的開銷,于是A釋放鎖,讓樂觀自旋C的程序拿鎖,C釋放鎖時,D在樂觀自旋等待,此時如果仍然考慮開銷而将鎖交給D,那麼某些情況下B的等待時間會非常長。
是以A釋放鎖時,C拿到鎖,但是A仍然會喚醒B,B醒來發現自己還是不能持鎖,于是在鎖owner裡面加上
MUTEX_FLAG_HANDOFF
flag,告訴目前持鎖者 C,“我已經錯過一次了,下次不要直接釋放鎖讓别人去拿,而是把鎖給我”,這樣C釋放鎖時檢測到handoff flag會直接把鎖的owner設定成B,并且加入新的flag
MUTEX_FLAG_PICKUP
,相當于通知B “我已經把鎖給你了”,然後喚醒B。
#define MUTEX_FLAG_HANDOFF 0x02
#define MUTEX_FLAG_PICKUP 0x04
了解了handoff和樂觀自旋機制,閱讀後面的代碼就很容易了。
初始化
鎖的初始化主要有兩種,靜态和動态。靜态主要用于初始化例如全局變量和棧變量的mutex,這些變量無需考慮記憶體的申請和釋放。
動态初始化則一般用于結構體中内嵌的mutex,它們的記憶體一般來自于slub,需要動态的申請和銷毀。
靜态初始化:
owner初始化為0,wait_lock初始化為unlock狀态,wait_list初始化為空連結清單頭。
#define DEFINE_MUTEX(mutexname) \
struct mutex mutexname = __MUTEX_INITIALIZER(mutexname)
#define __MUTEX_INITIALIZER(lockname) \
{ .owner = ATOMIC_LONG_INIT(0) \
, .wait_lock = __SPIN_LOCK_UNLOCKED(lockname.wait_lock) \
, .wait_list = LIST_HEAD_INIT(lockname.wait_list) \
__DEBUG_MUTEX_INITIALIZER(lockname) \
__DEP_MAP_MUTEX_INITIALIZER(lockname) }
動态初始化:
與靜态的結果類似,但是需要預先申請mutex的記憶體。
void
__mutex_init(struct mutex *lock, const char *name, struct lock_class_key *key)
{
atomic_long_set(&lock->owner, 0);
spin_lock_init(&lock->wait_lock);
INIT_LIST_HEAD(&lock->wait_list);
#ifdef CONFIG_MUTEX_SPIN_ON_OWNER
osq_lock_init(&lock->osq);
#endif
...
}
持鎖
總體流程為:
1.如果owner是0,那就成功拿鎖,否則失敗,進lock_slowpath路徑
2.再次嘗試mutex_trylock持鎖,成功直接傳回
3.mutex_optimistic_spin試圖樂觀自旋等待,成功直接傳回
4.将自己加入等待隊列,進入睡眠
5.被喚醒,設定handoff flag,嘗試持鎖,失敗則進入樂觀自旋,再次失敗則繼續睡眠,重複這一步直到持鎖成功
6.成功之後将自身移出等待隊列
mutex_lock 是最常用的持鎖函數,隻是用來做個簡單的判斷。
void __sched mutex_lock(struct mutex *lock)
{
//需要排程則讓出cpu
might_sleep();
//核心是 atomic_long_try_cmpxchg_acquire(&lock->owner, &zero, curr)
//即如果owner是0,那就成功拿鎖,否則失敗,進入__mutex_lock_slowpath路徑
if (!__mutex_trylock_fast(lock))
__mutex_lock_slowpath(lock);
}
這是兩個過渡的函數,它的主要工作交給下一級,相關參數後再後面解析
static noinline void __sched
__mutex_lock_slowpath(struct mutex *lock)
{
__mutex_lock(lock, TASK_UNINTERRUPTIBLE, 0, NULL, _RET_IP_);
}
static int __sched
__mutex_lock(struct mutex *lock, long state, unsigned int subclass,
struct lockdep_map *nest_lock, unsigned long ip)
{
return __mutex_lock_common(lock, state, subclass, nest_lock, ip, NULL, false);
}
__mutex_lock_common
此函數是持鎖過程的核心函數,邏輯實作基本都在這個函數。
TASK_UNINTERRUPTIBLE
定義等鎖時程序的狀态,這裡是UN狀态,也就是常說的D狀态
_RET_IP_
是給編譯器使用的,這裡用來記錄調用此函數的調用者
nest_lock ww_ctx use_ww_ctx
這些功能以及debug都是不開啟的,我已經在代碼中将涉及的部分删除掉便于閱讀。如果你看到的代碼和我的不一樣,那就是删除了這部分導緻的。
static __always_inline int __sched
__mutex_lock_common(struct mutex *lock, long state, unsigned int subclass,
struct lockdep_map *nest_lock, unsigned long ip,
struct ww_acquire_ctx *ww_ctx, const bool use_ww_ctx)
{
struct mutex_waiter waiter;
bool first = false;
struct ww_mutex *ww;
int ret;
//需要排程那麼進入睡眠
might_sleep();
preempt_disable();
//trylock用于非阻塞的嘗試擷取鎖,如果成功那麼就可以不用等待了
//mutex_optimistic_spin是樂觀自旋的實作函數,傳回成功代表持鎖成功,失敗則是樂觀自旋條件不滿足,需要繼續處理
//這兩個函數後面都會有詳細的分析
//這裡先嘗試非阻塞持鎖,失敗則嘗試樂觀自旋等待,隻要一個成功則持鎖完成,可以傳回了
if (__mutex_trylock(lock) ||
mutex_optimistic_spin(lock, ww_ctx, use_ww_ctx, NULL)) {
preempt_enable();
return 0;
}
------------------ 嘗試持鎖和樂觀自旋都失敗了 --------------------------
spin_lock(&lock->wait_lock);
//因為spinlock可能會消耗一點時間,這段時間内所得狀态可能發生變化,再嘗試一次
if (__mutex_trylock(lock)) {
goto skip_wait;
}
//将自身使用尾插加入等待隊列,如果我們是等待隊列的隊首,那麼将owner加上MUTEX_FLAG_WAITERS标記,說明等待隊列現在不為空了
__mutex_add_waiter(lock, &waiter, &lock->wait_list);
waiter.task = current;
//将自身設定為UN狀态
set_current_state(state);
for (;;) {
//嘗試解鎖
if (__mutex_trylock(lock))
goto acquired;
//檢查信号情況,決定是否先處理信号
if (unlikely(signal_pending_state(state, current))) {
ret = -EINTR;
goto err;
}
spin_unlock(&lock->wait_lock);
//開搶占,然後将自己排程出去讓出cpu
//開始睡眠,之前使用set_current_state将自身設定為UN狀态,不會被排程器自動排程,需要等待其他程序喚醒
//被喚醒之後,關閉搶占,繼續向下運作
schedule_preempt_disabled();
//這裡就是被其他程序喚醒了
//first用來标記我們是否是等待隊列的隊首
//隊首程序被喚醒,那麼設定owner的MUTEX_FLAG_HANDOFF flag
if (!first) {
first = __mutex_waiter_is_first(lock, &waiter);
if (first)
__mutex_set_flag(lock, MUTEX_FLAG_HANDOFF);
}
set_current_state(state);
//再次嘗試擷取鎖,擷取失敗不直接進入睡眠,而是嘗試再次樂觀自旋
//一般而言被喚醒就應該是該持鎖的時候,但是樂觀自旋機制會讓其他程序有機會偷取鎖,這點會在解鎖時展現
if (__mutex_trylock(lock) || //condition_point(3)
(first && mutex_optimistic_spin(lock, ww_ctx, use_ww_ctx, &waiter)))
break;
//還是失敗,那麼繼續循環,留下MUTEX_FLAG_HANDOFF flag
spin_lock(&lock->wait_lock);
}
spin_lock(&lock->wait_lock);
//這裡意味着退出了循環成功擷取到鎖
acquired:
//設定為正常的R狀态,将自身移出等待隊列
__set_current_state(TASK_RUNNING);
mutex_remove_waiter(lock, &waiter, current);
if (likely(list_empty(&lock->wait_list)))
__mutex_clear_flag(lock, MUTEX_FLAGS);
skip_wait:
/* got the lock - cleanup and rejoice! */
lock_acquired(&lock->dep_map, ip);
spin_unlock(&lock->wait_lock);
preempt_enable();
return 0;
err:
__set_current_state(TASK_RUNNING);
mutex_remove_waiter(lock, &waiter, current);
err_early_kill:
spin_unlock(&lock->wait_lock);
preempt_enable();
return ret;
}
__mutex_trylock
用于非阻塞的嘗試持鎖。成功傳回true,否則傳回false。
static inline bool __mutex_trylock(struct mutex *lock)
{
return !__mutex_trylock_or_owner(lock);
}
__mutex_trylock_or_owner
非阻塞的嘗試持鎖,成功則傳回NULL,失敗傳回目前鎖的owner。
static inline struct task_struct *__mutex_trylock_or_owner(struct mutex *lock)
{
unsigned long owner, curr = (unsigned long)current;
//讀取owner
owner = atomic_long_read(&lock->owner);
for (;;) {
unsigned long old, flags = __owner_flags(owner);
unsigned long task = owner & ~MUTEX_FLAGS;
//task 為從owner中提取的持鎖者
//flags 即為owner中的flag部分
//如果task不為空,那麼說明有人持鎖了
if (task) { //condition_point(1)
//這兩條判斷是handoff功能的實作,如果觸發handoff機制,那麼上一個持鎖者會将鎖直接交接給我們,也就是将owner設定為我們的task_struct,并且在flag部分加上MUTEX_FLAG_PICKUP
//是以如果這兩條都成立,那麼我們設定的handoff起作用了,上個持鎖者直接将鎖交接給了我們
if (likely(task != curr))
break;
if (likely(!(flags & MUTEX_FLAG_PICKUP)))
break;
flags &= ~MUTEX_FLAG_PICKUP;
}
//到達此處,要麼handoff成功,要麼無人持鎖,總之我們可以嘗試持鎖了
flags &= ~MUTEX_FLAG_HANDOFF;
//如果lock->owner在這段時間内未發生改變,那麼我們将自己的task與設定的flag寫入,傳回持鎖成功
old = atomic_long_cmpxchg_acquire(&lock->owner, owner, curr | flags);
if (old == owner)
return NULL;
//如果發生了改變,重新嘗試一次
owner = old;
}
//break出循環,說明持鎖條件不滿足,那麼傳回鎖的目前owner
return __owner_task(owner);
}
mutex_optimistic_spin
用于實作樂觀自旋的主函數。進入這個函數時,有兩種身份。第一種我們沒有加入等待隊列,這是第一次嘗試了關系選,那我們是spiner。第二種我們加入了等待隊列,那我們是waiter。
對于沒有加入等待隊列的spiner,需要先競争osq_lock,同樣的,osq_lock也擁有樂觀自旋機制,競争到了這把鎖,才能樂觀自旋等待鎖的owner發生變化。
對于waiter,隻有隊首程序才會被喚醒,其餘程序都是D狀态沉睡,是以不存在大量競争的問題,可以直接在owner上spin等待。
函數流程的僞代碼大緻為:
while(1) {
嘗試持鎖
if 持鎖成功
return true
自旋等待owner發生變化
if 發生變化
continue
if 自旋條件不滿足
return false
}
static __always_inline bool
mutex_optimistic_spin(struct mutex *lock, struct ww_acquire_ctx *ww_ctx,
struct mutex_waiter *waiter)
{
//對于還沒有加入等待隊列的等待者,需要先競争osq_lock,防止過多的等待者競争owner
if (!waiter) {
//檢測樂觀自旋條件是否滿足,不滿足傳回false
if (!mutex_can_spin_on_owner(lock))
goto fail;
if (!osq_lock(&lock->osq))
goto fail;
}
//在循環中等待,owner每發生一次變化,就嘗試持鎖一次
//直到持鎖成功或者樂觀自旋條件不滿足,才退出循環
for (;;) {
struct task_struct *owner;
//嘗試非阻塞的持鎖,成功傳回NULL,否則傳回持鎖者的owner
owner = __mutex_trylock_or_owner(lock); //condition_point(2)
if (!owner)
break;
//等待owner發生變化,傳回true
//如果是自旋條件不滿足,自身時間用完需要被排程或者持鎖者進放棄了CPU,傳回false
if (!mutex_spin_on_owner(lock, owner, ww_ctx, waiter))
goto fail_unlock;
cpu_relax();
}
if (!waiter)
osq_unlock(&lock->osq);
return true;
fail_unlock:
if (!waiter)
osq_unlock(&lock->osq);
fail:
//如果程序是因為需要被排程才導緻樂觀自旋失敗,那麼需要做一些額外的處理
if (need_resched()) {
//排程器排程前需要将自身設定為R狀态
//開搶占,立刻讓出CPU,下次被排程回來時會傳回目前函數,然後關閉搶占,傳回false
__set_current_state(TASK_RUNNING);
schedule_preempt_disabled();
}
return false;
}
解鎖
解鎖分為兩種情況,1.之前存在等待隊列的程序被偷取了至少一次持鎖機會,該程序向owner寫入了handoff。2.正常的解鎖,釋放鎖自由競争決定下一個持鎖者。
不論如何,如果存在等待隊列,都會喚醒等待隊列的隊首程序,這樣該程序回去嘗試持鎖,其運作到condition_point(3)處 try_lock 失敗則設定handoff機制防止自己餓死。
存在handoff時:
1. 将隊首程序加入喚醒隊列
2. 将等待隊列的隊首的task_struct直接寫入到lock->owner,并且設定pickup flag
3. 喚醒等待隊列的隊首程序,
不存在handoff時:
1. 清除owner的task部分,這裡在owner上自旋的等待者就可以擷取鎖了
1. 如果等待隊列存在等待者,需要繼續處理,否則可以直接return
1. 将隊首程序加入喚醒隊列,喚醒等待隊列的隊首程序
void __sched mutex_unlock(struct mutex *lock)
{
//如果owner中沒有任何flag,意味着等待隊列中沒有成員,那麼直接将owner置0即可
//不用考慮自旋的等待者,他們沒有睡眠,可以自己處理自己的持鎖流程
if (__mutex_unlock_fast(lock))
return;
//否則就需要處理等待隊列
__mutex_unlock_slowpath(lock, _RET_IP_);
}
static noinline void __sched __mutex_unlock_slowpath(struct mutex *lock, unsigned long ip)
{
struct task_struct *next = NULL;
DEFINE_WAKE_Q(wake_q);
unsigned long owner;
//讀取目前的owner,owner是task | flag,目前task部分為目前程序的task_struct
owner = atomic_long_read(&lock->owner);
for (;;) {
unsigned long old;
//如果owner被設定了HANDOFF,有等待者已經錯過至少一次持鎖機會,為了防止其餓死,需要将鎖直接交給他
if (owner & MUTEX_FLAG_HANDOFF)
break;
------------------------- 無需處理HANDOFF的情況 ------------------------------
//清除owner的task部分,這裡在owner上自旋的等待者就可以擷取鎖了
//因為condition_point(1)處隻需要判斷目前owner的task部分為NULL就可以了
//為何要留下flag部分?因為新的持鎖者需要繼承flag部分防止flag丢失
old = atomic_long_cmpxchg_release(&lock->owner, owner,
__owner_flags(owner));
//設定成功,如果等待隊列存在等待者,需要繼續處理,否則可以直接結束了
if (old == owner) {
if (owner & MUTEX_FLAG_WAITERS)
break;
return;
}
owner = old;
}
spin_lock(&lock->wait_lock);
//等待隊列不為空,将隊首程序加入喚醒隊列
if (!list_empty(&lock->wait_list)) {
struct mutex_waiter *waiter =
list_first_entry(&lock->wait_list,
struct mutex_waiter, list);
next = waiter->task;
wake_q_add(&wake_q, next);
}
//需要處理handoff,将等待隊列的隊首的task_struct直接寫入到lock->owner,并且設定pickup flag
//意味着鎖不再通過競争擷取,直接交接給等待隊列隊首
if (owner & MUTEX_FLAG_HANDOFF)
__mutex_handoff(lock, next);
spin_unlock(&lock->wait_lock);
//喚醒等待隊列的隊首程序
wake_up_q(&wake_q);
}
加粗樣式