探究在C++程序并发时保护共享数据的问题
我们先通过一个简单的代码来了解该问题。
同步问题
我们使用一个简单的结构体Counter,该结构体包含一个值以及一个方法用来改变这个值:
structCounter{ intvalue; voidincrement(){ ++value; } };
然后启动多个线程来修改结构体的值:
intmain(){ Countercounter; std::vector<std::thread>threads; for(inti=0;i<5;++i){ threads.push_back(std::thread([&counter](){ for(inti=0;i<100;++i){ counter.increment(); } })); } for(auto&thread:threads){ thread.join(); } std::cout<<counter.value<<std::endl; return0; }
我们启动了5个线程来增加计数器的值,每个线程增加了100次,然后在线程结束时打印计数器的值。
但我们运行这个程序的时候,我们是希望它会答应500,但事实不是如此,没人能确切知道程序将打印什么结果,下面是在我机器上运行后打印的数据,而且每次都不同:
442 500 477 400 422 487
问题的原因在于改变计数器值并不是一个原子操作,需要经过下面三个操作才能完成一次计数器的增加:
- 首先读取value的值
- 然后将value值加1
- 将新的值赋值给value
但你使用单线程来运行这个程序的时候当然没有任何问题,因此程序是顺序执行的,但在多线程环境中就有麻烦了,想象下下面这个执行顺序:
- Thread1:读取value,得到0,加1,因此value=1
- Thread2:读取value,得到0,加1,因此value=1
- Thread1:将1赋值给value,然后返回1
- Thread2:将1赋值给value,然后返回1
这种情况我们称之为多线程的交错执行,也就是说多线程可能在同一个时间点执行相同的语句,尽管只有两个线程,交错的现象也很明显。如果你有更多的线程、更多的操作需要执行,那么这个交错是必然发生的。
有很多方法来解决线程交错的问题:
- 信号量Semaphores
- 原子引用Atomicreferences
- Monitors
- Conditioncodes
- Compareandswap
在这篇文章中我们将学习如何使用信号量来解决这个问题。信号量也有很多人称之为互斥量(Mutex),同一个时间只允许一个线程获取一个互斥对象的锁,通过Mutex的简单属性就可以用来解决交错的问题。
使用Mutex让计数器程序是线程安全的
在C++11线程库中,互斥量包含在mutex头文件中,对应的类是std::mutex,有两个重要的方法mutex:lock()和unlock(),从名字上可得知是用来锁对象以及释放锁对象。一旦某个互斥量被锁,那么再次调用lock()返回堵塞值得该对象被释放。
为了让我们刚才的计数器结构体是线程安全的,我们添加一个set:mutext成员,并在每个方法中通过lock()/unlock()方法来进行保护:
structCounter{ std::mutexmutex; intvalue; Counter():value(0){} voidincrement(){ mutex.lock(); ++value; mutex.unlock(); } };
然后我们再次测试这个程序,打印的结果就是500了,而且每次都一样。
异常和锁
现在让我们来看另外一种情况,想象我们的的计数器有一个减操作,并在值为0的时候抛出异常:
structCounter{ intvalue; Counter():value(0){} voidincrement(){ ++value; } voiddecrement(){ if(value==0){ throw"Valuecannotbelessthan0"; } --value; } };
然后我们不需要修改类来访问这个结构体,我们创建一个封装器:
structConcurrentCounter{ std::mutexmutex; Countercounter; voidincrement(){ mutex.lock(); counter.increment(); mutex.unlock(); } voiddecrement(){ mutex.lock(); counter.decrement(); mutex.unlock(); } };
大部分时候该封装器运行挺好,但是使用decrement方法的时候就会有异常发生。这是一个大问题,一旦异常发生后,unlock方法就没被调用,导致互斥量一直被占用,然后整个程序就一直处于堵塞状态(死锁),为了解决这个问题我们需要用try/catch结构来处理异常情况:
voiddecrement(){ mutex.lock(); try{ counter.decrement(); }catch(std::stringe){ mutex.unlock(); throwe; } mutex.unlock(); }
这个代码并不难,但看起来很丑,如果你一个函数有10个退出点,你就必须为每个退出点调用一次unlock方法,或许你可能在某个地方忘掉了unlock,那么各种悲剧即将发生,悲剧发生将直接导致程序死锁。
接下来我们看如何解决这个问题。
自动锁管理
当你需要包含整段的代码(在我们这里是一个方法,也可能是一个循环体或者其他的控制结构),有这么一种好的解决方法可以避免忘记释放锁,那就是std::lock_guard.
这个类是一个简单的智能锁管理器,但创建std::lock_guard时,会自动调用互斥量对象的lock()方法,当lock_guard析构时会自动释放锁,请看下面代码:
structConcurrentSafeCounter{ std::mutexmutex; Countercounter; voidincrement(){ std::lock_guard<std::mutex>guard(mutex); counter.increment(); } voiddecrement(){ std::lock_guard<std::mutex>guar(mutex); mutex.unlock(); } };
是不是看起来爽多了?
使用lock_guard,你不再需要考虑什么时候要释放锁,这个工作已经由std::lock_guard实例帮你完成。
结论
在这篇文章中我们学习了如何通过信号量/互斥量来保护共享数据。需要记住的是,使用锁会降低程序性能。在一些高并发的应用环境中有其他更好的解决办法,不过这不在本文的讨论范畴之内。
你可以在Github上获取本文的源码.