Redis+NodeJS实现能处理海量数据的异步任务队列系统( 三 )


Redis+NodeJS实现能处理海量数据的异步任务队列系统文章插图
如图所示 , 被完成的任务都会让黄色的标识加一 , 任何时候只要判断到标识的值等于队列的初始长度值 , 即可表明任务已经全部完成 。
回到 taskHandler() 函数 , 加入下列内容:
export default async function tasksHandler() {+// 获取标识值和队列初始长度+let curIndex = Number(await getRedisValue(`${TASK_NAME}_CUR_INDEX`))+const taskAmount = Number(await getRedisValue(`${TASK_NAME}_TOTAL`))+// 等待新任务+if (taskAmount === 0) {+console.log(`${pm2tips} Wating new tasks...`)+await sleep(2000)+await tasksHandler()+return+}+// 判断所有任务已经完成+if (curIndex === taskAmount) {+const beginTime = await getRedisValue(`${TASK_NAME}_BEGIN_TIME`)+// 获取总耗时+const cost = new Date().getTime() - Number(beginTime)+console.log(`${pm2tips} All tasks were completed! Time cost: ${cost}ms. ${beginTime}`)+// 初始化 Redis 的一些标识值+await setRedisValue(`${TASK_NAME}_TOTAL`, '0') +await setRedisValue(`${TASK_NAME}_CUR_INDEX`, '0')+await setRedisValue(`${TASK_NAME}_SET_FIRST`, 'false')+await delRedisKey(`${TASK_NAME}_BEGIN_TIME`)+await sleep(2000)+await tasksHandler()}// 获取第一个任务被取得的时间await setBeginTime(redlock)// 从队列中取出一个任务const task = await popTask()// 处理任务await handleTask(task)+ // 任务完成后需要为标识位加一+try {+const lock = await redlock.lock(`lock:${TASK_NAME}_CUR_INDEX`, 1000)+curIndex = await getCurIndex()+await setCurIndex(curIndex + 1)+await lock.unlock().catch((e) => e)+} catch (e) {+console.log(e)+}+// recursion+await tasksHandler()+}// 递归运行await tasksHandler()}到这一步为止 , 我们已经解决了获取“最后一个任务的完成时间”的问题 , 再结合前面的首个任务被取得的时间 , 便能得出运行的总耗时 。
最后来看一下实际的运行效果 。 我们循例往队列里面添加了 task-1 到 task-20 这 20 个任务 , 然后启动 4 个进程来跑:
Redis+NodeJS实现能处理海量数据的异步任务队列系统文章插图
Redis+NodeJS实现能处理海量数据的异步任务队列系统文章插图
运行状况良好 。 从运行结果来看 , 4 个进程处理 20 个平均耗时 2 秒的任务 , 只需要 10 秒的时间 , 完全符合设想 。
五、小结当面对海量的异步任务需要处理的时候 , 多进程 + 任务队列的方式是一个不错的解决方式 。 本文通过探索 Redis + NodeJS 结合的方式 , 构造出了一个异步任务队列处理系统 , 能较好地完成最初方案的设想 , 但依然有很多问题需要改进 。 比如说当任务出错了应该怎么办 , 系统能否支持不同类型的任务 , 能否运行多个队列等等 , 都是值得思考的问题 。
(完)