python基于mysql实现的简单队列以及跨进程锁实例详解
通常在我们进行多进程应用开发的过程中,不可避免的会遇到多个进程访问同一个资源(临界资源)的状况,这时候必须通过加一个全局性的锁,来实现资源的同步访问(即:同一时间里只能有一个进程访问资源)。
举个例子如下:
假设我们用mysql来实现一个任务队列,实现的过程如下:
1.在Mysql中创建Job表,用于储存队列任务,如下:
createtablejobs( idauto_incrementnotnullprimarykey, messagetextnotnull, job_statusnotnulldefault0 );
message用来存储任务信息,job_status用来标识任务状态,假设只有两种状态,0:在队列中,1:已出队列
2.有一个生产者进程,往job表中放新的数据,进行排队:
insertintojobs(message)values('msg1');
3.假设有多个消费者进程,从job表中取排队信息,要做的操作如下:
select*fromjobswherejob_status=0orderbyidasclimit1; updatejobssetjob_status=1whereid=?;--id为刚刚取得的记录id
4.如果没有跨进程的锁,两个消费者进程有可能同时取到重复的消息,导致一个消息被消费多次。这种情况是我们不希望看到的,于是,我们需要实现一个跨进程的锁。
=========================分割线=======================================
说到跨进程的锁实现,我们主要有几种实现方式:
(1)信号量
(2)文件锁fcntl
(3)socket(端口号绑定)
(4)signal
这几种方式各有利弊,总体来说前2种方式可能多一点,这里我就不详细说了,大家可以去查阅资料。
查资料的时候发现mysql中有锁的实现,适用于对于性能要求不是很高的应用场景,大并发的分布式访问可能会有瓶颈.
对此用python实现了一个demo,如下:
文件名:glock.py
#!/usr/bin/envpython2.7 # #-*-coding:utf-8-*- # #Desc: # importlogging,time importMySQLdb classGlock: def__init__(self,db): self.db=db def_execute(self,sql): cursor=self.db.cursor() try: ret=None cursor.execute(sql) ifcursor.rowcount!=1: logging.error("Multiplerowsreturnedinmysqllockfunction.") ret=None else: ret=cursor.fetchone() cursor.close() returnret exceptException,ex: logging.error("Executesql\"%s\"failed!Exception:%s",sql,str(ex)) cursor.close() returnNone deflock(self,lockstr,timeout): sql="SELECTGET_LOCK('%s',%s)"%(lockstr,timeout) ret=self._execute(sql) ifret[0]==0: logging.debug("Anotherclienthaspreviouslylocked'%s'.",lockstr) returnFalse elifret[0]==1: logging.debug("Thelock'%s'wasobtainedsuccessfully.",lockstr) returnTrue else: logging.error("Erroroccurred!") returnNone defunlock(self,lockstr): sql="SELECTRELEASE_LOCK('%s')"%(lockstr) ret=self._execute(sql) ifret[0]==0: logging.debug("Thelock'%s'thelockisnotreleased(thelockwasnotestablishedbythisthread).",lockstr) returnFalse elifret[0]==1: logging.debug("Thelock'%s'thelockwasreleased.",lockstr) returnTrue else: logging.error("Thelock'%s'didnotexist.",lockstr) returnNone #Initlogging definit_logging(): sh=logging.StreamHandler() logger=logging.getLogger() logger.setLevel(logging.DEBUG) formatter=logging.Formatter('%(asctime)s-%(module)s:%(filename)s-L%(lineno)d-%(levelname)s:%(message)s') sh.setFormatter(formatter) logger.addHandler(sh) logging.info("Currentloglevelis:%s",logging.getLevelName(logger.getEffectiveLevel())) defmain(): init_logging() db=MySQLdb.connect(host='localhost',user='root',passwd='') lock_name='queue' l=Glock(db) ret=l.lock(lock_name,10) ifret!=True: logging.error("Can'tgetlock!exit!") quit() time.sleep(10) logging.info("Youcandosomesynchronizationworkacrossprocesses!") ##TODO ##youcandosomethinginhere## l.unlock(lock_name) if__name__=="__main__": main()
在main函数里:
l.lock(lock_name,10)中,10是表示timeout的时间是10秒,如果10秒还获取不了锁,就返回,执行后面的操作。
在这个demo中,在标记TODO的地方,可以将消费者从job表中取消息的逻辑放在这里。即分割线以上的.
2.假设有多个消费者进程,从job表中取排队信息,要做的操作如下:
select*fromjobswherejob_status=0orderbyidasclimit1; updatejobssetjob_status=1whereid=?;--id为刚刚取得的记录id
这样,就能保证多个进程访问临界资源时同步进行了,保证数据的一致性。
测试的时候,启动两个glock.py,结果如下:
[@tj-10-47test]#./glock.py 2014-03-1417:08:40,277-glock:glock.py-L70-INFO:Currentloglevelis:DEBUG 2014-03-1417:08:40,299-glock:glock.py-L43-DEBUG:Thelock'queue'wasobtainedsuccessfully. 2014-03-1417:08:50,299-glock:glock.py-L81-INFO:Youcandosomesynchronizationworkacrossprocesses! 2014-03-1417:08:50,299-glock:glock.py-L56-DEBUG:Thelock'queue'thelockwasreleased.
可以看到第一个glock.py是17:08:50解锁的,下面的glock.py是在17:08:50获取锁的,可以证实这样是完全可行的。
[@tj-10-47test]#./glock.py 2014-03-1417:08:46,873-glock:glock.py-L70-INFO:Currentloglevelis:DEBUG 2014-03-1417:08:50,299-glock:glock.py-L43-DEBUG:Thelock'queue'wasobtainedsuccessfully. 2014-03-1417:09:00,299-glock:glock.py-L81-INFO:Youcandosomesynchronizationworkacrossprocesses! 2014-03-1417:09:00,300-glock:glock.py-L56-DEBUG:Thelock'queue'thelockwasreleased. [@tj-10-47test]#