python爬取基于m3u8协议的ts文件并合并
前言
简单学习过网络爬虫,只是之前都是照着书上做并发,大概能理解,却还是无法自己用到自己项目中,这里自己研究实现一个网页嗅探HTML5播放控件中基于m3u8协议ts格式视频资源的项目,并未考虑过复杂情况,毕竟只是练练手.
源码
#coding=utf-8
importasyncio
importmultiprocessing
importos
importre
importtime
frommathimportfloor
frommultiprocessingimportManager
importaiohttp
importrequests
fromlxmlimporthtml
importthreading
fromsrc.my_libimportretry
fromsrc.my_libimporttime_statistics
classM3U8Download:
_path="./resource\\"#本地文件路径
_url_seed=None#资源所在链接前缀
_target_url={}#资源任务目标字典
_mode=""
_headers={"User-agent":"Mozilla/5.0"}#浏览器代理
_target_num=100
def__init__(self):
self._ml=Manager().list()#进程通信列表
ifnotos.path.exists(self._path):#检测本地目录存在否
os.makedirs(self._path)
exec_str=r'chcp65001'
os.system(exec_str)#先切换utf-8输出,防止控制台乱码
defsniffing(self,url):
self._url=url
print("开始嗅探...")
try:
r=requests.get(self._url)#访问嗅探网址,获取网页信息
except:
print("嗅探失败,网址不正确")
os.system("pause")
else:
tree=html.fromstring(r.content)
try:
source_url=tree.xpath('//video//source/@src')[0]#嗅探资源控制文件链接,这里只针对一个资源控制文件
#self._url_seed=re.split("/\w+\.m3u8",source_url)[0]#从资源控制文件链接解析域名
except:
print("嗅探失败,未发现资源")
os.system("pause")
else:
self.analysis(source_url)
defanalysis(self,source_url):
try:
self._url_seed=re.split("/\w+\.m3u8",source_url)[0]#从资源控制文件链接解析域名
withrequests.get(source_url)asr:#访问资源控制文件,获得资源信息
src=re.split("\n*#.+\n",r.text)#解析资源信息
forsub_srcinsrc:#将资源地址储存到任务字典
ifsub_src:
self._target_url[sub_src]=self._url_seed+"/"+sub_src
exceptExceptionase:
print("资源无法成功解析",e)
os.system("pause")
else:
self._target_num=len(self._target_url)
print("sniffingsuccess!!!,found",self._target_num,"url.")
self._mode=input(
"1:->单进程(LowB)\n2:->多进程+多线程(网速开始biubiu飞起!)\n3:->多进程+协程(最先进的并发!!!)\n")
ifself._mode=="1":
forpath,urlinself._target_url.items():
self._download(path,url)
elifself._mode=="2"orself._mode=="3":
self._multiprocessing()
def_multiprocessing(self,processing_num=4):#多进程,多线程
target_list={}#进程任务字典,储存每个进程分配的任务
pool=multiprocessing.Pool(processes=processing_num)#开启进程池
i=0#任务分配标识
forpath,urlinself._target_url.items():#分配进程任务
target_list[path]=url
i+=1
ifi%10==0ori==len(self._target_url):#每个进程分配十个任务
ifself._mode=="2":
pool.apply_async(self._sub_multithreading,kwds=target_list)#使用多线程驱动方法
else:
pool.apply_async(self._sub_coroutine,kwds=target_list)#使用协程驱动方法
target_list={}
pool.close()#join函数等待所有子进程结束
pool.join()#调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool
whileTrue:
ifself._judge_over():
self._combine()
break
def_sub_multithreading(self,**kwargs):
forpath,urlinkwargs.items():#根据进程任务开启线程
t=threading.Thread(target=self._download,args=(path,url,))
t.start()
@retry()
def_download(self,path,url):#同步下载方法
withrequests.get(url,headers=self._headers)asr:
ifr.status_code==200:
withopen(self._path+path,"wb")asfile:
file.write(r.content)
self._ml.append(0)#每成功一个就往进程通信列表增加一个值
percent='%.2f'%(len(self._ml)/self._target_num*100)
print(len(self._ml),":",path,"->OK","\tcomplete:",percent,"%")#显示下载进度
else:
print(path,r.status_code,r.reason)
def_sub_coroutine(self,**kwargs):
tasks=[]
forpath,urlinkwargs.items():#根据进程任务创建协程任务列表
tasks.append(asyncio.ensure_future(self._async_download(path,url)))
loop=asyncio.get_event_loop()#创建异步事件循环
loop.run_until_complete(asyncio.wait(tasks))#注册任务列表
asyncdef_async_download(self,path,url):#异步下载方法
asyncwithaiohttp.ClientSession()assession:
asyncwithsession.get(url,headers=self._headers)asresp:
try:
assertresp.status==200,"E"#断言状态码为200,否则抛异常,触发重试装饰器
withopen(self._path+path,"wb")asfile:
file.write(awaitresp.read())
exceptExceptionase:
print(e)
else:
self._ml.append(0)#每成功一个就往进程通信列表增加一个值
percent='%.2f'%(len(self._ml)/self._target_num*100)
print(len(self._ml),":",path,"->OK","\tcomplete:",percent,"%")#显示下载进度
def_combine(self):#组合资源方法
try:
print("开始组合资源...")
identification=str(floor(time.time()))
exec_str=r'copy/b"'+self._path+r'*.ts""'+self._path+'video'+identification+'.mp4"'
os.system(exec_str)#使用cmd命令将资源整合
exec_str=r'del"'+self._path+r'*.ts"'
os.system(exec_str)#删除原来的文件
except:
print("资源组合失败")
else:
print("资源组合成功!")
def_judge_over(self):#判断是否全部下载完成
iflen(self._ml)==len(self._target_url):
returnTrue
returnFalse
@time_statistics
defapp():
multiprocessing.freeze_support()
url=input("输入嗅探网址:\n")
m3u8=M3U8Download()
m3u8.sniffing(url)
#m3u8.analysis(url)
if__name__=="__main__":
app()
这里是两个装饰器的实现:
importtime
deftime_statistics(fun):
deffunction_timer(*args,**kwargs):
t0=time.time()
result=fun(*args,**kwargs)
t1=time.time()
print("Totaltimerunning%s:%sseconds"%(fun.__name__,str(t1-t0)))
returnresult
returnfunction_timer
defretry(retries=3):
def_retry(fun):
defwrapper(*args,**kwargs):
for_inrange(retries):
try:
returnfun(*args,**kwargs)
exceptExceptionase:
print("@",fun.__name__,"->",e)
returnwrapper
return_retry
打包成exe文件
使用PyInstaller-Fdownload.py将程序打包成单个可执行文件.
这里需要注意一下,因为程序含有多进程,需要在执行前加一句multiprocessing.freeze_support(),不然程序会反复执行多进程前的功能.
关于协程
协程在Python3.5进化到了asyncawait版本,用async标记异步方法,在异步方法里对耗时操作使用await标记.这里使用了一个进程驱动协程的方法,在进程池创建多个协程任务,使用asyncio.get_event_loop()创建协程事件循环,使用run_until_complete()注册协程任务,asyncio.wait()方法接收一个任务列表进行协程注册.
关于装饰器
装饰器源于闭包原理,这里使用了两种装饰器.
- @time_statistics:统计耗时,装饰器自己无参型
- @retry():设置重试次数,装饰器自己有参型
- 按我理解是有参型是将无参型装饰器包含在内部,而调用是加()的,关于():
- 不带括号时,调用的是这个函数本身
- 带括号(此时必须传入需要的参数),调用的是函数的return结果
关于CMD控制台
程序会使用CMD命令来将下载的ts文件合并.
因为CMD默认使用GB2312编码,调用os.system()需要先切换成通用的UTF-8输出,否则系统信息会乱码.
而且使用cmd命令时参数最好加双引号,以避免特殊符号报错.
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。