python 下载m3u8视频的示例代码
importrequests
importos
importdatetime
importthreading
classxiazai():
def__init__(self,url):
self.url=url
work_dir=os.getcwd()
#print(work_dir)
#用来保存ts文件
file_dir=os.path.join(work_dir,'file_tmp')
ifnotos.path.exists(file_dir):
os.mkdir(file_dir)
self.headers={
'user-agent':'Mozilla/5.0(Macintosh;IntelMacOSX10_15_3)AppleWebKit/537.36(KHTML,likeGecko)Chrome/80.0.3987.116Safari/537.36'
}
self.savefile(self.url)
defsavefile(self,file_url):
r=requests.get(file_url,headers=self.headers)
#合成带有hls的m3u8地址
ifr.text.split('\n')[-1]=='':
hls_mark=r.text.split('\n')[-2]#以防\n结尾
else:
hls_mark=r.text.split('\n')[-1]
self.url_m3u8_hls=file_url.replace('index.m3u8',hls_mark)
#file_m3u8=url_m3u8_hls.split('/')[-1]
self.duqu()
#print(url_m3u8_hls)
defduqu(self):
r=requests.get(self.url_m3u8_hls,headers=self.headers).text
text_bytes=r.split('\n')
#筛选以.ts结尾的行
#有些情况下可能是以其他格式的文件,比如png,下载后修改后缀即可
#ts_name=[iforiintext_stringifi.endswith('.ts')]
self.ts_time=[iforiintext_bytesifi.startswith('#EXTINF')]
#self.shijian(dm_time)
#print(dm_time)
self.ts_neirong=[iforiintext_bytesifnoti.startswith('#')]
self.ts_neirong.pop()
self.threads=[]
self.threads.append(threading.Thread(target=self.xiazai))
self.threads.append(threading.Thread(target=self.shijian))
fortinself.threads:
#print(t)
t.start()
#self.xiazai(url_m3u8_hls)
#print(ts_neirong)
defshijian(self):
self.dm_time=0
foriinrange(len(self.ts_time)):
ts_time1=self.ts_time[i].replace('#EXTINF:','')
ts_time2=ts_time1.replace(',','')
self.dm_time=float(ts_time2)+self.dm_time
shichang_time=str(datetime.timedelta(seconds=self.dm_time))
print('视频时长:%s'%shichang_time)
defxiazai(self):
liebiao=[]
foriinrange(len(self.ts_neirong)):
hls_mark=self.url_m3u8_hls.split('/')[-1]
url_xiazai=self.url_m3u8_hls.replace(hls_mark,self.ts_neirong[i])
liebiao.append(url_xiazai)
#print(url_xiazai)
#r=requests.get(url_xiazai,headers=self.headers)
#withopen('file_tmp/'+ts_neirong[i],'wb')asf:
#f.write(r.content)
#f.close()
x=self.bisector_list(liebiao,10)
self.xiancheng0=x[0]
self.xiancheng1=x[1]
self.xiancheng2=x[2]
self.xiancheng3=x[3]
self.xiancheng4=x[4]
self.xiancheng5=x[5]
self.xiancheng6=x[6]
self.xiancheng7=x[7]
self.xiancheng8=x[8]
self.xiancheng9=x[9]
self.threads2=[]
self.threads2.append(threading.Thread(target=self.xiancheng_xiazai1))
self.threads2.append(threading.Thread(target=self.xiancheng_xiazai2))
self.threads2.append(threading.Thread(target=self.xiancheng_xiazai3))
self.threads2.append(threading.Thread(target=self.xiancheng_xiazai4))
self.threads2.append(threading.Thread(target=self.xiancheng_xiazai5))
self.threads2.append(threading.Thread(target=self.xiancheng_xiazai6))
self.threads2.append(threading.Thread(target=self.xiancheng_xiazai7))
self.threads2.append(threading.Thread(target=self.xiancheng_xiazai8))
self.threads2.append(threading.Thread(target=self.xiancheng_xiazai9))
self.threads2.append(threading.Thread(target=self.xiancheng_xiazai10))
fortinself.threads2:
#print(t)
t.start()
defxiancheng_xiazai1(self):
#print(self.xiancheng0)
foriinself.xiancheng0:
#print(i)
r=requests.get(i,headers=self.headers)
mingzi=i.split('/')[-1]
withopen('file_tmp/'+mingzi,'wb')asf:
f.write(r.content)
f.close()
defxiancheng_xiazai2(self):
#print(self.xiancheng1)
foriinself.xiancheng1:
#print(i)
r=requests.get(i,headers=self.headers)
mingzi=i.split('/')[-1]
withopen('file_tmp/'+mingzi,'wb')asf:
f.write(r.content)
f.close()
defxiancheng_xiazai3(self):
#print(self.xiancheng2)
foriinself.xiancheng2:
#print(i)
r=requests.get(i,headers=self.headers)
mingzi=i.split('/')[-1]
withopen('file_tmp/'+mingzi,'wb')asf:
f.write(r.content)
f.close()
defxiancheng_xiazai4(self):
#print(self.xiancheng3)
foriinself.xiancheng3:
#print(i)
r=requests.get(i,headers=self.headers)
mingzi=i.split('/')[-1]
withopen('file_tmp/'+mingzi,'wb')asf:
f.write(r.content)
f.close()
defxiancheng_xiazai5(self):
#print(self.xiancheng4)
foriinself.xiancheng4:
#print(i)
r=requests.get(i,headers=self.headers)
mingzi=i.split('/')[-1]
withopen('file_tmp/'+mingzi,'wb')asf:
f.write(r.content)
f.close()
defxiancheng_xiazai6(self):
#print(self.xiancheng5)
foriinself.xiancheng5:
#print(i)
r=requests.get(i,headers=self.headers)
mingzi=i.split('/')[-1]
withopen('file_tmp/'+mingzi,'wb')asf:
f.write(r.content)
f.close()
defxiancheng_xiazai7(self):
#print(self.xiancheng6)
foriinself.xiancheng6:
#print(i)
r=requests.get(i,headers=self.headers)
mingzi=i.split('/')[-1]
withopen('file_tmp/'+mingzi,'wb')asf:
f.write(r.content)
f.close()
defxiancheng_xiazai8(self):
#print(self.xiancheng7)
foriinself.xiancheng7:
#print(i)
r=requests.get(i,headers=self.headers)
mingzi=i.split('/')[-1]
withopen('file_tmp/'+mingzi,'wb')asf:
f.write(r.content)
f.close()
defxiancheng_xiazai9(self):
#print(self.xiancheng8)
foriinself.xiancheng8:
#print(i)
r=requests.get(i,headers=self.headers)
mingzi=i.split('/')[-1]
withopen('file_tmp/'+mingzi,'wb')asf:
f.write(r.content)
f.close()
defxiancheng_xiazai10(self):
#print(self.xiancheng9)
foriinself.xiancheng9:
#print(i)
r=requests.get(i,headers=self.headers)
mingzi=i.split('/')[-1]
withopen('file_tmp/'+mingzi,'wb')asf:
f.write(r.content)
f.close()
defbisector_list(self,tabulation:list,num:int):
"""
将列表平均分成几份
:paramtabulation:列表
:paramnum:份数
:return:返回一个新的列表
"""
new_list=[]
'''列表长度大于等于份数'''
iflen(tabulation)>=num:
'''remainder:列表长度除以份数,取余'''
remainder=len(tabulation)%num
ifremainder==0:
'''merchant:列表长度除以分数'''
merchant=int(len(tabulation)/num)
'''将列表平均拆分'''
foriinrange(1,num+1):
ifi==1:
new_list.append(tabulation[:merchant])
else:
new_list.append(tabulation[(i-1)*merchant:i*merchant])
returnnew_list
else:
'''merchant:列表长度除以分数取商'''
merchant=int(len(tabulation)//num)
'''remainder:列表长度除以份数,取余'''
remainder=int(len(tabulation)%num)
'''将列表平均拆分'''
foriinrange(1,num+1):
ifi==1:
new_list.append(tabulation[:merchant])
else:
new_list.append(tabulation[(i-1)*merchant:i*merchant])
'''将剩余数据的添加前面列表中'''
ifint(len(tabulation)-i*merchant)<=merchant:
forjintabulation[-remainder:]:
new_list[tabulation[-remainder:].index(j)].append(j)
returnnew_list
else:
'''如果列表长度小于份数'''
foriinrange(1,len(tabulation)+1):
tabulation_subset=[]
tabulation_subset.append(tabulation[i-1])
new_list.append(tabulation_subset)
returnnew_list
if__name__=='__main__':
xiazai('http://iqiyi.cdn9-okzy.com/20200907/15137_ed25d8c5/index.m3u8')
速度很慢. 40m5分钟 不加多线程1小时。可能我的m3u8不行,或者我电脑不行,多线程是机械式的。 电脑好可以多加几条。
以上就是python下载m3u8视频的示例代码的详细内容,更多关于python下载m3u8视频的资料请关注毛票票其它相关文章!