天天看点

第二章——读书笔记线程同步精要

文章目录

  • 线程同步精要
    • 2.1 互斥器(mutex)
      • 2.1.1 只使用非递归的mutex
      • 2.1.2 死锁
    • 2.2 条件变量
    • 2.3不要用读写锁和信号量
    • 2.4封装MutexLock,MutexLockGuard,Condition
    • 2.5线程安全的Singleton实现
    • 2.6 sleep()不是同步原语
    • 2.8 借shared_ptr实现copy_on_write

线程同步精要

线程同步的四项原则,按重要性排列:

  • 首要原则是尽量最低限度地共享对象,减少需要同步的场合。一个对象能不暴露给别的线程就不要暴露;如果要暴露,优先考虑

    immutable

    对象;实在不行才暴露可修改的对象,并用同步措施来充分保护。
  • 其次是使用高级的并发编程构件。
  • 最后不得已必须使用底层同步原语时,只使用非递归的互斥器和条件变量,慎用读写锁,不要用信号量。
  • 除了使用

    atomic

    整数之外,不自己编写

    lock-free

    代码,不要用内核级同步原语。

2.1 互斥器(mutex)

  • RAII

    手法封装

    mutex

    的创建,销毁,加锁,解锁这四个操作。
  • 只使用非递归的

    mutex

    (即不可重入的

    mutex

    )。
  • 不手工调用

    lock()

    unlock()

    函数,一切交给栈上的

    Gurad

    对象的构造和析构函数负责。

    Gurad

    对象的生命期正好等于临界区。
  • 在每次构造

    Guard

    对象的时候,思考一路上已经持有的锁,防止因加锁顺序不同而导致死锁。

2.1.1 只使用非递归的mutex

mutex

分为递归和非递归两种,这是

POSIX

的叫法,另外的名字是可重入与非可重入。它们唯一的区别在于:同一个线程可以重复对

recursive mutex

加锁,但是不能重复对

non-recursive mutex

加锁。

recursive mutex

可能会隐藏代码里的一些问题。典型情况是你以为拿到一个锁就能修改对象,没想到外层代码已经拿到了锁,正在修改同一个对象。

MutexLock mutex;
std::vector<Foo> foos;

void post(const Foo& f)
{
    MutexLockGuard lock(mutex);
    foos.push_back(f);
}

void traverse()
{
    MutexLockGuard lock(mutex);
    for(std::vector<Foo>::const_iterator it = foos.begin();
       it != foos.end(); ++it)
    {
        it->doit();
    }
}
           

post()

加锁,然后修改

foos

对象;

traverse()

加锁,然后遍历

foos

向量。

如果

Foo::doit()

间接调用了

post()

,那么会出现戏剧性的结果。

  1. mutex

    是非递归的,于是死锁。
  2. mutex

    是递归的,由于

    push_back()

    可能导致

    vector

    迭代器失效,程序偶尔会崩溃。

如果一个函数既可能在已加锁的情况下调用,有可能在未加锁的情况下调用,那么就拆成两个函数:

  1. 跟原来的函数同名,函数加锁,转而调用第2个函数。
  2. 给函数名加上后缀

    WithLockHold

    ,不加锁,把原来的函数体搬过来。

就像这样:

fvoid post(cost Foo& f)
{
    MutexLockGurad lock(mutex);
    postWithLockHold(f);
}

void postWithLockHold(const Foo& f)
{
    foos.push_back(f);
}
           

2.1.2 死锁

class Request
{
public:
	void process()
    {
        muduo::MutexLockGuard lock(mutex_);
        print();
    }
  	
    void print() const
    {
        muduo::MutexLockGuard lock(mutex_);
        //...
    }
    
private:
	mutable muduo::MutexLock mutex_;
};
int main()
{
    Request req;
    req.process();
}
           

上面这段代码,在第8行出现了死锁。是因为在调用

Request::process()

Request::print()

先后对同一个

mutex

上锁,引发了死锁。

要修复这个错误也很容易,从

Request::print()

抽取出

Request::printWithLockHold()

,并让

Request::print()

Request::process()

都调用它即可。

有一个

Inventory

的类,记录当前的

Request

对象。容易看出,下面这个

Inventory class

add()

remove()

成员都是线程安全的。他使用了

mutex

来保护共享数据。

class Inventory
{
public:
    void add(Request* req)
    {
		muduo::MutexLockGuard lock(mutex_);
        requests_.insert(req);
    }
    
    void remove(Request* req)
    {
        muduo::MutexLockGurad lock(mutex_);
        requests_.erase(req);
    }
    
    void printAll() const;
    
private:
    mutable muduo::MutexLock mutex_;
    std::set<Request*> requests_;
}
           

Request class

Inventory class

的交互逻辑很简单,在处理请求的时候,往

g_inventory

中添加自己,在析构的时候,从

g_inventory

中移除自己。

