手机版

KOA egg.js集成卡夫卡消息队列的示例

时间:2021-08-28 来源:互联网 编辑:宝哥软件园 浏览:

Egg.js:基于KOA2的企业级框架

卡夫卡:高吞吐量的分布式发布订阅消息系统

本文章将集成鸡蛋卡夫卡关系型数据库的日志系统例子

系统要求:日志记录,通过卡夫卡进行消息队列控制

思路图:

这里消费者和生产者都由日志系统提供

.1 环境准备

卡夫卡

官网下载卡夫卡后,解压

启动动物园管理员:

bin/zoo keeper-服务器-启动。sh配置/动物园管理员。性能启动卡夫卡服务器

这里配置/服务器。属性中将分区数=5,我们设置5个划分

bin/Kafka-服务器-启动。sh配置/服务器。属性鸡蛋MySQL

根据脚手架搭建好鸡蛋,再多安装kafka-node,egg-mysql

关系型数据库用户名根密码123456

.2 集成

1、根目录新建app.js,这个文件在每次项目加载时候都会运作

使用"严格";const Kafka=require(' Kafka-node ');模块。exports=app={ app。before start(async()={ const CTX=app。createanonymoucontext();常数制作人=卡夫卡。制片人;const客户端=新卡夫卡KafkaClient({ kafkahost : app。配置。kafkahost });常量生产者=新生产者(客户端,app。配置。生产者配置);producer.on('error ',function(err){ console。错误('错误:[Productor]' err);});app.producer=生产者;常数消费者=新卡夫卡Consumer(客户端,app.config.consumerTopics,{ autoCommit: false,});consumer.on('message '),异步函数(留言){试试{等等CTX。服务。日志。插入(JSON。解析(消息。值));consumer.commit(true),(err,data)={ console。错误(' commit : ',err,data);});} catch(错误){控制台。错误(' ERROR :[GetMessage]',消息,错误);} });consumer.on('error ',function(err){ console。错误('错误:[Consumer]' err);});});};上述代码新建了生产者、消费者。

生产者新建后加载进应用全局对象。我们将在请求时候生产消息。这里只是先新建实例

消费者获取消息将访问服务层的插入方法(数据库插入数据)。

具体参数可以参考卡夫卡式节点官方原料药,往下看会有生产者和消费者的配置参数。

2、控制器日志

这里获取到了制片人,并传往服务层

使用"严格";常量控制器=要求('鸡蛋')。控制器;类日志控制器扩展了控制器{/* * * @描述卡夫卡控制日志信息流* @主机/日志/通知* @方法开机自检* @参数{日志}日志日志信息*/async notice(){ const producer=this。CTX。app。制片人;const Response=new this。CTX。app。response();const RequestBody=this。CTX。请求。身体;返回常量信息=等待此消息。CTX。服务。日志。发送(生产者,请求正文);这个。CTX。body=响应。成功(返回信息);} }模块。导出=日志控制器;3、服务日志

这里有一个派遣方法,这里调用了制作人。发送,进行生产者生产

插入方法则是数据库插入数据

使用"严格";常量服务=需要(鸡蛋)。服务;const uuid v1=require(' uuid/v1 ');类日志服务扩展了服务{异步发送(生产者,参数){ const有效负载=[{ topic : this。CTX。app。配置。主题,消息: JSON.stringify(参数),},];生产者.发送(有效负载,函数(err,data) { console.log('send : ',data);});返回"成功";}异步插入(消息){尝试{ const LogDB=this。CTX。app。MySQL。get(' log ');const IP=这个。CTX。知识产权;常量日志=这个。CTX。模特。日志。build({ id : uuid v1()),type: message.type || ' ',level: message.level || 0,operator : message。operator | | ' ',content: message.content || ' ',ip,user _ agent : message。user _ agent | | ' ',error _ stack :消息。error _ stack | | ' ',url: message.url || ' ' .const result=wait LogDB。插入(' logs ',LogS)。DataValues);if(结果。effect rows===1){控制台。日志(` suceesss :[插入$ { message。type }]`);} else控制台。错误('错误:[插入数据库]',结果);} catch(错误){控制台。错误('错误:[插入]',消息,错误);} } } module.exports=LogService4、config . config . default . js

一些上述代码用到的配置参数具体在这里,注这里开了5个分区。

