天天看点

rw_semaphore 原理与代码分析

代码基于内核5.10版本

信号量的进阶形式,对读者写者进行区分,写者时互斥的,写者持锁时其余的写者和读者都只能等待。读者是允许并发的,读者持锁时允许其他的读者持锁,但是写者必须等待。

后续的分析不涉及死锁检测,也就是lockdep部分。

常用的API如下:

DECLARE_RWSEM(name) 声明名为name的读写信号量,并初始化它。
void init_rwsem(struct rw_semaphore *sem); 对读写信号量sem进行初始化。
void down_read(struct rw_semaphore *sem); 读者用来获取sem,若没获得时,则调用者睡眠等待。
void up_read(struct rw_semaphore *sem); 读者释放sem。
int down_read_trylock(struct rw_semaphore *sem); 读者尝试获取sem,如果获得返回1,如果没有获得返回0。可在中断上下文使用。
void down_write(struct rw_semaphore *sem); 写者用来获取sem,若没获得时,则调用者睡眠等待。
int down_write_trylock(struct rw_semaphore *sem); 写者尝试获取sem,如果获得返回1,如果没有获得返回0。可在中断上下文使用
void up_write(struct rw_semaphore *sem); 写者释放sem。
void downgrade_write(struct rw_semaphore *sem); 把写者降级为读者。

急用请看这

说实话,这部分机制还是比较难的,并且如果不是需要去深入研究其机制,完全没必要了解这么多。只需要简单debug,我这个给出一个快速的总结

struct rw_semaphore {
    atomic_long_t count; //用于记录锁的状态
    atomic_long_t owner; //持锁的写者或者其中一个读者
    struct optimistic_spin_queue osq; //用于乐观自旋
    raw_spinlock_t wait_lock; //保护wait_list结构的spinlock
    struct list_head wait_list; //等锁者链表
    void *magic; //魔术数,记录锁的地址,当其被改变时可以认为锁的数据结构被破坏
    struct lockdep_map dep_map; //死锁检测,暂不关注
}
count bit
 * Bit  0    - writer locked bit
 * Bit  1    - waiters present bit
 * Bit  2    - lock handoff bit
 * Bits 3-7  - reserved
 * Bits 8-62 - 55-bit reader count
 * Bit  63   - read fail bit
           

遇到读写锁相关问题,那大概率是发生在进程卡在rw_sem上了,这时候需要快速找到持锁者,可以从栈中拿到sem的地址以及数据,然后寻找真正的持锁者。注意owner最后几个bit需要清零才是task结构。

  1. magic不是锁的首地址,那不用看了,结构体被破坏,属于内存踩踏问题
  2. writer locked bit被置位,那肯定是写者持锁,找owner就可以找到持锁者。不要理会reader count是否存在,读者会先斩后奏,这部分不能代表读者持锁。
  3. writer locked bit为0,reader count等于1,这时候是读者持锁,可能没有完全持锁成功,即只写入了count还没来得及走完后面的流程。此时owner不一定准确,可能为NULL,也可能是读者中的一个。如果读者计数为1,并且owner是一个task地址,那么大概率就只有这个读者持锁。
  4. writer locked bit为0,读者计数大于1,此时owner为NULL是正常的,为读者中的一个也是正常的。这种情况是没有办法通过锁结构找完所有读者的。可以尝试开启相关debug功能。如果你正在使用使用crash tool分析一个内核尸体的话, search -t 锁地址 来寻找所有线程栈上可能残留的蛛丝马迹。没有收获,也没有bug功能,那就只能自己使用kprobe之类的工具来debug了。
  5. 一些奇奇怪怪的中间态:读者计数为0,写者没有持锁,存在等待者。这种情况要么锁当前没有问题,只是panic的时候恰巧碰到了这种状态,要么等待队列的队首进程状态异常,可能是自身非法调度引起,也可能是其所在cpu异常。

handoff机制

handoff这里应该翻译为“接力”,它的出现是为了弥补乐观自旋带来的漏洞,既并非先到先得。锁大多被设计为先到先得,是为了实现公平,防止某些进程一直得不到资源被饿死。而乐观自旋是为了减少进程切换的开销,降低锁带来的延迟,但是乐观自旋会打破先到先得的规则。

semaphore的handoff是为了防止偷锁的情况出现。等待队列的进程都是在睡眠的,尝试持锁的进程可以进入乐观自旋阶段,这个阶段无视等待队列一旦有机会就会获取锁,造成本该最先拿到锁的等待队列队首被跳过。队首被唤醒持锁,发现锁已经被偷取,这时候就会设置handoff bit,后面所有的持锁者看到这个bit就不能再偷锁了。因此队首进程可以顺利获取到锁。

设置时机:

对于读者,当读者在队首并且被唤醒尝试持锁失败之后,如果超时设置handoff。

对于写者,同样在队首,当自身是RT进程或者超时时设置handoff。

生效时机:

所有试图持锁者都需要检测是否有handoff标志位,一旦存在均会持锁失败。设置者会在持锁成功之后才清除这个标志位。

乐观自旋

乐观自旋是性能和速度的均衡考虑,让进程在满足一定的条件下循环等待某些数据,即期待持有锁的进程尽快释放锁所以占用cpu一段时间来死等不退出,避免自身陷入睡眠然后很快被唤醒。一段时间是指调度器没有标记当前进程需要被调度走。 伪代码如下:

while(1) {
	检测数据是否发生变变化
		是 退出,返回成功
	当前cpu时间片是否用尽?
		是 退出,返回失败
	当前需要等待的进程是否还在cpu上运行
		否 退出,返回失败
}
           

数据结构

实际生效的定义如下:
struct rw_semaphore {
    atomic_long_t count; //用于记录锁的状态
    atomic_long_t owner; //持锁的写者或者其中一个读者
    struct optimistic_spin_queue osq; //用于乐观自旋
    raw_spinlock_t wait_lock; //保护wait_list结构的spinlock
    struct list_head wait_list; //等锁者链表
    void *magic; //魔术数,记录锁的地址,当其被改变时可以认为锁的数据结构被破坏
    struct lockdep_map dep_map; //死锁检测,暂不关注
}
           

count

count用来表示当前锁的状态较为复杂,其定义如下:

* Bit  0    - writer locked bit
 * Bit  1    - waiters present bit
 * Bit  2    - lock handoff bit
 * Bits 3-7  - reserved
 * Bits 8-62 - 55-bit reader count
 * Bit  63   - read fail bit
           

bit 0用来表示是否有写者持锁,写者持锁成功的标志就是这个位被设置

bit 1表示等待队列中是否存在等待者,有等待者时读者持锁成功需要唤醒等待队列的其他读者,读写者释放锁时需要唤醒等待队列的队首进程

bit 2表示hand off bit,这个是为了防止饿死设计的。handoff bit会阻止所有的偷锁/持锁行为保证锁的交接。