class Request
{
public:
    void process()
    {
        muduo::MutexLockGuard lock(mutex_);
        g_inventory.add(this);
    }
    ~Request()
    {
		muduo::MutexLockGuard lock(mutex_);
        sleep(1);
        g_inventory.remove(this);
    }
    void print() const
    {
        muduo::MutexLockGuard lock(mutex_);
        ...
    }
    
private:
    mutable muduo::MutexLock mutex_;
};
           

Inventory class

还有一个函数是打印全部已知的

Request

对象。

Inventory::printAll()

里的逻辑单独看没什么问题,但是它有可能引发死锁。

void Inventory::printAll() const
{
    muduo::MutexLockGuard lock(mutex_);
    sleep(1);
    for(std::set<Request*>::const_iterator it=requests_.begin();
       it != requests_.end();++it)
    {
        (*it)->print();
    }
    printf("Inventory::printAll() unlocked\n");
}
           

下面这个程序运行起来发生了死锁。

void threadFunc()
{
    Request* req = new Request;
    req->process();
    delete req;
}
int main()
{
	muduo::Thread thread(threadFunc);
    thread.start();
    usleep(500*1000);
    g_inventory.printAll();
    thread.join();
}
           

注意到,

main()

线程先调用

Inventory::printAll()

再调用

Request::print()

,而

threadFunc()

线程是先调用

Request::~Reques()

再调用

Inventory::remove

。这两个调用序列对两个

mutex

的加锁顺序正好相反,于是造成死锁。

解决死锁的方法很简单,要么把

print()

移出

printAll()

的临界区,要么把

remove()

移出

~Request()

的临界区。

这里也出现了对象析构的

race condition

,即一个线程正在析构对象,另一个线程却在调用它的成员函数。

2.2 条件变量

需要需要等待某个条件成立,应该使用条件变量。条件变量就是一个或多个线程等待某个布尔表达式为真,即等待别的线程“唤醒”它。

条件变量只有一种正确使用的方式,几乎不可能用错。对于

wait

端。

  1. 必须与

    mutex

    一起使用,该布尔表达式的读写需受

    mutex

    保护。
  2. mutex

    已上锁的时候才能调用

    wait()

  3. 把判断布尔条件和

    wait()

    放到

    while

    循环中。

写成代码是:

muduo::MutexLock mutex;
muduo::Condition cond(mutex);
std::deque<int> queue;

int dequeue()
{
    MutexLockGuard lock(mutex);
    while(queue.empty())
    {
        cond.wait();	//这一步会源自地unlock mutex并进入等待,不会与enqueue死锁
        //wait()执行完毕时会自动重新加锁
    }
    assert(!queue.empty());
    int top = queue.front();
    queue.pop_front();
    return top;
}
           

上面的代码中必须用

while

循环来等待条件变量,而不能使用

if

语句。原因是

spurious wakeup

对于

signal/broadcast

  1. 不一定要在

    mutex

    已上锁的情况下调用

    signal

  2. signal

    之前一般要修改布尔表达式。
  3. 修改布尔表达式通常要用

    mutex

    保护。
  4. 注意区分

    signal

    broadcast

写成代码是:

void enqueue(int x)
{
	MutexLockGurad lock(mutex);
    queue.push_back(x);
    cond.notify();
}
           

条件变量是非常底层的同步原语,很少直接使用,一般都是用它来实现高层的同步措施。

倒计时

CountDownLatch

是一种同步手段。它主要有两种用途。

  • 主线程发起多个子线程,等这些子线程各自都完成一定的任务之后,主线程才继续执行。通常用于主线程等待多个子线程完成初始化。
  • 主线程发起多个子线程,子线程都在等待主线程,主线程完成其他一些任务之后通知所有子线程开始执行。
class CountDownLatch : boost::noncopyable
{
public:
    explicit CountDownLatch(int count);		//倒数几次
    void wait();							//等待计数值变为0
    void countDown();						//计数减1
        
private:
    mutable MutexLock mutex_;
    Condition condition_;
    int count_;
};

void CountDownLatch::wait()
{
    MutexLockGuard lock(mutex_);
    while(count_ > 0)
        condition_.wait();
}
void CountDownLatch::countDown()
{
    MutexLockGuard lock(mutex_);
    --count_;
    if(count_ == 0)
        condition_.notifyAll();
}
           

2.3不要用读写锁和信号量

2.4封装MutexLock,MutexLockGuard,Condition

class MutexLock : boost::noncopyable
{
public:
    MutexLock():holder_(0)
    { pthread_mutex_init(&mutex_, NULL); }
    
    ~MutexLock()
    {
        assert(holder_ == 0);
        pthread_mutex_destroy(&mutex_);
    }
    
    bool isLockedBythisThread()
    { return holder_ == CurrentThread::tid(); }
    
    void assertLocked()
    { assert(isLockedByThisThread()); }
    
    void lock()
    {
        pthread_mutex_lock(&mutex);
        holder_ = CurrentThread::tid();
    }
    
    void unlock()
    {
		holder_ = 0;
        pthread_mutex_unlock(&mutex_);
    }
    