使用"严格";模块。exports=AppInfo={ const config=(exports={ });const topic=' logAction _ p5//在此添加您的配置配置。中间件=[];配置。security={ csrf : { enable : false,},};//mysql数据库配置配置。MySQL={ clients : { basic : { host : ' localhost ',port: '3306 ',user: 'root ',password: '123456 ',database: 'merchants_basic ',},log: { host: 'localhost ',port: '3306 ',user 3: ' root ',password//sequelieze config。sequelize={方言: ' MySQL '、数据库: 'merchants_log '、主机: 'localhost '、端口: '3306 '、用户名: 'root '、密码: '123456 ',拨号盘选项3360 { request time out : 999999 }、pool : { acquire : 999999//Kafka config。kafkahost=' localhost :9092 ';config.topic=topicconfig。producterconfig={//Partitioner类型(默认=0,随机=1,循环=2,键控=3,自定义=4),默认0 partitionerType:1 };config.consumerTopics=[ { topic,partition: 0 },{ topic,partition: 1 },{ topic,partition: 2 },{ topic,partition: 3 },{ topic,partition: 4 },];返回配置;};5、实体类:

模式日志。射流研究…

这里使用了续集

使用"严格";模块。exports=app={ const { STRING,INTEGER,DATE,TEXT }=app .续集;const Log=app.model.define('log ',{/** * UUID */id : { type : STRING(36),primaryKey: true },/* * *日志类型*/type: STRING(100),/** *优先等级(数字越高,优先级越高)*/level: INTEGER,/** *操作者*/operator: STRING(50),/** *日志内容*/content: TEXT,/** * IP */ip: STRING(36),/** *当前用户代理信息*/user_agent: STRING(150),/** *错误堆栈*/error_stack: TEXT,/** * URL */url: STRING(255),/** *请求对象*/request: TEXT,/** *响应对象*/response: TEXT,/** *创建时间*/created_at: DATE,/** *更新时间*/updated_at: DATE,});返回日志;};6、测试计算机编程语言脚本:

从多处理导入池导入请求从线程导入线程从多处理导入进程定义循环(): t=1000,而t : URL=' http://localhost :7001/log/notice ' payload=' \ ' n \ t ' Type ' : ' ERROR \ ',\ n \ t \ ' level \ ' : 1,\ n \ t ' Content ' : ' URL send ERROR \ ',\ n \ t \ ' operator \ ' : '建表语句:

SET NAMES utf8mb4设置外键检查=0;- -日志的表结构- -如果存在"日志",则删除表;创建表"日志"(" id"varchar(36)字符集utf8mb4校对utf8mb4_bin不为空,键入" varchar(100)字符集utf8mb4校对utf8mb4_bin不为空注释"日志类型,` level ` int(11)NULL DEFAULT NULL COMMENT '优先等级(数字越高,优先级越高)','运算符` varchar(50)CHARACTER SET utf8 mb4 COLLATE utf8 mb4 _ bin NULL DEFAULT NULL COMMENT '操作人,` content ` text CHARACTER SET utf8 MB 4 COLLATE utf8 MB 4 _ bin空注释'日志信息,` IP ` varchar(36)CHARTER SET utf8 mb4 COLLATE utf8 mb4 _ bin NULL DEFAULT NULL COMMIT ' IP \ r \ NiP ',` user _ agent ` varchar(150)CHARTER SET utf8 mb4 COLLATE utf8 mb4 _ bin NULL DEFAULT NULL COMMIT '当前用户代理信息, ` error _ stack ` text CHARACTER SET utf 8 mb4 COLLATE utf 8 mb4 _ bin NULL COMMENT '错误堆栈,`网址` varchar(255)字符集utf8 mb4 COLLATE utf8 mb4 _ bin空默认值空注释'当前网址','请求文本字符集utf8 MB 4 COLLATE utf8 MB 4 _ bin空注释'请求对象、“响应”文本字符集utf8 mb4 COLLATE utf8 mb4 _ bin空注释'响应对象,`创建于` datetime(0)空默认值空注释'创建时间,更新于日期时间(0)空默认值空注释'更新时间,PRIMARY KEY(` id `)USING BTREE)ENGINE=InnoDB CHARACTER SET=utf8 MB 4 COLLATE=utf8 MB 4 _ bin ROW _ FORMAT=Dynamic; 设置外键检查=1;.3 后话

网上类似资料甚少,啃各种文档,探寻技术实现方式

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

版权声明:KOA egg.js集成卡夫卡消息队列的示例是由宝哥软件园云端程序自动收集整理而来。如果本文侵犯了你的权益,请联系本站底部QQ或者邮箱删除。