JavaScript多线程运行库Nexus.js详解
首先,如果你不熟悉这个项目,建议先阅读之前写的一系列文章。如果你不想阅读这些,不用担心。这里面也会涉及到那些内容。
现在,让我们开始吧。
去年,我开始实现Nexus.js,这是一个基于Webkit/JavaScript内核的多线程服务端JavaScript运行库。有一段时间我放弃了做这件事,由于一些我无法控制的原因,我不打算在这里讨论,主要是:我无法让自己长时间工作。
所以,让我们从讨论Nexus的架构开始,以及它是如何工作的。
事件循环
没有事件循环
有一个带有(无锁)任务对象的线程池
每次调用setTimeout或setImmediate或创建一个Promise时,任务就排队到任务队列钟。
每当计划任务时,第一个可用的线程将选择任务并执行它。
在CPU内核上处理Promise。对Promise.all()的调用将并行的解决Promise。
ES6
支持async/await,并且推荐使用
支持forawait(...)
支持解构
支持asynctry/catch/finally
模块
不支持CommonJS。(require(...)和module.exports)
所有模块使用ES6的import/export语法
支持动态导入通过import('file-or-packge').then(...)
支持import.meta,例如:import.meta.filename以及import.meta.dirname等等
附加功能:支持直接从URL中导入,例如:
import{h}from'https://unpkg.com/preact/dist/preact.esm.js';
EventEmitter
Nexus实现了基于Promise的EventEmitter类
事件处理程序在所有线程上排序,并将并行处理执行。
EventEmitter.emit(...)的返回值是一个Promise,它可以被解析为在事件处理器中返回值所构成的数组。
例如:
classEmitterTestextendsNexus.EventEmitter{ constructor(){ super(); for(leti=0;i<4;i++) this.on('test',value=>{console.log(`firedtest${i}!`);console.inspect(value);}); for(leti=0;i<4;i++) this.on('returns-a-value',v=>`${v+i}`); } } consttest=newEmitterTest(); asyncfunctionstart(){ awaittest.emit('test',{payload:'test1'}); console.log('firsttestdone!'); awaittest.emit('test',{payload:'test2'}); console.log('secondtestdone!'); constvalues=awaittest.emit('returns-a-value',10); console.log('thirdtestdone,returnedvaluesare:');console.inspect(values); } start().catch(console.error);
I/O
所有输入/输出都通过三个原语完成:Device,Filter和Stream。
所有输入/输出原语都实现了EventEmitter类
要使用Device,你需要在Device之上创建一个ReadableStream或WritableStream
要操作数据,可以将Filters添加到ReadableStream或WritableStream中。
最后,使用source.pipe(...destinationStreams),然后等待source.resume()来处理数据。
所有的输入/输出操作都是使用ArrayBuffer对象完成的。
Filter试了了process(buffer)方法来处理数据。
例如:使用2个独立的输出文件将UTF-8转换为UTF6。
conststartTime=Date.now(); try{ constdevice=newNexus.IO.FilePushDevice('enwik8'); conststream=newNexus.IO.ReadableStream(device); stream.pushFilter(newNexus.IO.EncodingConversionFilter("UTF-8","UTF-16LE")); constwstreams=[0,1,2,3] .map(i=>newNexus.IO.WritableStream(newNexus.IO.FileSinkDevice('enwik16-'+i))); console.log('piping...'); stream.pipe(...wstreams); console.log('streaming...'); awaitstream.resume(); awaitstream.close(); awaitPromise.all(wstreams.map(stream=>stream.close())); console.log(`finishedin${(Date.now()*startTime)/1000}seconds!`); }catch(e){ console.error('Anerroroccurred:',e); } } start().catch(console.error);
TCP/UDP
Nexus.js提供了一个Acceptor类,负责绑定ip地址/端口和监听连接
每次收到一个连接请求,connection事件就会被触发,并且提供一个Socket设备。
每一个Socket实例是全双工的I/O设备。
你可以使用ReadableStream和WritableStream来操作Socket。
最基础的例子:(向客户端发送“HelloWorld”)
constacceptor=newNexus.Net.TCP.Acceptor(); letcount=0; acceptor.on('connection',(socket,endpoint)=>{ constconnId=count++; console.log(`connection#${connId}from${endpoint.address}:${endpoint.port}`); constrstream=newNexus.IO.ReadableStream(socket); constwstream=newNexus.IO.WritableStream(socket); constbuffer=newUint8Array(13); constmessage='HelloWorld!\n'; for(leti=0;i<13;i++) buffer[i]=message.charCodeAt(i); rstream.pushFilter(newNexus.IO.UTF8StringFilter()); rstream.on('data',buffer=>console.log(`gotmessage:${buffer}`)); rstream.resume().catch(e=>console.log(`client#${connId}at${endpoint.address}:${endpoint.port}disconnected!`)); console.log(`sendinggreetingto#${connId}!`); wstream.write(buffer); }); acceptor.bind('127.0.0.1',10000); acceptor.listen(); console.log('serverready');
Http
Nexus提供了一个Nexus.Net.HTTP.Server类,该类基本上继承了TCPAcceptor
一些基础接口
当服务器端完成了对传入连接的基本的Http头的解析/校验时,将使用连接和同样的信息触发connection事件
每一个连接实例都又一个request和一个response对象。这些是输入/输出设备。
你可以构造ReadableStream和WritableStream来操纵request/response。
如果你通过管道连接到一个Response对象,输入的流将会使用分块编码的模式。否者,你可以使用response.write()来写入一个常规的字符串。
复杂例子:(基本的Http服务器与块编码,细节省略)
.... /** *Createsaninputstreamfromapath. *@parampath *@returns{Promise} */ asyncfunctioncreateInputStream(path){ if(path.startsWith('/'))//Ifitstartswith'/',omitit. path=path.substr(1); if(path.startsWith('.'))//Ifitstartswith'.',rejectit. thrownewNotFoundError(path); if(path==='/'||!path)//Ifit'sempty,settoindex.html. path='index.html'; /** *`import.meta.dirname`and`import.meta.filename`replacetheoldCommonJS`__dirname`and`__filename`. */ constfilePath=Nexus.FileSystem.join(import.meta.dirname,'server_root',path); try{ //Statthetargetpath. const{type}=awaitNexus.FileSystem.stat(filePath); if(type===Nexus.FileSystem.FileType.Directory)//Ifit'sadirectory,returnits'index.html' returncreateInputStream(Nexus.FileSystem.join(filePath,'index.html')); elseif(type===Nexus.FileSystem.FileType.Unknown||type===Nexus.FileSystem.FileType.NotFound) //Ifit'snotfound,throwNotFound. thrownewNotFoundError(path); }catch(e){ if(e.code) throwe; thrownewNotFoundError(path); } try{ //First,wecreateadevice. constfileDevice=newNexus.IO.FilePushDevice(filePath); //ThenwereturnanewReadableStreamcreatedusingoursourcedevice. returnnewNexus.IO.ReadableStream(fileDevice); }catch(e){ thrownewInternalServerError(e.message); } } /** *Connectionscounter. */ letconnections=0; /** *CreateanewHTTPserver. *@type{Nexus.Net.HTTP.Server} */ constserver=newNexus.Net.HTTP.Server(); //Aservererrormeansanerroroccurredwhiletheserverwaslisteningtoconnections. //Wecanmostlyignoresucherrors,wedisplaythemanyway. server.on('error',e=>{ console.error(FgRed+Bright+'ServerError:'+e.message+'\n'+e.stack,Reset); }); /** *Listentoconnections. */ server.on('connection',async(connection,peer)=>{ //StartwithaconnectionIDof0,incrementwitheverynewconnection. constconnId=connections++; //Recordthestarttimeforthisconnection. conststartTime=Date.now(); //Destructuringissupported,whynotuseit? const{request,response}=connection; //ParsetheURLparts. const{path}=parseURL(request.url); //Herewe'llstoreanyerrorsthatoccurduringtheconnection. consterrors=[]; //inStreamisourReadableStreamfilesource,outStreamisourresponse(device)wrappedinaWritableStream. letinStream,outStream; try{ //Logtherequest. console.log(`>#${FgCyan+connId+Reset}${Bright+peer.address}:${peer.port+Reset}${ FgGreen+request.method+Reset}"${FgYellow}${path}${Reset}"`,Reset); //Setthe'Server'header. response.set('Server',`nexus.js/0.1.1`); //Createourinputstream. inStream=awaitcreateInputStream(path); //Createouroutputstream. outStream=newNexus.IO.WritableStream(response); //Hookall`error`events,addanyerrorstoour`errors`array. inStream.on('error',e=>{errors.push(e);}); request.on('error',e=>{errors.push(e);}); response.on('error',e=>{errors.push(e);}); outStream.on('error',e=>{errors.push(e);}); //Setcontenttypeandrequeststatus. response .set('Content-Type',mimeType(path)) .status(200); //Hookinputtooutput(s). constdisconnect=inStream.pipe(outStream); try{ //Resumeourfilestream,thiscausesthestreamtoswitchtoHTTPchunkedencoding. //Thiswillreturnapromisethatwillonlyresolveafterthelastbyte(HTTPchunk)iswritten. awaitinStream.resume(); }catch(e){ //Captureanyerrorsthathappenduringthestreaming. errors.push(e); } //Disconnectallthecallbackscreatedby`.pipe()`. returndisconnect(); }catch(e){ //Ifanerroroccurred,pushittothearray. errors.push(e); //Setthecontenttype,status,andwriteabasicmessage. response .set('Content-Type','text/plain') .status(e.code||500) .send(e.message||'Anerrorhasoccurred.'); }finally{ //Closethestreamsmanually.Thisisimportantbecausewemayrunoutoffilehandlesotherwise. if(inStream) awaitinStream.close(); if(outStream) awaitoutStream.close(); //Closetheconnection,hasnorealeffectwithkeep-aliveconnections. awaitconnection.close(); //Grabtheresponse'sstatus. letstatus=response.status(); //Determinewhatcolourtooutputtotheterminal. conststatusColors={ '200':Bright+FgGreen,//Greenfor200(OK), '404':Bright+FgYellow,//Yellowfor404(NotFound) '500':Bright+FgRed//Redfor500(InternalServerError) }; letstatusColor=statusColors[status]; if(statusColor) status=statusColor+status+Reset; //Logtheconnection(andtimetocomplete)totheconsole. console.log(`<#${FgCyan+connId+Reset}${Bright+peer.address}:${peer.port+Reset}${ FgGreen+request.method+Reset}"${FgYellow}${path}${Reset}"${status}${(Date.now()*startTime)}ms`+ (errors.length?""+FgRed+Bright+errors.map(error=>error.message).join(',')+Reset:Reset)); } }); /** *IPandporttolistenon. */ constip='0.0.0.0',port=3000; /** *Whetherornottosetthe`reuse`flag.(optional,default=false) */ constportReuse=true; /** *Maximumallowedconcurrentconnections.Defaultis128onmysystem.(optional,systemspecific) *@type{number} */ constmaxConcurrentConnections=1000; /** *Bindtheselectedaddressandport. */ server.bind(ip,port,portReuse); /** *Startlisteningtorequests. */ server.listen(maxConcurrentConnections); /** *Happystreaming! */ console.log(FgGreen+`Nexus.jsHTTPserverlisteningat${ip}:${port}`+Reset);
基准
我想我已经涵盖了到目前为止所实现的一切。那么现在我们来谈谈性能。
这里是上诉Http服务器的当前基准,有100个并发连接和总共10000个请求:
ThisisApacheBench,Version2.3<$Revision:1796539$> Copyright1996AdamTwiss,ZeusTechnologyLtd,http://www.zeustech.net/ LicensedtoTheApacheSoftwareFoundation,http://www.apache.org/ Benchmarkinglocalhost(bepatient).....done ServerSoftware:nexus.js/0.1.1 ServerHostname:localhost ServerPort:3000 DocumentPath:/ DocumentLength:8673bytes ConcurrencyLevel:100 Timetakenfortests:9.991seconds Completerequests:10000 Failedrequests:0 Totaltransferred:87880000bytes HTMLtransferred:86730000bytes Requestspersecond:1000.94[#/sec](mean) Timeperrequest:99.906[ms](mean) Timeperrequest:0.999[ms](mean,acrossallconcurrentrequests) Transferrate:8590.14[Kbytes/sec]received ConnectionTimes(ms) minmean[+/-sd]medianmax Connect:000.101 Processing:69936.684464 Waiting:59936.484463 Total:610036.684464 Percentageoftherequestsservedwithinacertaintime(ms) 50%84 66%97 75%105 80%112 90%134 95%188 98%233 99%238 100%464(longestrequest)
每秒1000个请求。在一个老的i7上,上面运行了包括这个基准测试软件,一个占用了5G内存的IDE,以及服务器本身。
voodooattack@voodooattack:~$cat/proc/cpuinfo processor:0 vendor_id:GenuineIntel cpufamily:6 model:60 modelname:Intel(R)Core(TM)i7-4770CPU@3.40GHz stepping:3 microcode:0x22 cpuMHz:3392.093 cachesize:8192KB physicalid:0 siblings:8 coreid:0 cpucores:4 apicid:0 initialapicid:0 fpu:yes fpu_exception:yes cpuidlevel:13 wp:yes flags:fpuvmedepsetscmsrpaemcecx8apicsepmtrrpgemcacmovpatpse36clflushdtsacpimmxfxsrssesse2sshttmpbesyscallnxpdpe1gbrdtscplmconstant_tscarch_perfmonpebsbtsrep_goodnoplxtopologynonstop_tsccpuidaperfmperfpnipclmulqdqdtes64monitords_cplvmxsmxesttm2ssse3sdbgfmacx16xtprpdcmpcidsse4_1sse4_2x2apicmovbepopcnttsc_deadline_timeraesxsaveavxf16crdrandlahf_lmabmcpuid_faulttpr_shadowvnmiflexpriorityeptvpidfsgsbasetsc_adjustbmi1avx2smepbmi2ermsinvpcidxsaveoptdthermidaaratplnpts bugs: bogomips:6784.18 clflushsize:64 cache_alignment:64 addresssizes:39bitsphysical,48bitsvirtual powermanagement:
我尝试了1000个并发请求,但是APacheBench由于许多套接字被打开而超时。我尝试了httperf,这里是结果:
voodooattack@voodooattack:~$httperf--port=3000--num-conns=10000--rate=1000 httperf--client=0/1--server=localhost--port=3000--uri=/--rate=1000--send-buffer=4096--recv-buffer=16384--num-conns=10000--num-calls=1 httperf:warning:openfilelimit>FD_SETSIZE;limitingmax.#ofopenfilestoFD_SETSIZE Maximumconnectburstlength:262 Total:connections9779requests9779replies9779test-duration10.029s Connectionrate:975.1conn/s(1.0ms/conn,<=1022concurrentconnections) Connectiontime[ms]:min0.5avg337.9max7191.8median79.5stddev848.1 Connectiontime[ms]:connect207.3 Connectionlength[replies/conn]:1.000 Requestrate:975.1req/s(1.0ms/req) Requestsize[B]:62.0 Replyrate[replies/s]:min903.5avg974.6max1045.7stddev100.5(2samples) Replytime[ms]:response129.5transfer1.1 Replysize[B]:header89.0content8660.0footer2.0(total8751.0) Replystatus:1xx=02xx=97793xx=04xx=05xx=0 CPUtime[s]:user0.35system9.67(user3.5%system96.4%total99.9%) NetI/O:8389.9KB/s(68.7*10^6bps) Errors:total221client-timo0socket-timo0connrefused0connreset0 Errors:fd-unavail221addrunavail0ftab-full0other0
正如你看到的,它任然能工作。尽管由于压力,有些连接会超时。我仍在研究导致这个问题的原因。
以上就是这篇关于Nexus.js学习知识的全部内容,大家有问题可以在下方留言讨论,感谢对毛票票的支持。