Redis+NodeJS实现能处理海量数据的异步任务队列系统( 二 )
local_tasks)和任务的总数 TASK_AMOUNT(此处为 20 个) 。 通过 LPUSH 方法往 TASK_NAME 的 List 当中塞入内容为 task-1 到 task-20 的任务 , 如图所示:
文章插图
文章插图
三、异步任务处理首先新建一个 index.ts 文件 , 作为整个异步任务队列处理系统的入口文件 。
import taskHandler from './tasksHandler'import client from './mqClient'client.on('connect', () => {console.log('Redis is connected!')})client.on('ready', async () => {console.log('Redis is ready!')await taskHandler()})client.on('error', (e) => {console.log('Redis error! ' + e)})
在运行该文件时 , 会自动连接 Redis , 并且在 ready 状态时执行任务处理器 taskHandler() 。
在上一节的操作中 , 我们往任务队列里面添加了 20 个任务 , 每个任务都是形如 task-n 的字符串 。 为了验证异步任务的实现 , 我们可以在任务处理器 taskHandler.ts 重写一段 demo 函数 , 来模拟真正的异步任务:
function handleTask(task: string) {return new Promise((resolve) => {setTimeout(async () => {console.log(`Handling task: ${task}...`)resolve()}, 2000)})}
上面这个 handleTask() 函数 , 将会在执行的 2 秒后打印出当前任务的内容 , 并返回一个 Promise , 很好地模拟了异步函数的实现方式 。 接下来我们将会围绕这个函数 , 来处理队列中的任务 。
其实到了这一步为止 , 整个异步任务队列处理系统已经基本完成了 , 只需要在 taskHandler.ts 中补充一点点代码即可:
import { popTask } from './utils'import client from './mqClient'function handleTask(task: string) { /* ... */}export default async function tasksHandler() {// 从队列中取出一个任务const task = await popTask()// 处理任务await handleTask(task)// 递归运行await tasksHandler()}
最后 , 我们使用 PM2 启动 4 个进程 , 来试着跑一下整个项目:
pm2 start ./dist/index.js -i 4--tt-darkmode-color: #E83E8C;">false 设置成 true 。 当且仅当黄色的标记值为 false 时才会设置时间 。 这样一来 , 当其他任务被取得时 , 由于黄色的标记值已经是 true 了 , 因此无法设置时间 , 所以我们便能得到首个任务被取得的时间 。
在本文的例子中 , 黄色的标记值和首个任务被取得的时间也被存放在 Redis 中 , 分别被命名为 local_tasks_SET_FIRST 和 local_tasks_BEGIN_TIME 。
原理已经弄懂 , 但是在实践中还有一个地方值得注意 。 我们知道 , 从 Redis 中读写数据也是一个异步操作 。 由于我们有多个 worker 但只有一个 Redis , 那么在读取黄色标记值的时候很可能会出现“冲突”的问题 。 举个例子 , 当 worker-1 修改标记值为 true 的同时 ,worker-2 正好在读取标记值 。 由于时间的关系 , 可能 worker-2 读到的标记值依然是 false , 那么这就冲突了 。 为了解决这个问题 , 我们可以使用 node-redlock 这个工具来实现“锁”的操作 。
顾名思义 , “锁”的操作可以理解为当 worker-1 读取并修改标记值的时候 , 不允许其他 worker 读取该值 , 也就是把标记值给锁住了 。 当 worker-1 完成标记值的修改时会释放锁 , 此时才允许其他的 worker 去读取该标记值 。
node-redlock 是 Redis 分布式锁 Redlock 算法的 JavaScript 实现 , 关于该算法的讲解可参考。
值得注意的是 , 在 node-redlock 在使用的过程中 , 如果要锁一个已存在的 key , 就必须为该 key 添加一个前缀 locks: , 否则会报错 。
回到 utils.ts , 编写一个 setBeginTime() 的工具函数:
export const setBeginTime = async (redlock: Redlock) => {// 读取标记值前先把它锁住const lock = await redlock.lock(`lock:${TASK_NAME}_SET_FIRST`, 1000)const setFirst = await getRedisValue(`${TASK_NAME}_SET_FIRST`)// 当且仅当标记值不等于 true 时 , 才设置起始时间if (setFirst !== 'true') {console.log(`${pm2tips} Get the first task!`)await setRedisValue(`${TASK_NAME}_SET_FIRST`, 'true')await setRedisValue(`${TASK_NAME}_BEGIN_TIME`, `${new Date().getTime()}`)}// 完成标记值的读写操作后 , 释放锁await lock.unlock().catch(e => e)}
然后把它添加到 taskHandler() 函数里面即可:
export default async function tasksHandler() {+// 获取第一个任务被取得的时间+await setBeginTime(redlock)// 从队列中取出一个任务const task = await popTask()// 处理任务await handleTask(task)// 递归运行await tasksHandler()}
接下来解决“最后一个任务的完成时间”这个问题 。
类似上一个问题 , 由于任务执行的先后顺序无法保证 , 异步操作的完成时间也无法保证 , 因此我们也需要一个额外的标识来记录任务的完成情况 。 在 Redis 中创建一个初始值为 0 的标识 local_tasks_CUR_INDEX , 当 worker 完成一个任务就让标识加 。 由于任务队列的初始长度是已知的(为 TASK_AMOUNT 常量 , 也写入了 Redis 的 local_tasks_TOTAL 中) , 因此当标识的值等于队列初始长度的值时 , 即可表明所有任务都已经完成 。
- 智能手机市场|华为再拿第一!27%的份额领跑全行业,苹果8%排在第四名!
- 痛点|首个OTA智能社区诞生 解决行业四大痛点
- 王兴称美团优选目前重点是建设核心能力;苏宁旗下云网万店融资60亿元;阿里小米拟增资居然之家|8点1氪 | 美团
- 芯片|华米GTS2mini和红米手表哪个好 参数功能配置对比
- 黑莓(BB.US)盘前涨逾32%,将与亚马逊开发智能汽车数据平台|美股异动 | US
- 国产手机|国产手机新品频发,果粉们你们还能忍得住吗?
- 巅峰|realme巅峰之作:120Hz+陶瓷机身+5000mAh 做到了颜值与性能并存
- 蛋壳公寓|官媒发声:绝不能让“割韭菜者”一跑了之!
- 出海|出海日报丨短视频生产服务商小影科技完成近4亿元 C 轮融资;华为成为俄罗斯在线出售智能手机的第一品牌
- QuestMobile|QuestMobile:百度智能小程序月人均使用个数达9.6个