python实现实时视频流播放代码实例
这篇文章主要介绍了python实现实时视频流播放代码实例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
@action(methods=['GET'],detail=True)
defvideo(self,request,pk=None):
"""
获取设备实时视频流
:paramrequest:
:parampk:
:return:
"""
device_obj=self.get_object()
#ifdevice_obj.status==0:
#returnResponse({'error':'设备离线'})
ifnotdevice_obj.rtsp_address:
returnResponse({'error':'缺少rtsp地址'})
cache_id='_video_stream_{}'.format(device_obj.hash)
cache_status=cache.get(cache_id,None)
ifcache_statusisNone:#任务初始化,设置初始时间
cache.set(cache_id,time.time(),timeout=60)
elifisinstance(cache_status,float)andtime.time()-cache_status>30:#任务已超时,返回错误信息,一段时间内不再入队
returnResponse({'error':'连接数目超过限制,请稍后再试'})
ret=job_queue.enqueue_video(rtsp_address=device_obj.rtsp_address,device_hash=device_obj.hash)
logger.info('fetchdevice%svideojobstatus:%s',pk,ret._status)
ifret._status==b'started'or'started':#视频流正常推送中,刷新播放时间,返回视频ID
cache.set(cache_id,'continue',timeout=30)
returnResponse({'video':''.join([settings.FFMPEG_VIDEO,device_obj.hash])})
elifret._status==b'queued'or'queued':#视频任务等待中
returnResponse({'status':'等待建立视频连接'})
else:#建立视频任务失败
returnResponse({'error':'打开视频失败'})
classJobQueue:
"""实时视频播放"""
def__init__(self):
self.video_queue=django_rq.get_queue('video')#视频推流消息队列
defenqueue_video(self,rtsp_address,device_hash):
"""视频流队列"""
job_id='video_{}'.format(device_hash)
job=self.video_queue.fetch_job(job_id)
ifnotjob:
job=self.video_queue.enqueue_call(
func='utils.ffmpeg.ffmpeg_play',
args=(rtsp_address,device_hash),
timeout=-1,
ttl=30,#最多等待30秒
result_ttl=0,
job_id=job_id
)
returnjob
#-*-coding:utf-8-*-
importsubprocess
importthreading
importtime
importlogging
fromdjango.core.cacheimportcache
logger=logging.getLogger('server.default')
defffmpeg_play(stream,name):
play=True
cache_id='_video_stream_{}'.format(name)
cache.set(cache_id,'continue',timeout=30)
process=None
defupstream():
cmd="ffmpeg-i'{}'-c:vh264-fflv-r25-an'rtmp://127.0.0.1:1935/live/{}'".format(stream,name)
process=subprocess.Popen(cmd,shell=True,stdin=subprocess.PIPE,stderr=subprocess.DEVNULL)
try:
logger.info('device:{}streamthreadstart:{}'.format(name,stream))
whileplay:
time.sleep(1)
exceptExceptionase:
logger.info('device:{}streamthreaderror{}'.format(name,e))
finally:
logger.info('device:{}streamthreadstop'.format(name))
process.communicate(b'q')
thr=threading.Thread(target=upstream)
thr.start()
try:
whileTrue:
play=cache.get(cache_id,'')
ifplay!='continue':
logger.info('stopdevice{}videostream'.format(name))
play=False
break
time.sleep(1)
exceptExceptionase:
logger.info('device:{}playstreamerror{}'.format(name,e))
process.communicate(b'q')
logger.info('waitdevice{}videothreadstop'.format(name))
thr.join()
logger.info('device{}videojobstop'.format(name))
#实时视频流播放
RQ_QUEUES={
'video':{
'USE_REDIS_CACHE':'video',
}
}
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。
声明:本文内容来源于网络,版权归原作者所有,内容由互联网用户自发贡献自行上传,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任。如果您发现有涉嫌版权的内容,欢迎发送邮件至:czq8825#qq.com(发邮件时,请将#更换为@)进行举报,并提供相关证据,一经查实,本站将立刻删除涉嫌侵权内容。