我的各种 GitHub 开源项目和代码: https://github.com/linbin524
背景
笔者 目前架构的 IOT 项目是使用 abp 框架作为后台, 虽然 abp 的框架适用于中小型项目框架, 但由于架构优美, 笔者认为还是可以经过改造, 作为大型项目中使用. 但 IOT 的这个项目目前刚上线不久, 十几天数据库已经有了上百 GB, 而且由于实施检查设备状态, 调用设备状态维护表, 审计日志压力很大, 单单审计日志一天的数据量就有几十万, 目前在架构上, 笔者做了几个优化处理;
1, 针对审计日志, 笔者重写了 Abp 原有的 IAuditingStore, 实现 MongoDB 和 Redis 两种转移, 并且针对审计日志内容做了过滤, DisableAuditing 特性标记指定的类或方法不进行记录.
ps:abp 虽然有 MongoDB 的封装, 但它的出发点是和 EF 同一个模式, 左右系统唯一的 ORM, 如果要使用 abp 的 mongo 封装, 必须要替代 EF, 或者重写 ABP UnitOfWorkOptions, 否则直接用会出现工作单元转换失败的问题.
2, 站点层面使用 nginx 做了反向代理, 进行多站点服务, 通信模式由原来的队列, 改为服务化, EventBus 等方式
3, 数据库底层 做了 Percona XtraDB Cluster-MySQL 集群处理迁移.
思考评估: 1, 审计日志这样处理, 从源头做了缩减, 并且进行 Nosql 拆分, 有助于缓解数据库压力.
2, 中间层的处理是一般 IOT 中间件各种脚手架的组合, 成熟, 也有经过多年生产环境的检验.
3, 数据库底层 使用 Percona XtraDB Cluster, 是因为它支持集群, 可以缓解数据库请求压力, 又支持 abp 的事务;
但从真正大系统考虑, 其实最理性的模式应该是分片, 结合 SOA, 或者微服务才能真正解决底层压力, 目前考量了 Tidb(张善友 张队推荐的),oceanbase(淘宝 自有数据库, 生产环境十年),mycat 中间件 (听说这个坑多) 等,
为了暂时不做大改造, 只能先使用 Percona XtraDB Cluster, 后续可能使用 Orleans(Azure 云框架),akka.NET(大型的框架) 或者 Service Fabric(微服务框架)
二, Percona XtraDB Cluster 评估
优点如下:
1. 当执行一个查询时, 在本地节点上执行. 因为所有数据都在本地, 无需远程访问.
2. 无需集中管理. 可以在任何时间点失去任何节点, 但是集群将照常工作.
3. 良好的读负载扩展, 任意节点都可以查询.
缺点如下:
1. 加入新节点, 开销大. 需要复制完整的数据.
2. 不能有效的解决写缩放问题, 所有的写操作都将发生在所有节点上.
3. 有多少个节点就有多少重复的数据.
Percona XtraDB Cluster 是 MySQL 高可用性和可扩展性的解决方案.
Percona XtraDB Cluster 提供的特性有:
1. 同步复制, 事务要么在所有节点提交或不提交.
2. 多主复制, 可以在任意节点进行写操作.
3. 在从服务器上并行应用事件, 真正意义上的并行复制.
4. 节点自动配置.
5. 数据一致性, 不再是异步复制.
Percona XtraDB Cluster 完全兼容 MySQL 和 Percona Server, 表现在:
1. 数据的兼容性
2. 应用程序的兼容性: 无需更改应用程序
1. 集群是有节点组成的, 推荐配置至少 3 个节点, 但是也可以运行在 2 个节点上.
2. 每个节点都是普通的 MySQL/percona 服务器, 可以将现有的数据库服务器组成集群, 反之, 也可以将集群拆分成单独的服务器.
3. 每个节点都包含完整的数据副本.
三, 部署流程
1, 环境准备
在腾讯云上开设三个测试服务器, 系统 镜像 CentOS 7.5 64
用远程工具连接三台测试服务器, 完成如下操作
(1) 关闭 firewalld 防火墙
# systemctl disable firewalld --now
关闭防火墙或者允许 3306, 4444, 4567 和 4568 四个端口的连接
(2)关闭 SELINUX
- # setenforce 0
- # sed -i 's,^SELINUX=enforcing,SELINUX=disabled,g' /etc/selinux/config
2, 主节点部署
(1)安装 PXC yum 源
yum install http://www.percona.com/downloads/percona-release/redhat/0.1-3/percona-release-0.1-3.noarch.rpm
(2) 安装 PXC
# yum install Percona-XtraDB-Cluster-56
最终下载下来的版本是 Percona-XtraDB-Cluster-56-5.6.30
(3) 修改 /etc/my.cnf
- VIM /etc/my.cnf
- [mysqld]
- datadir=/var/lib/MySQL
- user=MySQL
- wsrep_provider=/usr/lib64/galera3/libgalera_smm.so
- # 集群的 ip
- wsrep_cluster_address=gcomm:// 节点 ip1, 节点 ip2, 节点 ip3
- binlog_format=ROW
- default_storage_engine=InnoDB
- innodb_autoinc_lock_mode=2
- # 当前主节点的 ip
wsrep_node_address = 当前节点 ip
- wsrep_sst_method=xtrabackup-v2
- wsrep_cluster_name=my_centos_cluster
- # 初始化一个 MySQL 的用户和密码
- wsrep_sst_auth="admin:123456"
(4)启动主节点
systemctl start MySQL@Bootstrap.service
(5)进入 MySQL
登录 (初始化状态, 无密码, 遇到要输密码直接回车)
MySQL -uroot -p
(6) 登录客户端查看数据库的状态, 在进行权限配置允许 ip 访问, 默认无法远程访问, 但是我们需要远程通过图形化等界面查看, 所以要做如下配置
- MySQL> show status like 'wsrep%';
- CREATE USER 'admin'@'localhost' IDENTIFIED BY '123456';// 如果这里报错, 看一下是否有 用户存在了
- GRANT RELOAD, LOCK TABLES, REPLICATION CLIENT ON *.* TO 'admin'@'localhost';
- FLUSH PRIVILEGES;
完成后可以用 Navicat For MySQL 连接看一下是否可以成功访问
3, 其他两个节点的配置
(1)安装 PXC yum 源
yum install http://www.percona.com/downloads/percona-release/redhat/0.1-3/percona-release-0.1-3.noarch.rpm
(2) 安装 PXC
# yum install Percona-XtraDB-Cluster-56
(3) 修改 /etc/my.cnf
- VIM /etc/my.cnf
- [mysqld]
- datadir=/var/lib/MySQL
- user=MySQL
- wsrep_provider=/usr/lib64/galera3/libgalera_smm.so
- # 集群的 ip
- wsrep_cluster_address=gcomm:// 节点 ip1, 节点 ip2, 节点 ip3
- binlog_format=ROW
- default_storage_engine=InnoDB
- innodb_autoinc_lock_mode=2
- # 当前主节点的 ip
wsrep_node_address = 当前节点 ip
- wsrep_sst_method=xtrabackup-v2
- wsrep_cluster_name=my_centos_cluster
- # 初始化一个 MySQL 的用户和密码
- wsrep_sst_auth="admin:123456"
(4)启动当前节点(这一步和主节点不一样)
systemctl start MySQL
(5)进入 MySQL
登录 (初始化状态, 无密码, 遇到要输密码直接回车)
MySQL -uroot -p
(6) 登录客户端查看数据库的状态, 在进行权限配置允许 ip 访问, 默认无法远程访问, 但是我们需要远程通过图形化等界面查看, 所以要做如下配置
- MySQL> show status like 'wsrep%';
- CREATE USER 'admin'@'localhost' IDENTIFIED BY '123456';// 如果这里报错, 看一下是否有 用户存在了
- GRANT RELOAD, LOCK TABLES, REPLICATION CLIENT ON *.* TO 'admin'@'localhost';
- FLUSH PRIVILEGES;
完成后可以用 Navicat For MySQL 连接看一下是否可以成功访问
(7)可以在 MySQL 中执行如下命令查看
show status like 'wsrep%';
如果正常, 可以出现如下界面, 标识当前三个集群节点
(8)如果出现启动节点时候出现异常, 可以查看提示的操作, 看看日志, 百度一下看看是什么错误, 怎么解决, 因为各种错误都有, 就不好一一解释了.
比如笔者在操作过程中就出现如下错误
ecStop=/usr/bin/MySQL-systemd stop (code=exited, status=2)
后面查找原因应该是 防火墙等问题, 进行关闭拦截等操作, 就是一开始 环境准备的后面那一步, 关闭防火墙, SELINUX,
主节点重启
- systemctl stop MySQL@Bootstrap.service
- systemctl start MySQL@Bootstrap.service
其他节点也再次启动
systemctl start MySQL
4,abp 进行数据库迁移
(1)abp 想要进行 MySQL 支持, 网上的教程有, 我就不重复造轮子自己参考(不要要注意 组件的版本, 如果出现差异可能会失败)
https://www.jianshu.com/p/543e34da16a7?winzoom=1 https://www.jianshu.com/p/543e34da16a7?winzoom=1
(2) 将数据库连接字符串改为 主节点
<add name="Default" connectionString="server = 主节点 ip;port=3306;database=abpzero4_6db;uid=admin;password=123456;" providerName="MySql.Data.MySqlClient" />
(3) 执行迁移
(4)查看对应的三台服务器集群都自动同步该数据库
(5)在 Appservice 中建立测试服务进行增删改查, 事务等测试
- using Abp.Application.Services;
- using Abp.Application.Services.Dto;
- using Abp.AutoMapper;
- using Abp.Domain.Repositories;
- using Abp.Domain.Uow;
- using AutoMapper;
- using System;
- using System.Collections.Generic;
- using System.Data.Entity;
- using System.Linq;
- using System.Linq.Expressions;
- using System.Text;
- using System.Threading.Tasks;
- using System.Linq.Dynamic;
- using Abp.Linq.Extensions;
- using MyCompanyName.AbpZeroTemplate;
- using MyCompanyName.AbpZeroTemplate.ZLDB_Domain;
- using MyCompanyName.AbpZeroTemplate.Authorization.Consignee.Exporting;
- using MyCompanyName.AbpZeroTemplate.ZLDB_Domain.Dtos;
- using MyCompanyName.AbpZeroTemplate.Dto;
- namespace MyCompanyName.AbpZeroTemplate
- {
- /// <summary>
- /// 收货地址 业务实现接口
- /// </summary>
- public class ConsigneeAppService : AbpZeroTemplateAppServiceBase, IConsigneeAppService
- {
- private readonly IRepository<Consignee, Guid> _consigneeRepository;
- private readonly IConsigneeListExcelExporter _iConsigneeListExcelExporter;
- /// <summary>
- /// 构造函数自动注入我们所需要的类或接口
- /// </summary>
- public ConsigneeAppService(IRepository<Consignee, Guid> consigneeRepository,
- IConsigneeListExcelExporter iConsigneeListExcelExporter)
- {
- _consigneeRepository = consigneeRepository;
- _iConsigneeListExcelExporter = iConsigneeListExcelExporter;
- //_consigneeMongoDbRepository = consigneeMongoDbRepository;
- }
- /// <summary>
- /// 获取所有数据列表
- /// </summary>
- /// <returns > 返回数据集合</returns>
- public async Task<List<ConsigneeDto>> GetAllList(string guid = "")
- {
- //try
- //{
- // var model = new Consignee() { DealerId = System.Guid.NewGuid() };
- // var mr = _consigneeMongoDbRepository.Insert(model);
- //}
- //catch (Exception ex)
- //{
- // throw;
- //}
- // 调用 Task 仓储的特定方法 GetAllWithPeople
- var resultList = await _consigneeRepository.GetAllListAsync();
- return Mapper.Map<List<ConsigneeDto>>(resultList).ToList();
- }
- /// <summary>
- /// 获取分页数据列表 分页具体代码需要适当修改, 如 orderby 需要匹配 创建时间 或者其他数据 Id(int)
- /// </summary>
- /// <returns > 返回数据集合</returns>
- public async Task<PagedResultDto<ConsigneeDto>> GetPagedListAsync(PagedAndFilteredInputDto input)
- {
- var query = _consigneeRepository.GetAll();
- //TODO: 根据传入的参数添加过滤条件
- var resultCount = await query.CountAsync();
- var resultconsignee = await query
- .OrderBy(x => x.Id)
- .PageBy(input)
- .ToListAsync();
- var resultListDtos = resultconsignee.MapTo<List<ConsigneeDto>>();
- return new PagedResultDto<ConsigneeDto>(
- resultCount,
- resultListDtos
- );
- }
- /// <summary>
- /// 获取指定条件的数据列表 webapi 无法使用
- /// </summary>
- /// <returns > 返回数据集合</returns>
- public async Task<List<ConsigneeDto>> GetListByCodition(Expression<Func<Consignee, bool>> predicate)
- {
- var resultList = await _consigneeRepository.GetAllListAsync(predicate);
- return Mapper.Map<List<ConsigneeDto>>(resultList).ToList();
- }
- /// <summary>
- /// 导出 Excel 具体方法
- /// </summary>
- /// <returns>Excel 文件</returns>
- /// public async Task<FileDto> GetConsigneeToExcel()
- ///{
- /// var resultList = await _consigneeRepository.GetAllListAsync();
- /// var consigneeDtos= Mapper.Map<List<ConsigneeDto>>(resultList).ToList();
- /// return _iConsigneeListExcelExporter.ExportToFile(consigneeDtos);
- /// }
- /// <summary>
- /// 根据指定 id 获取数据实体
- /// </summary>
- /// <param name="input">当前 id</param>
- /// <returns></returns>
- public async Task<ConsigneeDto> GetConsigneeForEditAsync(NullableIdDto<System.Guid> input)
- {
- var output = new ConsigneeDto();
- ConsigneeDto consigneeEditDto;
- if (input.Id.HasValue)
- {
- var entity = await _consigneeRepository.GetAsync(input.Id.Value);
- consigneeEditDto = entity.MapTo<ConsigneeDto>();
- }
- else
- {
- consigneeEditDto = new ConsigneeDto();
- }
- output = consigneeEditDto;
- return output;
- }
- /// <summary>
- /// 根据 Id 创建或编辑操作
- /// </summary>
- /// <param name="input">实体</param>
- /// <returns></returns>
- public async Task CreateOrUpdateConsigneeAsync(ConsigneeDto input)
- {
- if (!string.IsNullOrWhiteSpace(input.Id))
- {
- await Update(input);
- }
- else
- {
- await Create(input);
- }
- }
- /// <summary>
- /// 新增
- /// </summary>
- /// <param name="input">新增参数</param>
- /// <returns > 新增实体</returns>
- public async Task<Guid> Create(ConsigneeDto input)
- {
- input.Id = new Consignee().Id.ToString();
- var resultObj = input.MapTo<Consignee>();
- var result = await _consigneeRepository.InsertAsync(resultObj);
- return result.Id;
- }
- /// <summary>
- /// 新增
- /// </summary>
- /// <param name="input">新增参数</param>
- /// <returns > 新增实体</returns>
- public async Task<int> CreateList(List<ConsigneeDto> list)
- {
- foreach (var input in list) {
- if (input.Contact.Contains("ex")) {
- throw new Exception("测试分布式异常!");
- }
- input.Id = new Consignee().Id.ToString();
- var resultObj = input.MapTo<Consignee>();
- var result = await _consigneeRepository.InsertAsync(resultObj);
- }
- return list.Count();
- }
- /// <summary>
- /// 修改
- /// </summary>
- /// <param name="input">修改参数</param>
- /// <returns > 修改实体</returns>
- public async Task<ConsigneeDto> Update(ConsigneeDto input)
- {
- Consignee obj = await _consigneeRepository.GetAsync(new Guid(input.Id));
- input.MapTo(obj);
- var result = await _consigneeRepository.UpdateAsync(obj);
- return obj.MapTo<ConsigneeDto>();
- }
- /// <summary>
- /// 删除
- /// </summary>
- /// <param name="input">删除 Dto</param>
- /// <returns > 无返回值</returns>
- public async System.Threading.Tasks.Task Delete(EntityDto<string> input)
- {
- await _consigneeRepository.DeleteAsync(new Guid(input.Id));
- }
- /// <summary>
- /// 删除 webapi 无法使用
- /// </summary>
- /// <param name="predicate">删除条件</param>
- /// <returns > 无返回值</returns>
- public async System.Threading.Tasks.Task DeleteByCondition(Expression<Func<Consignee, bool>> predicate)
- {
- await _consigneeRepository.DeleteAsync(predicate);
- }
- }
- }
在 swagger ui 中增删改查都已经正常, 而且数据在三个数据库中正常同步
针对事务, 做了人为异常处理, 确认会实现回滚(abp 自带工作单元处理事务)
五, 后记
这一次只是做了简单的实验性测试, 后续需要在加强深入检测, 才可以用生产环境中.
来源: https://www.cnblogs.com/linbin524/p/10150300.html