从底层简析Python程序的执行过程
最近我在学习Python的运行模型。我对Python的一些内部机制很是好奇,比如Python是怎么实现类似YIELDVALUE、YIELDFROM这样的操作码的;对于递推式构造列表(ListComprehensions)、生成器表达式(generatorexpressions)以及其他一些有趣的Python特性是怎么编译的;从字节码的层面来看,当异常抛出的时候都发生了什么事情。翻阅CPython的代码对于解答这些问题当然是很有帮助的,但我仍然觉得以这样的方式来做的话对于理解字节码的执行和堆栈的变化还是缺少点什么。GDB是个好选择,但是我懒,而且只想使用一些比较高阶的接口写点Python代码来完成这件事。
所以呢,我的目标就是创建一个字节码级别的追踪API,类似sys.setrace所提供的那样,但相对而言会有更好的粒度。这充分锻炼了我编写Python实现的C代码的编码能力。我们所需要的有如下几项,在这篇文章中所用的Python版本为3.5。
- 一个新的Cpython解释器操作码
- 一种将操作码注入到Python字节码的方法
- 一些用于处理操作码的Python代码
一个新的Cpython操作码
新操作码:DEBUG_OP
这个新的操作码DEBUG_OP是我第一次尝试写CPython实现的C代码,我将尽可能的让它保持简单。我们想要达成的目的是,当我们的操作码被执行的时候我能有一种方式来调用一些Python代码。同时,我们也想能够追踪一些与执行上下文有关的数据。我们的操作码会把这些信息当作参数传递给我们的回调函数。通过操作码能辨识出的有用信息如下:
- 堆栈的内容
- 执行DEBUG_OP的帧对象信息
所以呢,我们的操作码需要做的事情是:
- 找到回调函数
- 创建一个包含堆栈内容的列表
- 调用回调函数,并将包含堆栈内容的列表和当前帧作为参数传递给它
听起来挺简单的,现在开始动手吧!声明:下面所有的解释说明和代码是经过了大量段错误调试之后总结得到的结论。首先要做的是给操作码定义一个名字和相应的值,因此我们需要在Include/opcode.h中添加代码。
/**Myowncommentsbeginby'**'**/ /**From:Includes/opcode.h**/ /*Instructionopcodesforcompiledcode*/ /**Wejusthavetodefineouropcodewithafreevalue 0wasthefirstoneIfound**/ #defineDEBUG_OP0 #definePOP_TOP1 #defineROT_TWO2 #defineROT_THREE3
这部分工作就完成了,现在我们去编写操作码真正干活的代码。
实现DEBUG_OP
在考虑如何实现DEBUG_OP之前我们需要了解的是DEBUG_OP提供的接口将长什么样。拥有一个可以调用其他代码的新操作码是相当酷眩的,但是究竟它将调用哪些代码捏?这个操作码如何找到回调函数的捏?我选择了一种最简单的方法:在帧的全局区域写死函数名。那么问题就变成了,我该怎么从字典中找到一个固定的C字符串?为了回答这个问题我们来看看在Python的mainloop中使用到的和上下文管理相关的标识符enter和exit。
我们可以看到这两标识符被使用在操作码SETUP_WITH中:
/**From:Python/ceval.c**/ TARGET(SETUP_WITH){ _Py_IDENTIFIER(__exit__); _Py_IDENTIFIER(__enter__); PyObject*mgr=TOP(); PyObject*exit=special_lookup(mgr,&PyId___exit__),*enter; PyObject*res;
现在,看一眼宏_Py_IDENTIFIER定义
/**From:Include/object.h**/ /*********************StringLiterals****************************************/ /*Thisstructurehelpsmanagingstaticstrings.Thebasicusagegoeslikethis: Insteadofdoing r=PyObject_CallMethod(o,"foo","args",...); do _Py_IDENTIFIER(foo); ... r=_PyObject_CallMethodId(o,&PyId_foo,"args",...); PyId_fooisastaticvariable,eitheronblocklevelorfilelevel.Onfirst usage,thestring"foo"isinterned,andthestructuresarelinked.Oninterpreter shutdown,allstringsarereleased(through_PyUnicode_ClearStaticStrings). Alternatively,_Py_static_stringallowstochoosethevariablename. _PyUnicode_FromIdreturnsaborrowedreferencetotheinternedstring. _PyObject_{Get,Set,Has}AttrIdare__getattr__versionsusing_Py_Identifier*. */ typedefstruct_Py_Identifier{ struct_Py_Identifier*next; constchar*string; PyObject*object; }_Py_Identifier; #define_Py_static_string_init(value){0,value,0} #define_Py_static_string(varname,value)static_Py_Identifiervarname=_Py_static_string_init(value) #define_Py_IDENTIFIER(varname)_Py_static_string(PyId_##varname,#varname)
嗯,注释部分已经说明得很清楚了。通过一番查找,我们发现了可以用来从字典找固定字符串的函数_PyDict_GetItemId,所以我们操作码的查找部分的代码就是长这样滴。
/**Ourcallbackfunctionwillbenamedop_target**/ PyObject*target=NULL; _Py_IDENTIFIER(op_target); target=_PyDict_GetItemId(f->f_globals,&PyId_op_target); if(target==NULL&&_PyErr_OCCURRED()){ if(!PyErr_ExceptionMatches(PyExc_KeyError)) gotoerror; PyErr_Clear(); DISPATCH(); }
为了方便理解,对这一段代码做一些说明:
- f是当前的帧,f->f_globals是它的全局区域
- 如果我们没有找到op_target,我们将会检查这个异常是不是KeyError
- gotoerror;是一种在mainloop中抛出异常的方法
- PyErr_Clear()抑制了当前异常的抛出,而DISPATCH()触发了下一个操作码的执行
下一步就是收集我们想要的堆栈信息。
/**Thiscodecreatealistwithallthevaluesonthecurrentstack**/ PyObject*value=PyList_New(0); for(i=1;i<=STACK_LEVEL();i++){ tmp=PEEK(i); if(tmp==NULL){ tmp=Py_None; } PyList_Append(value,tmp); }
最后一步就是调用我们的回调函数!我们用call_function来搞定这件事,我们通过研究操作码CALL_FUNCTION的实现来学习怎么使用call_function。
/**From:Python/ceval.c**/ TARGET(CALL_FUNCTION){ PyObject**sp,*res; /**stack_pointerisalocalofthemainloop. It'sthepointertothestacktopofourframe**/ sp=stack_pointer; res=call_function(&sp,oparg); /**call_functionhandlestheargsitconsummedonthestackforus**/ stack_pointer=sp; PUSH(res); /**Standardexceptionhandling**/ if(res==NULL) gotoerror; DISPATCH(); }
有了上面这些信息,我们终于可以捣鼓出一个操作码DEBUG_OP的草稿了:
TARGET(DEBUG_OP){ PyObject*value=NULL; PyObject*target=NULL; PyObject*res=NULL; PyObject**sp=NULL; PyObject*tmp; inti; _Py_IDENTIFIER(op_target); target=_PyDict_GetItemId(f->f_globals,&PyId_op_target); if(target==NULL&&_PyErr_OCCURRED()){ if(!PyErr_ExceptionMatches(PyExc_KeyError)) gotoerror; PyErr_Clear(); DISPATCH(); } value=PyList_New(0); Py_INCREF(target); for(i=1;i<=STACK_LEVEL();i++){ tmp=PEEK(i); if(tmp==NULL) tmp=Py_None; PyList_Append(value,tmp); } PUSH(target); PUSH(value); Py_INCREF(f); PUSH(f); sp=stack_pointer; res=call_function(&sp,2); stack_pointer=sp; if(res==NULL) gotoerror; Py_DECREF(res); DISPATCH(); }
在编写CPython实现的C代码方面我确实没有什么经验,有可能我漏掉了些细节。如果您有什么建议还请您纠正,我期待您的反馈。
编译它,成了!
一切看起来很顺利,但是当我们尝试去使用我们定义的操作码DEBUG_OP的时候却失败了。自从2008年之后,Python使用预先写好的goto(你也可以从这里获取更多的讯息)。故,我们需要更新下gotojumptable,我们在Python/opcode_targets.h中做如下修改。
/**From:Python/opcode_targets.h**/ /**EasychangesinceDEBUG_OPistheopcodenumber1**/ staticvoid*opcode_targets[256]={ //&&_unknown_opcode, &&TARGET_DEBUG_OP, &&TARGET_POP_TOP, /**...**/
这就完事了,我们现在就有了一个可以工作的新操作码。唯一的问题就是这货虽然存在,但是没有被人调用过。接下来,我们将DEBUG_OP注入到函数的字节码中。
在Python字节码中注入操作码DEBUG_OP
有很多方式可以在Python字节码中注入新的操作码:
- 使用peepholeoptimizer,Quarkslab就是这么干的
- 在生成字节码的代码中动些手脚
- 在运行时直接修改函数的字节码(这就是我们将要干的事儿)
为了创造出一个新操作码,有了上面的那一堆C代码就够了。现在让我们回到原点,开始理解奇怪甚至神奇的Python!
我们将要做的事儿有:
- 得到我们想要追踪函数的codeobject
- 重写字节码来注入DEBUG_OP
- 将新生成的codeobject替换回去
和codeobject有关的小贴士
如果你从没听说过codeobject,这里有一个简单的介绍网路上也有一些相关的文档可供查阅,可以直接Ctrl+F查找codeobject
还有一件事情需要注意的是在这篇文章所指的环境中codeobject是不可变的:
Python3.4.2(default,Oct82014,10:45:20) [GCC4.9.1]onlinux Type"help","copyright","credits"or"license"formoreinformation. >>>x=lambday:2 >>>x.__code__ <codeobject<lambda>at0x7f481fd88390,file"<stdin>",line1> >>>x.__code__.co_name '<lambda>' >>>x.__code__.co_name='truc' Traceback(mostrecentcalllast): File"<stdin>",line1,in<module> AttributeError:readonlyattribute >>>x.__code__.co_consts=('truc',) Traceback(mostrecentcalllast): File"<stdin>",line1,in<module> AttributeError:readonlyattribute
但是不用担心,我们将会找到方法绕过这个问题的。
使用的工具
为了修改字节码我们需要一些工具:
- dis模块用来反编译和分析字节码
- dis.BytecodePython3.4新增的一个特性,对于反编译和分析字节码特别有用
- 一个能够简单修改codeobject的方法
用dis.Bytecode反编译codeobject能告诉我们一些有关操作码、参数和上下文的信息。
#Python3.4 >>>importdis >>>f=lambdax:x+3 >>>foriindis.Bytecode(f.__code__):print(i) ... Instruction(opname='LOAD_FAST',opcode=124,arg=0,argval='x',argrepr='x',offset=0,starts_line=1,is_jump_target=False) Instruction(opname='LOAD_CONST',opcode=100,arg=1,argval=3,argrepr='3',offset=3,starts_line=None,is_jump_target=False) Instruction(opname='BINARY_ADD',opcode=23,arg=None,argval=None,argrepr='',offset=6,starts_line=None,is_jump_target=False) Instruction(opname='RETURN_VALUE',opcode=83,arg=None,argval=None,argrepr='',offset=7,starts_line=None,is_jump_target=False)
为了能够修改codeobject,我定义了一个很小的类用来复制codeobject,同时能够按我们的需求修改相应的值,然后重新生成一个新的codeobject。
classMutableCodeObject(object): args_name=("co_argcount","co_kwonlyargcount","co_nlocals","co_stacksize","co_flags","co_code", "co_consts","co_names","co_varnames","co_filename","co_name","co_firstlineno", "co_lnotab","co_freevars","co_cellvars") def__init__(self,initial_code): self.initial_code=initial_code forattr_nameinself.args_name: attr=getattr(self.initial_code,attr_name) ifisinstance(attr,tuple): attr=list(attr) setattr(self,attr_name,attr) defget_code(self): args=[] forattr_nameinself.args_name: attr=getattr(self,attr_name) ifisinstance(attr,list): attr=tuple(attr) args.append(attr) returnself.initial_code.__class__(*args)
这个类用起来很方便,解决了上面提到的codeobject不可变的问题。
>>>x=lambday:2 >>>m=MutableCodeObject(x.__code__) >>>m <new_code.MutableCodeObjectobjectat0x7f3f0ea546a0> >>>m.co_consts [None,2] >>>m.co_consts[1]='3' >>>m.co_name='truc' >>>m.get_code() <codeobjecttrucat0x7f3f0ea2bc90,file"<stdin>",line1>
测试我们的新操作码
我们现在拥有了注入DEBUG_OP的所有工具,让我们来验证下我们的实现是否可用。我们将我们的操作码注入到一个最简单的函数中:
fromnew_codeimportMutableCodeObject defop_target(*args): print("WOOT") print("op_targetcalledwithargs<{0}>".format(args)) defnop(): pass new_nop_code=MutableCodeObject(nop.__code__) new_nop_code.co_code=b"\x00"+new_nop_code.co_code[0:3]+b"\x00"+new_nop_code.co_code[-1:] new_nop_code.co_stacksize+=3 nop.__code__=new_nop_code.get_code() importdis dis.dis(nop) nop() #Don'tforgetthat./pythonisourcustomPythonimplementingDEBUG_OP hakril@computer~/python/CPython3.5%./pythonproof.py 80<0> 1LOAD_CONST0(None) 4<0> 5RETURN_VALUE WOOT op_targetcalledwithargs<([],<frameobjectat0x7fde9eaebdb0>)> WOOT op_targetcalledwithargs<([None],<frameobjectat0x7fde9eaebdb0>)>
看起来它成功了!有一行代码需要说明一下new_nop_code.co_stacksize+=3
- co_stacksize表示codeobject所需要的堆栈的大小
- 操作码DEBUG_OP往堆栈中增加了三项,所以我们需要为这些增加的项预留些空间
现在我们可以将我们的操作码注入到每一个Python函数中了!
重写字节码
正如我们在上面的例子中所看到的那样,重写Pyhton的字节码似乎soeasy。为了在每一个操作码之间注入我们的操作码,我们需要获取每一个操作码的偏移量,然后将我们的操作码注入到这些位置上(把我们操作码注入到参数上是有坏处大大滴)。这些偏移量也很容易获取,使用dis.Bytecode,就像这样。
defadd_debug_op_everywhere(code_obj): #Wegeteveryinstructionoffsetinthecodeobject offsets=[instr.offsetforinstrindis.Bytecode(code_obj)] #AndinsertaDEBUG_OPateveryoffset returninsert_op_debug_list(code_obj,offsets) definsert_op_debug_list(code,offsets): #WeinserttheDEBUG_OPonebyone fornb,offinenumerate(sorted(offsets)): #Needtoajusttheoffsetsbythenumberofopcodesalreadyinsertedbefore #That'swhywesortouroffsets! code=insert_op_debug(code,off+nb) returncode #Lastproblem:whatdoesinsert_op_debuglookslike?
基于上面的例子,有人可能会想我们的insert_op_debug会在指定的偏移量增加一个"\x00",这尼玛是个坑啊!我们第一个DEBUG_OP注入的例子中被注入的函数是没有任何的分支的,为了能够实现完美一个函数注入函数insert_op_debug我们需要考虑到存在分支操作码的情况。
Python的分支一共有两种:
(1)绝对分支:看起来是类似这样子的Instruction_Pointer=argument(instruction)
(2)相对分支:看起来是类似这样子的Instruction_Pointer+=argument(instruction)
相对分支总是向前的
我们希望这些分支在我们插入操作码之后仍然能够正常工作,为此我们需要修改一些指令参数。以下是其逻辑流程:
(1)对于每一个在插入偏移量之前的相对分支而言
如果目标地址是严格大于我们的插入偏移量的话,将指令参数增加1
如果相等,则不需要增加1就能够在跳转操作和目标地址之间执行我们的操作码DEBUG_OP
如果小于,插入我们的操作码的话并不会影响到跳转操作和目标地址之间的距离
(2)对于codeobject中的每一个绝对分支而言
如果目标地址是严格大于我们的插入偏移量的话,将指令参数增加1
如果相等,那么不需要任何修改,理由和相对分支部分是一样的
如果小于,插入我们的操作码的话并不会影响到跳转操作和目标地址之间的距离
下面是实现:
#Helper defbytecode_to_string(bytecode): ifbytecode.argisnotNone: returnstruct.pack("<Bh",bytecode.opcode,bytecode.arg) returnstruct.pack("<B",bytecode.opcode) #Dummyclassforbytecode_to_string classDummyInstr: def__init__(self,opcode,arg): self.opcode=opcode self.arg=arg definsert_op_debug(code,offset): opcode_jump_rel=['FOR_ITER','JUMP_FORWARD','SETUP_LOOP','SETUP_WITH','SETUP_EXCEPT','SETUP_FINALLY'] opcode_jump_abs=['POP_JUMP_IF_TRUE','POP_JUMP_IF_FALSE','JUMP_ABSOLUTE'] res_codestring=b"" inserted=False forinstrindis.Bytecode(code): ifinstr.offset==offset: res_codestring+=b"\x00" inserted=True ifinstr.opnameinopcode_jump_relandnotinserted:#relativejumparealwaysforward ifoffset<instr.offset+3+instr.arg:#insertedbeetwenjumpanddest:add1todest(3forsize) #Ifequal:jumponDEBUG_OPtogetinfobeforeexecinstr res_codestring+=bytecode_to_string(DummyInstr(instr.opcode,instr.arg+1)) continue ifinstr.opnameinopcode_jump_abs: ifinstr.arg>offset: res_codestring+=bytecode_to_string(DummyInstr(instr.opcode,instr.arg+1)) continue res_codestring+=bytecode_to_string(instr) #replace_bytecodejustreplacestheoriginalcodeco_code returnreplace_bytecode(code,res_codestring)
让我们看一下效果如何:
>>>deflol(x): ...foriinrange(10): ...ifx==i: ...break >>>dis.dis(lol) 1010SETUP_LOOP36(to39) 3LOAD_GLOBAL0(range) 6LOAD_CONST1(10) 9CALL_FUNCTION1(1positional,0keywordpair) 12GET_ITER >>13FOR_ITER22(to38) 16STORE_FAST1(i) 10219LOAD_FAST0(x) 22LOAD_FAST1(i) 25COMPARE_OP2(==) 28POP_JUMP_IF_FALSE13 10331BREAK_LOOP 32JUMP_ABSOLUTE13 35JUMP_ABSOLUTE13 >>38POP_BLOCK >>39LOAD_CONST0(None) 42RETURN_VALUE >>>lol.__code__=transform_code(lol.__code__,add_debug_op_everywhere,add_stacksize=3) >>>dis.dis(lol) 1010<0> 1SETUP_LOOP50(to54) 4<0> 5LOAD_GLOBAL0(range) 8<0> 9LOAD_CONST1(10) 12<0> 13CALL_FUNCTION1(1positional,0keywordpair) 16<0> 17GET_ITER >>18<0> 10219FOR_ITER30(to52) 22<0> 23STORE_FAST1(i) 26<0> 27LOAD_FAST0(x) 30<0> 10331LOAD_FAST1(i) 34<0> 35COMPARE_OP2(==) 38<0> 39POP_JUMP_IF_FALSE18 42<0> 43BREAK_LOOP 44<0> 45JUMP_ABSOLUTE18 48<0> 49JUMP_ABSOLUTE18 >>52<0> 53POP_BLOCK >>54<0> 55LOAD_CONST0(None) 58<0> 59RETURN_VALUE #SetupthesimplesthandlerEVER >>>defop_target(stack,frame): ...print(stack) #GO >>>lol(2) [] [] [<class'range'>] [10,<class'range'>] [range(0,10)] [<range_iteratorobjectat0x7f1349afab80>] [0,<range_iteratorobjectat0x7f1349afab80>] [<range_iteratorobjectat0x7f1349afab80>] [2,<range_iteratorobjectat0x7f1349afab80>] [0,2,<range_iteratorobjectat0x7f1349afab80>] [False,<range_iteratorobjectat0x7f1349afab80>] [<range_iteratorobjectat0x7f1349afab80>] [1,<range_iteratorobjectat0x7f1349afab80>] [<range_iteratorobjectat0x7f1349afab80>] [2,<range_iteratorobjectat0x7f1349afab80>] [1,2,<range_iteratorobjectat0x7f1349afab80>] [False,<range_iteratorobjectat0x7f1349afab80>] [<range_iteratorobjectat0x7f1349afab80>] [2,<range_iteratorobjectat0x7f1349afab80>] [<range_iteratorobjectat0x7f1349afab80>] [2,<range_iteratorobjectat0x7f1349afab80>] [2,2,<range_iteratorobjectat0x7f1349afab80>] [True,<range_iteratorobjectat0x7f1349afab80>] [<range_iteratorobjectat0x7f1349afab80>] [] [None]
甚好!现在我们知道了如何获取堆栈信息和Python中每一个操作对应的帧信息。上面结果所展示的结果目前而言并不是很实用。在最后一部分中让我们对注入做进一步的封装。
增加Python封装
正如您所见到的,所有的底层接口都是好用的。我们最后要做的一件事是让op_target更加方便使用(这部分相对而言比较空泛一些,毕竟在我看来这不是整个项目中最有趣的部分)。
首先我们来看一下帧的参数所能提供的信息,如下所示:
- f_code当前帧将执行的codeobject
- f_lasti当前的操作(codeobject中的字节码字符串的索引)
经过我们的处理我们可以得知DEBUG_OP之后要被执行的操作码,这对我们聚合数据并展示是相当有用的。
新建一个用于追踪函数内部机制的类:
- 改变函数自身的co_code
- 设置回调函数作为op_debug的目标函数
一旦我们知道下一个操作,我们就可以分析它并修改它的参数。举例来说我们可以增加一个auto-follow-called-functions的特性。
defop_target(l,f,exc=None): ifop_target.callbackisnotNone: op_target.callback(l,f,exc) classTrace: def__init__(self,func): self.func=func defcall(self,*args,**kwargs): self.add_func_to_trace(self.func) #ActivateTracecallbackforthefunccall op_target.callback=self.callback try: res=self.func(*args,**kwargs) exceptExceptionase: res=e op_target.callback=None returnres defadd_func_to_trace(self,f): #Isitcode?isitalreadytransformed? ifnothasattr(f,"op_debug")andhasattr(f,"__code__"): f.__code__=transform_code(f.__code__,transform=add_everywhere,add_stacksize=ADD_STACK) f.__globals__['op_target']=op_target f.op_debug=True defdo_auto_follow(self,stack,frame): #Nothingfancy:FrameAnalyserisjustthewrapperthatgivesthenextexecutedinstruction next_instr=FrameAnalyser(frame).next_instr() if"CALL"innext_instr.opname: arg=next_instr.arg f_index=(arg&0xff)+(2*(arg>>8)) called_func=stack[f_index] #Ifcalltargetisnottracedyet:doit ifnothasattr(called_func,"op_debug"): self.add_func_to_trace(called_func)
现在我们实现一个Trace的子类,在这个子类中增加callback和doreport这两个方法。callback方法将在每一个操作之后被调用。doreport方法将我们收集到的信息打印出来。
这是一个伪函数追踪器实现:
classDummyTrace(Trace): def__init__(self,func): self.func=func self.data=collections.OrderedDict() self.last_frame=None self.known_frame=[] self.report=[] defcallback(self,stack,frame,exc): ifframenotinself.known_frame: self.known_frame.append(frame) self.report.append("===EnteringNewFrame{0}({1})===".format(frame.f_code.co_name,id(frame))) self.last_frame=frame ifframe!=self.last_frame: self.report.append("===ReturningtoFrame{0}{1}===".format(frame.f_code.co_name,id(frame))) self.last_frame=frame self.report.append(str(stack)) instr=FrameAnalyser(frame).next_instr() offset=str(instr.offset).rjust(8) opname=str(instr.opname).ljust(20) arg=str(instr.arg).ljust(10) self.report.append("{0}{1}{2}{3}".format(offset,opname,arg,instr.argval)) self.do_auto_follow(stack,frame) defdo_report(self): print("\n".join(self.report))
这里有一些实现的例子和使用方法。格式有些不方便观看,毕竟我并不擅长于搞这种对用户友好的报告的事儿。
- 例1自动追踪堆栈信息和已经执行的指令
- 例2上下文管理
递推式构造列表(ListComprehensions)的追踪示例。
- 例3伪追踪器的输出
- 例4输出收集的堆栈信息
总结
这个小项目是一个了解Python底层的良好途径,包括解释器的mainloop,Python实现的C代码编程、Python字节码。通过这个小工具我们可以看到Python一些有趣构造函数的字节码行为,例如生成器、上下文管理和递推式构造列表。
这里是这个小项目的完整代码。更进一步的,我们还可以做的是修改我们所追踪的函数的堆栈。我虽然不确定这个是否有用,但是可以肯定是这一过程是相当有趣的。