Egg.JS : 基于 KOA2 的企业级框架
Kafka: 高吞吐量的分布式发布订阅消息系统
本文章将集成 egg + kafka + MySQL 的日志系统例子
系统要求: 日志记录, 通过 kafka 进行消息队列控制
思路图:
这里消费者和生产者都由日志系统提供
λ.1 环境准备
1Kafka
官网下载 kafka 后, 解压
启动 zookeeper:
bin/zookeeper-server-start.sh config/zookeeper.properties
启动 Kafka server
这里 config/server.properties 中将 num.partitions=5, 我们设置 5 个 partitions
- bin/kafka-server-start.sh config/server.properties
- 2 egg + MySQL
根据脚手架搭建好 egg, 再多安装 kafka-node,egg-MySQL
MySQL 用户名 root 密码 123456
λ.2 集成
1, 根目录新建 App.JS, 这个文件在每次项目加载时候都会运作
- 'use strict';
- const kafka = require('kafka-node');
- module.exports = App => {
- App.beforeStart(async () => {
- const ctx = App.createAnonymousContext();
- const Producer = kafka.Producer;
- const client = new kafka.KafkaClient({ kafkaHost: App.config.kafkaHost });
- const producer = new Producer(client, App.config.producerConfig);
- producer.on('error', function(err) {
- console.error('ERROR: [Producer]' + err);
- });
- App.producer = producer;
- const consumer = new kafka.Consumer(client, App.config.consumerTopics, {
- autoCommit: false,
- });
- consumer.on('message', async function(message) {
- try {
- await ctx.service.log.insert(JSON.parse(message.value));
- consumer.commit(true, (err, data) => {
- console.error('commit:', err, data);
- });
- } catch (error) {
- console.error('ERROR: [GetMessage]', message, error);
- }
- });
- consumer.on('error', function(err) {
- console.error('ERROR: [Consumer]' + err);
- });
- });
- };
上述代码新建了生产者, 消费者.
生产者新建后加载进 App 全局对象. 我们将在请求时候生产消息. 这里只是先新建实例
消费者获取消息将访问 service 层的 insert 方法 (数据库插入数据).
具体参数可以参考 kafka-node 官方 API, 往下看会有生产者和消费者的配置参数.
2,controller . log.JS
这里获取到了 producer, 并传往 service 层
- 'use strict';
- const Controller = require('egg').Controller;
- class LogController extends Controller {
- /**
- * @description Kafka 控制日志信息流
- * @host /log/notice
- * @method POST
- * @param {Log} log 日志信息
- */
- async notice() {
- const producer = this.ctx.App.producer;
- const Response = new this.ctx.App.Response();
- const requestBody = this.ctx.request.body;
- const backInfo = await this.ctx.service.log.send(producer, requestBody);
- this.ctx.body = Response.success(backInfo);
- }
- }
- module.exports = LogController;
- 3,service . log.JS
这里有一个 send 方法, 这里调用了 producer.send , 进行生产者生产
insert 方法则是数据库插入数据
- 'use strict';
- const Service = require('egg').Service;
- const uuidv1 = require('uuid/v1');
- class LogService extends Service {
- async send(producer, params) {
- const payloads = [
- {
- topic: this.ctx.App.config.topic,
- messages: JSON.stringify(params),
- },
- ];
- producer.send(payloads, function(err, data) {
- console.log('send :', data);
- });
- return 'success';
- }
- async insert(message) {
- try {
- const logDB = this.ctx.App.MySQL.get('log');
- const ip = this.ctx.ip;
- const Logs = this.ctx.model.Log.build({
- id: uuidv1(),
- type: message.type || '',
- level: message.level || 0,
- operator: message.operator || '',
- content: message.content || '',
- ip,
- user_agent: message.user_agent || '',
- error_stack: message.error_stack || '',
- url: message.url || '',
- request: message.request || '',
- response: message.response || '',
- created_at: new Date(),
- updated_at: new Date(),
- });
- const result = await logDB.insert('logs', Logs.dataValues);
- if (result.affectedRows === 1) {
- console.log(`SUCEESS: [Insert ${message.type}]`);
- } else console.error('ERROR: [Insert DB]', result);
- } catch (error) {
- console.error('ERROR: [Insert]', message, error);
- }
- }
- }
- module.exports = LogService;
- 4,config . config.default.JS
一些上述代码用到的配置参数具体在这里, 注这里开了 5 个 partition.
- 'use strict';
- module.exports = appInfo => {
- const config = (exports = {});
- const topic = 'logAction_p5';
- // add your config here
- config.middleware = [];
- config.security = {
- csrf: {
- enable: false,
- },
- };
- // MySQL database configuration
- config.MySQL = {
- clients: {
- basic: {
- host: 'localhost',
- port: '3306',
- user: 'root',
- password: '123456',
- database: 'merchants_basic',
- },
- log: {
- host: 'localhost',
- port: '3306',
- user: 'root',
- password: '123456',
- database: 'merchants_log',
- },
- },
- default: {},
- App: true,
- agent: false,
- };
- // sequelize config
- config.sequelize = {
- dialect: 'mysql',
- database: 'merchants_log',
- host: 'localhost',
- port: '3306',
- username: 'root',
- password: '123456',
- dialectOptions: {
- requestTimeout: 999999,
- },
- pool: {
- acquire: 999999,
- },
- };
- // kafka config
- config.kafkaHost = 'localhost:9092';
- config.topic = topic;
- config.producerConfig = {
- // Partitioner type (default = 0, random = 1, cyclic = 2, keyed = 3, custom = 4), default 0
- partitionerType: 1,
- };
- config.consumerTopics = [
- { topic, partition: 0 },
- { topic, partition: 1 },
- { topic, partition: 2 },
- { topic, partition: 3 },
- { topic, partition: 4 },
- ];
- return config;
- };
5, 实体类:
mode . log.JS
这里使用了 Sequelize
- 'use strict';
- module.exports = App => {
- const { STRING, INTEGER, DATE, TEXT } = App.Sequelize;
- const Log = App.model.define('log', {
- /**
- * UUID
- */
- id: { type: STRING(36), primaryKey: true },
- /**
- * 日志类型
- */
- type: STRING(100),
- /**
- * 优先等级 (数字越高, 优先级越高)
- */
- level: INTEGER,
- /**
- * 操作者
- */
- operator: STRING(50),
- /**
- * 日志内容
- */
- content: TEXT,
- /**
- * IP
- */
- ip: STRING(36),
- /**
- * 当前用户代理信息
- */
- user_agent: STRING(150),
- /**
- * 错误堆栈
- */
- error_stack: TEXT,
- /**
- * URL
- */
- url: STRING(255),
- /**
- * 请求对象
- */
- request: TEXT,
- /**
- * 响应对象
- */
- response: TEXT,
- /**
- * 创建时间
- */
- created_at: DATE,
- /**
- * 更新时间
- */
- updated_at: DATE,
- });
- return Log;
- };
6, 测试 Python 脚本:
- import requests
- from multiprocessing import Pool
- from threading import Thread
- from multiprocessing import Process
- def loop():
- t = 1000
- while t:
- url = "http://localhost:7001/log/notice"
- payload = "{\n\t\"type\": \"ERROR\",\n\t\"level\": 1,\n\t\"content\": \"URL send ERROR\",\n\t\"operator\": \"Knove\"\n}"
- headers = {
- 'Content-Type': "application/json",
- 'Cache-Control': "no-cache"
- }
- response = requests.request("POST", url, data=payload, headers=headers)
- print(response.text)
- if __name__ == '__main__':
- for i in range(10):
- t = Thread(target=loop)
- t.start()
7, 建表语句:
- SET NAMES utf8mb4;
- SET FOREIGN_KEY_CHECKS = 0;
- -- ----------------------------
- -- Table structure for logs
- -- ----------------------------
- DROP TABLE IF EXISTS `logs`;
- CREATE TABLE `logs` (
- `id` varchar(36) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL,
- `type` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL COMMENT '日志类型',
- `level` int(11) NULL DEFAULT NULL COMMENT '优先等级 (数字越高, 优先级越高)',
- `operator` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL DEFAULT NULL COMMENT '操作人',
- `content` text CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL COMMENT '日志信息',
- `ip` varchar(36) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL DEFAULT NULL COMMENT 'IP\r\nIP',
- `user_agent` varchar(150) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL DEFAULT NULL COMMENT '当前用户代理信息',
- `error_stack` text CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL COMMENT '错误堆栈',
- `url` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL DEFAULT NULL COMMENT '当前 URL',
- `request` text CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL COMMENT '请求对象',
- `response` text CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL COMMENT '响应对象',
- `created_at` datetime(0) NULL DEFAULT NULL COMMENT '创建时间',
- `updated_at` datetime(0) NULL DEFAULT NULL COMMENT '更新时间',
- PRIMARY KEY (`id`) USING BTREE
- ) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_bin ROW_FORMAT = Dynamic;
- SET FOREIGN_KEY_CHECKS = 1;
λ.3 后话
网上类似资料甚少, 啃各种文档, 探寻技术实现方式
来源: https://www.jb51.net/article/150459.htm