Node.js + Redis Sorted Set实现任务队列
需求:功能A需要调用第三方API获取数据,而第三方API自身是异步处理方式,在调用后会返回数据与状态{data:"查询结果","status":"正在异步处理中"},这样就需要间隔一段时间后再去调用第三方API获取数据。为了用户在使用功能A时不会因为第三方API正在异步处理中而必须等待,将用户请求加入任务队列中,返回部分数据并关闭请求。然后定时从任务队列里中取出任务调用第三方API,若返回状态为”异步处理中“,将该任务再次加入任务队列,若返回状态为”已处理完毕“,将返回数据入库。
根据以上问题,想到使用Node.js+Redissortedset来实现任务队列。Node.js实现自身应用API用来接受用户请求,合并数据库已存数据与API返回的部分数据返回给用户,并将任务加入到任务队列中。利用Node.jschildprocess与cron定时从任务队列中取出任务执行。
在设计任务队列的过程中需要考虑到的几个问题
- 并行执行多个任务
- 任务唯一性
- 任务成功或失败后的处理
针对以上问题的解决方案
- 并行执行多个任务利用Promise.all来实现
- 任务唯一性利用Redissortedset来实现。使用时间戳作为分值可以实现将sortedset作为list来使用,在加入任务时判断任务是否已经存在,在取出任务执行时将该任务分值设置为0,每次取出分值大于0的任务来执行,可以避免重复执行任务。
- 执行任务成功后删除任务,执行任务失败后将任务分值更新为当前时间时间戳,这样就可以将失败的任务重新加入任务队列尾部
示例代码
//remote_api.js模拟第三方API 'usestrict'; constapp=require('express')(); app.get('/',(req,res)=>{ setTimeout(()=>{ letarr=[200,300];//200代表成功,300代表失败需要重新请求 res.status(200).send({'status':arr[parseInt(Math.random()*2)]}); },3000); }); app.listen('9001',()=>{ console.log('API服务监听端口:9001'); }); //producer.js自身应用API,用来接受用户请求并将任务加入任务队列 'usestrict'; constapp=require('express')(); constredisClient=require('redis').createClient(); constQUEUE_NAME='queue:example'; functionaddTaskToQueue(taskName,callback){ //先判断任务是否已经存在,存在:跳过,不存在:加入任务队列 redisClient.zscore(QUEUE_NAME,taskName,(error,task)=>{ if(error){ console.log(error); }else{ if(task){ console.log('任务已存在,不新增相同任务'); callback(null,task); }else{ redisClient.zadd(QUEUE_NAME,newDate().getTime(),taskName,(error,result)=>{ if(error){ callback(error); }else{ callback(null,result); } }); } } }); } app.get('/',(req,res)=>{ lettaskName=req.query['task-name']; addTaskToQueue(taskName,(error,result)=>{ if(error){ console.log(error); }else{ res.status(200).send('正在查询中......'); } }); }); app.listen(9002,()=>{ console.log('生产者服务监听端口:9002'); }); //consumer.js定时获取任务并执行 'usestrict'; constredisClient=require('redis').createClient(); constrequest=require('request'); constschedule=require('node-schedule'); constQUEUE_NAME='queue:expmple'; constPARALLEL_TASK_NUMBER=2;//并行执行任务数量 functiongetTasksFromQueue(callback){ //获取多个任务 redisClient.zrangebyscore([QUEUE_NAME,1,newDate().getTime(),'LIMIT',0,PARALLEL_TASK_NUMBER],(error,tasks)=>{ if(error){ callback(error); }else{ //将任务分值设置为0,表示正在处理 if(tasks.length>0){ lettmp=[]; tasks.forEach((task)=>{ tmp.push(0); tmp.push(task); }); redisClient.zadd([QUEUE_NAME].concat(tmp),(error,result)=>{ if(error){ callback(error); }else{ callback(null,tasks) } }); } } }); } functionaddFailedTaskToQueue(taskName,callback){ redisClient.zadd(QUEUE_NAME,newDate().getTime(),taskName,(error,result)=>{ if(error){ callback(error); }else{ callback(null,result); } }); } functionremoveSucceedTaskFromQueue(taskName,callback){ redisClient.zrem(QUEUE_NAME,taskName,(error,result)=>{ if(error){ callback(error); }else{ callback(null,result); } }) } functionexecTask(taskName){ returnnewPromise((resolve,reject)=>{ letrequestOptions={ 'url':'http://127.0.0.1:9001', 'method':'GET', 'timeout':5000 }; request(requestOptions,(error,response,body)=>{ if(error){ resolve('failed'); console.log(error); addFailedTaskToQueue(taskName,(error)=>{ if(error){ console.log(error); }else{ } }); }else{ try{ body=typeofbody!=='object'?JSON.parse(body):body; }catch(error){ resolve('failed'); console.log(error); addFailedTaskToQueue(taskName,(error,result)=>{ if(error){ console.log(error); }else{ } }); return; } if(body.status!==200){ resolve('failed'); addFailedTaskToQueue(taskName,(error,result)=>{ if(error){ console.log(error); }else{ } }); }else{ resolve('succeed'); removeSucceedTaskFromQueue(taskName,(error,result)=>{ if(error){ console.log(error); }else{ } }); } } }); }); } //定时,每隔5秒获取新的任务来执行 letjob=schedule.scheduleJob('*/5*****',()=>{ console.log('获取新任务'); getTasksFromQueue((error,tasks)=>{ if(error){ console.log(error); }else{ if(tasks.length>0){ console.log(tasks); Promise.all(tasks.map(execTask)) .then((results)=>{ console.log(results); }) .catch((error)=>{ console.log(error); }); } } }); });