python实现实时视频流播放代码实例
这篇文章主要介绍了python实现实时视频流播放代码实例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
@action(methods=['GET'],detail=True) defvideo(self,request,pk=None): """ 获取设备实时视频流 :paramrequest: :parampk: :return: """ device_obj=self.get_object() #ifdevice_obj.status==0: #returnResponse({'error':'设备离线'}) ifnotdevice_obj.rtsp_address: returnResponse({'error':'缺少rtsp地址'}) cache_id='_video_stream_{}'.format(device_obj.hash) cache_status=cache.get(cache_id,None) ifcache_statusisNone:#任务初始化,设置初始时间 cache.set(cache_id,time.time(),timeout=60) elifisinstance(cache_status,float)andtime.time()-cache_status>30:#任务已超时,返回错误信息,一段时间内不再入队 returnResponse({'error':'连接数目超过限制,请稍后再试'}) ret=job_queue.enqueue_video(rtsp_address=device_obj.rtsp_address,device_hash=device_obj.hash) logger.info('fetchdevice%svideojobstatus:%s',pk,ret._status) ifret._status==b'started'or'started':#视频流正常推送中,刷新播放时间,返回视频ID cache.set(cache_id,'continue',timeout=30) returnResponse({'video':''.join([settings.FFMPEG_VIDEO,device_obj.hash])}) elifret._status==b'queued'or'queued':#视频任务等待中 returnResponse({'status':'等待建立视频连接'}) else:#建立视频任务失败 returnResponse({'error':'打开视频失败'})
classJobQueue: """实时视频播放""" def__init__(self): self.video_queue=django_rq.get_queue('video')#视频推流消息队列 defenqueue_video(self,rtsp_address,device_hash): """视频流队列""" job_id='video_{}'.format(device_hash) job=self.video_queue.fetch_job(job_id) ifnotjob: job=self.video_queue.enqueue_call( func='utils.ffmpeg.ffmpeg_play', args=(rtsp_address,device_hash), timeout=-1, ttl=30,#最多等待30秒 result_ttl=0, job_id=job_id ) returnjob
#-*-coding:utf-8-*- importsubprocess importthreading importtime importlogging fromdjango.core.cacheimportcache logger=logging.getLogger('server.default') defffmpeg_play(stream,name): play=True cache_id='_video_stream_{}'.format(name) cache.set(cache_id,'continue',timeout=30) process=None defupstream(): cmd="ffmpeg-i'{}'-c:vh264-fflv-r25-an'rtmp://127.0.0.1:1935/live/{}'".format(stream,name) process=subprocess.Popen(cmd,shell=True,stdin=subprocess.PIPE,stderr=subprocess.DEVNULL) try: logger.info('device:{}streamthreadstart:{}'.format(name,stream)) whileplay: time.sleep(1) exceptExceptionase: logger.info('device:{}streamthreaderror{}'.format(name,e)) finally: logger.info('device:{}streamthreadstop'.format(name)) process.communicate(b'q') thr=threading.Thread(target=upstream) thr.start() try: whileTrue: play=cache.get(cache_id,'') ifplay!='continue': logger.info('stopdevice{}videostream'.format(name)) play=False break time.sleep(1) exceptExceptionase: logger.info('device:{}playstreamerror{}'.format(name,e)) process.communicate(b'q') logger.info('waitdevice{}videothreadstop'.format(name)) thr.join() logger.info('device{}videojobstop'.format(name))
#实时视频流播放 RQ_QUEUES={ 'video':{ 'USE_REDIS_CACHE':'video', } }
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。
声明:本文内容来源于网络,版权归原作者所有,内容由互联网用户自发贡献自行上传,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任。如果您发现有涉嫌版权的内容,欢迎发送邮件至:czq8825#qq.com(发邮件时,请将#更换为@)进行举报,并提供相关证据,一经查实,本站将立刻删除涉嫌侵权内容。