手机版

详细说明node.js中TCP Socket多进程之间的消息推送示例

时间:2021-08-31 来源:互联网 编辑:宝哥软件园 浏览:

前言

前段时间我收到一个支付转账服务的需求,就是通过http接口将支付数据传输到转账服务器,转账服务器将支付数据发送到异构后台(Lua)指定的tcp套接字。

评估之初感觉挺简单的,就是http服务器和tcp服务器的通信不是一个Event实例就能解决的状态管理问题?为消息传输注册一个事件a,当套接字连接时注册一个唯一的ID,然后在http接收数据时发出事件a;当听到事件A时,在tcp服务器中寻找与指定ID对应的套接字来处理数据。

虽然node.js在高并发方面有很好的性能,但是单个tcp服务器实例的承载能力是有限的。为了避免服务器过载,node.js单个进程的内存有一个上限(默认2G),可以容纳少量长连接的客户端。但是随着业务的扩展,需要考虑多机集群部署,客户端可以连接到任意节点,发送消息。如何同时推送多个节点,需要在多个节点之间建立消息分发/订阅架构。常用的第三方消息管理库包括RabbitMQ和Redis。在这里,我使用Redis的订阅发布服务。

Redis.io有成熟的Redis消息传递库socket.io-redis(本地下载)。但是我们项目中的异构背景不是websocket,而是Native TCP Native Socket。用原生redis的sub/pubs实现并不难,所以我写了。

Redis在这个项目中主要扮演发布/订阅的角色。当发送http请求的支付数据时,通过redis的发布功能将消息推送到所有通道,这样订阅该通道的所有socket服务器都可以接收回调,并推送到指定的客户端。在应用程序级别,它类似于事件消息的处理。

const redis=require('redis '),redisClient=redis.createClient,REDIS_CFG={ host: '127.0.0.1 ',port: 6379 },sub=redisClient(REDIS_CFG),pub=redisClient(REDIS_CFG),PAY _ MQ _ CHANNEL=' PAY _ MQ _ CHANNEL ';//收听消息回调sub。on ('message ',function (channel,message){ switch(channel){ case pay _ MQ _ channel : console . log(' notification received 3360 ',message);//将消息广播到指定的socket break}});//订阅频道sub . subscribe(PAY _ MQ _ CHannel);//收到支付数据时,推送渠道消息pub.publish (pay _ MQ _ channel,{id:' 01 ',msg : ` hello $ { pay _ MQ _ channel }!`});由于redis' sub/pub中的通道订阅数量是有上限的,建议一种类型的消息使用一个通道,订阅时在一个通道下使用映射、集合或数组来存储回调函数,收到订阅消息时遍历并执行回调函数。

下面是我打包的Redis组件(RedisMQProxy.js):

/* * redis订阅/发布*/const _=require('lodash '),redis=require('redis '),REDIS_CFG={ host: '127.0.0.1 ',port: 6379 },sub=redisClient(REDIS_CFG),pub=REDIS client(REDIS _ CFG);让SubListenerFuns={ };//通道的回调函数列表让RedisMQProxy={ //订阅通道开启(通道,cb,errorCb,once=false){ sub subscribe(通道);//订阅引导消息//将回调函数存放数组中SubListenerFuns[channel]=_ .isEmpty(SubListenerFuns[channel])?[]: sublistener funs[通道];子列表功能[频道]。推送({一次,cb,错误CB });}, //监听一次性的引导回调函数一次(通道,cb,errorCb) { this.on(通道,Cb,errorCb,true);}, //发送引导消息发出(通道,消息){ if(!_.isString(message)){ message=JSON。stringify(消息);} pub.publish(频道、消息);}, //移除引导上的监听函数removeListener(channel,func) { let channelHandlers=_ .isEmpty(SubListenerFuns[channel])?[]: sublistener funs[通道];用于(设i=0,l=channelHandlers.lengthI lI){ let handler=channel handler[I]| | { };让CB=handler . cbif(func func==CB){ channel handler。拼接(,1);返回false } } } }RedisMQProxy .子列表器=子列表器函数;pub.on('error ',OnError);sub.on('错误,OnError);//监听存储的订阅消息sub on('消息',函数(通道,消息){ //遍历执行引导的回调函数尝试{ message=JSON.parse(消息);} catch(e) {} broadcastToChannel(频道,消息);});//广播消息到指定频道函数broadcastToChannel(通道,消息,isError) { let channelHandlers=_ .isEmpty(SubListenerFuns[channel])?[]: sublistener funs[通道];用于(设i=0,l=channelHandlers.lengthI lI){ let handler=channel handler[I]| | { };让isOnce=handler . once | | false let func=handler . cblet error func=handler。错误CB;_.isFunction(func) func(消息);isError _ .isFuncTion(错误功能)错误功能(消息);isOnce channelHandlers.splice(i,1);//移除一次性监听的函数} }函数broadcasttoallchannell(message,isError){ for(let channel in SubListenerFuns){ broadcasttochannell(channel,message,isError);} }函数onError(err){ err=err | | { };呃。消息=错误。msg | |“redis sub/pub失败”;//通知所有引导执行错误回调函数broadcastToAllChannels(err,true);} module.exports=RedisMQProxy在使用时就可以比较方便地调用了:

const RedisMQProxy=require(' ./RedisMQProxy '),PAY _ MQ _ CHannel=' PAY _ MQ _ CHannel ';//订阅CHannel redimq。on(PAY _ MQ _ CHannel),函数(消息){ console。日志('收到通知: ',消息);//广播消息到指定套接字/.});//订阅一次性的CHannel redimq。一次(PAY _ MQ _ CHannel,函数(消息){ //.});//当接收到支付数据时,推送频道消息RedisMQ.emit(PAY_MQ_CHANNEL,{id: '01 ',msg : ` hello $ { PAY _ MQ _ CHannel }!`});目前该项目已经健康运行了一个多月。由于套接字服务器的多进程间消息推送依赖于存储的消息中转,而使用心得使用的是单进程,未能充分利用中央处理器。当业务膨胀的时候存储就要考虑分布集群了。

总结

以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,如果有疑问大家可以留言交流,谢谢大家对我们的支持。

版权声明:详细说明node.js中TCP Socket多进程之间的消息推送示例是由宝哥软件园云端程序自动收集整理而来。如果本文侵犯了你的权益,请联系本站底部QQ或者邮箱删除。