Node 集成RabbitMQ 系列三:重连机制,附Demo
文章插图
前提前两篇对rabbitmq的基本概念与延迟队列、死信队列进行了代码测试 , 默认都是理想情况的正常操作 , 针对复杂多变的网络环境 , 先不说投递的可靠性 , 首先服务的可用性就是第一个拦路虎 , 如:重连、限流 。
本文目标:
- 单独抽离rabbitmq配置 , 便于之后写插件
- 考虑异常 , 比如:重联 , <之前为了实现API , 不想考虑>
- 消费端限流 , 为啥 , 因为遇到过bug…
代码篇重点
// !应该写断言的 。。。 下次import * as assert from 'assert'import * as amqp from 'amqplib'import * as consumer from './consumer'// 连接配置:#connect// url | configconst config = {protocol: 'amqp',hostname: 'localhost',port: 5672,username: 'guest',password: 'guest',// 最大连接数 , 0:无限// the size in bytes of the maximum frame allowed over the connection. 0 means no limit (but since frames have a size field which is an unsigned 32 bit integer, it’s perforce 2^32 - 1); I default it to 0x1000, i.e. 4kb, which is the allowed minimum, will fit many purposes, and not chug through Node.JS’s buffer pooling.frameMax: 0,// 心跳周期heartbeat: 0,}let connect: amqp.Connection;// 最大连接次数...let maxConnectTimes = 0;let isConnect = false;export const init = async () => {try {connect = await amqp.connect(config);// 监听error\close , 重新连接connect.on('error', err => {reconnect(err, 'error');});// 什么时候会触发?网络异常、服务异常、管理后台删除connect.on('close', err => {reconnect(err, 'close');});console.info('[x]Rabbitmq connect success');// !注册执行消费者// 可以根据需求 , 多写几个?consumer.run(connect);return connect;} catch (error) {reconnect(error, 'catch');}}const reconnect = (err, event) => {// 因为后台删除连接 , 会同时触发error、close, 为了不一次创建两个 , 所以做个限制if (!isConnect) {isConnect = true;maxConnectTimes++;console.error(`[x]Lost connection to RMQ. reconnectingCount: ${maxConnectTimes}. Reconnecting in 10 seconds...`);console.error('[x]Rabbitmq close: ', event, err);// 5秒连接一次return setTimeout(init, 1000 * 5);}}// 公用这个连接export const connection = () => {return connect;}19 86 97 88 8 7 60 98 5 74 23 27 87 24 26 61 25 64 13 62 39 68 71 49 53 89 92 77 28 58 2 12 94 69 1 32 16 90 73 72 10 6 14 35 21 51 18 37 93 42 67 43 48 54 75 38 44 80 52 46 36 99 55 66 70 47 59 15 29 31 57 63 82 81 96 30 91 78 95 4 20 83 56 9 76 41 100 65 3 11 45 79 34 17 0 50 40 84 22
启动文件import * as http from 'http'import * as rabbitmq from './rabbitmq';import * as producer from './producer';/*** 实现功能 * 1. 启动node服务}res.end('hello world')}).listen(3000, () => {rabbitmq.init();console.log('开启端口3000')})81 7 95 82 30 45 5 34 50 86 53 59 80 64 15 62 73 0 85 54 42 12 19 78 10 76 21 17 83 23 56 18 91 25 94 4 65 55 49 97 43 28 58 14 47 8 3 9 2 44 37 93 89 36 41 29 11 52 27 87 51 6 40 33 74 60 61 16 48 96 57 99 98 66 39 84 35 79 69 72 38 100 71 46 24 20 22 92 70 32 75 88 90 31 67 68 13 26 1
消费端async function consumer(args: {exchange, queue, routingKey, connection}, cb: (msg: any, channel: any) => void){// 常规操作const channel = await args.connection.createChannel();await channel.assertExchange(args.exchange, 'direct', {durable: false});const queueA = await channel.assertQueue(args.queue, {exclusive: false});await channel.bindQueue(queueA.queue, args.exchange, args.routingKey);// !消费端限流await channel.prefetch(1, false);// 消费队列await channel.consume(queueA.queue, msg => {cb(msg, channel);});}export const run = (connection) => {consumer({exchange: 'order.exchange',routingKey: 'order.routingKey',queue: 'order.queue',connection,}, async (msg, channel) => {const data = http://kandian.youth.cn/index/msg.content.toString();console.info(`${(new Date()).getMinutes()}:${(new Date()).getSeconds()} consumer msg:%j`, data);console.log('msg: ', msg)return setTimeout(function () {try {/*** 针对队列信息进行业务操作* 1. 直接消费* 2. 重回队列* 3. 扔到死信队列*/channel.ack(msg);// if(Number(data) < 6) {//// 手动ack//channel.ack(msg);// } else {//// !1. 重回队列//channel.nack(msg);//// !2. 扔到死信队列//// 下个demo再整理 。// }} catch (err) {console.error('消息 Ack Error:', err)}// 每隔1s执行一个任务}, 1000);})}74 14 28 21 42 40 46 20 25 5 93 98 29 13 95 55 9 83 11 87 68 26 38 17 41 60 32 2 45 53 22 96 67 31 30 89 69 97 51 77 84 78 6 64 54 75 16 34 1 4 70 12 82 7 85 81 66 73 3 56 80 39 57 79 50 63 33 37 23 49 86 47 52 19 91 99 36 8 94 48 59 24 61 62 76 44 15 88 27 72 65 18 10 100 43 58 92 90 35 0
github# 启动服务 , node应该都会 , 要不也不会看这个$ ts-node index.ts
【Node 集成RabbitMQ 系列三:重连机制,附Demo】
- 联盟|天津半导体集成电路人才联盟成立
- 科技|万业企业控股孙公司北京凯世通拟向芯成科技出售3款集成电路设备
- 软件|山西出台政策奖励集成电路和软件企业
- 高通发布骁龙888芯片,集成X60 5G芯片
- 高通骁龙888正式发布 首发Cortex X1架构集成5G基带
- 搭建私有Sentry日志收集系统并集成到springboot
- Kubernetes 运维小记:node 为系统保留最低资源
- 你喜欢的 Go 第三方库:一步为系统集成可视化实时运行时统计
- 集成式|多模块集成式工具 燚博云提升薪管效率
- 柯尼卡美能达最新的小型柔性OLED照明面板可与NFC集成