python实现封装得到virustotal扫描结果
本文实例讲述了python实现封装得到virustotal扫描结果的方法。分享给大家供大家参考。具体方法如下:
importsimplejson importurllib importurllib2 importos,sys importlogging try: importsqlite3 exceptImportError: sys.stderr.write("ERROR:UnabletolocatePythonSQLite3module."\ "Pleaseverifyyourinstallation.Exiting...\n") sys.exit(-1) MD5="5248f774d2ee0a10936d0b1dc89107f1" MD5="12fa5fb74201d9b6a14f63fbf9a81ff6"#donothavereportonvirustotal.com APIKEY="xxxxxxxxxxxxxxxxxx"用自己的 classVirusTotalDatabase: """ Databaseabstractionlayer. """ def__init__(self,db_file): log=logging.getLogger("Database.Init") self.__dbfile=db_file self._conn=None self._cursor=None #CheckifSQLitedatabasealreadyexists.Ifitdoesn'texistIinvoke #thegenerationprocedure. ifnotos.path.exists(self.__dbfile): ifself._generate(): print("Generateddatabase\"%s\"whichdidn't"\ "existbefore."%self.__dbfile) else: print("Unabletogeneratedatabase") #Oncethedatabaseisgeneratedofitalreadyhasbeen,Ican #initializetheconnection. try: self._conn=sqlite3.connect(self.__dbfile) self._cursor=self._conn.cursor() exceptException,why: print("Unabletoconnecttodatabase\"%s\":%s." %(self.__dbfile,why)) log.debug("ConnectedtoSQLitedatabase\"%s\"."%self.__dbfile) def_generate(self): """ CreatesdatabasestructureinaSQLitefile. """ ifos.path.exists(self.__dbfile): returnFalse db_dir=os.path.dirname(self.__dbfile) ifnotos.path.exists(db_dir): try: os.makedirs(db_dir) except(IOError,os.error),why: print("Somethingwentwrongwhilecreatingdatabase"\ "directory\"%s\":%s"%(db_dir,why)) returnFalse conn=sqlite3.connect(self.__dbfile) cursor=conn.cursor() cursor.execute("CREATETABLEvirustotal(\n"\ "idINTEGERPRIMARYKEY,\n"\ "md5TEXTNOTNULL,\n"\ "KasperskyTEXTDEFAULTNULL,\n"\ "McAfeeTEXTDEFAULTNULL,\n"\ "SymantecTEXTDEFAULTNULL,\n"\ "NormanTEXTDEFAULTNULL,\n"\ "AvastTEXTDEFAULTNULL,\n"\ "NOD32TEXTDEFAULTNULL,\n"\ "BitDefenderTEXTDEFAULTNULL,\n"\ "MicrosoftTEXTDEFAULTNULL,\n"\ "RisingTEXTDEFAULTNULL,\n"\ "PandaTEXTDEFAULTNULL\n"\ ");") print"createdb:%ssucess"%self.__dbfile returnTrue def_get_task_dict(self,row): try: task={} task["id"]=row[0] task["md5"]=row[1] task["Kaspersky"]=row[2] task["McAfee"]=row[3] task["Symantec"]=row[4] task["Norman"]=row[5] task["Avast"]=row[6] task["NOD32"]=row[7] task["BitDefender"]=row[8] task["Microsoft"]=row[9] task["Rising"]=row[10] task["Panda"]=row[11] returntask exceptException,why: returnNone defadd_sample(self,md5,virus_dict): """ """ task_id=None ifnotself._cursor: returnNone ifnotmd5ormd5=="": returnNone Kaspersky=virus_dict.get("Kaspersky",None) McAfee=virus_dict.get("McAfee",None) Symantec=virus_dict.get("Symantec",None) Norman=virus_dict.get("Norman",None) Avast=virus_dict.get("Avast",None) NOD32=virus_dict.get("NOD32",None) BitDefender=virus_dict.get("BitDefender",None) Microsoft=virus_dict.get("Microsoft",None) Rising=virus_dict.get("Rising",None) Panda=virus_dict.get("Panda",None) self._conn.text_factory=str try: self._cursor.execute("SELECTidFROMvirustotalWHEREmd5=?;", (md5,)) sample_row=self._cursor.fetchone() exceptsqlite3.OperationalError,why: print"sqlite3error:%s\n"%str(why) returnFalse ifsample_row: try: sample_row=sample_row[0] self._cursor.execute("UPDATEvirustotalSETKaspersky=?,McAfee=?,Symantec=?,Norman=?,Avast=?,\ NOD32=?,BitDefender=?,Microsoft=?,Rising=?,Panda=?WHEREid=?;", (Kaspersky,McAfee,Symantec,Norman,Avast,NOD32,BitDefender,Microsoft,\ Rising,Panda,sample_row)) self._conn.commit() task_id=sample_row exceptsqlite3.OperationalError,why: print("Unabletoupdatedatabase:%s."%why) returnFalse else:#thesamplenotinthedatabase try: self._cursor.execute("INSERTINTOvirustotal"\ "(md5,Kaspersky,McAfee,Symantec,Norman,Avast,NOD32,BitDefender,\ Microsoft,Rising,Panda)"\ "VALUES(?,?,?,?,?,?,?,?,?,?,?);", (md5,Kaspersky,McAfee,Symantec,Norman,Avast,NOD32,BitDefender,\ Microsoft,Rising,Panda)) self._conn.commit() task_id=self._cursor.lastrowid exceptsqlite3.OperationalError,why: print"why",str(why) returnNone print"add_to_db:%s,task_id:%s"%(str(self.__dbfile),str(task_id)) returntask_id defget_sample(self): """ Getsataskfrompendingqueue. """ log=logging.getLogger("Database.GetTask") ifnotself._cursor: log.error("Unabletoacquirecursor.") returnNone #Selectoneitemfromthequeuetablewithhigherpriorityandolder #additiondatewhichhasnotalreadybeenprocessed. try: self._cursor.execute("SELECT*FROMvirustotal"\ #"WHERElock=0"\ #"ANDstatus=0"\ "ORDERBYid,added_onLIMIT1;") exceptsqlite3.OperationalError,why: log.error("Unabletoquerydatabase:%s."%why) returnNone sample_row=self._cursor.fetchone() ifsample_row: returnself._get_task_dict(sample_row) else: returnNone defsearch_md5(self,md5): """ """ ifnotself._cursor: returnNone ifnotmd5orlen(md5)!=32: returnNone try: self._cursor.execute("SELECT*FROMvirustotal"\ "WHEREmd5=?"\ #"ANDstatus=1"\ "ORDERBYidDESC;", (md5,)) exceptsqlite3.OperationalError,why: returnNone task_dict={} forrowinself._cursor.fetchall(): task_dict=self._get_task_dict(row) #iftask_dict: #tasks.append(task_dict) returntask_dict classVirusTotal: """""" def__init__(self,md5): """Constructor""" self._virus_dict={} self._md5=md5 self._db_file=r"./db/virustotal.db" self.get_report_dict() defrepr(self): returnstr(self._virus_dict) defsubmit_md5(self,file_path): importpostfile #submitthefile FILE_NAME=os.path.basename(file_path) host="www.virustotal.com" selector="https://www.virustotal.com/vtapi/v2/file/scan" fields=[("apikey",APIKEY)] file_to_send=open(file_path,"rb").read() files=[("file",FILE_NAME,file_to_send)] json=postfile.post_multipart(host,selector,fields,files) printjson pass defget_report_dict(self): result_dict={} url="https://www.virustotal.com/vtapi/v2/file/report" parameters={"resource":self._md5, "apikey":APIKEY} data=urllib.urlencode(parameters) req=urllib2.Request(url,data) response=urllib2.urlopen(req) json=response.read() response_dict=simplejson.loads(json) ifresponse_dict["response_code"]:#hasresult scans_dict=response_dict.get("scans",{}) foranti_virus_comany,virus_nameinscans_dict.iteritems(): ifvirus_name["detected"]: result_dict.setdefault(anti_virus_comany,virus_name["result"]) returnresult_dict defwrite_to_db(self): """""" db=VirusTotalDatabase(self._db_file) virus_dict=self.get_report_dict() db.add_sample(self._md5,virus_dict)
使用方法如下:
config={'input':"inputMd5s"} fp=open(config['input'],"r") content=fp.readlines() MD5S=[] formd5inifilter(lambdax:len(x)>0,imap(string.strip,content)): MD5S.append(md5) print"MD5S",MD5S fp.close() fromgetVirusTotalInfoimportVirusTotal #得到扫描结果并写入数库 formd5inMD5S: virus_total=VirusTotal(md5) virus_total.write_to_db()
希望本文所述对大家的Python程序设计有所帮助。