用Python实现读写锁的示例代码
起步
Python提供的多线程模型中并没有提供读写锁,读写锁相对于单纯的互斥锁,适用性更高,可以多个线程同时占用读模式的读写锁,但是只能一个线程占用写模式的读写锁。
通俗点说就是当没有写锁时,就可以加读锁且任意线程可以同时加;而写锁只能有一个线程,且必须在没有读锁时才能加上。
简单的实现
importthreading classRWlock(object): def__init__(self): self._lock=threading.Lock() self._extra=threading.Lock() self.read_num=0 defread_acquire(self): withself._extra: self.read_num+=1 ifself.read_num==1: self._lock.acquire() defread_release(self): withself._extra: self.read_num-=1 ifself.read_num==0: self._lock.release() defwrite_acquire(self): self._lock.acquire() defwrite_release(self): self._lock.release()
这是读写锁的一个简单的实现,self.read_num用来保存获得读锁的线程数,这个属性属于临界区,对其操作也要加锁,所以这里需要一个保护内部数据的额外的锁self._extra。
但是这个锁是不公平的。理想情况下,线程获得所的机会应该是一样的,不管线程是读操作还是写操作。而从上述代码可以看到,读请求都会立即设置self.read_num+=1,不管有没有获得锁,而写请求想要获得锁还得等待read_num为0。
所以这个就造成了只有锁没有被占用或者没有读请求时,可以获得写权限。我们应该想办法避免读模式锁长期占用。
读写锁的优先级
读写锁也有分读优先和写优先。上面的代码就属于读优先。
如果要改成写优先,那就换成去记录写线程的引用计数,读和写在同时竞争时,可以让写线程增加写的计数,这样可使读线程的读锁一直获取不到,因为读线程要先判断写的引用计数,若不为0,则等待其为0,然后进行读。这部分代码不罗列了。
但这样显然不够灵活。我们不需要两个相似的读写锁类。我们希望重构我们代码,使它更强大。
改进
为了能够满足自定义优先级的读写锁,要记录等待的读写线程数,并且需要两个条件threading.Condition用来处理哪方优先的通知。计数引用可以扩大语义:正数:表示正在读操作的线程数,负数:表示正在写操作的线程数(最多-1)
在获取读操作时,先然后判断时候有等待的写线程,没有,进行读操作,有,则等待读的计数加1后等待Condition通知;等待读的计数减1,计数引用加1,继续读操作,若条件不成立,循环等待;
在获取写操作时,若锁没有被占用,引用计数减1,若被占用,等待写线程数加1,等待写条件Condition的通知。
读模式和写模式的释放都是一样,需要根据判断去通知对应的Condition:
classRWLock(object): def__init__(self): self.lock=threading.Lock() self.rcond=threading.Condition(self.lock) self.wcond=threading.Condition(self.lock) self.read_waiter=0#等待获取读锁的线程数 self.write_waiter=0#等待获取写锁的线程数 self.state=0#正数:表示正在读操作的线程数负数:表示正在写操作的线程数(最多-1) self.owners=[]#正在操作的线程id集合 self.write_first=True#默认写优先,False表示读优先 defwrite_acquire(self,blocking=True): #获取写锁只有当 me=threading.get_ident() withself.lock: whilenotself._write_acquire(me): ifnotblocking: returnFalse self.write_waiter+=1 self.wcond.wait() self.write_waiter-=1 returnTrue def_write_acquire(self,me): #获取写锁只有当锁没人占用,或者当前线程已经占用 ifself.state==0or(self.state<0andmeinself.owners): self.state-=1 self.owners.append(me) returnTrue ifself.state>0andmeinself.owners: raiseRuntimeError('cannotrecursivelywrlockardlockedlock') returnFalse defread_acquire(self,blocking=True): me=threading.get_ident() withself.lock: whilenotself._read_acquire(me): ifnotblocking: returnFalse self.read_waiter+=1 self.rcond.wait() self.read_waiter-=1 returnTrue def_read_acquire(self,me): ifself.state<0: #如果锁被写锁占用 returnFalse ifnotself.write_waiter: ok=True else: ok=meinself.owners ifokornotself.write_first: self.state+=1 self.owners.append(me) returnTrue returnFalse defunlock(self): me=threading.get_ident() withself.lock: try: self.owners.remove(me) exceptValueError: raiseRuntimeError('cannotreleaseun-acquiredlock') ifself.state>0: self.state-=1 else: self.state+=1 ifnotself.state: ifself.write_waiterandself.write_first:#如果有写操作在等待(默认写优先) self.wcond.notify() elifself.read_waiter: self.rcond.notify_all() elifself.write_waiter: self.wcond.notify() read_release=unlock write_release=unlock
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。