flask开启多线程的具体方法
在我之前解释了flask如何支持多线程主要通过两个类来实现,LocalStack和Local,在Local中有两个属性,__storage__和__ident_func__,后者用来获取线程id,从而区分不同线程发来的请求
这次要说的是flask如何开启多线程
先从app.run()这个方法看起
defrun(self,host=None,port=None,debug=None,**options):
fromwerkzeug.servingimportrun_simple
ifhostisNone:
host='127.0.0.1'
ifportisNone:
server_name=self.config['SERVER_NAME']
ifserver_nameand':'inserver_name:
port=int(server_name.rsplit(':',1)[1])
else:
port=5000
ifdebugisnotNone:
self.debug=bool(debug)
options.setdefault('use_reloader',self.debug)
options.setdefault('use_debugger',self.debug)
try:
run_simple(host,port,self,**options)#会进入这个函数
finally:
#resetthefirstrequestinformationifthedevelopmentserver
#resetnormally.Thismakesitpossibletorestarttheserver
#withoutreloaderandthatstufffromaninteractiveshell.
self._got_first_request=False
经过判断和设置后进入run_simple()这个函数,看下源码
defrun_simple(hostname,port,application,use_reloader=False,
use_debugger=False,use_evalex=True,
extra_files=None,reloader_interval=1,
reloader_type='auto',threaded=False,
processes=1,request_handler=None,static_files=None,
passthrough_errors=False,ssl_context=None):
"""StartaWSGIapplication.Optionalfeaturesincludeareloader,
multithreadingandforksupport.
Thisfunctionhasacommand-lineinterfacetoo::
python-mwerkzeug.serving--help
..versionadded::0.5
`static_files`wasaddedtosimplifyservingofstaticfilesaswell
as`passthrough_errors`.
..versionadded::0.6
supportforSSLwasadded.
..versionadded::0.8
AddedsupportforautomaticallyloadingaSSLcontextfromcertificate
fileandprivatekey.
..versionadded::0.9
Addedcommand-lineinterface.
..versionadded::0.10
Improvedthereloaderandaddedsupportforchangingthebackend
throughthe`reloader_type`parameter.See:ref:`reloader`
formoreinformation.
:paramhostname:Thehostfortheapplication.eg:``'localhost'``
:paramport:Theportfortheserver.eg:``8080``
:paramapplication:theWSGIapplicationtoexecute
:paramuse_reloader:shouldtheserverautomaticallyrestartthepython
processifmoduleswerechanged?
:paramuse_debugger:shouldthewerkzeugdebuggingsystembeused?
:paramuse_evalex:shouldtheexceptionevaluationfeaturebeenabled?
:paramextra_files:alistoffilesthereloadershouldwatch
additionallytothemodules.Forexampleconfiguration
files.
:paramreloader_interval:theintervalforthereloaderinseconds.
:paramreloader_type:thetypeofreloadertouse.Thedefaultis
autodetection.Validvaluesare``'stat'``and
``'watchdog'``.See:ref:`reloader`formore
information.
:paramthreaded:shouldtheprocesshandleeachrequestinaseparate
thread?
:paramprocesses:ifgreaterthan1thenhandleeachrequestinanewprocess
uptothismaximumnumberofconcurrentprocesses.
:paramrequest_handler:optionalparameterthatcanbeusedtoreplace
thedefaultone.Youcanusethistoreplaceit
withadifferent
:class:`~BaseHTTPServer.BaseHTTPRequestHandler`
subclass.
:paramstatic_files:alistordictofpathsforstaticfiles.Thisworks
exactlylike:class:`SharedDataMiddleware`,it'sactually
justwrappingtheapplicationinthatmiddlewarebefore
serving.
:parampassthrough_errors:setthisto`True`todisabletheerrorcatching.
Thismeansthattheserverwilldieonerrorsbut
itcanbeusefultohookdebuggersin(pdbetc.)
:paramssl_context:anSSLcontextfortheconnection.Eitheran
:class:`ssl.SSLContext`,atupleintheform
``(cert_file,pkey_file)``,thestring``'adhoc'``if
theservershouldautomaticallycreateone,or``None``
todisableSSL(whichisthedefault).
"""
ifnotisinstance(port,int):
raiseTypeError('portmustbeaninteger')
ifuse_debugger:
fromwerkzeug.debugimportDebuggedApplication
application=DebuggedApplication(application,use_evalex)
ifstatic_files:
fromwerkzeug.wsgiimportSharedDataMiddleware
application=SharedDataMiddleware(application,static_files)
deflog_startup(sock):
display_hostname=hostnamenotin('','*')andhostnameor'localhost'
if':'indisplay_hostname:
display_hostname='[%s]'%display_hostname
quit_msg='(PressCTRL+Ctoquit)'
port=sock.getsockname()[1]
_log('info','*Runningon%s://%s:%d/%s',
ssl_contextisNoneand'http'or'https',
display_hostname,port,quit_msg)
definner():
try:
fd=int(os.environ['WERKZEUG_SERVER_FD'])
except(LookupError,ValueError):
fd=None
srv=make_server(hostname,port,application,threaded,
processes,request_handler,
passthrough_errors,ssl_context,
fd=fd)
iffdisNone:
log_startup(srv.socket)
srv.serve_forever()
ifuse_reloader:
#Ifwe'renotrunningalreadyinthesubprocessthatisthe
#reloaderwewanttoopenupasocketearlytomakesurethe
#portisactuallyavailable.
ifos.environ.get('WERKZEUG_RUN_MAIN')!='true':
ifport==0andnotcan_open_by_fd:
raiseValueError('Cannotbindtoarandomportwithenabled'
'reloaderifthePythoninterpreterdoes'
'notsupportsocketopeningbyfd.')
#Createanddestroyasocketsothatanyexceptionsare
#raisedbeforewespawnaseparatePythoninterpreterand
#losethisability.
address_family=select_ip_version(hostname,port)
s=socket.socket(address_family,socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
s.bind(get_sockaddr(hostname,port,address_family))
ifhasattr(s,'set_inheritable'):
s.set_inheritable(True)
#Ifwecanopenthesocketbyfiledescriptor,thenwecanjust
#reusethisoneandoursocketwillsurvivetherestarts.
ifcan_open_by_fd:
os.environ['WERKZEUG_SERVER_FD']=str(s.fileno())
s.listen(LISTEN_QUEUE)
log_startup(s)
else:
s.close()
#Donotuserelativeimports,otherwise"python-mwerkzeug.serving"
#breaks.
fromwerkzeug._reloaderimportrun_with_reloader
run_with_reloader(inner,extra_files,reloader_interval,
reloader_type)
else:
inner()#默认会执行
经过判断和设置后进入run_simple()这个函数,看下源码
defrun_simple(hostname,port,application,use_reloader=False,
use_debugger=False,use_evalex=True,
extra_files=None,reloader_interval=1,
reloader_type='auto',threaded=False,
processes=1,request_handler=None,static_files=None,
passthrough_errors=False,ssl_context=None):
"""StartaWSGIapplication.Optionalfeaturesincludeareloader,
multithreadingandforksupport.
Thisfunctionhasacommand-lineinterfacetoo::
python-mwerkzeug.serving--help
..versionadded::0.5
`static_files`wasaddedtosimplifyservingofstaticfilesaswell
as`passthrough_errors`.
..versionadded::0.6
supportforSSLwasadded.
..versionadded::0.8
AddedsupportforautomaticallyloadingaSSLcontextfromcertificate
fileandprivatekey.
..versionadded::0.9
Addedcommand-lineinterface.
..versionadded::0.10
Improvedthereloaderandaddedsupportforchangingthebackend
throughthe`reloader_type`parameter.See:ref:`reloader`
formoreinformation.
:paramhostname:Thehostfortheapplication.eg:``'localhost'``
:paramport:Theportfortheserver.eg:``8080``
:paramapplication:theWSGIapplicationtoexecute
:paramuse_reloader:shouldtheserverautomaticallyrestartthepython
processifmoduleswerechanged?
:paramuse_debugger:shouldthewerkzeugdebuggingsystembeused?
:paramuse_evalex:shouldtheexceptionevaluationfeaturebeenabled?
:paramextra_files:alistoffilesthereloadershouldwatch
additionallytothemodules.Forexampleconfiguration
files.
:paramreloader_interval:theintervalforthereloaderinseconds.
:paramreloader_type:thetypeofreloadertouse.Thedefaultis
autodetection.Validvaluesare``'stat'``and
``'watchdog'``.See:ref:`reloader`formore
information.
:paramthreaded:shouldtheprocesshandleeachrequestinaseparate
thread?
:paramprocesses:ifgreaterthan1thenhandleeachrequestinanewprocess
uptothismaximumnumberofconcurrentprocesses.
:paramrequest_handler:optionalparameterthatcanbeusedtoreplace
thedefaultone.Youcanusethistoreplaceit
withadifferent
:class:`~BaseHTTPServer.BaseHTTPRequestHandler`
subclass.
:paramstatic_files:alistordictofpathsforstaticfiles.Thisworks
exactlylike:class:`SharedDataMiddleware`,it'sactually
justwrappingtheapplicationinthatmiddlewarebefore
serving.
:parampassthrough_errors:setthisto`True`todisabletheerrorcatching.
Thismeansthattheserverwilldieonerrorsbut
itcanbeusefultohookdebuggersin(pdbetc.)
:paramssl_context:anSSLcontextfortheconnection.Eitheran
:class:`ssl.SSLContext`,atupleintheform
``(cert_file,pkey_file)``,thestring``'adhoc'``if
theservershouldautomaticallycreateone,or``None``
todisableSSL(whichisthedefault).
"""
ifnotisinstance(port,int):
raiseTypeError('portmustbeaninteger')
ifuse_debugger:
fromwerkzeug.debugimportDebuggedApplication
application=DebuggedApplication(application,use_evalex)
ifstatic_files:
fromwerkzeug.wsgiimportSharedDataMiddleware
application=SharedDataMiddleware(application,static_files)
deflog_startup(sock):
display_hostname=hostnamenotin('','*')andhostnameor'localhost'
if':'indisplay_hostname:
display_hostname='[%s]'%display_hostname
quit_msg='(PressCTRL+Ctoquit)'
port=sock.getsockname()[1]
_log('info','*Runningon%s://%s:%d/%s',
ssl_contextisNoneand'http'or'https',
display_hostname,port,quit_msg)
definner():
try:
fd=int(os.environ['WERKZEUG_SERVER_FD'])
except(LookupError,ValueError):
fd=None
srv=make_server(hostname,port,application,threaded,
processes,request_handler,
passthrough_errors,ssl_context,
fd=fd)
iffdisNone:
log_startup(srv.socket)
srv.serve_forever()
ifuse_reloader:
#Ifwe'renotrunningalreadyinthesubprocessthatisthe
#reloaderwewanttoopenupasocketearlytomakesurethe
#portisactuallyavailable.
ifos.environ.get('WERKZEUG_RUN_MAIN')!='true':
ifport==0andnotcan_open_by_fd:
raiseValueError('Cannotbindtoarandomportwithenabled'
'reloaderifthePythoninterpreterdoes'
'notsupportsocketopeningbyfd.')
#Createanddestroyasocketsothatanyexceptionsare
#raisedbeforewespawnaseparatePythoninterpreterand
#losethisability.
address_family=select_ip_version(hostname,port)
s=socket.socket(address_family,socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
s.bind(get_sockaddr(hostname,port,address_family))
ifhasattr(s,'set_inheritable'):
s.set_inheritable(True)
#Ifwecanopenthesocketbyfiledescriptor,thenwecanjust
#reusethisoneandoursocketwillsurvivetherestarts.
ifcan_open_by_fd:
os.environ['WERKZEUG_SERVER_FD']=str(s.fileno())
s.listen(LISTEN_QUEUE)
log_startup(s)
else:
s.close()
#Donotuserelativeimports,otherwise"python-mwerkzeug.serving"
#breaks.
fromwerkzeug._reloaderimportrun_with_reloader
run_with_reloader(inner,extra_files,reloader_interval,
reloader_type)
else:
inner()#默认会执行
还是经过一系列判断后默认会进入inner()函数,这个函数定义在run_simple()内,属于闭包,inner()中会执行make_server()这个函数,看下源码:
defmake_server(host=None,port=None,app=None,threaded=False,processes=1,
request_handler=None,passthrough_errors=False,
ssl_context=None,fd=None):
"""Createanewserverinstancethatiseitherthreaded,orforks
orjustprocessesonerequestafteranother.
"""
ifthreadedandprocesses>1:
raiseValueError("cannothaveamultithreadedand"
"multiprocessserver.")
elifthreaded:
returnThreadedWSGIServer(host,port,app,request_handler,
passthrough_errors,ssl_context,fd=fd)
elifprocesses>1:
returnForkingWSGIServer(host,port,app,processes,request_handler,
passthrough_errors,ssl_context,fd=fd)
else:
returnBaseWSGIServer(host,port,app,request_handler,
passthrough_errors,ssl_context,fd=fd)
看到这也很明白了,想要配置多线程或者多进程,则需要设置threaded或processes这两个参数,而这两个参数是从app.run()中传递过来的:
app.run(**options)--->run_simple(threaded,processes)--->make_server(threaded,processes)
默认情况下flask是单线程,单进程的,想要开启只需要在run中传入对应的参数:app.run(threaded=True)即可.
从make_server中可知,flask提供了三种server:ThreadedWSGIServer,ForkingWSGIServer,BaseWSGIServer,默认情况下是BaseWSGIServer
以线程为例,看下ThreadedWSGIServer这个类:
classThreadedWSGIServer(ThreadingMixIn,BaseWSGIServer):#继承自ThreadingMixIn,BaseWSGIServer
"""AWSGIserverthatdoesthreading.""" multithread=True daemon_threads=True
ThreadingMixIn=socketserver.ThreadingMixIn
classThreadingMixIn:
"""Mix-inclasstohandleeachrequestinanewthread.""" #Decideshowthreadswillactuponterminationofthe #mainprocess daemon_threads=False defprocess_request_thread(self,request,client_address): """SameasinBaseServerbutasathread. Inaddition,exceptionhandlingisdonehere. """ try: self.finish_request(request,client_address) self.shutdown_request(request) except: self.handle_error(request,client_address) self.shutdown_request(request) defprocess_request(self,request,client_address): """Startanewthreadtoprocesstherequest.""" t=threading.Thread(target=self.process_request_thread, args=(request,client_address)) t.daemon=self.daemon_threads t.start()
process_request就是对每个请求产生一个新的线程来处理
最后写一个非常简单的应用来验证以上说法:
fromflaskimportFlask
fromflaskimport_request_ctx_stackapp=Flask(__name__)
@app.route('/')
defindex():
print(_request_ctx_stack._local.__ident_func__()) whileTrue: pass return'hello
'
app.run()#如果需要开启多线程则app.run(threaded=True)
_request_ctx_stack._local.__ident_func__()对应这get_ident()这个函数,返回当前线程id,为什么要在后面加上whileTrue这句呢,我们看下get_ident()这个函数的说明:
Returnanon-zerointegerthatuniquelyidentifiesthecurrentthreadamongstotherthreadsthatexistsimultaneously.Thismaybeusedtoidentifyper-threadresources.Eventhoughonsomeplatformsthreadsidentitiesmayappeartobeallocatedconsecutivenumbersstartingat1,thisbehaviorshouldnotbereliedupon,andthenumbershouldbeseenpurelyasamagiccookie.Athread'sidentitymaybereusedforanotherthreadafteritexits.
关键字我已经加粗了,线程id会在线程结束后重复利用,所以我在路由函数中加了这个死循环来阻塞请求以便于观察到不同的id,这就会产生两种情况:
1.没开启多线程的情况下,一次请求过来,服务器直接阻塞,并且之后的其他请求也都阻塞
2.开启多线程情况下,每次都会打印出不同的线程id
结果:
第一种情况
Runningonhttp://127.0.0.1:5000/(PressCTRL+Ctoquit)
139623180527360
第二种情况
Runningonhttp://127.0.0.1:5000/(PressCTRL+Ctoquit)
140315469436672
140315477829376
140315486222080
140315316901632
140315105163008
140315096770304
140315088377600
结果显而易见
综上所述:flask支持多线程,但默认没开启,其次app.run()只适用于开发环境,生产环境下可以使用uWSGI,Gunicorn等web服务器
内容扩展:
flask开启多线程还是多进程
Flask默认是单进程,单线程阻塞的任务模式,在项目上线的时候可以通过nginx+gunicorn的方式部署flask任务。
但是在开发的过程中如果想通过延迟的方式测试高并发怎么实现呢,其实非常简单,
app.run()中可以接受两个参数,分别是threaded和processes,用于开启线程支持和进程支持。
1.threaded:多线程支持,默认为False,即不开启多线程;
2.processes:进程数量,默认为1.
开启方式:
if__name__=='__main__': app.run(threaded=True) #app.run(processes=4)
注意:多进程或多线程只能选择一个,不能同时开启。
以上就是flask开启多线程的具体方法的详细内容,更多关于flask如何开启多线程详解的资料请关注毛票票其它相关文章!