bit 8-62 表示读者的数目,这个数目并不能真正的反应持锁成功的读者个数。某些情况下读者会采取先斩后奏的方式,先加上这个计数,然后再判断自己能否持锁,不行的话就减去恢复原来的计数。一般而言,当读者持锁时这个计数是真实的,但是写者持锁时也可能有读者计数,这部分就是先斩后奏的读者。

最高位63表示读者个数溢出位,几乎不会被设置,但是在某些路径还是可能会被检查

下面是用于判断count的一些掩码。

rw_semaphore 原理与代码分析
#define RWSEM_WRITER_LOCKED (1UL << 0)                            1
#define RWSEM_FLAG_WAITERS  (1UL << 1)                            2
#define RWSEM_FLAG_HANDOFF  (1UL << 2)                            4
#define RWSEM_FLAG_READFAIL (1UL << (BITS_PER_LONG - 1))          1<<63

#define RWSEM_READER_SHIFT  8
#define RWSEM_READER_BIAS   (1UL << RWSEM_READER_SHIFT)            1<<8                           
#define RWSEM_READER_MASK   (~(RWSEM_READER_BIAS - 1))             0xffff ffff ffff ff00
#define RWSEM_WRITER_MASK   RWSEM_WRITER_LOCKED                    1
#define RWSEM_LOCK_MASK     (RWSEM_WRITER_MASK|RWSEM_READER_MASK)  0xffff ffff ffff ff01
#define RWSEM_READ_FAILED_MASK  (RWSEM_WRITER_MASK|RWSEM_FLAG_WAITERS|\
                 RWSEM_FLAG_HANDOFF|RWSEM_FLAG_READFAIL) 0x8000 0000 0000 0007
           

osq

struct optimistic_spin_queue osq 成员用于乐观自旋,暂不做详细分析,其本质为mcslock的一种变体,当持锁者还在cpu上运行时,等锁者可以期待持锁者很快会释放这个锁,所以不会陷入睡眠而是自旋等待。反之持锁者已经放弃了cpu,那么预计很长一段时间内持锁者都不会释放锁,此时等锁者陷入睡眠不再忙等。

owner

owner用来记录当前持锁的task,由于task必定是cacheline对齐的目前最小的cacheline也有16bit,所以owner的低位可以用来表示一些flag。同时可以有多个reader持锁,因此绣着持锁时owner就是持锁者,但是读者持锁时owner有特殊表示。

#define RWSEM_READER_OWNED  (1UL << 0)                 bit 0 用来表示当前是reader持锁                                          
#define RWSEM_RD_NONSPINNABLE   (1UL << 1)             bit 1 表示是否允许读者自旋等待
#define RWSEM_WR_NONSPINNABLE   (1UL << 2)             bit 2 表示是否允许写者等待
#define RWSEM_NONSPINNABLE  (RWSEM_RD_NONSPINNABLE | RWSEM_WR_NONSPINNABLE)
#define RWSEM_OWNER_FLAGS_MASK  (RWSEM_READER_OWNED | RWSEM_NONSPINNABLE)
           

读者的owner一般由rwsem_set_reader_owned设置,为 current | RWSEM_READER_OWNED | (之前owner & RWSEM_RD_NONSPINNABLE)

写者的owner由rwsem_set_owner设置,一般为 current

wait_list

等待队列用来放置等待锁的继承,但是由于包含乐观自旋逻辑,等待队列不是全部的等待者,部分等待着可能在乐观自旋等待owner。

作为链表头,连接所有的struct rwsem_waiter.list

初始化

存在两种方式,一种是临时定义一个栈上的变量并且初始化。

  1. count初始化为0
  2. owner初始化为0
  3. wait_lock初始化为未加锁状态
  4. wait_list初始化为空链表头
  5. magic初始化为锁的起始地址
  6. osq lock初始化为未加锁
#define DECLARE_RWSEM(name) \                                                                 
    struct rw_semaphore name = __RWSEM_INITIALIZER(name)

#define __RWSEM_INITIALIZER(name)               \
    { __RWSEM_COUNT_INIT(name),             \
      .owner = ATOMIC_LONG_INIT(0),             \
      __RWSEM_OPT_INIT(name)                \
      .wait_lock = __RAW_SPIN_LOCK_UNLOCKED(name.wait_lock),\
      .wait_list = LIST_HEAD_INIT((name).wait_list),    \
      __RWSEM_DEBUG_INIT(name)              \
      __RWSEM_DEP_MAP_INIT(name) }

count原子变量初始化为0
#define RWSEM_UNLOCKED_VALUE        0L
#define __RWSEM_COUNT_INIT(name)    .count = ATOMIC_LONG_INIT(RWSEM_UNLOCKED_VALUE)

osq lock初始化为未加锁
#define __RWSEM_OPT_INIT(lockname) .osq = OSQ_LOCK_UNLOCKED,

magic初始化为锁地址
# define __RWSEM_DEBUG_INIT(lockname) .magic = &lockname,
           

还可以动态初始化,动态初始化的结果和静态一致。

