代码基于内核5.10版本
信号量的进阶形式,对读者写者进行区分,写者时互斥的,写者持锁时其余的写者和读者都只能等待。读者是允许并发的,读者持锁时允许其他的读者持锁,但是写者必须等待。
后续的分析不涉及死锁检测,也就是lockdep部分。
常用的API如下:
DECLARE_RWSEM(name) | 声明名为name的读写信号量,并初始化它。 |
---|---|
void init_rwsem(struct rw_semaphore *sem); | 对读写信号量sem进行初始化。 |
void down_read(struct rw_semaphore *sem); | 读者用来获取sem,若没获得时,则调用者睡眠等待。 |
void up_read(struct rw_semaphore *sem); | 读者释放sem。 |
int down_read_trylock(struct rw_semaphore *sem); | 读者尝试获取sem,如果获得返回1,如果没有获得返回0。可在中断上下文使用。 |
void down_write(struct rw_semaphore *sem); | 写者用来获取sem,若没获得时,则调用者睡眠等待。 |
int down_write_trylock(struct rw_semaphore *sem); | 写者尝试获取sem,如果获得返回1,如果没有获得返回0。可在中断上下文使用 |
void up_write(struct rw_semaphore *sem); | 写者释放sem。 |
void downgrade_write(struct rw_semaphore *sem); | 把写者降级为读者。 |
急用请看这
说实话,这部分机制还是比较难的,并且如果不是需要去深入研究其机制,完全没必要了解这么多。只需要简单debug,我这个给出一个快速的总结
struct rw_semaphore {
atomic_long_t count; //用于记录锁的状态
atomic_long_t owner; //持锁的写者或者其中一个读者
struct optimistic_spin_queue osq; //用于乐观自旋
raw_spinlock_t wait_lock; //保护wait_list结构的spinlock
struct list_head wait_list; //等锁者链表
void *magic; //魔术数,记录锁的地址,当其被改变时可以认为锁的数据结构被破坏
struct lockdep_map dep_map; //死锁检测,暂不关注
}
count bit
* Bit 0 - writer locked bit
* Bit 1 - waiters present bit
* Bit 2 - lock handoff bit
* Bits 3-7 - reserved
* Bits 8-62 - 55-bit reader count
* Bit 63 - read fail bit
遇到读写锁相关问题,那大概率是发生在进程卡在rw_sem上了,这时候需要快速找到持锁者,可以从栈中拿到sem的地址以及数据,然后寻找真正的持锁者。注意owner最后几个bit需要清零才是task结构。
- magic不是锁的首地址,那不用看了,结构体被破坏,属于内存踩踏问题
- writer locked bit被置位,那肯定是写者持锁,找owner就可以找到持锁者。不要理会reader count是否存在,读者会先斩后奏,这部分不能代表读者持锁。
- writer locked bit为0,reader count等于1,这时候是读者持锁,可能没有完全持锁成功,即只写入了count还没来得及走完后面的流程。此时owner不一定准确,可能为NULL,也可能是读者中的一个。如果读者计数为1,并且owner是一个task地址,那么大概率就只有这个读者持锁。
- writer locked bit为0,读者计数大于1,此时owner为NULL是正常的,为读者中的一个也是正常的。这种情况是没有办法通过锁结构找完所有读者的。可以尝试开启相关debug功能。如果你正在使用使用crash tool分析一个内核尸体的话, search -t 锁地址 来寻找所有线程栈上可能残留的蛛丝马迹。没有收获,也没有bug功能,那就只能自己使用kprobe之类的工具来debug了。
- 一些奇奇怪怪的中间态:读者计数为0,写者没有持锁,存在等待者。这种情况要么锁当前没有问题,只是panic的时候恰巧碰到了这种状态,要么等待队列的队首进程状态异常,可能是自身非法调度引起,也可能是其所在cpu异常。
handoff机制
handoff这里应该翻译为“接力”,它的出现是为了弥补乐观自旋带来的漏洞,既并非先到先得。锁大多被设计为先到先得,是为了实现公平,防止某些进程一直得不到资源被饿死。而乐观自旋是为了减少进程切换的开销,降低锁带来的延迟,但是乐观自旋会打破先到先得的规则。
semaphore的handoff是为了防止偷锁的情况出现。等待队列的进程都是在睡眠的,尝试持锁的进程可以进入乐观自旋阶段,这个阶段无视等待队列一旦有机会就会获取锁,造成本该最先拿到锁的等待队列队首被跳过。队首被唤醒持锁,发现锁已经被偷取,这时候就会设置handoff bit,后面所有的持锁者看到这个bit就不能再偷锁了。因此队首进程可以顺利获取到锁。
设置时机:
对于读者,当读者在队首并且被唤醒尝试持锁失败之后,如果超时设置handoff。
对于写者,同样在队首,当自身是RT进程或者超时时设置handoff。
生效时机:
所有试图持锁者都需要检测是否有handoff标志位,一旦存在均会持锁失败。设置者会在持锁成功之后才清除这个标志位。
乐观自旋
乐观自旋是性能和速度的均衡考虑,让进程在满足一定的条件下循环等待某些数据,即期待持有锁的进程尽快释放锁所以占用cpu一段时间来死等不退出,避免自身陷入睡眠然后很快被唤醒。一段时间是指调度器没有标记当前进程需要被调度走。 伪代码如下:
while(1) {
检测数据是否发生变变化
是 退出,返回成功
当前cpu时间片是否用尽?
是 退出,返回失败
当前需要等待的进程是否还在cpu上运行
否 退出,返回失败
}
数据结构
实际生效的定义如下:
struct rw_semaphore {
atomic_long_t count; //用于记录锁的状态
atomic_long_t owner; //持锁的写者或者其中一个读者
struct optimistic_spin_queue osq; //用于乐观自旋
raw_spinlock_t wait_lock; //保护wait_list结构的spinlock
struct list_head wait_list; //等锁者链表
void *magic; //魔术数,记录锁的地址,当其被改变时可以认为锁的数据结构被破坏
struct lockdep_map dep_map; //死锁检测,暂不关注
}
count
count用来表示当前锁的状态较为复杂,其定义如下:
* Bit 0 - writer locked bit
* Bit 1 - waiters present bit
* Bit 2 - lock handoff bit
* Bits 3-7 - reserved
* Bits 8-62 - 55-bit reader count
* Bit 63 - read fail bit
bit 0用来表示是否有写者持锁,写者持锁成功的标志就是这个位被设置
bit 1表示等待队列中是否存在等待者,有等待者时读者持锁成功需要唤醒等待队列的其他读者,读写者释放锁时需要唤醒等待队列的队首进程
bit 2表示hand off bit,这个是为了防止饿死设计的。handoff bit会阻止所有的偷锁/持锁行为保证锁的交接。
bit 8-62 表示读者的数目,这个数目并不能真正的反应持锁成功的读者个数。某些情况下读者会采取先斩后奏的方式,先加上这个计数,然后再判断自己能否持锁,不行的话就减去恢复原来的计数。一般而言,当读者持锁时这个计数是真实的,但是写者持锁时也可能有读者计数,这部分就是先斩后奏的读者。
最高位63表示读者个数溢出位,几乎不会被设置,但是在某些路径还是可能会被检查
下面是用于判断count的一些掩码。
#define RWSEM_WRITER_LOCKED (1UL << 0) 1
#define RWSEM_FLAG_WAITERS (1UL << 1) 2
#define RWSEM_FLAG_HANDOFF (1UL << 2) 4
#define RWSEM_FLAG_READFAIL (1UL << (BITS_PER_LONG - 1)) 1<<63
#define RWSEM_READER_SHIFT 8
#define RWSEM_READER_BIAS (1UL << RWSEM_READER_SHIFT) 1<<8
#define RWSEM_READER_MASK (~(RWSEM_READER_BIAS - 1)) 0xffff ffff ffff ff00
#define RWSEM_WRITER_MASK RWSEM_WRITER_LOCKED 1
#define RWSEM_LOCK_MASK (RWSEM_WRITER_MASK|RWSEM_READER_MASK) 0xffff ffff ffff ff01
#define RWSEM_READ_FAILED_MASK (RWSEM_WRITER_MASK|RWSEM_FLAG_WAITERS|\
RWSEM_FLAG_HANDOFF|RWSEM_FLAG_READFAIL) 0x8000 0000 0000 0007
osq
struct optimistic_spin_queue osq 成员用于乐观自旋,暂不做详细分析,其本质为mcslock的一种变体,当持锁者还在cpu上运行时,等锁者可以期待持锁者很快会释放这个锁,所以不会陷入睡眠而是自旋等待。反之持锁者已经放弃了cpu,那么预计很长一段时间内持锁者都不会释放锁,此时等锁者陷入睡眠不再忙等。
owner
owner用来记录当前持锁的task,由于task必定是cacheline对齐的目前最小的cacheline也有16bit,所以owner的低位可以用来表示一些flag。同时可以有多个reader持锁,因此绣着持锁时owner就是持锁者,但是读者持锁时owner有特殊表示。
#define RWSEM_READER_OWNED (1UL << 0) bit 0 用来表示当前是reader持锁
#define RWSEM_RD_NONSPINNABLE (1UL << 1) bit 1 表示是否允许读者自旋等待
#define RWSEM_WR_NONSPINNABLE (1UL << 2) bit 2 表示是否允许写者等待
#define RWSEM_NONSPINNABLE (RWSEM_RD_NONSPINNABLE | RWSEM_WR_NONSPINNABLE)
#define RWSEM_OWNER_FLAGS_MASK (RWSEM_READER_OWNED | RWSEM_NONSPINNABLE)
读者的owner一般由rwsem_set_reader_owned设置,为 current | RWSEM_READER_OWNED | (之前owner & RWSEM_RD_NONSPINNABLE)
写者的owner由rwsem_set_owner设置,一般为 current
wait_list
等待队列用来放置等待锁的继承,但是由于包含乐观自旋逻辑,等待队列不是全部的等待者,部分等待着可能在乐观自旋等待owner。
作为链表头,连接所有的struct rwsem_waiter.list
初始化
存在两种方式,一种是临时定义一个栈上的变量并且初始化。
- count初始化为0
- owner初始化为0
- wait_lock初始化为未加锁状态
- wait_list初始化为空链表头
- magic初始化为锁的起始地址
- osq lock初始化为未加锁
#define DECLARE_RWSEM(name) \
struct rw_semaphore name = __RWSEM_INITIALIZER(name)
#define __RWSEM_INITIALIZER(name) \
{ __RWSEM_COUNT_INIT(name), \
.owner = ATOMIC_LONG_INIT(0), \
__RWSEM_OPT_INIT(name) \
.wait_lock = __RAW_SPIN_LOCK_UNLOCKED(name.wait_lock),\
.wait_list = LIST_HEAD_INIT((name).wait_list), \
__RWSEM_DEBUG_INIT(name) \
__RWSEM_DEP_MAP_INIT(name) }
count原子变量初始化为0
#define RWSEM_UNLOCKED_VALUE 0L
#define __RWSEM_COUNT_INIT(name) .count = ATOMIC_LONG_INIT(RWSEM_UNLOCKED_VALUE)
osq lock初始化为未加锁
#define __RWSEM_OPT_INIT(lockname) .osq = OSQ_LOCK_UNLOCKED,
magic初始化为锁地址
# define __RWSEM_DEBUG_INIT(lockname) .magic = &lockname,
还可以动态初始化,动态初始化的结果和静态一致。
#define init_rwsem(sem) \
do { \
static struct lock_class_key __key; \
\
__init_rwsem((sem), #sem, &__key); \
} while (0)
void __init_rwsem(struct rw_semaphore *sem, const char *name,
struct lock_class_key *key)
{
#ifdef CONFIG_DEBUG_RWSEMS
sem->magic = sem;
#endif
atomic_long_set(&sem->count, RWSEM_UNLOCKED_VALUE);
raw_spin_lock_init(&sem->wait_lock);
INIT_LIST_HEAD(&sem->wait_list);
atomic_long_set(&sem->owner, 0L);
#ifdef CONFIG_RWSEM_SPIN_ON_OWNER
osq_lock_init(&sem->osq);
#endif
trace_android_vh_rwsem_init(sem);
}
down_read 读者持锁
读者持锁的判定条件较为复杂,即使有写者持锁时,也可能向count增加RWSEM_READER_BIAS,增加总是能成功的,但是不代表持锁成功,增加之后一般会检测是否有writer以及handoff,失败需要回退,增加只是为了防止写者抢占。读者持锁的条件如下:
- 没有读者写者以及等待者,即count == 0,此时向count增加RWSEM_READER_BIAS并且设置owner
- 有其他读者持锁,向count增加RWSEM_READER_BIAS即算成功
- 有等待者但是没有写者,偷锁时向count增加RWSEM_READER_BIAS并且设置owner
- 被其他人唤醒时waiter.task被清除,说明别人已经替当前进程持锁了,当前进程只需要返回即可
note:
- 读者不管持锁是否成功都可能会增加RWSEM_READER_BIAS一段时间,因此不能用此标志位判断是否是读者持锁
- 持锁顺序不是先来后到,乐观自旋不管是读者还是写者都能通过rwsem_try_xxx_lock_unqueued无视等待队列偷锁
- 队首进程有特殊处理,如果队首是写者任何不会唤醒写者(! RWSEM_WAKE_ANY)的动作都会失败,如果队首是读者,唤醒读者的时候锁可能会被另一个写者被偷取,此时队首读者如果等待超时会设置handoff不再进行竞争持锁。
- 读者有两种持锁方式:在无人持锁以及乐观自旋的情况下是读者自己获取锁,陷入睡眠被唤醒的情况下别人已经设置好了count和owner,自身不需要持锁了,醒来检测自己的waiter.task被清空了即可退出。
- 读者持锁的owner:在无人持锁以及乐观自旋的情况下owner为第一个持锁的读者,如果是处于等待队列中被其他人协助唤醒的读者,是不会设置owner的。所以存在两种情况:1.多个读者,但是只有最先持锁的读者是owner。2.任意数目的读者,最先持锁的task已经释放锁并且清除owner,导致owner中的task为空。
void __sched down_read(struct rw_semaphore *sem)
{
//检测是否有其他高优先级任务需要抢占,如果有让出cpu
might_sleep();
//死锁检测
rwsem_acquire_read(&sem->dep_map, 0, 0, _RET_IP_);
//持锁流程
LOCK_CONTENDED(sem, __down_read_trylock, __down_read);
}
//如果穿入了trt_lock的接口,那么先尝试不需要等待的try_lock,成功就直接持锁退出
//否则调用会阻塞的lock接口
#define LOCK_CONTENDED(_lock, try, lock) \
do { \
if (!try(_lock)) { \
lock_contended(&(_lock)->dep_map, _RET_IP_); \
lock(_lock); \
} \
lock_acquired(&(_lock)->dep_map, _RET_IP_); \
} while (0)
简化一下,流程为:
down_read
{
//尝试非阻塞持锁
if(!__down_read_trylock)
//尝试失败,进入阻塞流程
__down_read
}
__down_read_trylock
读者尝试非阻塞的持锁,仅当count == 0,也就是没有reader也没有writer也没有waiter的情况下才能持锁成功。成功之后设置自己的task | RWSEM_READER_OWNED 到owner。
返回1表示成功,0表示失败。
__down_read_trylock
{
if(sem->count == 0)
sem->count += RWSEM_READER_BIAS(1 << 8)
sem->owner &= current | RWSEM_READER_OWNED | RWSEM_RD_NONSPINNABLE
return 1
return 0
}
static inline int __down_read_trylock(struct rw_semaphore *sem)
{
long tmp;
//检测magic是否被改写
DEBUG_RWSEMS_WARN_ON(sem->magic != sem, sem);
//设置tmp为锁没有持有者也没有等待者的状态的值,为0
tmp = RWSEM_UNLOCKED_VALUE;
do {
//判断sem->count == tmp
//是,当前无人持锁无人等待,设置sem->count = tmp + RWSEM_READER_BIAS,返回1表示更新成功
//否,锁不是空闲状态,更新count失败,返回0
if (atomic_long_try_cmpxchg_acquire(&sem->count, &tmp,
tmp + RWSEM_READER_BIAS)) {
//sem->owner &= current | RWSEM_READER_OWNED | RWSEM_RD_NONSPINNABLE
rwsem_set_reader_owned(sem);
//持锁成功返回1
return 1;
}
} while (!(tmp & RWSEM_READ_FAILED_MASK));
//尝试拿锁失败,返回0
return 0;
}
__down_read
阻塞性的等锁直到持锁成功。
__down_read
{
rwsem_read_trylock
{
sem->count += RWSEM_READER_BIAS
return !(cnt & (RWSEM_WRITER_MASK|RWSEM_FLAG_WAITERS|RWSEM_FLAG_HANDOFF|RWSEM_FLAG_READFAIL))
}
if (!rwsem_read_trylock) {
rwsem_down_read_slowpath
} else {
rwsem_set_reader_owned
}
}
static inline void __down_read(struct rw_semaphore *sem)
{
//读者尝试持锁,0表示失败
//不管失败成功与否,都会在count中增加读者的计数RWSEM_READER_BIAS。
if (!rwsem_read_trylock(sem)) {
//读者持锁失败,进入慢速路径
rwsem_down_read_slowpath(sem, TASK_UNINTERRUPTIBLE);
DEBUG_RWSEMS_WARN_ON(!is_rwsem_reader_owned(sem), sem);
} else {
//读者持锁成功,设置owner = current | RWSEM_READER_OWNED,并且继承之前设置的RWSEM_RD_NONSPINNABLE
rwsem_set_reader_owned(sem);
}
}
rwsem_read_trylock
读者尝试持锁,返回0表示失败,1表示成功。不管失败成功与否,都会在count中增加读者的计数RWSEM_READER_BIAS。
失败的原因有:
- 存在写者持锁
- 等待队列不为空
- 锁处于handoff交接状态
- 读者数目溢出
static inline bool rwsem_read_trylock(struct rw_semaphore *sem)
{
//将count增加RWSEM_READER_BIAS,返回增加后的值
//即读者做一次预抢占,先设置抢占值再检查是否能够抢占成功
long cnt = atomic_long_add_return_acquire(RWSEM_READER_BIAS, &sem->count);
//数值小于0表示bit 63被设置,即读者数目溢出了,禁止读写者在锁上乐观自旋等待
if (WARN_ON_ONCE(cnt < 0))
rwsem_set_nonspinnable(sem);
//返回此时读者是否能够持有这个锁,如果存在写者或者存在等待者或者锁处于特殊的handoff状态,都会返回0,也就是失败
return !(cnt & RWSEM_READ_FAILED_MASK);
}
rwsem_down_read_slowpath
读者直接持锁失败,进入慢速路径。
注意,在此函数之前读者已经将自己的标志位RWSEM_READER_BIAS加入了count。
rwsem_down_read_slowpath
{
if(rwsem_can_spin_on_owner) {
//判断自身是否需要调度,持锁者是否在运行,以及flag中有没有明确禁止当前身份的自旋
减去之前rwsem_read_trylock中加上的RWSEM_READER_BIAS (2)
atomic_long_add(-RWSEM_READER_BIAS, &sem->count); //减去之前rwsem_read_trylock中加上的RWSEM_READER_BIAS
if(rwsem_optimistic_spin){ //乐观自旋,自旋等待成功会设置当前task为owner,设置count对应bit
等待队列不为空?
rwsem_mark_wake 唤醒等待队列其他读者
return 持锁成功,退出
} else if (rwsem_reader_phase_trylock()) //乐观自旋失败,再尝试一次持锁
return 持锁成功,退出
}
乐观自旋彻底失败,加入等待队列
初始化waiter结构,设置超时时间
如果进程没有经历过乐观自旋(1)处设置的count一直在生效,会阻止写者持锁,当前不是写者持锁和handoff状态,那读者持锁成功了
持锁成功,退出
否则加上RWSEM_FLAG_WAITERS,将自身加入等待队列
加入等待队列之后锁被释放,或者另一个读者持锁我们是队首,那么rwsem_mark_wake唤醒所有读者持锁包括我们自身
持锁成功,退出
while(1)
等待waiter.task被清除
}
读者将自身加入等待队列时,会创建唤醒队列和等待队列结构
struct rwsem_waiter {
struct list_head list; //链表头
struct task_struct *task; //等待持锁是task,被唤醒时会清除这个成员
enum rwsem_waiter_type type; //表明自身是等待读锁还是写锁
unsigned long timeout; //保存超时时间,只对队首进程生效,超时之后并不会退出,而是设置handoff标志
unsigned long last_rowner; //保存当前的持锁者
}
struct wake_q_head {
struct wake_q_node *first; //初始化为WAKE_Q_TAIL
struct wake_q_node **lastp; //初始化为自身first成员的地址
}
static struct rw_semaphore __sched *
rwsem_down_read_slowpath(struct rw_semaphore *sem, int state)
{
//adjustment用来存储将要count增加或者减少的值
//初始化为非0,在乐观自旋逻辑中清零,这样可以通过该值判断进程有没有经历过乐观自旋的逻辑
long count, adjustment = -RWSEM_READER_BIAS;
struct rwsem_waiter waiter;
//初始化唤醒队列
DEFINE_WAKE_Q(wake_q);
bool wake = false;
//保存当前owner信息
waiter.last_rowner = atomic_long_read(&sem->owner);
//如果当前不是读者持锁
if (!(waiter.last_rowner & RWSEM_READER_OWNED))
//在保存的owner中加上禁止读者自旋等待的flag
waiter.last_rowner &= RWSEM_RD_NONSPINNABLE;
//当前是否允许自旋等待,不允许则加入等待队列
if (!rwsem_can_spin_on_owner(sem, RWSEM_RD_NONSPINNABLE))
goto queue;
//乐观自旋等待的部分
---------------------------------------------------------------------- (1)
//减去之前rwsem_read_trylock中加上的RWSEM_READER_BIAS
atomic_long_add(-RWSEM_READER_BIAS, &sem->count);
adjustment = 0;
if (rwsem_optimistic_spin(sem, false)) {
//自旋等待成功,成功设置owner为当前task,持有count
//读者持锁成功,就可以将其他等待的读者唤醒了
if ((atomic_long_read(&sem->count) & RWSEM_FLAG_WAITERS)) {
raw_spin_lock_irq(&sem->wait_lock);
if (!list_empty(&sem->wait_list))
//唤醒队列中所有能唤醒的读者
rwsem_mark_wake(sem, RWSEM_WAKE_READ_OWNED,
&wake_q);
raw_spin_unlock_irq(&sem->wait_lock);
wake_up_q(&wake_q);
}
return sem;
//乐观自旋失败,进行最后一次尝试
} else if (rwsem_reader_phase_trylock(sem, waiter.last_rowner)) {
return sem;
}
//乐观自旋彻底失败,加入等待队列
---------------------------------------------------------------------- (2)
queue:
waiter.task = current;
waiter.type = RWSEM_WAITING_FOR_READ;
waiter.timeout = jiffies + RWSEM_WAIT_TIMEOUT;
raw_spin_lock_irq(&sem->wait_lock);
//等待队列为空,当前进程为队首
if (list_empty(&sem->wait_list)) {
//adjustment!=0意味着没有经历过代码(1)部分的乐观自旋等待,即之前设置的count中的RWSEM_READER_BIAS没有被减去
//当前不是写者持锁和handoff状态,那读者持锁成功了
if (adjustment && !(atomic_long_read(&sem->count) &
(RWSEM_WRITER_MASK | RWSEM_FLAG_HANDOFF))) {
smp_acquire__after_ctrl_dep();
raw_spin_unlock_irq(&sem->wait_lock);
//count值前面已经增加过RWSEM_READER_BIAS了,现在不需要调整,设置owner之后就退出
rwsem_set_reader_owned(sem);
lockevent_inc(rwsem_rlock_fast);
return sem;
}
//否则添加RWSEM_FLAG_WAITERS,表明等待队列非空,当前进程不是队首
adjustment += RWSEM_FLAG_WAITERS;
}
//将自身加入等待队列
list_add_tail(&waiter.list, &sem->wait_list);
if (adjustment)
count = atomic_long_add_return(adjustment, &sem->count);
else
count = atomic_long_read(&sem->count);
//加入等待队列完毕
----------------------------------------------------
//加入等待队列之后情况如果发生改变,锁被释放了
if (!(count & RWSEM_LOCK_MASK)) {
clear_wr_nonspinnable(sem);
wake = true;
}
//或者没有写者持锁并且我们是队首
//那么唤醒并且协助队列中所有的读者持锁,包括我们自身
//rwsem_mark_wake会清除所有被唤醒者的waiter.task
if (wake || (!(count & RWSEM_WRITER_MASK) &&
(adjustment & RWSEM_FLAG_WAITERS))) //adjustment被设置了RWSEM_FLAG_WAITERS意味着我们是队首
rwsem_mark_wake(sem, RWSEM_WAKE_ANY, &wake_q);
raw_spin_unlock_irq(&sem->wait_lock);
wake_up_q(&wake_q);
//在队列中等待waiter.task被清除
----------------------------------------------------------
for (;;) {
set_current_state(state);
//当waiter.task被清除时,说明我们持锁成功
if (!smp_load_acquire(&waiter.task)) {
/* Matches rwsem_mark_wake()'s smp_store_release(). */
break;
}
//处理信号
...
schedule();
lockevent_inc(rwsem_sleep_reader);
}
__set_current_state(TASK_RUNNING);
lockevent_inc(rwsem_rlock);
return sem;
out_nolock:
list_del(&waiter.list);
if (list_empty(&sem->wait_list)) {
atomic_long_andnot(RWSEM_FLAG_WAITERS|RWSEM_FLAG_HANDOFF,
&sem->count);
}
raw_spin_unlock_irq(&sem->wait_lock);
__set_current_state(TASK_RUNNING);
lockevent_inc(rwsem_rlock_fail);
return ERR_PTR(-EINTR);
}
rwsem_can_spin_on_owner
检测是否能够乐观自旋。
static inline bool rwsem_can_spin_on_owner(struct rw_semaphore *sem,
unsigned long nonspinnable)
{
struct task_struct *owner;
unsigned long flags;
bool ret = true;
//当前任务被调度器标记为需要调度,那么不能自旋等待
if (need_resched()) {
return false;
}
...
//获取当前owner以及flag
owner = rwsem_owner_flags(sem, &flags);
//如果flag中禁止了当前身份的自旋或者当前是写者持锁并且写者放弃了cpu,自旋失败
if ((flags & nonspinnable) ||
(owner && !(flags & RWSEM_READER_OWNED) && !owner_on_cpu(owner)))
ret = false;
...
return ret;
}
rwsem_optimistic_spin
在owner上乐观自旋,第一步通过乐观自旋获取sem->osq,只有持有这个锁才能在sem->owner上自旋等待。返回自旋等待是否成功,如果是被迫退出自旋返回0.
自旋等待成功会设置当前task为owner。
rwsem_optimistic_spin
{
osq_lock 获取乐观自旋所osq
失败,退出
while(1) {
owner_state = rwsem_spin_on_owner 更新当前owner状态
OWNER_SPINNABLE? -> break
rwsem_try_xxx_lock_unqueued
调用不加入队列尝试获取锁的函数,此函数能够抢占等待队列的持锁机会
reader只检测是否存在writer持锁和handoff
writer只检测锁是否被人持有和handoff,两者均不关心waiter
成功 -> break
写者等待读者持锁的情况,读者太多不能确定每个读者是否都在线
每16次循环检测超时时间
超时 -> break
RT任务不能被阻塞,读者持锁情况向读者太多不能确定每个读者是否都在线
两次机会获取锁或者带写者持锁开始spin
}
}
@sem 指向的指针锁
@wlock 决定是否是写者
函数中使用owner_state表示当前owner的状态
enum owner_state {
OWNER_NULL = 1 owner为NULL
OWNER_WRITER = 2 owner为写者
OWNER_READER = 4 owner为读者
OWNER_NONSPINNABLE = 8 当前不允许在owner上spin
}
#define OWNER_SPINNABLE (OWNER_NULL | OWNER_WRITER | OWNER_READER)
static bool rwsem_optimistic_spin(struct rw_semaphore *sem, bool wlock)
{
bool taken = false;
int prev_owner_state = OWNER_NULL;
int loop = 0;
u64 rspin_threshold = 0;
unsigned long nonspinnable = wlock ? RWSEM_WR_NONSPINNABLE
: RWSEM_RD_NONSPINNABLE;
preempt_disable();
/* 先获取osqlock,osqlock也是一种乐观自旋机制,在soqlock上自旋失败,则不尝试在owner上自旋等待,直接退出 */
if (!osq_lock(&sem->osq))
goto done;
//由于到达此步之前需要持有osq,因此即使有多个读写者抢锁也只能有一个自旋等待owner
//乐观自旋等待owner发生变化时持锁
for (;;) {
enum owner_state owner_state;
//自旋等待owner发生改变,返回改变之后的state
owner_state = rwsem_spin_on_owner(sem, nonspinnable);
//除OWNER_NONSPINNABLE之外都是允许继续spin的
if (!(owner_state & OWNER_SPINNABLE))
break;
//根据身份调用不加入队列尝试获取锁的函数,不加入等待队列意味着有可能抢占等待队列进程的持锁机会
//对于写者而言,等待count中既没有读者持锁也没有写者持锁,也不是handoff状态
//对于读者而言,等待count中既没有没有写者持锁,也不是handoff状态
//成功返回1,失败返回0
taken = wlock ? rwsem_try_write_lock_unqueued(sem)
: rwsem_try_read_lock_unqueued(sem);
//持锁成功,退出循环
if (taken)
break;
//其他的读者可以直接持锁,写者需要等待所有读者释放锁才能持锁,写者无法弄清楚是否所有读者都在线
//所以如果当前是写者等锁,并且读者持有锁,给自旋等待的时间加个限制
if (wlock && (owner_state == OWNER_READER)) {
//prev_owner_state初始化为OWNER_NULL,第一次此条件必定成立
if (prev_owner_state != OWNER_READER) {
//不允许写者在owner上spin,那么退出
if (rwsem_test_oflags(sem, nonspinnable))
break;
//计算写者的最大等待时间
rspin_threshold = rwsem_rspin_threshold(sem);
//循环次数清零
loop = 0;并且已经超过了该读者的预定等
}
//每16次循环计算当前等待是否超时,如果超时设置RWSEM_NONSPINNABLE禁止所有的在owner上的spin
else if (!(++loop & 0xf) && (sched_clock() > rspin_threshold)) {
rwsem_set_nonspinnable(sem);
lockevent_inc(rwsem_opt_nospin);
break;
}
}
//读者可能有很多,RT任务同样不能弄清楚每个读者的状态
//上次不是写者持锁,则RT任务不再等待,读者一直持锁RT任务将有两次等待机会
if (owner_state != OWNER_WRITER) {
//调度器认为我们需要让出cpu,那么退出等待
if (need_resched())
break;
//读者持锁的情况向RT任务有两次机会spin
if (rt_task(current) &&
(prev_owner_state != OWNER_WRITER))
break;
}
//记录这次循环的持锁者,下次循环时会用到
prev_owner_state = owner_state;
cpu_relax();
}
osq_unlock(&sem->osq);
done:
preempt_enable();
lockevent_cond_inc(rwsem_opt_fail, !taken);
return taken;
}
rwsem_spin_on_owner
自旋等待owner或者flag发生改变,返回新的state
static noinline enum owner_state
rwsem_spin_on_owner(struct rw_semaphore *sem, unsigned long nonspinnable)
{
struct task_struct *new, *owner;
unsigned long flags, new_flags;
enum owner_state state;
//获取owner
owner = rwsem_owner_flags(sem, &flags);
/* 获取onwer的状态,这里有4种情况
* 1.owner中存在禁止当前身份spin的flag,比如当前是读者owner中存在RWSEM_RD_NONSPINNABLE,返回 OWNER_NONSPINNABLE
* 2.当前是读者持锁,返回 OWNER_READER
* 3.在等待的过程中所已经被释放了,返回 OWNER_NULL
* 4.写者持锁,OWNER_WRITER */
state = rwsem_owner_state(owner, flags, nonspinnable);
//当前不是写者持锁,停止等待owner
//因为读者持锁时间可能很长,OWNER_NONSPINNABLE意味着禁止自旋等待
if (state != OWNER_WRITER)
return state;
rcu_read_lock();
for (;;) {
//读取新的flag和owner
new = rwsem_owner_flags(sem, &new_flags);
//如果owner或者flag发生了改变
if ((new != owner) || (new_flags != flags)) {
//重新获取stat,终止循环
state = rwsem_owner_state(new, new_flags, nonspinnable);
break;
}
...
//如果自选条件不满足
if (need_resched() || !owner_on_cpu(owner)) {
state = OWNER_NONSPINNABLE;
break;
}
cpu_relax();
}
rcu_read_unlock();
return state;
}
rwsem_try_xxx_lock_unqueued
不加入等待队列的尝试持锁,这此持锁没有判断等待队列是否有等待者,可能会抢占等待队列中进程的持锁机会,所以需要进行handoff的判断,避免等待着被强占次数过多饿死。
static inline bool rwsem_try_write_lock_unqueued(struct rw_semaphore *sem)
{
long count = atomic_long_read(&sem->count);
//写者等待无人持锁以及非handoff状态,之后尝试持锁
//使用循环是为了给写者更多的持锁机会,读者持锁就只有一次判断
while (!(count & (RWSEM_LOCK_MASK|RWSEM_FLAG_HANDOFF))) {
if (atomic_long_try_cmpxchg_acquire(&sem->count, &count,
count | RWSEM_WRITER_LOCKED)) {
rwsem_set_owner(sem);
lockevent_inc(rwsem_opt_wlock);
return true;
}
}
return false;
}
前面读者实际上是有非阻塞持锁的函数的,rwsem_read_trylock,与此函数区别在于是否判断waiter。
static inline bool rwsem_try_read_lock_unqueued(struct rw_semaphore *sem)
{
long count = atomic_long_read(&sem->count);
//判断是否有写者持锁以及是否在handoff状态,读者持锁不影响另一个读者
if (count & (RWSEM_WRITER_MASK | RWSEM_FLAG_HANDOFF))
return false;
//不在的话尝试持锁,将count + RWSEM_READER_BIAS
count = atomic_long_fetch_add_acquire(RWSEM_READER_BIAS, &sem->count);
//成功则开始设置owner
if (!(count & (RWSEM_WRITER_MASK | RWSEM_FLAG_HANDOFF))) {
rwsem_set_reader_owned(sem);
lockevent_inc(rwsem_opt_rlock);
return true;
}
//失败则回退自己设置的RWSEM_READER_BIAS
atomic_long_add(-RWSEM_READER_BIAS, &sem->count);
return false;
}
rwsem_mark_wake
唤醒等待队列的进程。
如果队首是写者但是唤醒类型不是RWSEM_WAKE_ANY,那么唤醒不了任何进程。
队首是读者,当前不是读者持锁唤醒其他读者,那么帮助这个读者提前占据锁。抢锁失败并且超时,就设置handoff位。
@wake_type 需要唤醒对象的类型
enum rwsem_wake_type {
RWSEM_WAKE_ANY, /* 唤醒队列的第一个等待者 */
RWSEM_WAKE_READERS, /* 只唤醒读者 */
RWSEM_WAKE_READ_OWNED /* 读者持锁唤醒其他读者 */ };
@wake_q 唤醒队列
static void rwsem_mark_wake(struct rw_semaphore *sem,
enum rwsem_wake_type wake_type,
struct wake_q_head *wake_q)
{
struct rwsem_waiter *waiter, *tmp;
long oldcount, woken = 0, adjustment = 0;
struct list_head wlist;
lockdep_assert_held(&sem->wait_lock);
//如果第一个等待者是写者,并且唤醒对象是任意队首等待者,那么不唤醒所有进程进行竞争,只唤醒这个写者
//如果第一个等待者是写者,但是唤醒对象是其他的,比如读者持锁唤醒其他读者或者仅限于唤醒读者的情况
//直接返回不进行唤醒动作,因为第一个等待的写者应该最先被唤醒
waiter = rwsem_first_waiter(sem);
if (waiter->type == RWSEM_WAITING_FOR_WRITE) {
if (wake_type == RWSEM_WAKE_ANY) {
wake_q_add(wake_q, waiter->task);
lockevent_inc(rwsem_wake_writer);
}
return;
}
//此处之后处理队首是读者的情况
-------------------------------------------------------------------------
//读者计数溢出处理
if (unlikely(atomic_long_read(&sem->count) < 0))
return;
//不是读者持锁唤醒其他读者的情况,说明当前锁是未被人持有的状态,需要抢锁
if (wake_type != RWSEM_WAKE_READ_OWNED) {
struct task_struct *owner;
//先构造一个虚假的读者持锁,帮助这个队首的读者提前获取锁
adjustment = RWSEM_READER_BIAS;
oldcount = atomic_long_fetch_add(adjustment, &sem->count);
//抢锁失败了,写者偷到了锁,队首的读者错过了一次持锁机会,并且已经超过了该读者的预定等待时间
//现在设置handoff,屏蔽所有竞争锁以及窃取锁的动作,让下个持锁者直接把锁交给队首的读者,退出
if (unlikely(oldcount & RWSEM_WRITER_MASK)) {
if (!(oldcount & RWSEM_FLAG_HANDOFF) &&
time_after(jiffies, waiter->timeout)) {
adjustment -= RWSEM_FLAG_HANDOFF;
lockevent_inc(rwsem_rlock_handoff);
}
atomic_long_add(-adjustment, &sem->count);
return;
}
//帮助队首的读者抢锁成功了,设置队首进程为新的owner
owner = waiter->task;
if (waiter->last_rowner & RWSEM_RD_NONSPINNABLE) {
owner = (void *)((unsigned long)owner | RWSEM_RD_NONSPINNABLE);
lockevent_inc(rwsem_opt_norspin);
}
__rwsem_set_reader_owned(sem, owner);
}
/*
这里之后,队首是读者,持锁者也是读者,需要做的是唤醒队列中的其他读者
1.读者持锁唤醒其他读者,
2.帮助队首的读者获取了锁,现在需要继续唤醒剩余的读者
*/
-------------------------------------------------------------------
//现在,持锁者是读者了,我们可以唤醒队列中至多MAX_READERS_WAKEUP个其他的读者,一般是1600个
INIT_LIST_HEAD(&wlist);
list_for_each_entry_safe(waiter, tmp, &sem->wait_list, list) {
if (waiter->type == RWSEM_WAITING_FOR_WRITE)
continue;
woken++;
list_move_tail(&waiter->list, &wlist);
if (woken >= MAX_READERS_WAKEUP)
break;
}
//增加需要唤醒的读者的计数,如果等待队列为空需要移除掉waiter标志
adjustment = woken * RWSEM_READER_BIAS - adjustment;
lockevent_cond_inc(rwsem_wake_reader, woken);
if (list_empty(&sem->wait_list)) {
adjustment -= RWSEM_FLAG_WAITERS;
}
//handoff只作用于队首进程,现在我们已经协助队首拿到锁了,清除它
if (woken && (atomic_long_read(&sem->count) & RWSEM_FLAG_HANDOFF))
adjustment -= RWSEM_FLAG_HANDOFF;
//将增加的读者数目写入count
if (adjustment)
atomic_long_add(adjustment, &sem->count);
/* 将需要唤醒的进程放入wake_q */
list_for_each_entry_safe(waiter, tmp, &wlist, list) {
struct task_struct *tsk;
tsk = waiter->task;
get_task_struct(tsk);
//唤醒之前清除waiter->task
smp_store_release(&waiter->task, NULL);
wake_q_add_safe(wake_q, tsk);
}
}
up_read 读者释放锁
如果owner是当前task,清除掉owner中task_struct部分,保留flag部分。
将count减去RWSEM_READER_BIAS,即减少一名读者计数。
若无人持锁并且等待队列不为空,调用rwsem_wake唤醒等待队列进程,最终盗用的是rwsem_mark_wake,唤醒类型为RWSEM_WAKE_ANY。
void up_read(struct rw_semaphore *sem)
{
rwsem_release(&sem->dep_map, _RET_IP_);
__up_read(sem);
}
static inline void __up_read(struct rw_semaphore *sem)
{
long tmp;
DEBUG_RWSEMS_WARN_ON(sem->magic != sem, sem);
DEBUG_RWSEMS_WARN_ON(!is_rwsem_reader_owned(sem), sem);
//onwer是当前进程,将flag中的task_struct清除,保留flag
rwsem_clear_reader_owned(sem);
//减少读者数目
tmp = atomic_long_add_return_release(-RWSEM_READER_BIAS, &sem->count);
DEBUG_RWSEMS_WARN_ON(tmp < 0, sem);
//当前无人持锁
if (unlikely((tmp & (RWSEM_LOCK_MASK|RWSEM_FLAG_WAITERS)) ==
RWSEM_FLAG_WAITERS)) {
clear_wr_nonspinnable(sem);
//唤醒等待队列
rwsem_wake(sem, tmp);
}
}
down_write 写者持锁
写者和其他写着以及读者互斥,因此需要没有任何人持锁的情况下才能获取锁。
写者持锁的条件比读者简单,只需要设置count的RWSEM_WRITER_LOCKED bit就算持锁成功。
读者会有其他人协助帮忙持锁,自身被唤醒只用检查waiter.task是否被清空,写者无人协助,被唤醒之后会自己去检查锁的状态,然后拿锁。
挟制持锁,owner必定是自身。
down_write 和读者持锁流程一样,都是先尝试__down_write_trylock失败则调用__down_write。
伪代码为:
if !__down_write_trylock
__down_write
void __sched down_write(struct rw_semaphore *sem)
{
might_sleep();
rwsem_acquire(&sem->dep_map, 0, 0, _RET_IP_);
LOCK_CONTENDED(sem, __down_write_trylock, __down_write);
}
__down_write_trylock
写者尝试非阻塞持锁。仅当count == 0时,将其设置为RWSEM_WRITER_LOCKED,并且设置owner为当前task。
static inline int __down_write_trylock(struct rw_semaphore *sem)
{
long tmp;
DEBUG_RWSEMS_WARN_ON(sem->magic != sem, sem);
tmp = RWSEM_UNLOCKED_VALUE;
//如果count == 0,那么设置count = RWSEM_WRITER_LOCKED,即表明写者持锁
if (atomic_long_try_cmpxchg_acquire(&sem->count, &tmp,
RWSEM_WRITER_LOCKED)) { //设置owner = current
rwsem_set_owner(sem);
return true;
}
return false;
}
__down_write
static inline void __down_write(struct rw_semaphore *sem)
{
long tmp = RWSEM_UNLOCKED_VALUE;
//再尝试一次__down_write_trylock的流程
if (unlikely(!atomic_long_try_cmpxchg_acquire(&sem->count, &tmp, RWSEM_WRITER_LOCKED)))
//失败则进入慢速路径
rwsem_down_write_slowpath(sem, TASK_UNINTERRUPTIBLE);
else
rwsem_set_owner(sem);
}
rwsem_down_write_slowpath
乐观自旋持锁以及加入等待队列死等的慢速路径。
rwsem_down_write_slowpath
{
和读者类似的乐观自旋尝试
加入等待队列
不是队首,写者持锁什么都不干,读者持锁唤醒队列其他读者,无人持锁唤醒队列队首
while(1) {
rwsem_try_write_lock 尝试持锁,当前队首进程等待超时设置handoff
当前进程设置了handoff,那么尝试在owner上乐观自旋
}
}
wstate 用来表明当前写着的状态
WRITER_FIRST 当前是等待队列的队首
WRITER_NOT_FIRST 当前不是队首
WRITER_HANDOFF 当前进程已经等待超时或者当前时RT进程,允许在count中设置handoff
static struct rw_semaphore *
rwsem_down_write_slowpath(struct rw_semaphore *sem, int state)
{
long count;
bool disable_rspin;
enum writer_wait_state wstate;
struct rwsem_waiter waiter;
struct rw_semaphore *ret = sem;
DEFINE_WAKE_Q(wake_q);
//尝试乐观自旋偷锁
//和前面读者流程一致,不过是以写者身份spin,等待count中没有写者持锁,读者持锁,handoff标记
if (rwsem_can_spin_on_owner(sem, RWSEM_WR_NONSPINNABLE) &&
rwsem_optimistic_spin(sem, true)) {
/* rwsem_optimistic_spin() implies ACQUIRE on success */
return sem;
}
//获取禁止spin的flag
disable_rspin = atomic_long_read(&sem->owner) & RWSEM_NONSPINNABLE;
//初始化waiter,准备加入等待队列
waiter.task = current;
waiter.type = RWSEM_WAITING_FOR_WRITE;
waiter.timeout = jiffies + RWSEM_WAIT_TIMEOUT;
raw_spin_lock_irq(&sem->wait_lock);
//当前进程是等待队列的队首吗?
wstate = list_empty(&sem->wait_list) ? WRITER_FIRST : WRITER_NOT_FIRST;
list_add_tail(&waiter.list, &sem->wait_list);
//如果我们不是队首,需要处理下等待队列
if (wstate == WRITER_NOT_FIRST) {
count = atomic_long_read(&sem->count);
//当前是写者持锁,没什么好做的慢慢等待即可
if (count & RWSEM_WRITER_MASK)
goto wait;
//当前是读者持锁,尝试使用RWSEM_WAKE_READERS唤醒队列内的其他读者持锁
//当前无人持锁,RWSEM_WAKE_READERS唤醒队列的队首
rwsem_mark_wake(sem, (count & RWSEM_READER_MASK)
? RWSEM_WAKE_READERS
: RWSEM_WAKE_ANY, &wake_q);
//唤醒进程
if (!wake_q_empty(&wake_q)) { raw_spin_unlock_irq(&sem->wait_lock);
wake_up_q(&wake_q);
wake_q_init(&wake_q); /* Used again, reinit */
raw_spin_lock_irq(&sem->wait_lock);
}
} else {
//我们是队首,增加RWSEM_FLAG_WAITERS标志
atomic_long_or(RWSEM_FLAG_WAITERS, &sem->count);
}
wait:
/* 死循环等锁 */
set_current_state(state);
for (;;) {
//以写者的身份尝试获取锁,成功返回1
if (rwsem_try_write_lock(sem, wstate)) {
break;
}
raw_spin_unlock_irq(&sem->wait_lock);
//如果当前进程允许设置handoff还是持锁失败,那么尝试在owner上自旋等待owner为NULL,避免睡眠
//因为设置了handoff之后应该不会再有竞争者
if (wstate == WRITER_HANDOFF &&
rwsem_spin_on_owner(sem, RWSEM_NONSPINNABLE) == OWNER_NULL)
goto trylock_again;
/* Block until there are no active lockers. */
for (;;) {
...
//让出cpu,等待唤醒
schedule();
//唤醒之后的节点,更新自身以及锁的状态,发生改变就进入外层循环再次尝试持锁,否则回来继续睡眠
------------------------------------------------------
//当前进程能够设置handoff状态,马上退出内存循环到外层循环rwsem_try_write_lock给count设置handoff
if (wstate == WRITER_HANDOFF)
break;
//当前不是队首进程,更新下自身状态看下是否已经成为新的队首
if ((wstate == WRITER_NOT_FIRST) &&
(rwsem_first_waiter(sem) == &waiter))
wstate = WRITER_FIRST;
//更新锁的状态,如果无人持锁马上去外层循环尝试持锁
count = atomic_long_read(&sem->count);
if (!(count & RWSEM_LOCK_MASK))
break;
//当前是队首并且是RT进程或者等待超时,那么允许设置handoff状态
if ((wstate == WRITER_FIRST) && (rt_task(current) ||
time_after(jiffies, waiter.timeout))) {
wstate = WRITER_HANDOFF;
lockevent_inc(rwsem_wlock_handoff);
break;
}
}
trylock_again:
raw_spin_lock_irq(&sem->wait_lock);
}
//持锁成功,恢复进程状态,将自身从等待队列删除
__set_current_state(TASK_RUNNING);
list_del(&waiter.list);
rwsem_disable_reader_optspin(sem, disable_rspin);
raw_spin_unlock_irq(&sem->wait_lock);
lockevent_inc(rwsem_wlock);
return ret;
out_nolock:
__set_current_state(TASK_RUNNING);
raw_spin_lock_irq(&sem->wait_lock);
list_del(&waiter.list);
if (unlikely(wstate == WRITER_HANDOFF))
atomic_long_add(-RWSEM_FLAG_HANDOFF, &sem->count);
if (list_empty(&sem->wait_list))
atomic_long_andnot(RWSEM_FLAG_WAITERS, &sem->count);
else
rwsem_mark_wake(sem, RWSEM_WAKE_ANY, &wake_q);
raw_spin_unlock_irq(&sem->wait_lock);
wake_up_q(&wake_q);
lockevent_inc(rwsem_wlock_fail);
return ERR_PTR(-EINTR);
}
rwsem_try_write_lock
写者尝试获取锁,主要用来处理写者的handoff的标记。
- 存在count handoff标记当前进程不是队首的情况下会直接失败。
- 有人持锁,根据wstate选择是否设置handoff,然后返回
- 无人持锁,清除handoff,然后持锁
@wstate 用来表明当前写着的状态
WRITER_FIRST 当前是等待队列的队首
WRITER_NOT_FIRST 当前不是队首
WRITER_HANDOFF 当前进程已经等待超时或者当前时RT进程,允许在count中设置handoff
static inline bool rwsem_try_write_lock(struct rw_semaphore *sem,
enum writer_wait_state wstate)
{
long count, new;
lockdep_assert_held(&sem->wait_lock);
//读取新的count
count = atomic_long_read(&sem->count);
do {
//count中是否被设置了handoff bit?
bool has_handoff = !!(count & RWSEM_FLAG_HANDOFF);
//如果存在handoff那意味着锁会被交接给队首,当前如果不是队首,不可能获取到锁,因此直接返回失败
if (has_handoff && wstate == WRITER_NOT_FIRST)
return false;
new = count;
//当前仍然有人持锁
if (count & RWSEM_LOCK_MASK) {
//存在handoff或者当前进程暂时不能设置handoff,返回失败
if (has_handoff || (wstate != WRITER_HANDOFF))
return false;
//我们可以设置handoff,加上handoff标记
new |= RWSEM_FLAG_HANDOFF;
} else {
//无人持锁了,尝试持锁
new |= RWSEM_WRITER_LOCKED;
new &= ~RWSEM_FLAG_HANDOFF;
//持锁成功如果等待队列为空,需要删除RWSEM_FLAG_WAITERS
if (list_is_singular(&sem->wait_list))
new &= ~RWSEM_FLAG_WAITERS;
}
//如果这期间count没有发生变化,那么将sem->count设置为new,返回更新是否成功,count被更新为sem->count
} while (!atomic_long_try_cmpxchg_acquire(&sem->count, &count, new));
if (new & RWSEM_FLAG_HANDOFF)
return false;
//设置owner == current
rwsem_set_owner(sem);
return true;
}