python实用代码片段收集贴
获取一个类的所有子类
defitersubclasses(cls,_seen=None): """Generatoroverallsubclassesofagivenclassindepthfirstorder.""" ifnotisinstance(cls,type): raiseTypeError(_('itersubclassesmustbecalledwith' 'new-styleclasses,not%.100r')%cls) _seen=_seenorset() try: subs=cls.__subclasses__() exceptTypeError: #failsonlywhenclsistype subs=cls.__subclasses__(cls) forsubinsubs: ifsubnotin_seen: _seen.add(sub) yieldsub forsubinitersubclasses(sub,_seen): yieldsub
简单的线程配合
importthreading is_done=threading.Event() consumer=threading.Thread( target=self.consume_results, args=(key,self.task,runner.result_queue,is_done)) consumer.start() self.duration=runner.run( name,kw.get("context",{}),kw.get("args",{})) is_done.set() consumer.join()#主线程堵塞,直到consumer运行结束
多说一点,threading.Event()也可以被替换为threading.Condition(),condition有notify(),wait(),notifyAll()。解释如下:
Thewait()methodreleasesthelock,andthenblocksuntilitisawakenedbyanotify()ornotifyAll()callforthesameconditionvariableinanotherthread.Onceawakened,itre-acquiresthelockandreturns.Itisalsopossibletospecifyatimeout. Thenotify()methodwakesuponeofthethreadswaitingfortheconditionvariable,ifanyarewaiting.ThenotifyAll()methodwakesupallthreadswaitingfortheconditionvariable. Note:thenotify()andnotifyAll()methodsdon'treleasethelock;thismeansthatthethreadorthreadsawakenedwillnotreturnfromtheirwait()callimmediately,butonlywhenthethreadthatcallednotify()ornotifyAll()finallyrelinquishesownershipofthelock.
#Consumeoneitem cv.acquire() whilenotan_item_is_available(): cv.wait() get_an_available_item() cv.release() #Produceoneitem cv.acquire() make_an_item_available() cv.notify() cv.release()
计算运行时间
classTimer(object): def__enter__(self): self.error=None self.start=time.time() returnself def__exit__(self,type,value,tb): self.finish=time.time() iftype: self.error=(type,value,tb) defduration(self): returnself.finish-self.start withTimer()astimer: func() returntimer.duration()
元类
__new__()方法接收到的参数依次是:
当前准备创建的类的对象;
类的名字;
类继承的父类集合;
类的方法集合;
classModelMetaclass(type): def__new__(cls,name,bases,attrs): ifname=='Model': returntype.__new__(cls,name,bases,attrs) mappings=dict() fork,vinattrs.iteritems(): ifisinstance(v,Field): print('Foundmapping:%s==>%s'%(k,v)) mappings[k]=v forkinmappings.iterkeys(): attrs.pop(k) attrs['__table__']=name#假设表名和类名一致 attrs['__mappings__']=mappings#保存属性和列的映射关系 returntype.__new__(cls,name,bases,attrs) classModel(dict): __metaclass__=ModelMetaclass def__init__(self,**kw): super(Model,self).__init__(**kw) def__getattr__(self,key): try: returnself[key] exceptKeyError: raiseAttributeError(r"'Model'objecthasnoattribute'%s'"%key) def__setattr__(self,key,value): self[key]=value defsave(self): fields=[] params=[] args=[] fork,vinself.__mappings__.iteritems(): fields.append(v.name) params.append('?') args.append(getattr(self,k,None)) sql='insertinto%s(%s)values(%s)'%(self.__table__,','.join(fields),','.join(params)) print('SQL:%s'%sql) print('ARGS:%s'%str(args)) classField(object): def__init__(self,name,column_type): self.name=name self.column_type=column_type def__str__(self): return'<%s:%s>'%(self.__class__.__name__,self.name) classStringField(Field): def__init__(self,name): super(StringField,self).__init__(name,'varchar(100)') classIntegerField(Field): def__init__(self,name): super(IntegerField,self).__init__(name,'bigint') classUser(Model): #定义类的属性到列的映射: id=IntegerField('id') name=StringField('username') email=StringField('email') password=StringField('password') #创建一个实例: u=User(id=12345,name='Michael',email='test@orm.org',password='my-pwd') #保存到数据库: u.save()
输出如下:
Foundmodel:User Foundmapping:email==><StringField:email> Foundmapping:password==><StringField:password> Foundmapping:id==><IntegerField:uid> Foundmapping:name==><StringField:username> SQL:insertintoUser(password,email,username,uid)values(?,?,?,?) ARGS:['my-pwd','test@orm.org','Michael',12345]
SQLAlchemy简单使用
#导入: fromsqlalchemyimportColumn,String,create_engine fromsqlalchemy.ormimportsessionmaker fromsqlalchemy.ext.declarativeimportdeclarative_base #创建对象的基类: Base=declarative_base() #定义User对象: classUser(Base): #表的名字: __tablename__='user' #表的结构: id=Column(String(20),primary_key=True) name=Column(String(20)) #初始化数据库连接: engine=create_engine('mysql+mysqlconnector://root:password@localhost:3306/test')#'数据库类型+数据库驱动名称://用户名:口令@机器地址:端口号/数据库名' #创建DBSession类型: DBSession=sessionmaker(bind=engine) #创建新User对象: new_user=User(id='5',name='Bob') #添加到session: session.add(new_user) #提交即保存到数据库: session.commit() #创建Query查询,filter是where条件,最后调用one()返回唯一行,如果调用all()则返回所有行: user=session.query(User).filter(User.id=='5').one() #关闭session: session.close()
WSGI简单使用和Web框架Flask的简单使用
fromwsgiref.simple_serverimportmake_server defapplication(environ,start_response): start_response('200OK',[('Content-Type','text/html')]) return'<h1>Hello,web!</h1>' #创建一个服务器,IP地址为空,端口是8000,处理函数是application: httpd=make_server('',8000,application) print"ServingHTTPonport8000..." #开始监听HTTP请求: httpd.serve_forever()
了解了WSGI框架,我们发现:其实一个WebApp,就是写一个WSGI的处理函数,针对每个HTTP请求进行响应。
但是如何处理HTTP请求不是问题,问题是如何处理100个不同的URL。
一个最简单和最土的想法是从environ变量里取出HTTP请求的信息,然后逐个判断。
fromflaskimportFlask fromflaskimportrequest app=Flask(__name__) @app.route('/',methods=['GET','POST']) defhome(): return'<h1>Home</h1>' @app.route('/signin',methods=['GET']) defsignin_form(): return'''<formaction="/signin"method="post"> <p><inputname="username"></p> <p><inputname="password"type="password"></p> <p><buttontype="submit">SignIn</button></p> </form>''' @app.route('/signin',methods=['POST']) defsignin(): #需要从request对象读取表单内容: ifrequest.form['username']=='admin'andrequest.form['password']=='password': return'<h3>Hello,admin!</h3>' return'<h3>Badusernameorpassword.</h3>' if__name__=='__main__': app.run()
格式化显示json
print(json.dumps(data,indent=4)) #或者 importpprint pprint.pprint(data)
实现类似Java或C中的枚举
#!/usr/bin/envpython #-*-coding:utf-8-*- importitertools importsys classImmutableMixin(object): _inited=False def__init__(self): self._inited=True def__setattr__(self,key,value): ifself._inited: raiseException("unsupportedaction") super(ImmutableMixin,self).__setattr__(key,value) classEnumMixin(object): def__iter__(self): fork,vinitertools.imap(lambdax:(x,getattr(self,x)),dir(self)): ifnotk.startswith('_'): yieldv class_RunnerType(ImmutableMixin,EnumMixin): SERIAL="serial" CONSTANT="constant" CONSTANT_FOR_DURATION="constant_for_duration" RPS="rps" if__name__=="__main__": print_RunnerType.CONSTANT
创建文件时指定权限
importos defwrite_to_file(path,contents,umask=None): """Writethegivencontentstoafile :parampath:Destinationfile :paramcontents:Desiredcontentsofthefile :paramumask:Umasktosetwhencreatingthisfile(willbereset) """ ifumask: saved_umask=os.umask(umask) try: withopen(path,'w')asf: f.write(contents) finally: ifumask: os.umask(saved_umask) if__name__=='__main__': write_to_file('/home/kong/tmp','test',31) #Thenyouwillseeafileiscreatedwithpermission640. #Warning:Ifthefilealreadyexists,itspermissionwillnotbechanged. #Note:Forfile,defaultallpermissionis666,and777fordirectory.
多进程并发执行
importmultiprocessing importtime importos defrun(flag): print"flag:%s,sleep2sinrun"%flag time.sleep(2) print"%sexist"%flag returnflag if__name__=='__main__': pool=multiprocessing.Pool(3) iter_result=pool.imap(run,xrange(6)) print"sleep5s\n\n" time.sleep(5) foriinrange(6): try: result=iter_result.next(600) exceptmultiprocessing.TimeoutErrorase: raise printresult pool.close() pool.join()
运行时自动填充函数参数
importdecorator defdefault_from_global(arg_name,env_name): defdefault_from_global(f,*args,**kwargs): id_arg_index=f.func_code.co_varnames.index(arg_name) args=list(args) ifargs[id_arg_index]isNone: args[id_arg_index]=get_global(env_name) ifnotargs[id_arg_index]: print("Missingargument:--%(arg_name)s"%{"arg_name":arg_name}) return(1) returnf(*args,**kwargs) returndecorator.decorator(default_from_global) #如下是一个装饰器,可以用在需要自动填充参数的函数上。功能是: #如果没有传递函数的deploy_id参数,那么就从环境变量中获取(调用自定义的get_global函数) with_default_deploy_id=default_from_global('deploy_id',ENV_DEPLOYMENT)
嵌套装饰器
validator函数装饰func1,func1使用时接收参数(*arg,**kwargs),而func1又装饰func2(其实就是Rally中的scenario函数),给func2增加validators属性,是一个函数的列表,函数的接收参数config,clients,task。这些函数最终调用func1,传入参数(config,clients,task,*args,**kwargs),所以func1定义时参数是(config,clients,task,*arg,**kwargs)
最终实现的效果是,func2有很多装饰器,每个都会接收自己的参数,做一些校验工作。
defvalidator(fn): """Decoratorthatconstructsascenariovalidatorfromgivenfunction. DecoratedfunctionshouldreturnValidationResultonerror. :paramfn:functionthatperformsvalidation :returns:rallyscenariovalidator """ defwrap_given(*args,**kwargs): """Dynamicvalidationdecoratorforscenario. :paramargs:theargumentsofthedecoratorofthebenchmarkscenario ex.@my_decorator("arg1"),thenargs=('arg1',) :paramkwargs:thekeywordargumentsofthedecoratorofthescenario ex.@my_decorator(kwarg1="kwarg1"),thenkwargs={"kwarg1":"kwarg1"} """ defwrap_validator(config,clients,task): return(fn(config,clients,task,*args,**kwargs)or ValidationResult()) defwrap_scenario(scenario): wrap_validator.permission=getattr(fn,"permission", consts.EndpointPermission.USER) ifnothasattr(scenario,"validators"): scenario.validators=[] scenario.validators.append(wrap_validator) returnscenario returnwrap_scenario returnwrap_given
inspect库的一些常见用法
inspect.getargspec(func)获取函数参数的名称和默认值,返回一个四元组(args,varargs,keywords,defaults),其中:
args是参数名称的列表;
varargs和keywords是*号和**号的变量名称;
defaults是参数默认值的列表;
inspect.getcallargs(func[,*args][,**kwds])绑定函数参数。返回绑定后函数的入参字典。
python中的私有属性和函数
Python把以两个或以上下划线字符开头且没有以两个或以上下划线结尾的变量当作私有变量。私有变量会在代码生成之前被转换为长格式(变为公有),这个过程叫"Privatenamemangling",如类A里的__private标识符将被转换为_A__private,但当类名全部以下划线命名的时候,Python就不再执行轧压。而且,虽然叫私有变量,仍然有可能被访问或修改(使用_classname__membername),所以,总结如下:
无论是单下划线还是双下划线开头的成员,都是希望外部程序开发者不要直接使用这些成员变量和这些成员函数,只是双下划线从语法上能够更直接的避免错误的使用,但是如果按照_类名__成员名则依然可以访问到。单下划线的在动态调试时可能会方便一些,只要项目组的人都遵守下划线开头的成员不直接使用,那使用单下划线或许会更好。