#define init_rwsem(sem)                     \
do {                                \
    static struct lock_class_key __key;         \
                                \
    __init_rwsem((sem), #sem, &__key);          \
} while (0)

void __init_rwsem(struct rw_semaphore *sem, const char *name,
          struct lock_class_key *key)
{
#ifdef CONFIG_DEBUG_RWSEMS
    sem->magic = sem;
#endif
    atomic_long_set(&sem->count, RWSEM_UNLOCKED_VALUE);
    raw_spin_lock_init(&sem->wait_lock);
    INIT_LIST_HEAD(&sem->wait_list);
    atomic_long_set(&sem->owner, 0L);
#ifdef CONFIG_RWSEM_SPIN_ON_OWNER
    osq_lock_init(&sem->osq);
#endif
    trace_android_vh_rwsem_init(sem);
}
           

down_read 读者持锁

读者持锁的判定条件较为复杂,即使有写者持锁时,也可能向count增加RWSEM_READER_BIAS,增加总是能成功的,但是不代表持锁成功,增加之后一般会检测是否有writer以及handoff,失败需要回退,增加只是为了防止写者抢占。读者持锁的条件如下:

  1. 没有读者写者以及等待者,即count == 0,此时向count增加RWSEM_READER_BIAS并且设置owner
  2. 有其他读者持锁,向count增加RWSEM_READER_BIAS即算成功
  3. 有等待者但是没有写者,偷锁时向count增加RWSEM_READER_BIAS并且设置owner
  4. 被其他人唤醒时waiter.task被清除,说明别人已经替当前进程持锁了,当前进程只需要返回即可

note:

  1. 读者不管持锁是否成功都可能会增加RWSEM_READER_BIAS一段时间,因此不能用此标志位判断是否是读者持锁
  2. 持锁顺序不是先来后到,乐观自旋不管是读者还是写者都能通过rwsem_try_xxx_lock_unqueued无视等待队列偷锁
  3. 队首进程有特殊处理,如果队首是写者任何不会唤醒写者(! RWSEM_WAKE_ANY)的动作都会失败,如果队首是读者,唤醒读者的时候锁可能会被另一个写者被偷取,此时队首读者如果等待超时会设置handoff不再进行竞争持锁。
  4. 读者有两种持锁方式:在无人持锁以及乐观自旋的情况下是读者自己获取锁,陷入睡眠被唤醒的情况下别人已经设置好了count和owner,自身不需要持锁了,醒来检测自己的waiter.task被清空了即可退出。
  5. 读者持锁的owner:在无人持锁以及乐观自旋的情况下owner为第一个持锁的读者,如果是处于等待队列中被其他人协助唤醒的读者,是不会设置owner的。所以存在两种情况:1.多个读者,但是只有最先持锁的读者是owner。2.任意数目的读者,最先持锁的task已经释放锁并且清除owner,导致owner中的task为空。
void __sched down_read(struct rw_semaphore *sem)                                              
{
    //检测是否有其他高优先级任务需要抢占,如果有让出cpu
    might_sleep();
    //死锁检测
    rwsem_acquire_read(&sem->dep_map, 0, 0, _RET_IP_);
    //持锁流程
    LOCK_CONTENDED(sem, __down_read_trylock, __down_read);
} 

//如果穿入了trt_lock的接口,那么先尝试不需要等待的try_lock,成功就直接持锁退出
//否则调用会阻塞的lock接口
#define LOCK_CONTENDED(_lock, try, lock)            \                                         
do {                                \
    if (!try(_lock)) {                  \
        lock_contended(&(_lock)->dep_map, _RET_IP_);    \
        lock(_lock);                    \
    }                           \
    lock_acquired(&(_lock)->dep_map, _RET_IP_);         \
} while (0)

简化一下,流程为:
down_read 
{
    //尝试非阻塞持锁
    if(!__down_read_trylock)
        //尝试失败,进入阻塞流程
        __down_read
}

           

__down_read_trylock

读者尝试非阻塞的持锁,仅当count == 0,也就是没有reader也没有writer也没有waiter的情况下才能持锁成功。成功之后设置自己的task | RWSEM_READER_OWNED 到owner。

返回1表示成功,0表示失败。

__down_read_trylock
{
    if(sem->count == 0)
        sem->count += RWSEM_READER_BIAS(1 << 8)
        sem->owner &= current | RWSEM_READER_OWNED | RWSEM_RD_NONSPINNABLE
        return 1
    return 0
}
           
static inline int __down_read_trylock(struct rw_semaphore *sem)                               
{
    long tmp;
    //检测magic是否被改写
    DEBUG_RWSEMS_WARN_ON(sem->magic != sem, sem);

    //设置tmp为锁没有持有者也没有等待者的状态的值,为0
    tmp = RWSEM_UNLOCKED_VALUE;
    do {
        //判断sem->count == tmp
        //是,当前无人持锁无人等待,设置sem->count = tmp + RWSEM_READER_BIAS,返回1表示更新成功
        //否,锁不是空闲状态,更新count失败,返回0
        if (atomic_long_try_cmpxchg_acquire(&sem->count, &tmp,
                    tmp + RWSEM_READER_BIAS)) {
            //sem->owner &= current | RWSEM_READER_OWNED | RWSEM_RD_NONSPINNABLE
            rwsem_set_reader_owned(sem);
            //持锁成功返回1
            return 1;
        }
    } while (!(tmp & RWSEM_READ_FAILED_MASK));

    //尝试拿锁失败,返回0
    return 0;
}

           

__down_read

阻塞性的等锁直到持锁成功。

__down_read
{
    rwsem_read_trylock
    {
        sem->count += RWSEM_READER_BIAS
        return !(cnt & (RWSEM_WRITER_MASK|RWSEM_FLAG_WAITERS|RWSEM_FLAG_HANDOFF|RWSEM_FLAG_READFAIL))
    }

    if (!rwsem_read_trylock) {
        rwsem_down_read_slowpath
    } else {
        rwsem_set_reader_owned
    }

}
           
static inline void __down_read(struct rw_semaphore *sem)                                               
{
    //读者尝试持锁,0表示失败
    //不管失败成功与否,都会在count中增加读者的计数RWSEM_READER_BIAS。
    if (!rwsem_read_trylock(sem)) {
        //读者持锁失败,进入慢速路径
        rwsem_down_read_slowpath(sem, TASK_UNINTERRUPTIBLE);
        DEBUG_RWSEMS_WARN_ON(!is_rwsem_reader_owned(sem), sem);
    } else {
        //读者持锁成功,设置owner = current | RWSEM_READER_OWNED,并且继承之前设置的RWSEM_RD_NONSPINNABLE
        rwsem_set_reader_owned(sem);
    }
}

           

rwsem_read_trylock

读者尝试持锁,返回0表示失败,1表示成功。不管失败成功与否,都会在count中增加读者的计数RWSEM_READER_BIAS。

失败的原因有:

  1. 存在写者持锁
  2. 等待队列不为空
  3. 锁处于handoff交接状态
  4. 读者数目溢出
static inline bool rwsem_read_trylock(struct rw_semaphore *sem)
{
    //将count增加RWSEM_READER_BIAS,返回增加后的值
    //即读者做一次预抢占,先设置抢占值再检查是否能够抢占成功
    long cnt = atomic_long_add_return_acquire(RWSEM_READER_BIAS, &sem->count);
    //数值小于0表示bit 63被设置,即读者数目溢出了,禁止读写者在锁上乐观自旋等待
    if (WARN_ON_ONCE(cnt < 0))
        rwsem_set_nonspinnable(sem);
    //返回此时读者是否能够持有这个锁,如果存在写者或者存在等待者或者锁处于特殊的handoff状态,都会返回0,也就是失败
    return !(cnt & RWSEM_READ_FAILED_MASK);
}

           

rwsem_down_read_slowpath

读者直接持锁失败,进入慢速路径。

注意,在此函数之前读者已经将自己的标志位RWSEM_READER_BIAS加入了count。

rwsem_down_read_slowpath
{

    if(rwsem_can_spin_on_owner) {
    //判断自身是否需要调度,持锁者是否在运行,以及flag中有没有明确禁止当前身份的自旋
          减去之前rwsem_read_trylock中加上的RWSEM_READER_BIAS (2)
        atomic_long_add(-RWSEM_READER_BIAS, &sem->count); //减去之前rwsem_read_trylock中加上的RWSEM_READER_BIAS

        if(rwsem_optimistic_spin){ //乐观自旋,自旋等待成功会设置当前task为owner,设置count对应bit
               等待队列不为空?
                rwsem_mark_wake 唤醒等待队列其他读者
            return 持锁成功,退出
        } else if (rwsem_reader_phase_trylock()) //乐观自旋失败,再尝试一次持锁
            return 持锁成功,退出
    }

    乐观自旋彻底失败,加入等待队列
    初始化waiter结构,设置超时时间
    如果进程没有经历过乐观自旋(1)处设置的count一直在生效,会阻止写者持锁,当前不是写者持锁和handoff状态,那读者持锁成功了
        持锁成功,退出
   否则加上RWSEM_FLAG_WAITERS,将自身加入等待队列

   加入等待队列之后锁被释放,或者另一个读者持锁我们是队首,那么rwsem_mark_wake唤醒所有读者持锁包括我们自身
       持锁成功,退出

   while(1)
       等待waiter.task被清除

}

           
读者将自身加入等待队列时,会创建唤醒队列和等待队列结构
struct rwsem_waiter {
    struct list_head list; //链表头
    struct task_struct *task; //等待持锁是task,被唤醒时会清除这个成员
    enum rwsem_waiter_type type; //表明自身是等待读锁还是写锁
    unsigned long timeout; //保存超时时间,只对队首进程生效,超时之后并不会退出,而是设置handoff标志
    unsigned long last_rowner; //保存当前的持锁者
}


struct wake_q_head {
    struct wake_q_node *first; //初始化为WAKE_Q_TAIL
    struct wake_q_node **lastp; //初始化为自身first成员的地址
}
           
static struct rw_semaphore __sched *
rwsem_down_read_slowpath(struct rw_semaphore *sem, int state)
{
    //adjustment用来存储将要count增加或者减少的值
    //初始化为非0,在乐观自旋逻辑中清零,这样可以通过该值判断进程有没有经历过乐观自旋的逻辑
    long count, adjustment = -RWSEM_READER_BIAS;
    struct rwsem_waiter waiter;
    //初始化唤醒队列
    DEFINE_WAKE_Q(wake_q);
    bool wake = false;

    //保存当前owner信息
    waiter.last_rowner = atomic_long_read(&sem->owner);
    //如果当前不是读者持锁
    if (!(waiter.last_rowner & RWSEM_READER_OWNED))
        //在保存的owner中加上禁止读者自旋等待的flag
        waiter.last_rowner &= RWSEM_RD_NONSPINNABLE;

    //当前是否允许自旋等待,不允许则加入等待队列
    if (!rwsem_can_spin_on_owner(sem, RWSEM_RD_NONSPINNABLE))
        goto queue;

//乐观自旋等待的部分
---------------------------------------------------------------------- (1)
    //减去之前rwsem_read_trylock中加上的RWSEM_READER_BIAS
    atomic_long_add(-RWSEM_READER_BIAS, &sem->count);
    adjustment = 0;
    if (rwsem_optimistic_spin(sem, false)) {
    //自旋等待成功,成功设置owner为当前task,持有count
        //读者持锁成功,就可以将其他等待的读者唤醒了
        if ((atomic_long_read(&sem->count) & RWSEM_FLAG_WAITERS)) {
            raw_spin_lock_irq(&sem->wait_lock);
            if (!list_empty(&sem->wait_list))
                //唤醒队列中所有能唤醒的读者
                rwsem_mark_wake(sem, RWSEM_WAKE_READ_OWNED,                                            
                        &wake_q);
            raw_spin_unlock_irq(&sem->wait_lock);
            wake_up_q(&wake_q);
        }
        return sem;
    //乐观自旋失败,进行最后一次尝试
    } else if (rwsem_reader_phase_trylock(sem, waiter.last_rowner)) {
        return sem;
    }


//乐观自旋彻底失败,加入等待队列
---------------------------------------------------------------------- (2)
queue:
    waiter.task = current;
    waiter.type = RWSEM_WAITING_FOR_READ;
    waiter.timeout = jiffies + RWSEM_WAIT_TIMEOUT;

    raw_spin_lock_irq(&sem->wait_lock);
    //等待队列为空,当前进程为队首
    if (list_empty(&sem->wait_list)) {
        //adjustment!=0意味着没有经历过代码(1)部分的乐观自旋等待,即之前设置的count中的RWSEM_READER_BIAS没有被减去
        //当前不是写者持锁和handoff状态,那读者持锁成功了
        if (adjustment && !(atomic_long_read(&sem->count) &
             (RWSEM_WRITER_MASK | RWSEM_FLAG_HANDOFF))) {
            smp_acquire__after_ctrl_dep();
            raw_spin_unlock_irq(&sem->wait_lock);
            //count值前面已经增加过RWSEM_READER_BIAS了,现在不需要调整,设置owner之后就退出
            rwsem_set_reader_owned(sem);
            lockevent_inc(rwsem_rlock_fast);
            return sem;
        }

        //否则添加RWSEM_FLAG_WAITERS,表明等待队列非空,当前进程不是队首
        adjustment += RWSEM_FLAG_WAITERS;
    }
    //将自身加入等待队列
    list_add_tail(&waiter.list, &sem->wait_list);

    if (adjustment)
        count = atomic_long_add_return(adjustment, &sem->count);
    else                                                                                               
        count = atomic_long_read(&sem->count);

//加入等待队列完毕
----------------------------------------------------
    //加入等待队列之后情况如果发生改变,锁被释放了
    if (!(count & RWSEM_LOCK_MASK)) {
        clear_wr_nonspinnable(sem);
        wake = true;
    }

    //或者没有写者持锁并且我们是队首
    //那么唤醒并且协助队列中所有的读者持锁,包括我们自身
    //rwsem_mark_wake会清除所有被唤醒者的waiter.task
    if (wake || (!(count & RWSEM_WRITER_MASK) &&
            (adjustment & RWSEM_FLAG_WAITERS))) //adjustment被设置了RWSEM_FLAG_WAITERS意味着我们是队首
        rwsem_mark_wake(sem, RWSEM_WAKE_ANY, &wake_q);

    raw_spin_unlock_irq(&sem->wait_lock);
    wake_up_q(&wake_q);


//在队列中等待waiter.task被清除
----------------------------------------------------------
    for (;;) {
        set_current_state(state);
        //当waiter.task被清除时,说明我们持锁成功
        if (!smp_load_acquire(&waiter.task)) {
            /* Matches rwsem_mark_wake()'s smp_store_release(). */
            break;
        }
        //处理信号
...
        schedule();
        lockevent_inc(rwsem_sleep_reader);                                                             
    }

    __set_current_state(TASK_RUNNING);
    lockevent_inc(rwsem_rlock);
    return sem;

out_nolock:
    list_del(&waiter.list);
    if (list_empty(&sem->wait_list)) {
        atomic_long_andnot(RWSEM_FLAG_WAITERS|RWSEM_FLAG_HANDOFF,
                   &sem->count);
    }
    raw_spin_unlock_irq(&sem->wait_lock);
    __set_current_state(TASK_RUNNING);
    lockevent_inc(rwsem_rlock_fail);
    return ERR_PTR(-EINTR);
}

           
rwsem_can_spin_on_owner

检测是否能够乐观自旋。

static inline bool rwsem_can_spin_on_owner(struct rw_semaphore *sem,
                       unsigned long nonspinnable)
{
    struct task_struct *owner;
    unsigned long flags;
    bool ret = true;

    //当前任务被调度器标记为需要调度,那么不能自旋等待
    if (need_resched()) {
        return false;
    }
...
    //获取当前owner以及flag
    owner = rwsem_owner_flags(sem, &flags);
    //如果flag中禁止了当前身份的自旋或者当前是写者持锁并且写者放弃了cpu,自旋失败
    if ((flags & nonspinnable) ||
        (owner && !(flags & RWSEM_READER_OWNED) && !owner_on_cpu(owner)))
        ret = false;
...
    return ret;                                                                                        
}

           
rwsem_optimistic_spin

在owner上乐观自旋,第一步通过乐观自旋获取sem->osq,只有持有这个锁才能在sem->owner上自旋等待。返回自旋等待是否成功,如果是被迫退出自旋返回0.

自旋等待成功会设置当前task为owner。

rwsem_optimistic_spin
{
    osq_lock 获取乐观自旋所osq
        失败,退出
    while(1) {
        owner_state = rwsem_spin_on_owner 更新当前owner状态
        OWNER_SPINNABLE? -> break
        rwsem_try_xxx_lock_unqueued 
            调用不加入队列尝试获取锁的函数,此函数能够抢占等待队列的持锁机会
            reader只检测是否存在writer持锁和handoff
            writer只检测锁是否被人持有和handoff,两者均不关心waiter
              成功 -> break
        写者等待读者持锁的情况,读者太多不能确定每个读者是否都在线
            每16次循环检测超时时间
               超时 -> break
      RT任务不能被阻塞,读者持锁情况向读者太多不能确定每个读者是否都在线
           两次机会获取锁或者带写者持锁开始spin
    }

}
           
@sem   指向的指针锁
@wlock 决定是否是写者

函数中使用owner_state表示当前owner的状态
enum owner_state {
  OWNER_NULL = 1          owner为NULL
  OWNER_WRITER = 2        owner为写者
  OWNER_READER = 4        owner为读者
  OWNER_NONSPINNABLE = 8  当前不允许在owner上spin
}
#define OWNER_SPINNABLE     (OWNER_NULL | OWNER_WRITER | OWNER_READER)


static bool rwsem_optimistic_spin(struct rw_semaphore *sem, bool wlock)
{
    bool taken = false;
    int prev_owner_state = OWNER_NULL;
    int loop = 0;
    u64 rspin_threshold = 0;                                                                           
    unsigned long nonspinnable = wlock ? RWSEM_WR_NONSPINNABLE
                       : RWSEM_RD_NONSPINNABLE;

    preempt_disable();

    /* 先获取osqlock,osqlock也是一种乐观自旋机制,在soqlock上自旋失败,则不尝试在owner上自旋等待,直接退出 */
    if (!osq_lock(&sem->osq))
        goto done;

    //由于到达此步之前需要持有osq,因此即使有多个读写者抢锁也只能有一个自旋等待owner
    //乐观自旋等待owner发生变化时持锁
    for (;;) {
        enum owner_state owner_state;

        //自旋等待owner发生改变,返回改变之后的state
        owner_state = rwsem_spin_on_owner(sem, nonspinnable);

        //除OWNER_NONSPINNABLE之外都是允许继续spin的
        if (!(owner_state & OWNER_SPINNABLE))
            break;

        //根据身份调用不加入队列尝试获取锁的函数,不加入等待队列意味着有可能抢占等待队列进程的持锁机会
        //对于写者而言,等待count中既没有读者持锁也没有写者持锁,也不是handoff状态
        //对于读者而言,等待count中既没有没有写者持锁,也不是handoff状态
        //成功返回1,失败返回0
        taken = wlock ? rwsem_try_write_lock_unqueued(sem)
                  : rwsem_try_read_lock_unqueued(sem);

        //持锁成功,退出循环
        if (taken)
            break;


        //其他的读者可以直接持锁,写者需要等待所有读者释放锁才能持锁,写者无法弄清楚是否所有读者都在线
        //所以如果当前是写者等锁,并且读者持有锁,给自旋等待的时间加个限制
        if (wlock && (owner_state == OWNER_READER)) {
            //prev_owner_state初始化为OWNER_NULL,第一次此条件必定成立
            if (prev_owner_state != OWNER_READER) {
                //不允许写者在owner上spin,那么退出
                if (rwsem_test_oflags(sem, nonspinnable))
                    break;
                //计算写者的最大等待时间
                rspin_threshold = rwsem_rspin_threshold(sem);
                //循环次数清零
                loop = 0;并且已经超过了该读者的预定等
            }

            //每16次循环计算当前等待是否超时,如果超时设置RWSEM_NONSPINNABLE禁止所有的在owner上的spin
            else if (!(++loop & 0xf) && (sched_clock() > rspin_threshold)) {
                rwsem_set_nonspinnable(sem);
                lockevent_inc(rwsem_opt_nospin);
                break;
            }
        }

        //读者可能有很多,RT任务同样不能弄清楚每个读者的状态
        //上次不是写者持锁,则RT任务不再等待,读者一直持锁RT任务将有两次等待机会
        if (owner_state != OWNER_WRITER) {
            //调度器认为我们需要让出cpu,那么退出等待
            if (need_resched())
                break;
            //读者持锁的情况向RT任务有两次机会spin
            if (rt_task(current) &&
               (prev_owner_state != OWNER_WRITER))
                break;
        }
        //记录这次循环的持锁者,下次循环时会用到
        prev_owner_state = owner_state;
        cpu_relax();
    }
    osq_unlock(&sem->osq);
done:
    preempt_enable();
    lockevent_cond_inc(rwsem_opt_fail, !taken);
    return taken;
}

           

rwsem_spin_on_owner

​ 自旋等待owner或者flag发生改变,返回新的state

static noinline enum owner_state
rwsem_spin_on_owner(struct rw_semaphore *sem, unsigned long nonspinnable)
{
    struct task_struct *new, *owner;
    unsigned long flags, new_flags;
    enum owner_state state;                                                                                                                                                                   
    //获取owner
    owner = rwsem_owner_flags(sem, &flags);
    /* 获取onwer的状态,这里有4种情况
     * 1.owner中存在禁止当前身份spin的flag,比如当前是读者owner中存在RWSEM_RD_NONSPINNABLE,返回 OWNER_NONSPINNABLE
     * 2.当前是读者持锁,返回 OWNER_READER
     * 3.在等待的过程中所已经被释放了,返回 OWNER_NULL
     * 4.写者持锁,OWNER_WRITER */
    state = rwsem_owner_state(owner, flags, nonspinnable);

    //当前不是写者持锁,停止等待owner
    //因为读者持锁时间可能很长,OWNER_NONSPINNABLE意味着禁止自旋等待
    if (state != OWNER_WRITER)
        return state;

    rcu_read_lock();
    for (;;) {
        //读取新的flag和owner
        new = rwsem_owner_flags(sem, &new_flags);
        //如果owner或者flag发生了改变
        if ((new != owner) || (new_flags != flags)) {
            //重新获取stat,终止循环
            state = rwsem_owner_state(new, new_flags, nonspinnable);
            break;
        }
...
        //如果自选条件不满足
        if (need_resched() || !owner_on_cpu(owner)) {
            state = OWNER_NONSPINNABLE;
            break;
        }

        cpu_relax();
    }
    rcu_read_unlock();

    return state;
}

           
rwsem_try_xxx_lock_unqueued

不加入等待队列的尝试持锁,这此持锁没有判断等待队列是否有等待者,可能会抢占等待队列中进程的持锁机会,所以需要进行handoff的判断,避免等待着被强占次数过多饿死。

static inline bool rwsem_try_write_lock_unqueued(struct rw_semaphore *sem)                             
{
    long count = atomic_long_read(&sem->count);
    //写者等待无人持锁以及非handoff状态,之后尝试持锁
    //使用循环是为了给写者更多的持锁机会,读者持锁就只有一次判断
    while (!(count & (RWSEM_LOCK_MASK|RWSEM_FLAG_HANDOFF))) {
        if (atomic_long_try_cmpxchg_acquire(&sem->count, &count,
                    count | RWSEM_WRITER_LOCKED)) {
            rwsem_set_owner(sem);
            lockevent_inc(rwsem_opt_wlock);
            return true;
        }
    }
    return false;
}

           

前面读者实际上是有非阻塞持锁的函数的,rwsem_read_trylock,与此函数区别在于是否判断waiter。

static inline bool rwsem_try_read_lock_unqueued(struct rw_semaphore *sem)                              
{
    long count = atomic_long_read(&sem->count);
    //判断是否有写者持锁以及是否在handoff状态,读者持锁不影响另一个读者
    if (count & (RWSEM_WRITER_MASK | RWSEM_FLAG_HANDOFF))
        return false;

    //不在的话尝试持锁,将count + RWSEM_READER_BIAS
    count = atomic_long_fetch_add_acquire(RWSEM_READER_BIAS, &sem->count);
    //成功则开始设置owner
    if (!(count & (RWSEM_WRITER_MASK | RWSEM_FLAG_HANDOFF))) {
        rwsem_set_reader_owned(sem);
        lockevent_inc(rwsem_opt_rlock);
        return true;
    }

    //失败则回退自己设置的RWSEM_READER_BIAS
    atomic_long_add(-RWSEM_READER_BIAS, &sem->count);
    return false;
}

           
rwsem_mark_wake

唤醒等待队列的进程。

如果队首是写者但是唤醒类型不是RWSEM_WAKE_ANY,那么唤醒不了任何进程。

队首是读者,当前不是读者持锁唤醒其他读者,那么帮助这个读者提前占据锁。抢锁失败并且超时,就设置handoff位。

@wake_type 需要唤醒对象的类型
enum rwsem_wake_type {
    RWSEM_WAKE_ANY,     /* 唤醒队列的第一个等待者 */
    RWSEM_WAKE_READERS, /* 只唤醒读者 */
    RWSEM_WAKE_READ_OWNED   /* 读者持锁唤醒其他读者 */                                                             };
@wake_q 唤醒队列

static void rwsem_mark_wake(struct rw_semaphore *sem,                   
                enum rwsem_wake_type wake_type,
                struct wake_q_head *wake_q)
{
    struct rwsem_waiter *waiter, *tmp;
    long oldcount, woken = 0, adjustment = 0;
    struct list_head wlist;

    lockdep_assert_held(&sem->wait_lock);

    //如果第一个等待者是写者,并且唤醒对象是任意队首等待者,那么不唤醒所有进程进行竞争,只唤醒这个写者
    //如果第一个等待者是写者,但是唤醒对象是其他的,比如读者持锁唤醒其他读者或者仅限于唤醒读者的情况
    //直接返回不进行唤醒动作,因为第一个等待的写者应该最先被唤醒
    waiter = rwsem_first_waiter(sem);
    if (waiter->type == RWSEM_WAITING_FOR_WRITE) {
        if (wake_type == RWSEM_WAKE_ANY) {
            wake_q_add(wake_q, waiter->task);
            lockevent_inc(rwsem_wake_writer);
        }
        return;
    }

//此处之后处理队首是读者的情况
-------------------------------------------------------------------------
    //读者计数溢出处理
    if (unlikely(atomic_long_read(&sem->count) < 0))
        return;

    //不是读者持锁唤醒其他读者的情况,说明当前锁是未被人持有的状态,需要抢锁
    if (wake_type != RWSEM_WAKE_READ_OWNED) {
        struct task_struct *owner;

        //先构造一个虚假的读者持锁,帮助这个队首的读者提前获取锁
        adjustment = RWSEM_READER_BIAS;
        oldcount = atomic_long_fetch_add(adjustment, &sem->count);
        //抢锁失败了,写者偷到了锁,队首的读者错过了一次持锁机会,并且已经超过了该读者的预定等待时间
        //现在设置handoff,屏蔽所有竞争锁以及窃取锁的动作,让下个持锁者直接把锁交给队首的读者,退出
        if (unlikely(oldcount & RWSEM_WRITER_MASK)) {
            if (!(oldcount & RWSEM_FLAG_HANDOFF) &&
                time_after(jiffies, waiter->timeout)) {
                adjustment -= RWSEM_FLAG_HANDOFF;
                lockevent_inc(rwsem_rlock_handoff);
            }

            atomic_long_add(-adjustment, &sem->count);                                                                                                                                        
            return;
        }

        //帮助队首的读者抢锁成功了,设置队首进程为新的owner
        owner = waiter->task;
        if (waiter->last_rowner & RWSEM_RD_NONSPINNABLE) {
            owner = (void *)((unsigned long)owner | RWSEM_RD_NONSPINNABLE);
            lockevent_inc(rwsem_opt_norspin);
        }
        __rwsem_set_reader_owned(sem, owner);
    }

/*    
这里之后,队首是读者,持锁者也是读者,需要做的是唤醒队列中的其他读者
1.读者持锁唤醒其他读者,
2.帮助队首的读者获取了锁,现在需要继续唤醒剩余的读者
 */
-------------------------------------------------------------------
    //现在,持锁者是读者了,我们可以唤醒队列中至多MAX_READERS_WAKEUP个其他的读者,一般是1600个
    INIT_LIST_HEAD(&wlist);
    list_for_each_entry_safe(waiter, tmp, &sem->wait_list, list) {
        if (waiter->type == RWSEM_WAITING_FOR_WRITE)
            continue;
        woken++;
        list_move_tail(&waiter->list, &wlist);
        if (woken >= MAX_READERS_WAKEUP)
            break;
    }

    //增加需要唤醒的读者的计数,如果等待队列为空需要移除掉waiter标志
    adjustment = woken * RWSEM_READER_BIAS - adjustment;
    lockevent_cond_inc(rwsem_wake_reader, woken);
    if (list_empty(&sem->wait_list)) {
        adjustment -= RWSEM_FLAG_WAITERS;
    }

    //handoff只作用于队首进程,现在我们已经协助队首拿到锁了,清除它
    if (woken && (atomic_long_read(&sem->count) & RWSEM_FLAG_HANDOFF))
        adjustment -= RWSEM_FLAG_HANDOFF;

    //将增加的读者数目写入count
    if (adjustment)
        atomic_long_add(adjustment, &sem->count);

    /* 将需要唤醒的进程放入wake_q */
    list_for_each_entry_safe(waiter, tmp, &wlist, list) {
        struct task_struct *tsk;
        tsk = waiter->task;
        get_task_struct(tsk);
        //唤醒之前清除waiter->task
        smp_store_release(&waiter->task, NULL);
        wake_q_add_safe(wake_q, tsk);
    }
}

           

up_read 读者释放锁

如果owner是当前task,清除掉owner中task_struct部分,保留flag部分。

将count减去RWSEM_READER_BIAS,即减少一名读者计数。

若无人持锁并且等待队列不为空,调用rwsem_wake唤醒等待队列进程,最终盗用的是rwsem_mark_wake,唤醒类型为RWSEM_WAKE_ANY。

void up_read(struct rw_semaphore *sem)                                                                                                                                                        
{
    rwsem_release(&sem->dep_map, _RET_IP_);
    __up_read(sem);
}


static inline void __up_read(struct rw_semaphore *sem)                                                                                                                                        
{
    long tmp;

    DEBUG_RWSEMS_WARN_ON(sem->magic != sem, sem);
    DEBUG_RWSEMS_WARN_ON(!is_rwsem_reader_owned(sem), sem);
    
    //onwer是当前进程,将flag中的task_struct清除,保留flag
    rwsem_clear_reader_owned(sem);
    //减少读者数目
    tmp = atomic_long_add_return_release(-RWSEM_READER_BIAS, &sem->count);
    DEBUG_RWSEMS_WARN_ON(tmp < 0, sem);
    //当前无人持锁
    if (unlikely((tmp & (RWSEM_LOCK_MASK|RWSEM_FLAG_WAITERS)) ==
              RWSEM_FLAG_WAITERS)) {
        clear_wr_nonspinnable(sem);
        //唤醒等待队列
        rwsem_wake(sem, tmp);
    }
}

           

down_write 写者持锁

写者和其他写着以及读者互斥,因此需要没有任何人持锁的情况下才能获取锁。

写者持锁的条件比读者简单,只需要设置count的RWSEM_WRITER_LOCKED bit就算持锁成功。

读者会有其他人协助帮忙持锁,自身被唤醒只用检查waiter.task是否被清空,写者无人协助,被唤醒之后会自己去检查锁的状态,然后拿锁。

挟制持锁,owner必定是自身。

down_write 和读者持锁流程一样,都是先尝试__down_write_trylock失败则调用__down_write。

伪代码为:
if !__down_write_trylock
    __down_write

void __sched down_write(struct rw_semaphore *sem)
{
    might_sleep();
    rwsem_acquire(&sem->dep_map, 0, 0, _RET_IP_);
    LOCK_CONTENDED(sem, __down_write_trylock, __down_write);
}


           

__down_write_trylock

写者尝试非阻塞持锁。仅当count == 0时,将其设置为RWSEM_WRITER_LOCKED,并且设置owner为当前task。

static inline int __down_write_trylock(struct rw_semaphore *sem)
{
    long tmp;

    DEBUG_RWSEMS_WARN_ON(sem->magic != sem, sem);

    tmp  = RWSEM_UNLOCKED_VALUE;
    //如果count == 0,那么设置count = RWSEM_WRITER_LOCKED,即表明写者持锁
    if (atomic_long_try_cmpxchg_acquire(&sem->count, &tmp,
                        RWSEM_WRITER_LOCKED)) {                                                                                                              //设置owner = current                                 
        rwsem_set_owner(sem);
        return true;
    }
    return false;
}

           

__down_write

static inline void __down_write(struct rw_semaphore *sem)                                                                                                                                     
{
    long tmp = RWSEM_UNLOCKED_VALUE;
    //再尝试一次__down_write_trylock的流程
    if (unlikely(!atomic_long_try_cmpxchg_acquire(&sem->count, &tmp, RWSEM_WRITER_LOCKED)))
        //失败则进入慢速路径
        rwsem_down_write_slowpath(sem, TASK_UNINTERRUPTIBLE);
    else
        rwsem_set_owner(sem);
}

           

rwsem_down_write_slowpath

乐观自旋持锁以及加入等待队列死等的慢速路径。

rwsem_down_write_slowpath
{
    和读者类似的乐观自旋尝试
    加入等待队列
    不是队首,写者持锁什么都不干,读者持锁唤醒队列其他读者,无人持锁唤醒队列队首
    while(1) {
        rwsem_try_write_lock 尝试持锁,当前队首进程等待超时设置handoff
        当前进程设置了handoff,那么尝试在owner上乐观自旋
    }
}
           
wstate 用来表明当前写着的状态
WRITER_FIRST 当前是等待队列的队首
WRITER_NOT_FIRST 当前不是队首
WRITER_HANDOFF 当前进程已经等待超时或者当前时RT进程,允许在count中设置handoff

static struct rw_semaphore *
rwsem_down_write_slowpath(struct rw_semaphore *sem, int state)
{
    long count;
    bool disable_rspin;
    enum writer_wait_state wstate;
    struct rwsem_waiter waiter;
    struct rw_semaphore *ret = sem;
    DEFINE_WAKE_Q(wake_q);
    
    //尝试乐观自旋偷锁
    //和前面读者流程一致,不过是以写者身份spin,等待count中没有写者持锁,读者持锁,handoff标记
    if (rwsem_can_spin_on_owner(sem, RWSEM_WR_NONSPINNABLE) &&
        rwsem_optimistic_spin(sem, true)) {
        /* rwsem_optimistic_spin() implies ACQUIRE on success */
        return sem;
    }

    //获取禁止spin的flag
    disable_rspin = atomic_long_read(&sem->owner) & RWSEM_NONSPINNABLE;

    //初始化waiter,准备加入等待队列
    waiter.task = current;
    waiter.type = RWSEM_WAITING_FOR_WRITE;
    waiter.timeout = jiffies + RWSEM_WAIT_TIMEOUT;

    raw_spin_lock_irq(&sem->wait_lock);                            
    
    //当前进程是等待队列的队首吗?
    wstate = list_empty(&sem->wait_list) ? WRITER_FIRST : WRITER_NOT_FIRST;

    list_add_tail(&waiter.list, &sem->wait_list);

    //如果我们不是队首,需要处理下等待队列
    if (wstate == WRITER_NOT_FIRST) {
        count = atomic_long_read(&sem->count);
        //当前是写者持锁,没什么好做的慢慢等待即可
        if (count & RWSEM_WRITER_MASK)
            goto wait;
        
        //当前是读者持锁,尝试使用RWSEM_WAKE_READERS唤醒队列内的其他读者持锁
        //当前无人持锁,RWSEM_WAKE_READERS唤醒队列的队首
        rwsem_mark_wake(sem, (count & RWSEM_READER_MASK)
                    ? RWSEM_WAKE_READERS
                    : RWSEM_WAKE_ANY, &wake_q);
        
        //唤醒进程
        if (!wake_q_empty(&wake_q)) {                                                                                                                            raw_spin_unlock_irq(&sem->wait_lock);
            wake_up_q(&wake_q);
            wake_q_init(&wake_q);   /* Used again, reinit */
            raw_spin_lock_irq(&sem->wait_lock);
        }
    } else {
        //我们是队首,增加RWSEM_FLAG_WAITERS标志
        atomic_long_or(RWSEM_FLAG_WAITERS, &sem->count);
    }

wait:
    /* 死循环等锁 */
    set_current_state(state);
    for (;;) {
        //以写者的身份尝试获取锁,成功返回1
        if (rwsem_try_write_lock(sem, wstate)) {
            break;
        }

        raw_spin_unlock_irq(&sem->wait_lock);

        //如果当前进程允许设置handoff还是持锁失败,那么尝试在owner上自旋等待owner为NULL,避免睡眠
        //因为设置了handoff之后应该不会再有竞争者
        if (wstate == WRITER_HANDOFF &&
            rwsem_spin_on_owner(sem, RWSEM_NONSPINNABLE) == OWNER_NULL)
            goto trylock_again;

        /* Block until there are no active lockers. */
        for (;;) {
            ...
            //让出cpu,等待唤醒
            schedule();

//唤醒之后的节点,更新自身以及锁的状态,发生改变就进入外层循环再次尝试持锁,否则回来继续睡眠
------------------------------------------------------
            //当前进程能够设置handoff状态,马上退出内存循环到外层循环rwsem_try_write_lock给count设置handoff
            if (wstate == WRITER_HANDOFF)
                break;
            
            //当前不是队首进程,更新下自身状态看下是否已经成为新的队首
            if ((wstate == WRITER_NOT_FIRST) &&
                (rwsem_first_waiter(sem) == &waiter))
                wstate = WRITER_FIRST;
            
            //更新锁的状态,如果无人持锁马上去外层循环尝试持锁
            count = atomic_long_read(&sem->count);
            if (!(count & RWSEM_LOCK_MASK))
                break;

            //当前是队首并且是RT进程或者等待超时,那么允许设置handoff状态
            if ((wstate == WRITER_FIRST) && (rt_task(current) ||
                time_after(jiffies, waiter.timeout))) {
                wstate = WRITER_HANDOFF;
                lockevent_inc(rwsem_wlock_handoff);
                break;
            }
        }
trylock_again:
        raw_spin_lock_irq(&sem->wait_lock);
    }
    
    //持锁成功,恢复进程状态,将自身从等待队列删除
    __set_current_state(TASK_RUNNING);
    list_del(&waiter.list);
    rwsem_disable_reader_optspin(sem, disable_rspin);
    raw_spin_unlock_irq(&sem->wait_lock);
    lockevent_inc(rwsem_wlock);

    return ret;

out_nolock:
    __set_current_state(TASK_RUNNING);
    raw_spin_lock_irq(&sem->wait_lock);                                                                                                                                                       
    list_del(&waiter.list);

    if (unlikely(wstate == WRITER_HANDOFF))
        atomic_long_add(-RWSEM_FLAG_HANDOFF,  &sem->count);
    if (list_empty(&sem->wait_list))
        atomic_long_andnot(RWSEM_FLAG_WAITERS, &sem->count);
    else
        rwsem_mark_wake(sem, RWSEM_WAKE_ANY, &wake_q);
    raw_spin_unlock_irq(&sem->wait_lock);
    wake_up_q(&wake_q);
    lockevent_inc(rwsem_wlock_fail);

    return ERR_PTR(-EINTR);
}

           

rwsem_try_write_lock

写者尝试获取锁,主要用来处理写者的handoff的标记。

  1. 存在count handoff标记当前进程不是队首的情况下会直接失败。
  2. 有人持锁,根据wstate选择是否设置handoff,然后返回
  3. 无人持锁,清除handoff,然后持锁
@wstate 用来表明当前写着的状态
WRITER_FIRST 当前是等待队列的队首
WRITER_NOT_FIRST 当前不是队首
WRITER_HANDOFF 当前进程已经等待超时或者当前时RT进程,允许在count中设置handoff
static inline bool rwsem_try_write_lock(struct rw_semaphore *sem,
                    enum writer_wait_state wstate)                                                                                                                                            
{
    long count, new;

    lockdep_assert_held(&sem->wait_lock);
    
    //读取新的count
    count = atomic_long_read(&sem->count);
    do {
        //count中是否被设置了handoff bit?
        bool has_handoff = !!(count & RWSEM_FLAG_HANDOFF);
        
        //如果存在handoff那意味着锁会被交接给队首,当前如果不是队首,不可能获取到锁,因此直接返回失败
        if (has_handoff && wstate == WRITER_NOT_FIRST)
            return false;
        
        new = count;
        //当前仍然有人持锁
        if (count & RWSEM_LOCK_MASK) {
            //存在handoff或者当前进程暂时不能设置handoff,返回失败
            if (has_handoff || (wstate != WRITER_HANDOFF))
                return false;
            //我们可以设置handoff,加上handoff标记
            new |= RWSEM_FLAG_HANDOFF;
        } else {
            //无人持锁了,尝试持锁
            new |= RWSEM_WRITER_LOCKED;
            new &= ~RWSEM_FLAG_HANDOFF;
            //持锁成功如果等待队列为空,需要删除RWSEM_FLAG_WAITERS
            if (list_is_singular(&sem->wait_list))
                new &= ~RWSEM_FLAG_WAITERS;
        }
    //如果这期间count没有发生变化,那么将sem->count设置为new,返回更新是否成功,count被更新为sem->count
    } while (!atomic_long_try_cmpxchg_acquire(&sem->count, &count, new));

    if (new & RWSEM_FLAG_HANDOFF)
        return false;
    //设置owner == current
    rwsem_set_owner(sem);
    return true;
}


           

继续阅读