    pthread_mutex_t* getPthreadMutex()		//仅供Condition调用,严禁用户代码调用
    { return &mutex_; }
    
private:
    pthread_mutex_t mutex_;
    pid_t holder_;
};

class MutexLockGuard : boost::noncopyable
{
public:
    explicit MutexLockGuard(MutexLock& mutex) : mutex_(mutex)
    { mutex_:lock(); }
    
    ~MutexLockGuard()
    { mutex_.unlock(); }
    
 private:
    MutexLock& mutex_;
};
#define MutexLockGuard(x) static_assert(false, "missing mutex guard var name")
           

上面最后一行定义了一个宏,这个宏的作用是为了防止程序里出现下面的错误。

void doit()
{
    MutexLockGuard(mutex);	//遗漏变量名,产生一个临时对象又马上销毁了
    						//结果没有锁住临界区
    //正确的写法是 MutexLockGuard lock(mutex);
    //临界区
}
           

下面这个

muduo::Condition class

简单地封装了

Pthreads condition variable

,用起来也方便。

class Condition : boost::noncopyable
{
public:
    explicit Condition(MutexLock& mutex) : mutex_(mutex)
    { pthread_cond_init(&pcond_, NULL); }
    
    ~Condition(){ pthread_cond_destroy(&pcond_); }
    
    void wait() { pthread_cond_wait(&pcond_, mutex_.getPthreadMutex()); }
    void notify() { pthread_cond_signal(&pcond_); }
    void notifyAll() { pthread_cond_broadcast(&pcond_); }
    
private:
    MutexLock& mutex_;
    pthread_cond_t pcond_;
};
           

如果一个

class

要包含

MutexLock

Condition

,请注意它们的声明顺序和初始化顺序,

mutex_

应先于

condition_

构造,并作为后者的构造参数。

class CountDownLatch
{
public:
    CountDownLatch(int count):mutex_(),condition_(mutex_),count_(count){}
 
private:
    mutable MutexLock mutex_;	//顺序很重要,先mutex后condition
    Condition condition_;
    int count_;
};
           

2.5线程安全的Singleton实现

template<typename T>
class Singleton : boost::noncopyable
{
public:
    static T& instance()
    {
        pthread_once(&ponce_, &Singleton::init);
        return *value_;
    }

private:
	Singleton();
    ~Singleton();
    
    static void init()
    {
        value_ = new T();
    }
    
private:
    static pthread_once_t ponce_;
    static T* value_;
};

//必须在头文件中定义static变量
template<typename T>
pthread_once_t Singleton<T>::ponce_ = PTHREAD_ONCE_INIT;

template<typename T>
T* Singleton<T>::value_ = NULL;
           

上面这个

Singleton

没有任何花哨的技巧,使用

pthread_once_t

来保证

lazy-initialization

的线程安全。线程安全性由

Pthreads

库保证。

使用方法很简单。

Foo& foo = Singleton<Foo>::instance();
           

2.6 sleep()不是同步原语

在程序的正常执行中,如果需要等待一段已知的时间,应该往

event loop

里注册一个

timer

,然后在

timer

,然后在

timer

的回调函数里接着干活,因为线程是个珍贵的共享资源,不能轻易浪费。如果等待某个事件发生,那么应该采用条件变量或IO事件回调,不能用

sleep()

来轮询。

如果多线程的安全性和效率要靠代码主动调用

sleep

来保证,这显然是设计除了问题。等待某个事件发生,正确的做法是用

select()

等价物或

Condition

,或者高层同步工具。

2.8 借shared_ptr实现copy_on_write

解决2.1.1中的

post()

traverse()

死锁。

数据结构改成:

typedef std::vector<Foo> FooList;
typedef boost::shared_ptr<FooList> FooListPtr;
MutexLock mutex;
FooListPtr g_foos;
           

read

端,用一个栈上局部

FooListPtr

变量当做“观察者”,它使得

g_foos

的引用计数增加。

traverse()

函数的临界区是4到8行,临界区内只读了一次共享变量

g_foos

,比原来的写法大为缩短。而且多个线程同时调用

traverse()

也不会相互阻塞。

void traverse()
{
	FooListPtr foos;
    {
        MutexLockGuard lock(mutex);
        foo = g_foos;
        assert(!g_foos.unique());
    }
    for(std::vector<Foo>::const_iterator it = foos->begin();
       it != foo->end(); ++it)
    {
		it->doit();
    }
}
           

关键看

write

post()

如何写。按照前面的描述,如果

g_foos.unique()

true

,我们可以放心的在原地修改

FooList

,如果

g_foos.unique()

false

,这说明别的线程正在读取

FooList

,我们不能原地修改,而是复制一份,在副本上修改。这样就避免了死锁。

void post(const Foo& f)
{
    printf("post\n");
    MutexLockGuard lock(mutex);
    if(!g_foos.unique())
    {
        goo_foos.reset(new FooList(*g_foos));
        printf("copy the whole list\n");
    }
    assert(g_foos.unique());
    g_foos->push_back(f);
}
           

继续阅读