python实现封装得到virustotal扫描结果
本文实例讲述了python实现封装得到virustotal扫描结果的方法。分享给大家供大家参考。具体方法如下:
importsimplejson
importurllib
importurllib2
importos,sys
importlogging
try:
importsqlite3
exceptImportError:
sys.stderr.write("ERROR:UnabletolocatePythonSQLite3module."\
"Pleaseverifyyourinstallation.Exiting...\n")
sys.exit(-1)
MD5="5248f774d2ee0a10936d0b1dc89107f1"
MD5="12fa5fb74201d9b6a14f63fbf9a81ff6"#donothavereportonvirustotal.com
APIKEY="xxxxxxxxxxxxxxxxxx"用自己的
classVirusTotalDatabase:
"""
Databaseabstractionlayer.
"""
def__init__(self,db_file):
log=logging.getLogger("Database.Init")
self.__dbfile=db_file
self._conn=None
self._cursor=None
#CheckifSQLitedatabasealreadyexists.Ifitdoesn'texistIinvoke
#thegenerationprocedure.
ifnotos.path.exists(self.__dbfile):
ifself._generate():
print("Generateddatabase\"%s\"whichdidn't"\
"existbefore."%self.__dbfile)
else:
print("Unabletogeneratedatabase")
#Oncethedatabaseisgeneratedofitalreadyhasbeen,Ican
#initializetheconnection.
try:
self._conn=sqlite3.connect(self.__dbfile)
self._cursor=self._conn.cursor()
exceptException,why:
print("Unabletoconnecttodatabase\"%s\":%s."
%(self.__dbfile,why))
log.debug("ConnectedtoSQLitedatabase\"%s\"."%self.__dbfile)
def_generate(self):
"""
CreatesdatabasestructureinaSQLitefile.
"""
ifos.path.exists(self.__dbfile):
returnFalse
db_dir=os.path.dirname(self.__dbfile)
ifnotos.path.exists(db_dir):
try:
os.makedirs(db_dir)
except(IOError,os.error),why:
print("Somethingwentwrongwhilecreatingdatabase"\
"directory\"%s\":%s"%(db_dir,why))
returnFalse
conn=sqlite3.connect(self.__dbfile)
cursor=conn.cursor()
cursor.execute("CREATETABLEvirustotal(\n"\
"idINTEGERPRIMARYKEY,\n"\
"md5TEXTNOTNULL,\n"\
"KasperskyTEXTDEFAULTNULL,\n"\
"McAfeeTEXTDEFAULTNULL,\n"\
"SymantecTEXTDEFAULTNULL,\n"\
"NormanTEXTDEFAULTNULL,\n"\
"AvastTEXTDEFAULTNULL,\n"\
"NOD32TEXTDEFAULTNULL,\n"\
"BitDefenderTEXTDEFAULTNULL,\n"\
"MicrosoftTEXTDEFAULTNULL,\n"\
"RisingTEXTDEFAULTNULL,\n"\
"PandaTEXTDEFAULTNULL\n"\
");")
print"createdb:%ssucess"%self.__dbfile
returnTrue
def_get_task_dict(self,row):
try:
task={}
task["id"]=row[0]
task["md5"]=row[1]
task["Kaspersky"]=row[2]
task["McAfee"]=row[3]
task["Symantec"]=row[4]
task["Norman"]=row[5]
task["Avast"]=row[6]
task["NOD32"]=row[7]
task["BitDefender"]=row[8]
task["Microsoft"]=row[9]
task["Rising"]=row[10]
task["Panda"]=row[11]
returntask
exceptException,why:
returnNone
defadd_sample(self,md5,virus_dict):
"""
"""
task_id=None
ifnotself._cursor:
returnNone
ifnotmd5ormd5=="":
returnNone
Kaspersky=virus_dict.get("Kaspersky",None)
McAfee=virus_dict.get("McAfee",None)
Symantec=virus_dict.get("Symantec",None)
Norman=virus_dict.get("Norman",None)
Avast=virus_dict.get("Avast",None)
NOD32=virus_dict.get("NOD32",None)
BitDefender=virus_dict.get("BitDefender",None)
Microsoft=virus_dict.get("Microsoft",None)
Rising=virus_dict.get("Rising",None)
Panda=virus_dict.get("Panda",None)
self._conn.text_factory=str
try:
self._cursor.execute("SELECTidFROMvirustotalWHEREmd5=?;",
(md5,))
sample_row=self._cursor.fetchone()
exceptsqlite3.OperationalError,why:
print"sqlite3error:%s\n"%str(why)
returnFalse
ifsample_row:
try:
sample_row=sample_row[0]
self._cursor.execute("UPDATEvirustotalSETKaspersky=?,McAfee=?,Symantec=?,Norman=?,Avast=?,\
NOD32=?,BitDefender=?,Microsoft=?,Rising=?,Panda=?WHEREid=?;",
(Kaspersky,McAfee,Symantec,Norman,Avast,NOD32,BitDefender,Microsoft,\
Rising,Panda,sample_row))
self._conn.commit()
task_id=sample_row
exceptsqlite3.OperationalError,why:
print("Unabletoupdatedatabase:%s."%why)
returnFalse
else:#thesamplenotinthedatabase
try:
self._cursor.execute("INSERTINTOvirustotal"\
"(md5,Kaspersky,McAfee,Symantec,Norman,Avast,NOD32,BitDefender,\
Microsoft,Rising,Panda)"\
"VALUES(?,?,?,?,?,?,?,?,?,?,?);",
(md5,Kaspersky,McAfee,Symantec,Norman,Avast,NOD32,BitDefender,\
Microsoft,Rising,Panda))
self._conn.commit()
task_id=self._cursor.lastrowid
exceptsqlite3.OperationalError,why:
print"why",str(why)
returnNone
print"add_to_db:%s,task_id:%s"%(str(self.__dbfile),str(task_id))
returntask_id
defget_sample(self):
"""
Getsataskfrompendingqueue.
"""
log=logging.getLogger("Database.GetTask")
ifnotself._cursor:
log.error("Unabletoacquirecursor.")
returnNone
#Selectoneitemfromthequeuetablewithhigherpriorityandolder
#additiondatewhichhasnotalreadybeenprocessed.
try:
self._cursor.execute("SELECT*FROMvirustotal"\
#"WHERElock=0"\
#"ANDstatus=0"\
"ORDERBYid,added_onLIMIT1;")
exceptsqlite3.OperationalError,why:
log.error("Unabletoquerydatabase:%s."%why)
returnNone
sample_row=self._cursor.fetchone()
ifsample_row:
returnself._get_task_dict(sample_row)
else:
returnNone
defsearch_md5(self,md5):
"""
"""
ifnotself._cursor:
returnNone
ifnotmd5orlen(md5)!=32:
returnNone
try:
self._cursor.execute("SELECT*FROMvirustotal"\
"WHEREmd5=?"\
#"ANDstatus=1"\
"ORDERBYidDESC;",
(md5,))
exceptsqlite3.OperationalError,why:
returnNone
task_dict={}
forrowinself._cursor.fetchall():
task_dict=self._get_task_dict(row)
#iftask_dict:
#tasks.append(task_dict)
returntask_dict
classVirusTotal:
""""""
def__init__(self,md5):
"""Constructor"""
self._virus_dict={}
self._md5=md5
self._db_file=r"./db/virustotal.db"
self.get_report_dict()
defrepr(self):
returnstr(self._virus_dict)
defsubmit_md5(self,file_path):
importpostfile
#submitthefile
FILE_NAME=os.path.basename(file_path)
host="www.virustotal.com"
selector="https://www.virustotal.com/vtapi/v2/file/scan"
fields=[("apikey",APIKEY)]
file_to_send=open(file_path,"rb").read()
files=[("file",FILE_NAME,file_to_send)]
json=postfile.post_multipart(host,selector,fields,files)
printjson
pass
defget_report_dict(self):
result_dict={}
url="https://www.virustotal.com/vtapi/v2/file/report"
parameters={"resource":self._md5,
"apikey":APIKEY}
data=urllib.urlencode(parameters)
req=urllib2.Request(url,data)
response=urllib2.urlopen(req)
json=response.read()
response_dict=simplejson.loads(json)
ifresponse_dict["response_code"]:#hasresult
scans_dict=response_dict.get("scans",{})
foranti_virus_comany,virus_nameinscans_dict.iteritems():
ifvirus_name["detected"]:
result_dict.setdefault(anti_virus_comany,virus_name["result"])
returnresult_dict
defwrite_to_db(self):
""""""
db=VirusTotalDatabase(self._db_file)
virus_dict=self.get_report_dict()
db.add_sample(self._md5,virus_dict)
使用方法如下:
config={'input':"inputMd5s"}
fp=open(config['input'],"r")
content=fp.readlines()
MD5S=[]
formd5inifilter(lambdax:len(x)>0,imap(string.strip,content)):
MD5S.append(md5)
print"MD5S",MD5S
fp.close()
fromgetVirusTotalInfoimportVirusTotal
#得到扫描结果并写入数库
formd5inMD5S:
virus_total=VirusTotal(md5)
virus_total.write_to_db()
希望本文所述对大家的Python程序设计有所帮助。