python+requests接口自动化框架的实现
为什么要做接口自动化框架
1、业务与配置的分离
2、数据与程序的分离;数据的变更不影响程序
3、有日志功能,实现无人值守
4、自动发送测试报告
5、不懂编程的测试人员也可以进行测试
正常接口测试的流程是什么?
确定接口测试使用的工具----->配置需要的接口参数----->进行测试----->检查测试结果----->生成测试报告
测试的工具:python+requests
接口测试用例:excel
一、接口框架如下:
1、action包:用来存放关键字函数
2、config包:用来存放配置文件
3、TestData:用来存放测试数据,excel表
4、Log包:用来存放日志文件
5、utils包:用来存放公共的类
6、运行主程序interface_auto_test.py
7、Readme.txt:告诉团队组员使用改框架需要注意的地方
二、接口的数据规范设计---Case设计
一个sheet对应数据库里面一张表
APIsheet存放
编号;从1开始
接口的名称(APIName);
请求的url(RequestUrl);
请求的方法(RequestMethod);
传参的方式(paramsType):post/get请求方法不一样
用例说明(APITestCase)
是否执行(Active)部分接口已测通,下次不用测试,直接把这里设置成N,跳过此接口
post与get的区别
查看post详情
post请求参数一般是json串,参数放在from表单里面;参数一般不可见,相对来说安全性高些
查看get详情
get请求参数一般直接放在url里面
2.1注册接口用例
RequestData:请求的数据
(开发制定的传参方式)
RelyData:数据依赖
ResponseCode:响应code
ResponseData:响应数据
DataStore:存储的依赖数据;如果存在数据库里面,在表里增加一个字段用来存依赖的数据
(存储的方式是编写接口自动化的人员来设定的存储方式)
CheckPoint:检查点
Active:是否执行
Status:执行用例的状态,方便查看用例是否执行成功
ErrorInfo:case运行失败,失败的错误信息;eg:是也本身的原因还是case设置失败,还是其他原因
2.2登录接口用例
RequestData:请求的数据
(开发制定的传参方式)
RelyData:数据依赖
(存储的方式是编写接口自动化的人员来设定的存储方式)
ResponseCode:响应code
ResponseData:响应数据
DataStore:存储的依赖数据;如果存在数据库里面,在表里增加一个字段用来存依赖的数据
(存储的方式是编写接口自动化的人员来设定的存储方式)
CheckPoint:检查点
Active:是否执行
Status:执行用例的状态,方便查看用例是否执行成功
ErrorInfo:case运行失败,失败的错误信息;eg:是也本身的原因还是case设置失败,还是其他原因
重点说明下RelyData:数据依赖
采取的是字典:key:value来存储数据格式;
{"request":{"username":"register->1","password":"register->1"},"response":{"code":"register->1"}}
格式化之后:
{ "request":{ "username":"register->1", "password":"register->1" }, "response":{ "code":"register->1" } }
三、创建utils包:用来存放公共的类
3.1ParseExcel.py操作封装excel的类(ParseExcel.py)
#encoding=utf-8 importopenpyxl fromopenpyxl.stylesimportBorder,Side,Font importtime classParseExcel(object): def__init__(self): self.workbook=None self.excelFile=None self.font=Font(color=None)#设置字体的颜色 #颜色对应的RGB值 self.RGBDict={'red':'FFFF3030','green':'FF008B00'} defloadWorkBook(self,excelPathAndName): #将excel文件加载到内存,并获取其workbook对象 try: self.workbook=openpyxl.load_workbook(excelPathAndName) exceptExceptionaserr: raiseerr self.excelFile=excelPathAndName returnself.workbook defgetSheetByName(self,sheetName): #根据sheet名获取该sheet对象 try: #sheet=self.workbook.get_sheet_by_name(sheetName) sheet=self.workbook[sheetName] returnsheet exceptExceptionaserr: raiseerr defgetSheetByIndex(self,sheetIndex): #根据sheet的索引号获取该sheet对象 try: #sheetname=self.workbook.get_sheet_names()[sheetIndex] sheetname=self.workbook.sheetnames[sheetIndex] exceptExceptionaserr: raiseerr #sheet=self.workbook.get_sheet_by_name(sheetname) sheet=self.workbook[sheetname] returnsheet defgetRowsNumber(self,sheet): #获取sheet中有数据区域的结束行号 returnsheet.max_row defgetColsNumber(self,sheet): #获取sheet中有数据区域的结束列号 returnsheet.max_column defgetStartRowNumber(self,sheet): #获取sheet中有数据区域的开始的行号 returnsheet.min_row defgetStartColNumber(self,sheet): #获取sheet中有数据区域的开始的列号 returnsheet.min_column defgetRow(self,sheet,rowNo): #获取sheet中某一行,返回的是这一行所有的数据内容组成的tuple, #下标从1开始,sheet.rows[1]表示第一行 try: rows=[] forrowinsheet.iter_rows(): rows.append(row) returnrows[rowNo-1] exceptExceptionaserr: raiseerr defgetColumn(self,sheet,colNo): #获取sheet中某一列,返回的是这一列所有的数据内容组成tuple, #下标从1开始,sheet.columns[1]表示第一列 try: cols=[] forcolinsheet.iter_cols(): cols.append(col) returncols[colNo-1] exceptExceptionaserr: raiseerr defgetCellOfValue(self,sheet,coordinate=None, rowNo=None,colsNo=None): #根据单元格所在的位置索引获取该单元格中的值,下标从1开始, #sheet.cell(row=1,column=1).value, #表示excel中第一行第一列的值 ifcoordinate!=None: try: returnsheet[coordinate] exceptExceptionaserr: raiseerr elifcoordinateisNoneandrowNoisnotNoneand\ colsNoisnotNone: try: returnsheet.cell(row=rowNo,column=colsNo).value exceptExceptionaserr: raiseerr else: raiseException("InsufficientCoordinatesofcell!") defgetCellOfObject(self,sheet,coordinate=None, rowNo=None,colsNo=None): #获取某个单元格的对象,可以根据单元格所在位置的数字索引, #也可以直接根据excel中单元格的编码及坐标 #如getCellObject(sheet,coordinate='A1')or #getCellObject(sheet,rowNo=1,colsNo=2) ifcoordinate!=None: try: #returnsheet.cell(coordinate=coordinate) returnsheet[coordinate] exceptExceptionaserr: raiseerr elifcoordinate==NoneandrowNoisnotNoneand\ colsNoisnotNone: try: returnsheet.cell(row=rowNo,column=colsNo) exceptExceptionaserr: raiseerr else: raiseException("InsufficientCoordinatesofcell!") defwriteCell(self,sheet,content,coordinate=None, rowNo=None,colsNo=None,style=None): #根据单元格在excel中的编码坐标或者数字索引坐标向单元格中写入数据, #下标从1开始,参style表示字体的颜色的名字,比如red,green ifcoordinateisnotNone: try: #sheet.cell(coordinate=coordinate).value=content sheet[coordinate]=content ifstyleisnotNone: sheet[coordinate].\ font=Font(color=self.RGBDict[style]) self.workbook.save(self.excelFile) exceptExceptionase: raisee elifcoordinate==NoneandrowNoisnotNoneand\ colsNoisnotNone: try: sheet.cell(row=rowNo,column=colsNo).value=content ifstyle: sheet.cell(row=rowNo,column=colsNo).\ font=Font(color=self.RGBDict[style]) self.workbook.save(self.excelFile) exceptExceptionase: raisee else: raiseException("InsufficientCoordinatesofcell!") defwriteCellCurrentTime(self,sheet,coordinate=None, rowNo=None,colsNo=None): #写入当前的时间,下标从1开始 now=int(time.time())#显示为时间戳 timeArray=time.localtime(now) currentTime=time.strftime("%Y-%m-%d%H:%M:%S",timeArray) ifcoordinateisnotNone: try: sheet.cell(coordinate=coordinate).value=currentTime self.workbook.save(self.excelFile) exceptExceptionase: raisee elifcoordinate==NoneandrowNoisnotNone\ andcolsNoisnotNone: try: sheet.cell(row=rowNo,column=colsNo ).value=currentTime self.workbook.save(self.excelFile) exceptExceptionase: raisee else: raiseException("InsufficientCoordinatesofcell!") if__name__=='__main__': #测试代码 pe=ParseExcel() pe.loadWorkBook(r'D:\ProgramSourceCode\PythonSourceCode\WorkSpace\InterfaceFrame2018\inter_test_data.xlsx') sheetObj=pe.getSheetByName(u"API") print("通过名称获取sheet对象的名字:",sheetObj.title) #printhelp(sheetObj.rows) print("通过index序号获取sheet对象的名字:",pe.getSheetByIndex(0).title) sheet=pe.getSheetByIndex(0) print(type(sheet)) print(pe.getRowsNumber(sheet))#获取最大行号 print(pe.getColsNumber(sheet))#获取最大列号 rows=pe.getRow(sheet,1)#获取第一行 foriinrows: print(i.value) ##获取第一行第一列单元格内容 #printpe.getCellOfValue(sheet,rowNo=1,colsNo=1) #pe.writeCell(sheet,u'我爱祖国',rowNo=10,colsNo=10) #pe.writeCellCurrentTime(sheet,rowNo=10,colsNo=11)
3.2封装get/post请求(HttpClient.py)
importrequests importjson classHttpClient(object): def__init__(self): pass defrequest(self,requestMethod,requestUrl,paramsType, requestData,headers=None,**kwargs): ifrequestMethod=="post": print("---",requestData,type(requestData)) ifparamsType=="form": response=self.__post(url=requestUrl,data=json.dumps(eval(requestData)), headers=headers,**kwargs) returnresponse elifparamsType=="json": response=self.__post(url=requestUrl,json=json.dumps(eval(requestData)), headers=headers,**kwargs) returnresponse elifrequestMethod=="get": request_url=requestUrl ifparamsType=="url": request_url="%s%s"%(requestUrl,requestData) response=self.__get(url=request_url,params=requestData,**kwargs) returnresponse def__post(self,url,data=None,json=None,headers=None,**kwargs): print("----") response=requests.post(url=url,data=data,json=json,headers=headers) returnresponse def__get(self,url,params=None,**kwargs): response=requests.get(url,params=params,**kwargs) returnresponse if__name__=="__main__": hc=HttpClient() res=hc.request("get","http://39.106.41.11:8080/getBlogContent/","url",'2') print(res.json())
3.3封装MD5(md5_encrypt)
importhashlib defmd5_encrypt(text): m5=hashlib.md5() m5.update(text.encode("utf-8")) value=m5.hexdigest() returnvalue if__name__=="__main__": print(md5_encrypt("sfwe"))
3.4封装Log
importlogging importlogging.config fromconfig.public_dataimportbaseDir #读取日志配置文件 logging.config.fileConfig(baseDir+"\config\Logger.conf") #选择一个日志格式 logger=logging.getLogger("example02")#或者example01 defdebug(message): #定义dubug级别日志打印方法 logger.debug(message) definfo(message): #定义info级别日志打印方法 logger.info(message) defwarning(message): #定义warning级别日志打印方法 logger.warning(message)
3.5封装发送Email类
importsmtplib fromemail.mime.textimportMIMEText fromemail.mime.multipartimportMIMEMultipart fromemail.headerimportHeader fromProjVar.varimport* importos importsmtplib fromemailimportencoders fromemail.mime.baseimportMIMEBase fromemail.mime.textimportMIMEText fromemail.mime.multipartimportMIMEMultipart fromemail.headerimportHeader fromemail.utilsimportformataddr defsend_mail(): mail_host="smtp.126.com"#设置服务器 mail_user="testman1980"#用户名 mail_pass="wulaoshi1980"#口令 sender='testman1980@126.com' receivers=['2055739@qq.com',"testman1980@126.com"]#接收邮件,可设置为你的QQ邮箱或者其他邮箱 #创建一个带附件的实例 message=MIMEMultipart() message['From']=formataddr(["光荣之路吴老师","testman1980@126.com"]) message['To']=','.join(receivers) subject='自动化测试执行报告' message['Subject']=Header(subject,'utf-8') message["Accept-Language"]="zh-CN" message["Accept-Charset"]="ISO-8859-1,utf-8,gbk" #邮件正文内容 message.attach(MIMEText('最新执行的自动化测试报告,请参阅附件内容!','plain','utf-8')) #构造附件1,传送测试结果的excel文件 att=MIMEBase('application','octet-stream') att.set_payload(open(ProjDirPath+"\\testdata\\testdata.xlsx",'rb').read()) att.add_header('Content-Disposition','attachment',filename=('gbk','',"自动化测试报告.xlsx")) encoders.encode_base64(att) message.attach(att) """ #构造附件2,传送当前目录下的runoob.txt文件 att2=MIMEText(open('e:\\a.py','rb').read(),'base64','utf-8') att2["Content-Type"]='application/octet-stream' att2["Content-Disposition"]='attachment;filename="a.py"' message.attach(att2) """ try: smtpObj=smtplib.SMTP(mail_host) smtpObj.login(mail_user,mail_pass) smtpObj.sendmail(sender,receivers,message.as_string()) print("邮件发送成功") exceptsmtplib.SMTPExceptionase: print("Error:无法发送邮件",e) if__name__=="__main__": send_mail()
四、创建config包用来存放公共的参数、配置文件、长时间不变的变量值
创建public_data.py
importos #整个项目的根目录绝对路劲 baseDir=os.path.dirname(os.path.dirname(__file__)) #获取测试数据文件的绝对路径 file_path=baseDir+"/TestData/inter_test_data.xlsx" API_apiName=2 API_requestUrl=3 API_requestMothod=4 API_paramsType=5 API_apiTestCaseFileName=6 API_active=7 CASE_requestData=1 CASE_relyData=2 CASE_responseCode=3 CASE_responseData=4 CASE_dataStore=5 CASE_checkPoint=6 CASE_active=7 CASE_status=8 CASE_errorInfo=9 #存储请求参数里面依赖的数据 REQUEST_DATA={} #存储响应对象中的依赖数据 RESPONSE_DATA={} if__name__=="__main__": print(file_path) print(baseDir)
五、创建TestData目录,用来存放测试文件
inter_test_data.xlsx
六、创建action包,用来存放关键字函数
6.1解决数据依赖(GetRely.py)
fromconfig.public_dataimportREQUEST_DATA,RESPONSE_DATA fromutils.md5_encryptimportmd5_encrypt REQUEST_DATA={"用户注册":{"1":{"username":"zhangsan","password":"dfsdf23"}, "headers":{"cookie":"asdfwerw"}}} RESPONSE_DATA={"用户注册":{"1":{"code":"00"},"headers":{"age":2342}}} classGetRely(object): def__init__(self): pass @classmethod defget(self,dataSource,relyData,headSource={}): print(type(dataSource)) print(dataSource) data=dataSource.copy() forkey,valueinrelyData.items(): ifkey=="request": #说明应该去REQUEST_DATA中获取 fork,vinvalue.items(): interfaceName,case_idx=v.split("->") val=REQUEST_DATA[interfaceName][case_idx][k] ifk=="password": data[k]=md5_encrypt(val) else: data[k]=val elifkey=="response": #应该去RESPONSE_DATA中获取 fork,vinvalue.items(): interfaceName,case_idx=v.split("->") data[k]=RESPONSE_DATA[interfaceName][case_idx][k] elifkey=="headers": ifheadSource: forkey,valueinvalue.items(): ifkey=="request": fork,vinvalue.items(): foriinv: headSource[i]=REQUEST_DATA[k]["headers"][i] elifkey=="response": fori,valinvalue.items(): forjinval: headSource[j]=RESPONSE_DATA[i]["headers"][j] return"%s"%data if__name__=="__main__": s={"username":"","password":"","code":""} h={"cookie":"123","age":332} rely={"request":{"username":"用户注册->1","password":"用户注册->1"}, "response":{"code":"用户注册->1"}, "headers":{"request":{"用户注册":["cookie"]},"response":{"用户注册":["age"]}} } print(GetRely.get(s,rely,h))
6.2解决数据存储(RelyDataStore.py)
fromconfig.public_dataimportRESPONSE_DATA,REQUEST_DATA classRelyDataStore(object): def__init__(self): pass @classmethod defdo(cls,storePoint,apiName,caseId,request_source={},response_source={},req_headers={},res_headers={}): forkey,valueinstorePoint.items(): ifkey=="request": #说明需要存储的依赖数据来自请求参数,应该将数据存储到REQUEST_DATA foriinvalue: ifiinrequest_source: val=request_source[i] ifapiNamenotinREQUEST_DATA: #说明存储数据的结构还未生成,需要指明数据存储结构 REQUEST_DATA[apiName]={str(caseId):{i:val}} else: #说明存储数据结构中最外层结构已存在 ifstr(caseId)inREQUEST_DATA[apiName]: REQUEST_DATA[apiName][str(caseId)][i]=val else: #说明内层结构不完整,需要指明完整的结构 REQUEST_DATA[apiName][str(caseId)]={i:val} else: print("请求参数中不存在字段"+i) elifkey=="response": #说明需要存储的依赖数据来自接口的响应body,应该将数据存储到RESPONSE_DATA forjinvalue: ifjinresponse_source: val=response_source[j] ifapiNamenotinRESPONSE_DATA: #说明存储数据的结构还未生成,需要指明数据存储结构 RESPONSE_DATA[apiName]={str(caseId):{j:val}} else: #说明存储数据结构中最外层结构已存在 ifstr(caseId)inRESPONSE_DATA[apiName]: RESPONSE_DATA[apiName][str(caseId)][j]=val else: #说明内层结构不完整,需要指明完整的结构 RESPONSE_DATA[apiName][str(caseId)]={j:val} else: print("接口的响应body中不存在字段"+j) elifkey=="headers": fork,vinvalue.items(): ifk=="request": #说明需要往REQUEST_DATA变量中写入存储数据 foriteminv: ifiteminreq_headers: header=req_headers[item] if"headers"inREQUEST_DATA[apiName]: REQUEST_DATA[apiName]["headers"][item]=header else: REQUEST_DATA[apiName]["headers"]={item:header} elifk=="response": #说明需要往RESPONSE_DATA变量中写入存储数据 foritinv: ifitinres_headers: header=res_headers[it] if"headers"inRESPONSE_DATA[apiName]: RESPONSE_DATA[apiName]["headers"][it]=header else: RESPONSE_DATA[apiName]["headers"]={item:header} print(REQUEST_DATA) print(RESPONSE_DATA) if__name__=="__main__": r={"username":"srwcx01","password":"wcx123wac1","email":"wcx@qq.com"} req_h={"cookie":"csdfw23"} res_h={"age":597232} s={"request":["username","password"],"response":["userid"],"headers":{"request":["cookie"], "response":["age"]}} res={"userid":12,"code":"00"} RelyDataStore.do(s,"register",1,r,res,req_headers=req_h,res_headers=res_h) print(REQUEST_DATA) print(RESPONSE_DATA)
6.3校验数据结果(CheckResult.py)
importre classCheckResult(object): def__init__(self): pass @classmethod defcheck(self,responseObj,checkPoint): responseBody=responseObj.json() #responseBody={"code":"","userid":12,"id":"12"} errorKey={} forkey,valueincheckPoint.items(): ifkeyinresponseBody: ifisinstance(value,(str,int)): #等值校验 ifresponseBody[key]!=value: errorKey[key]=responseBody[key] elifisinstance(value,dict): sourceData=responseBody[key] if"value"invalue: #模糊匹配校验 regStr=value["value"] rg=re.match(regStr,"%s"%sourceData) ifnotrg: errorKey[key]=sourceData elif"type"invalue: #数据类型校验 typeS=value["type"] iftypeS=="N": #说明是整形校验 ifnotisinstance(sourceData,int): errorKey[key]=sourceData else: errorKey[key]="[%s]notexist"%key returnerrorKey if__name__=="__main__": r={"code":"00","userid":12,"id":12} c={"code":"00","userid":{"type":"N"},"id":{"value":"\d+"}} print(CheckResult.check(r,c))
6.4往excel里面写结果
fromconfig.public_dataimport* defwrite_result(wbObj,sheetObj,responseData,errorKey,rowNum): try: #写响应body wbObj.writeCell(sheetObj,content="%s"%responseData, rowNo=rowNum,colsNo=CASE_responseData) #写校验结果状态及错误信息 iferrorKey: wbObj.writeCell(sheetObj,content="%s"%errorKey, rowNo=rowNum,colsNo=CASE_errorInfo) wbObj.writeCell(sheetObj,content="faild", rowNo=rowNum,colsNo=CASE_status,style="red") else: wbObj.writeCell(sheetObj,content="pass", rowNo=rowNum,colsNo=CASE_status,style="green") exceptExceptionaserr: raiseerr
七、创建Log目录用来存放日志
八、主函数
#encoding=utf-8 importrequests importjson fromaction.get_relyimportGetRely fromconfig.public_dataimport* fromutils.ParseExcelimportParseExcel fromutils.HttpClientimportHttpClient fromaction.data_storeimportRelyDataStore fromaction.check_resultimportCheckResult fromaction.write_resultimportwrite_result fromutils.Logimport* defmain(): parseE=ParseExcel() parseE.loadWorkBook(file_path) sheetObj=parseE.getSheetByName("API") activeList=parseE.getColumn(sheetObj,API_active) foridx,cellinenumerate(activeList[1:],2): ifcell.value=="y": #需要被执行 RowObj=parseE.getRow(sheetObj,idx) apiName=RowObj[API_apiName-1].value requestUrl=RowObj[API_requestUrl-1].value requestMethod=RowObj[API_requestMothod-1].value paramsType=RowObj[API_paramsType-1].value apiTestCaseFileName=RowObj[API_apiTestCaseFileName-1].value #下一步读取用例sheet表,准备执行测试用例 caseSheetObj=parseE.getSheetByName(apiTestCaseFileName) caseActiveObj=parseE.getColumn(caseSheetObj,CASE_active) forc_idx,colinenumerate(caseActiveObj[1:],2): ifcol.value=="y": #需要执行的用例 caseRowObj=parseE.getRow(caseSheetObj,c_idx) requestData=caseRowObj[CASE_requestData-1].value relyData=caseRowObj[CASE_relyData-1].value responseCode=caseRowObj[CASE_responseCode-1].value responseData=caseRowObj[CASE_responseData-1].value dataStore=caseRowObj[CASE_dataStore-1].value checkPoint=caseRowObj[CASE_checkPoint-1].value #发送接口请求之前需要做一下数据依赖的处理 ifrelyData: logging.info("处理第%s个接口的第%s条用例的数据依赖!") requestData=GetRely.get(eval(requestData),eval(relyData)) httpC=HttpClient() response=httpC.request(requestMethod=requestMethod, requestData=requestData, requestUrl=requestUrl, paramsType=paramsType ) #获取到响应结果后,接下来进行数据依赖存储逻辑实现 ifresponse.status_code==200: responseData=response.json() #进行依赖数据存储 ifdataStore: RelyDataStore.do(eval(dataStore),apiName,c_idx-1,eval(requestData),responseData) #接下来就是校验结果 else: logging.info("接口【%s】的第【%s】条用例,不需要进行依赖数据存储!"%(apiName,c_idx)) ifcheckPoint: errorKey=CheckResult.check(response,eval(checkPoint)) write_result(parseE,caseSheetObj,responseData,errorKey,c_idx) else: logging.info("接口【%s】的第【%s】条用例,执行失败,接口协议code非200!"%(apiName,c_idx)) else: logging.info("第%s个接口的第%s条用例,被忽略执行!"%(idx-1,c_idx-1)) else: logging.info("第%s行的接口被忽略执行!"%(idx-1)) if__name__=="__main__": main()
框架待完善~~请各路神仙多多指教~~
到此这篇关于python+requests接口自动化框架的实现的文章就介绍到这了,更多相关pythonrequests接口自动化内容请搜索毛票票以前的文章或继续浏览下面的相关文章希望大家以后多多支持毛票票!
声明:本文内容来源于网络,版权归原作者所有,内容由互联网用户自发贡献自行上传,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任。如果您发现有涉嫌版权的内容,欢迎发送邮件至:czq8825#qq.com(发邮件时,请将#更换为@)进行举报,并提供相关证据,一经查实,本站将立刻删除涉嫌侵权内容。