JavaScript多线程运行库Nexus.js详解
首先,如果你不熟悉这个项目,建议先阅读之前写的一系列文章。如果你不想阅读这些,不用担心。这里面也会涉及到那些内容。
现在,让我们开始吧。
去年,我开始实现Nexus.js,这是一个基于Webkit/JavaScript内核的多线程服务端JavaScript运行库。有一段时间我放弃了做这件事,由于一些我无法控制的原因,我不打算在这里讨论,主要是:我无法让自己长时间工作。
所以,让我们从讨论Nexus的架构开始,以及它是如何工作的。
事件循环
没有事件循环
有一个带有(无锁)任务对象的线程池
每次调用setTimeout或setImmediate或创建一个Promise时,任务就排队到任务队列钟。
每当计划任务时,第一个可用的线程将选择任务并执行它。
在CPU内核上处理Promise。对Promise.all()的调用将并行的解决Promise。
ES6
支持async/await,并且推荐使用
支持forawait(...)
支持解构
支持asynctry/catch/finally
模块
不支持CommonJS。(require(...)和module.exports)
所有模块使用ES6的import/export语法
支持动态导入通过import('file-or-packge').then(...)
支持import.meta,例如:import.meta.filename以及import.meta.dirname等等
附加功能:支持直接从URL中导入,例如:
import{h}from'https://unpkg.com/preact/dist/preact.esm.js';
EventEmitter
Nexus实现了基于Promise的EventEmitter类
事件处理程序在所有线程上排序,并将并行处理执行。
EventEmitter.emit(...)的返回值是一个Promise,它可以被解析为在事件处理器中返回值所构成的数组。
例如:
classEmitterTestextendsNexus.EventEmitter{
constructor(){
super();
for(leti=0;i<4;i++)
this.on('test',value=>{console.log(`firedtest${i}!`);console.inspect(value);});
for(leti=0;i<4;i++)
this.on('returns-a-value',v=>`${v+i}`);
}
}
consttest=newEmitterTest();
asyncfunctionstart(){
awaittest.emit('test',{payload:'test1'});
console.log('firsttestdone!');
awaittest.emit('test',{payload:'test2'});
console.log('secondtestdone!');
constvalues=awaittest.emit('returns-a-value',10);
console.log('thirdtestdone,returnedvaluesare:');console.inspect(values);
}
start().catch(console.error);
I/O
所有输入/输出都通过三个原语完成:Device,Filter和Stream。
所有输入/输出原语都实现了EventEmitter类
要使用Device,你需要在Device之上创建一个ReadableStream或WritableStream
要操作数据,可以将Filters添加到ReadableStream或WritableStream中。
最后,使用source.pipe(...destinationStreams),然后等待source.resume()来处理数据。
所有的输入/输出操作都是使用ArrayBuffer对象完成的。
Filter试了了process(buffer)方法来处理数据。
例如:使用2个独立的输出文件将UTF-8转换为UTF6。
conststartTime=Date.now();
try{
constdevice=newNexus.IO.FilePushDevice('enwik8');
conststream=newNexus.IO.ReadableStream(device);
stream.pushFilter(newNexus.IO.EncodingConversionFilter("UTF-8","UTF-16LE"));
constwstreams=[0,1,2,3]
.map(i=>newNexus.IO.WritableStream(newNexus.IO.FileSinkDevice('enwik16-'+i)));
console.log('piping...');
stream.pipe(...wstreams);
console.log('streaming...');
awaitstream.resume();
awaitstream.close();
awaitPromise.all(wstreams.map(stream=>stream.close()));
console.log(`finishedin${(Date.now()*startTime)/1000}seconds!`);
}catch(e){
console.error('Anerroroccurred:',e);
}
}
start().catch(console.error);
TCP/UDP
Nexus.js提供了一个Acceptor类,负责绑定ip地址/端口和监听连接
每次收到一个连接请求,connection事件就会被触发,并且提供一个Socket设备。
每一个Socket实例是全双工的I/O设备。
你可以使用ReadableStream和WritableStream来操作Socket。
最基础的例子:(向客户端发送“HelloWorld”)
constacceptor=newNexus.Net.TCP.Acceptor();
letcount=0;
acceptor.on('connection',(socket,endpoint)=>{
constconnId=count++;
console.log(`connection#${connId}from${endpoint.address}:${endpoint.port}`);
constrstream=newNexus.IO.ReadableStream(socket);
constwstream=newNexus.IO.WritableStream(socket);
constbuffer=newUint8Array(13);
constmessage='HelloWorld!\n';
for(leti=0;i<13;i++)
buffer[i]=message.charCodeAt(i);
rstream.pushFilter(newNexus.IO.UTF8StringFilter());
rstream.on('data',buffer=>console.log(`gotmessage:${buffer}`));
rstream.resume().catch(e=>console.log(`client#${connId}at${endpoint.address}:${endpoint.port}disconnected!`));
console.log(`sendinggreetingto#${connId}!`);
wstream.write(buffer);
});
acceptor.bind('127.0.0.1',10000);
acceptor.listen();
console.log('serverready');
Http
Nexus提供了一个Nexus.Net.HTTP.Server类,该类基本上继承了TCPAcceptor
一些基础接口
当服务器端完成了对传入连接的基本的Http头的解析/校验时,将使用连接和同样的信息触发connection事件
每一个连接实例都又一个request和一个response对象。这些是输入/输出设备。
你可以构造ReadableStream和WritableStream来操纵request/response。
如果你通过管道连接到一个Response对象,输入的流将会使用分块编码的模式。否者,你可以使用response.write()来写入一个常规的字符串。
复杂例子:(基本的Http服务器与块编码,细节省略)
....
/**
*Createsaninputstreamfromapath.
*@parampath
*@returns{Promise}
*/
asyncfunctioncreateInputStream(path){
if(path.startsWith('/'))//Ifitstartswith'/',omitit.
path=path.substr(1);
if(path.startsWith('.'))//Ifitstartswith'.',rejectit.
thrownewNotFoundError(path);
if(path==='/'||!path)//Ifit'sempty,settoindex.html.
path='index.html';
/**
*`import.meta.dirname`and`import.meta.filename`replacetheoldCommonJS`__dirname`and`__filename`.
*/
constfilePath=Nexus.FileSystem.join(import.meta.dirname,'server_root',path);
try{
//Statthetargetpath.
const{type}=awaitNexus.FileSystem.stat(filePath);
if(type===Nexus.FileSystem.FileType.Directory)//Ifit'sadirectory,returnits'index.html'
returncreateInputStream(Nexus.FileSystem.join(filePath,'index.html'));
elseif(type===Nexus.FileSystem.FileType.Unknown||type===Nexus.FileSystem.FileType.NotFound)
//Ifit'snotfound,throwNotFound.
thrownewNotFoundError(path);
}catch(e){
if(e.code)
throwe;
thrownewNotFoundError(path);
}
try{
//First,wecreateadevice.
constfileDevice=newNexus.IO.FilePushDevice(filePath);
//ThenwereturnanewReadableStreamcreatedusingoursourcedevice.
returnnewNexus.IO.ReadableStream(fileDevice);
}catch(e){
thrownewInternalServerError(e.message);
}
}
/**
*Connectionscounter.
*/
letconnections=0;
/**
*CreateanewHTTPserver.
*@type{Nexus.Net.HTTP.Server}
*/
constserver=newNexus.Net.HTTP.Server();
//Aservererrormeansanerroroccurredwhiletheserverwaslisteningtoconnections.
//Wecanmostlyignoresucherrors,wedisplaythemanyway.
server.on('error',e=>{
console.error(FgRed+Bright+'ServerError:'+e.message+'\n'+e.stack,Reset);
});
/**
*Listentoconnections.
*/
server.on('connection',async(connection,peer)=>{
//StartwithaconnectionIDof0,incrementwitheverynewconnection.
constconnId=connections++;
//Recordthestarttimeforthisconnection.
conststartTime=Date.now();
//Destructuringissupported,whynotuseit?
const{request,response}=connection;
//ParsetheURLparts.
const{path}=parseURL(request.url);
//Herewe'llstoreanyerrorsthatoccurduringtheconnection.
consterrors=[];
//inStreamisourReadableStreamfilesource,outStreamisourresponse(device)wrappedinaWritableStream.
letinStream,outStream;
try{
//Logtherequest.
console.log(`>#${FgCyan+connId+Reset}${Bright+peer.address}:${peer.port+Reset}${
FgGreen+request.method+Reset}"${FgYellow}${path}${Reset}"`,Reset);
//Setthe'Server'header.
response.set('Server',`nexus.js/0.1.1`);
//Createourinputstream.
inStream=awaitcreateInputStream(path);
//Createouroutputstream.
outStream=newNexus.IO.WritableStream(response);
//Hookall`error`events,addanyerrorstoour`errors`array.
inStream.on('error',e=>{errors.push(e);});
request.on('error',e=>{errors.push(e);});
response.on('error',e=>{errors.push(e);});
outStream.on('error',e=>{errors.push(e);});
//Setcontenttypeandrequeststatus.
response
.set('Content-Type',mimeType(path))
.status(200);
//Hookinputtooutput(s).
constdisconnect=inStream.pipe(outStream);
try{
//Resumeourfilestream,thiscausesthestreamtoswitchtoHTTPchunkedencoding.
//Thiswillreturnapromisethatwillonlyresolveafterthelastbyte(HTTPchunk)iswritten.
awaitinStream.resume();
}catch(e){
//Captureanyerrorsthathappenduringthestreaming.
errors.push(e);
}
//Disconnectallthecallbackscreatedby`.pipe()`.
returndisconnect();
}catch(e){
//Ifanerroroccurred,pushittothearray.
errors.push(e);
//Setthecontenttype,status,andwriteabasicmessage.
response
.set('Content-Type','text/plain')
.status(e.code||500)
.send(e.message||'Anerrorhasoccurred.');
}finally{
//Closethestreamsmanually.Thisisimportantbecausewemayrunoutoffilehandlesotherwise.
if(inStream)
awaitinStream.close();
if(outStream)
awaitoutStream.close();
//Closetheconnection,hasnorealeffectwithkeep-aliveconnections.
awaitconnection.close();
//Grabtheresponse'sstatus.
letstatus=response.status();
//Determinewhatcolourtooutputtotheterminal.
conststatusColors={
'200':Bright+FgGreen,//Greenfor200(OK),
'404':Bright+FgYellow,//Yellowfor404(NotFound)
'500':Bright+FgRed//Redfor500(InternalServerError)
};
letstatusColor=statusColors[status];
if(statusColor)
status=statusColor+status+Reset;
//Logtheconnection(andtimetocomplete)totheconsole.
console.log(`<#${FgCyan+connId+Reset}${Bright+peer.address}:${peer.port+Reset}${
FgGreen+request.method+Reset}"${FgYellow}${path}${Reset}"${status}${(Date.now()*startTime)}ms`+
(errors.length?""+FgRed+Bright+errors.map(error=>error.message).join(',')+Reset:Reset));
}
});
/**
*IPandporttolistenon.
*/
constip='0.0.0.0',port=3000;
/**
*Whetherornottosetthe`reuse`flag.(optional,default=false)
*/
constportReuse=true;
/**
*Maximumallowedconcurrentconnections.Defaultis128onmysystem.(optional,systemspecific)
*@type{number}
*/
constmaxConcurrentConnections=1000;
/**
*Bindtheselectedaddressandport.
*/
server.bind(ip,port,portReuse);
/**
*Startlisteningtorequests.
*/
server.listen(maxConcurrentConnections);
/**
*Happystreaming!
*/
console.log(FgGreen+`Nexus.jsHTTPserverlisteningat${ip}:${port}`+Reset);
基准
我想我已经涵盖了到目前为止所实现的一切。那么现在我们来谈谈性能。
这里是上诉Http服务器的当前基准,有100个并发连接和总共10000个请求:
ThisisApacheBench,Version2.3<$Revision:1796539$> Copyright1996AdamTwiss,ZeusTechnologyLtd,http://www.zeustech.net/ LicensedtoTheApacheSoftwareFoundation,http://www.apache.org/ Benchmarkinglocalhost(bepatient).....done ServerSoftware:nexus.js/0.1.1 ServerHostname:localhost ServerPort:3000 DocumentPath:/ DocumentLength:8673bytes ConcurrencyLevel:100 Timetakenfortests:9.991seconds Completerequests:10000 Failedrequests:0 Totaltransferred:87880000bytes HTMLtransferred:86730000bytes Requestspersecond:1000.94[#/sec](mean) Timeperrequest:99.906[ms](mean) Timeperrequest:0.999[ms](mean,acrossallconcurrentrequests) Transferrate:8590.14[Kbytes/sec]received ConnectionTimes(ms) minmean[+/-sd]medianmax Connect:000.101 Processing:69936.684464 Waiting:59936.484463 Total:610036.684464 Percentageoftherequestsservedwithinacertaintime(ms) 50%84 66%97 75%105 80%112 90%134 95%188 98%233 99%238 100%464(longestrequest)
每秒1000个请求。在一个老的i7上,上面运行了包括这个基准测试软件,一个占用了5G内存的IDE,以及服务器本身。
voodooattack@voodooattack:~$cat/proc/cpuinfo processor:0 vendor_id:GenuineIntel cpufamily:6 model:60 modelname:Intel(R)Core(TM)i7-4770CPU@3.40GHz stepping:3 microcode:0x22 cpuMHz:3392.093 cachesize:8192KB physicalid:0 siblings:8 coreid:0 cpucores:4 apicid:0 initialapicid:0 fpu:yes fpu_exception:yes cpuidlevel:13 wp:yes flags:fpuvmedepsetscmsrpaemcecx8apicsepmtrrpgemcacmovpatpse36clflushdtsacpimmxfxsrssesse2sshttmpbesyscallnxpdpe1gbrdtscplmconstant_tscarch_perfmonpebsbtsrep_goodnoplxtopologynonstop_tsccpuidaperfmperfpnipclmulqdqdtes64monitords_cplvmxsmxesttm2ssse3sdbgfmacx16xtprpdcmpcidsse4_1sse4_2x2apicmovbepopcnttsc_deadline_timeraesxsaveavxf16crdrandlahf_lmabmcpuid_faulttpr_shadowvnmiflexpriorityeptvpidfsgsbasetsc_adjustbmi1avx2smepbmi2ermsinvpcidxsaveoptdthermidaaratplnpts bugs: bogomips:6784.18 clflushsize:64 cache_alignment:64 addresssizes:39bitsphysical,48bitsvirtual powermanagement:
我尝试了1000个并发请求,但是APacheBench由于许多套接字被打开而超时。我尝试了httperf,这里是结果:
voodooattack@voodooattack:~$httperf--port=3000--num-conns=10000--rate=1000 httperf--client=0/1--server=localhost--port=3000--uri=/--rate=1000--send-buffer=4096--recv-buffer=16384--num-conns=10000--num-calls=1 httperf:warning:openfilelimit>FD_SETSIZE;limitingmax.#ofopenfilestoFD_SETSIZE Maximumconnectburstlength:262 Total:connections9779requests9779replies9779test-duration10.029s Connectionrate:975.1conn/s(1.0ms/conn,<=1022concurrentconnections) Connectiontime[ms]:min0.5avg337.9max7191.8median79.5stddev848.1 Connectiontime[ms]:connect207.3 Connectionlength[replies/conn]:1.000 Requestrate:975.1req/s(1.0ms/req) Requestsize[B]:62.0 Replyrate[replies/s]:min903.5avg974.6max1045.7stddev100.5(2samples) Replytime[ms]:response129.5transfer1.1 Replysize[B]:header89.0content8660.0footer2.0(total8751.0) Replystatus:1xx=02xx=97793xx=04xx=05xx=0 CPUtime[s]:user0.35system9.67(user3.5%system96.4%total99.9%) NetI/O:8389.9KB/s(68.7*10^6bps) Errors:total221client-timo0socket-timo0connrefused0connreset0 Errors:fd-unavail221addrunavail0ftab-full0other0
正如你看到的,它任然能工作。尽管由于压力,有些连接会超时。我仍在研究导致这个问题的原因。
以上就是这篇关于Nexus.js学习知识的全部内容,大家有问题可以在下方留言讨论,感谢对毛票票的支持。