Node.js 多线程完全指南总结
很多人都想知道单线程的Node.js怎么能与多线程后端竞争。考虑到其所谓的单线程特性,许多大公司选择Node作为其后端似乎违反直觉。要想知道原因,必须理解其单线程的真正含义。
JavaScript的设计非常适合在网上做比较简单的事情,比如验证表单,或者说创建彩虹色的鼠标轨迹。在2009年,Node.js的创始人RyanDahl使开发人员可以用该语言编写后端代码。
通常支持多线程的后端语言具有各种机制,用于在线程和其他面向线程的功能之间同步数据。要向JavaScript添加对此类功能的支持,需要修改整个语言,这不是Dahl的目标。为了让纯JavaScript支持多线程,他必须想一个变通方法。接下来让我们探索一下其中的奥秘……
Node.js是如何工作的
Node.js使用两种线程:eventloop处理的主线程和workerpool中的几个辅助线程。
事件循环是一种机制,它采用回调(函数)并注册它们,准备在将来的某个时刻执行。它与相关的JavaScript代码在同一个线程中运行。当JavaScript操作阻塞线程时,事件循环也会被阻止。
工作池是一种执行模型,它产生并处理单独的线程,然后同步执行任务,并将结果返回到事件循环。事件循环使用返回的结果执行提供的回调。
简而言之,它负责异步I/O操作——主要是与系统磁盘和网络的交互。它主要由诸如fs(I/O密集)或crypto(CPU密集)等模块使用。工作池用libuv实现,当Node需要在JavaScript和C++之间进行内部通信时,会导致轻微的延迟,但这几乎不可察觉。
基于这两种机制,我们可以编写如下代码:
fs.readFile(path.join(__dirname,'./package.json'),(err,content)=>{ if(err){ returnnull; } console.log(content.toString()); });
前面提到的fs模块告诉工作池使用其中一个线程来读取文件的内容,并在完成后通知事件循环。然后事件循环获取提供的回调函数,并用文件的内容执行它。
以上是非阻塞代码的示例,我们不必同步等待某事的发生。只需告诉工作池去读取文件,并用结果去调用提供的函数即可。由于工作池有自己的线程,因此事件循环可以在读取文件时继续正常执行。
在不需要同步执行某些复杂操作时,这一切都相安无事:任何运行时间太长的函数都会阻塞线程。如果应用程序中有大量这类功能,就可能会明显降低服务器的吞吐量,甚至完全冻结它。在这种情况下,无法继续将工作委派给工作池。
在需要对数据进行复杂的计算时(如AI、机器学习或大数据)无法真正有效地使用Node.js,因为操作阻塞了主(且唯一)线程,使服务器无响应。在Node.jsv10.5.0发布之前就是这种情况,在这一版本增加了对多线程的支持。
简介:worker_threads
worker_threads模块允许我们创建功能齐全的多线程Node.js程序。
threadworker是在单独的线程中生成的一段代码(通常从文件中取出)。
注意,术语threadworker,worker和thread经常互换使用,他们都指的是同一件事。
要想使用threadworker,必须导入worker_threads模块。让我们先写一个函数来帮助我们生成这些threadworker,然后再讨论它们的属性。
typeWorkerCallback=(err:any,result?:any)=>any; exportfunctionrunWorker(path:string,cb:WorkerCallback,workerData:object|null=null){ constworker=newWorker(path,{workerData}); worker.on('message',cb.bind(null,null)); worker.on('error',cb); worker.on('exit',(exitCode)=>{ if(exitCode===0){ returnnull; } returncb(newError(`Workerhasstoppedwithcode${exitCode}`)); }); returnworker; }
要创建一个worker,首先必须创建一个Worker类的实例。它的第一个参数提供了包含worker的代码的文件的路径;第二个参数提供了一个名为workerData的包含一个属性的对象。这是我们希望线程在开始运行时可以访问的数据。
请注意:不管你是用的是JavaScript,还是最终要转换为JavaScript的语言(例如,TypeScript),路径应该始终引用带有.js或.mjs扩展名的文件。
我还想指出为什么使用回调方法,而不是返回在触发message事件时将解决的promise。这是因为worker可以发送许多message事件,而不是一个。
正如你在上面的例子中所看到的,线程间的通信是基于事件的,这意味着我们设置了worker在发送给定事件后调用的侦听器。
以下是最常见的事件:
worker.on('error',(error)=>{});
只要worker中有未捕获的异常,就会发出error事件。然后终止worker,错误可以作为提供的回调中的第一个参数。
worker.on('exit',(exitCode)=>{});
在worker退出时会发出exit事件。如果在worker中调用了process.exit(),那么exitCode将被提供给回调。如果worker以worker.terminate()终止,则代码为1。
worker.on('online',()=>{});
只要worker停止解析JavaScript代码并开始执行,就会发出online事件。它不常用,但在特定情况下可以提供信息。
worker.on('message',(data)=>{});
只要worker将数据发送到父线程,就会发出message事件。
现在让我们来看看如何在线程之间共享数据。
在线程之间交换数据
要将数据发送到另一个线程,可以用port.postMessage()方法。它的原型如下:
port.postMessage(data[,transferList])
port对象可以是parentPort,也可以是MessagePort的实例——稍后会详细讲解。
数据参数
第一个参数——这里被称为data——是一个被复制到另一个线程的对象。它可以是复制算法所支持的任何内容。
数据由结构化克隆算法进行复制。引用自Mozilla:
它通过递归输入对象来进行克隆,同时保持之前访问过的引用的映射,以避免无限遍历循环。
该算法不复制函数、错误、属性描述符或原型链。还需要注意的是,以这种方式复制对象与使用JSON不同,因为它可以包含循环引用和类型化数组,而JSON不能。
由于能够复制类型化数组,该算法可以在线程之间共享内存。
在线程之间共享内存
人们可能会说像cluster或child_process这样的模块在很久以前就开始使用线程了。这话对,也不对。
cluster模块可以创建多个节点实例,其中一个主进程在它们之间对请求进行路由。集群能够有效地增加服务器的吞吐量;但是我们不能用cluster模块生成一个单独的线程。
人们倾向于用PM2这样的工具来集中管理他们的程序,而不是在自己的代码中手动执行,如果你有兴趣,可以研究一下如何使用cluster模块。
child_process模块可以生成任何可执行文件,无论它是否是用JavaScript写的。它和worker_threads非常相似,但缺少后者的几个重要功能。
具体来说threadworkers更轻量,并且与其父线程共享相同的进程ID。它们还可以与父线程共享内存,这样可以避免对大的数据负载进行序列化,从而更有效地来回传递数据。
现在让我们看一下如何在线程之间共享内存。为了共享内存,必须将ArrayBuffer或SharedArrayBuffer的实例作为数据参数发送到另一个线程。
这是一个与其父线程共享内存的worker:
import{parentPort}from'worker_threads'; parentPort.on('message',()=>{ constnumberOfElements=100; constsharedBuffer=newSharedArrayBuffer(Int32Array.BYTES_PER_ELEMENT*numberOfElements); constarr=newInt32Array(sharedBuffer); for(leti=0;i首先,我们创建一个SharedArrayBuffer,其内存需要包含100个32位整数。接下来创建一个Int32Array实例,它将用缓冲区来保存其结构,然后用一些随机数填充数组并将其发送到父线程。
在父线程中:
importpathfrom'path'; import{runWorker}from'../run-worker'; constworker=runWorker(path.join(__dirname,'worker.js'),(err,{arr})=>{ if(err){ returnnull; } arr[0]=5; }); worker.postMessage({});把arr[0]的值改为5,实际上会在两个线程中修改它。
当然,通过共享内存,我们冒险在一个线程中修改一个值,同时也在另一个线程中进行了修改。但是我们在这个过程中也得到了一个好处:该值不需要进行序列化就可以另一个线程中使用,这极大地提高了效率。只需记住管理数据正确的引用,以便在完成数据处理后对其进行垃圾回收。
共享一个整数数组固然很好,但我们真正感兴趣的是共享对象——这是存储信息的默认方式。不幸的是,没有SharedObjectBuffer或类似的东西,但我们可以自己创建一个类似的结构。
transferList参数
transferList中只能包含ArrayBuffer和MessagePort。一旦它们被传送到另一个线程,就不能再次被传送了;因为内存里的内容已经被移动到了另一个线程。
目前,还不能通过transferList(可以使用child_process模块)来传输网络套接字。
创建通信渠道
线程之间的通信是通过port进行的,port是MessagePort类的实例,并启用基于事件的通信。
使用port在线程之间进行通信的方法有两种。第一个是默认值,这个方法比较容易。在worker的代码中,我们从worker_threads模块导入一个名为parentPort的对象,并使用对象的.postMessage()方法将消息发送到父线程。
这是一个例子:
import{parentPort}from'worker_threads'; constdata={ //... }; parentPort.postMessage(data);parentPort是Node.js在幕后创建的MessagePort实例,用于与父线程进行通信。这样就可以用parentPort和worker对象在线程之间进行通信。
线程间的第二种通信方式是创建一个MessageChannel并将其发送给worker。以下代码是如何创建一个新的MessagePort并与我们的worker共享它:
importpathfrom'path'; import{Worker,MessageChannel}from'worker_threads'; constworker=newWorker(path.join(__dirname,'worker.js')); const{port1,port2}=newMessageChannel(); port1.on('message',(message)=>{ console.log('messagefromworker:',message); }); worker.postMessage({port:port2},[port2]);在创建port1和port2之后,我们在port1上设置事件监听器并将port2发送给worker。我们必须将它包含在transferList中,以便将其传输给worker。
在worker内部:
import{parentPort,MessagePort}from'worker_threads'; parentPort.on('message',(data)=>{ const{port}:{port:MessagePort}=data; port.postMessage('heresyourmessage!'); });这样,我们就能使用父线程发送的port了。
使用parentPort不一定是错误的方法,但最好用MessageChannel的实例创建一个新的MessagePort,然后与生成的worker共享它。
请注意,在后面的例子中,为了简便起见,我用了parentPort。
使用worker的两种方式
可以通过两种方式使用worker。第一种是生成一个worker,然后执行它的代码,并将结果发送到父线程。通过这种方法,每当出现新任务时,都必须重新创建一个工作者。
第二种方法是生成一个worker并为message事件设置监听器。每次触发message时,它都会完成工作并将结果发送回父线程,这会使worker保持活动状态以供以后使用。
Node.js文档推荐第二种方法,因为在创建threadworker时需要创建虚拟机并解析和执行代码,这会产生比较大的开销。所以这种方法比不断产生新worker的效率更高。
这种方法被称为工作池,因为我们创建了一个工作池并让它们等待,在需要时调度message事件来完成工作。
以下是一个产生、执行然后关闭worker例子:
import{parentPort}from'worker_threads'; constcollection=[]; for(leti=0;i<10;i+=1){ collection[i]=i; } parentPort.postMessage(collection);将collection发送到父线程后,它就会退出。
下面是一个worker的例子,它可以在给定任务之前等待很长一段时间:
import{parentPort}from'worker_threads'; parentPort.on('message',(data:any)=>{ constresult=doSomething(data); parentPort.postMessage(result); });worker_threads模块中可用的重要属性
worker_threads模块中有一些可用的属性:
isMainThread
当不在工作线程内操作时,该属性为true。如果你觉得有必要,可以在worker文件的开头包含一个简单的if语句,以确保它只作为worker运行。
import{isMainThread}from'worker_threads'; if(isMainThread){ thrownewError('Itsnotaworker'); }workerData
产生线程时包含在worker的构造函数中的数据。
constworker=newWorker(path,{workerData});在工作线程中:
import{workerData}from'worker_threads'; console.log(workerData.property);parentPort
前面提到的MessagePort实例,用于与父线程通信。
threadId
分配给worker的唯一标识符。
现在我们知道了技术细节,接下来实现一些东西并在实践中检验学到的知识。
实现setTimeout
setTimeout是一个无限循环,顾名思义,用来检测程序运行时间是否超时。它在循环中检查起始时间与给定毫秒数之和是否小于实际日期。
import{parentPort,workerData}from'worker_threads'; consttime=Date.now(); while(true){ if(time+workerData.time<=Date.now()){ parentPort.postMessage({}); break; } }这个特定的实现产生一个线程,然后执行它的代码,最后在完成后退出。
接下来实现使用这个worker的代码。首先创建一个状态,用它来跟踪生成的worker:
consttimeoutState:{[key:string]:Worker}={};然后时负责创建worker并将其保存到状态的函数:
exportfunctionsetTimeout(callback:(err:any)=>any,time:number){ constid=uuidv4(); constworker=runWorker( path.join(__dirname,'./timeout-worker.js'), (err)=>{ if(!timeoutState[id]){ returnnull; } timeoutState[id]=null; if(err){ returncallback(err); } callback(null); }, { time, }, ); timeoutState[id]=worker; returnid; }首先,我们使用UUID包为worker创建一个唯一的标识符,然后用先前定义的函数runWorker来获取worker。我们还向worker传入一个回调函数,一旦worker发送了数据就会被触发。最后,把worker保存在状态中并返回id。
在回调函数中,我们必须检查该worker是否仍然存在于该状态中,因为有可能会cancelTimeout(),这将会把它删除。如果确实存在,就把它从状态中删除,并调用传给setTimeout函数的callback。
cancelTimeout函数使用.terminate()方法强制worker退出,并从该状态中删除该这个worker:
exportfunctioncancelTimeout(id:string){ if(timeoutState[id]){ timeoutState[id].terminate(); timeoutState[id]=undefined; returntrue; } returnfalse; }如果你有兴趣,我也实现了setInterval,代码在这里,但因为它对线程什么都没做(我们重用setTimeout的代码),所以我决定不在这里进行解释。
我已经创建了一个短小的测试代码,目的是检查这种方法与原生方法的不同之处。你可以在这里找到代码。这些是结果:
nativesetTimeout{ms:7004,averageCPUCost:0.1416} workersetTimeout{ms:7046,averageCPUCost:0.308}我们可以看到setTimeout有一点延迟-大约40ms-这时worker被创建时的消耗。平均CPU成本也略高,但没什么难以忍受的(CPU成本是整个过程持续时间内CPU使用率的平均值)。
如果我们可以重用worker,就能够降低延迟和CPU使用率,这就是要实现工作池的原因。
实现工作池
如上所述,工作池是给定数量的被事先创建的worker,他们保持空闲并监听message事件。一旦message事件被触发,他们就会开始工作并发回结果。
为了更好地描述我们将要做的事情,下面我们来创建一个由八个threadworker组成的工作池:
constpool=newWorkerPool(path.join(__dirname,'./test-worker.js'),8);如果你熟悉限制并发操作,那么你在这里看到的逻辑几乎相同,只是一个不同的用例。
如上面的代码片段所示,我们把指向worker的路径和要生成的worker数量传给了WorkerPool的构造函数。
exportclassWorkerPool{ privatequeue:QueueItem []=[]; privateworkersById:{[key:number]:Worker}={}; privateactiveWorkersById:{[key:number]:boolean}={}; publicconstructor(publicworkerPath:string,publicnumberOfThreads:number){ this.init(); } } 这里还有其他一些属性,如workersById和activeWorkersById,我们可以分别保存现有的worker和当前正在运行的worker的ID。还有queue,我们可以使用以下结构来保存对象:
typeQueueCallback=(err:any,result?:N)=>void; interfaceQueueItem { callback:QueueCallback ; getData:()=>T; } callback只是默认的节点回调,第一个参数是错误,第二个参数是可能的结果。getData是传递给工作池.run()方法的函数(如下所述),一旦项目开始处理就会被调用。getData函数返回的数据将传给工作线程。
在.init()方法中,我们创建了worker并将它们保存在以下状态中:
privateinit(){ if(this.numberOfThreads<1){ returnnull; } for(leti=0;i为避免无限循环,我们首先要确保线程数>1。然后创建有效的worker数,并将它们的索引保存在workersById状态。我们在activeWorkersById状态中保存了它们当前是否正在运行的信息,默认情况下该状态始终为false。
现在我们必须实现前面提到的.run()方法来设置一个worker可用的任务。
publicrun(getData:()=>T){ returnnewPromise((resolve,reject)=>{ constavailableWorkerId=this.getInactiveWorkerId(); constqueueItem:QueueItem ={ getData, callback:(error,result)=>{ if(error){ returnreject(error); } returnresolve(result); }, }; if(availableWorkerId===-1){ this.queue.push(queueItem); returnnull; } this.runWorker(availableWorkerId,queueItem); }); } 在promise函数里,我们首先通过调用.getInactiveWorkerId()来检查是否存在空闲的worker可以来处理数据:
privategetInactiveWorkerId():number{ for(leti=0;i接下来,我们创建一个queueItem,在其中保存传递给.run()方法的getData函数以及回调。在回调中,我们要么resolve或者rejectpromise,这取决于worker是否将错误传递给回调。
如果availableWorkerId的值是-1,意味着当前没有可用的worker,我们将queueItem添加到queue。如果有可用的worker,则调用.runWorker()方法来执行worker。
在.runWorker()方法中,我们必须把当前worker的activeWorkersById设置为使用状态;为message和error事件设置事件监听器(并在之后清理它们);最后将数据发送给worker。
privateasyncrunWorker(workerId:number,queueItem:QueueItem){ constworker=this.workersById[workerId]; this.activeWorkersById[workerId]=true; constmessageCallback=(result:N)=>{ queueItem.callback(null,result); cleanUp(); }; consterrorCallback=(error:any)=>{ queueItem.callback(error); cleanUp(); }; constcleanUp=()=>{ worker.removeAllListeners('message'); worker.removeAllListeners('error'); this.activeWorkersById[workerId]=false; if(!this.queue.length){ returnnull; } this.runWorker(workerId,this.queue.shift()); }; worker.once('message',messageCallback); worker.once('error',errorCallback); worker.postMessage(awaitqueueItem.getData()); } 首先,通过使用传递的workerId,我们从workersById中获得worker引用。然后,在activeWorkersById中,将[workerId]属性设置为true,这样我们就能知道在worker在忙,不要运行其他任务。
接下来,分别创建messageCallback和errorCallback用来在消息和错误事件上调用,然后注册所述函数来监听事件并将数据发送给worker。
在回调中,我们调用queueItem的回调,然后调用cleanUp函数。在cleanUp函数中,要删除事件侦听器,因为我们会多次重用同一个worker。如果没有删除监听器的话就会发生内存泄漏,内存会被慢慢耗尽。
在activeWorkersById状态中,我们将[workerId]属性设置为false,并检查队列是否为空。如果不是,就从queue中删除第一个项目,并用另一个queueItem再次调用worker。
接着创建一个在收到message事件中的数据后进行一些计算的worker:
import{isMainThread,parentPort}from'worker_threads'; if(isMainThread){ thrownewError('Itsnotaworker'); } constdoCalcs=(data:any)=>{ constcollection=[]; for(leti=0;i<1000000;i+=1){ collection[i]=Math.round(Math.random()*100000); } returncollection.sort((a,b)=>{ if(a>b){ return1; } return-1; }); }; parentPort.on('message',(data:any)=>{ constresult=doCalcs(data); parentPort.postMessage(result); });worker创建了一个包含100万个随机数的数组,然后对它们进行排序。只要能够多花费一些时间才能完成,做些什么事情并不重要。
以下是工作池简单用法的示例:
constpool=newWorkerPool<{i:number},number>(path.join(__dirname,'./test-worker.js'),8); constitems=[...newArray(100)].fill(null); Promise.all( items.map(async(_,i)=>{ awaitpool.run(()=>({i})); console.log('finished',i); }), ).then(()=>{ console.log('finishedall'); });首先创建一个由八个worker组成的工作池。然后创建一个包含100个元素的数组,对于每个元素,我们在工作池中运行一个任务。开始运行后将立即执行八个任务,其余任务被放入队列并逐个执行。通过使用工作池,我们不必每次都创建一个worker,从而大大提高了效率。
结论
worker_threads提供了一种为程序添加多线程支持的简单的方法。通过将繁重的CPU计算委托给其他线程,可以显着提高服务器的吞吐量。通过官方线程支持,我们可以期待更多来自AI、机器学习和大数据等领域的开发人员和工程师使用Node.js.
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。