一, 前言
感谢杨晓东大佬为社区贡献的 CAP 开源项目, 传送门在此:.NET Core 事件总线, 分布式事务解决方案: CAP 以及 如何在你的项目中集成 CAP[手把手视频教程] , 之前也在工作中遇到分布式数据一致性的问题, 也一直都是基于 CAP 理论和 Base.
之前一直有关注杨老板的博客, 直到今天才尝试一下 CAP, 发现好用, 非常的棒, 特此把 CAP 以组件化的方式引入到我的框架中.
二, CAP 介绍
针对 CAP 介绍可以参考上面给出的两个链接. 在此我只简单的说明一下:
CAP 是一个在分布式系统中 (SOA,MicroService) 实现事件总线及最终一致性 (分布式事务) 的一个开源的 C# 库, 她具有轻量级, 高性能, 易使用等特点.
你可以轻松的在基于 .NET Core 技术的分布式系统中引入 CAP, 包括但限于 ASP.NET Core 和 ASP.NET Core on .NET Framework.
CAP 以 NuGet 包的形式提供, 对项目无任何入侵, 你仍然可以以你喜爱的方式来构建分布式系统.
CAP 具有 Event Bus 的所有功能, 并且 CAP 提供了更加简化的方式来处理 EventBus 中的发布 / 订阅.
CAP 具有消息持久化的功能, 也就是当你的服务进行重启或者宕机时, 她可以保证消息的可靠性.
CAP 实现了分布式事务中的最终一致性, 你不用再去处理这些琐碎的细节.
CAP 提供了基于 Microsoft DI 的 API 服务, 她可以和你的 ASP.NET Core 系统进行无缝结合, 并且能够和你的业务代码集成支持强一致性的事务处理.
三, ASP.NET core 集成 CAP
由于我的框架是 DDD 六边形架构, 为了解耦方便, 我针对外部的工具都是以组件化的方式引入到项目中, 即新建了一个 CAP 类库.
第一步: 扩展了 Startup 类中的 IServiceCollection, 在 CAP 中我加入了 Consul 的注册, 如下图:
- public static void AddCAPConfigure(this IServiceCollection services, IConfiguration configuration)
- {
- services.AddCap(x =>
- {
- // 使用 Dapper ORM
- x.UseMySql(configuration.GetConnectionString("DBConnection"));
- // 使用 kafka 进行日志, case 的消息推送
- // 需要配置一下 MQ 地址, kafka 放在 Linux 系统上, 不建议放在 Windows 上
- x.UseKafka(configuration["KafkaConfig"]);
- x.UseDashboard();// 得到 UI 界面
- // 注册服务发现
- x.UseDiscovery(d =>
- {
- d.DiscoveryServerHostName = "192.168.161.163";
- d.DiscoveryServerPort = 8500;
- d.CurrentNodeHostName = "localhost";
- d.CurrentNodePort = 64616;
- d.NodeName = "CAP No.1 Node";
- });
- });
- }
我用的是 MySQL 数据库, 以及使用 kafka 消息队列, 这边要注意, kafka 最好部署在 Linux 系统上, 在 Windows 系统会存在很多的坑, 如果你觉得你的天坑能力强, 可以尝试一下.
我这边也集成了 Consul 服务注册, 如果大家对 cosnul 感谢的兴趣的可以看我的另外一篇文章: 实战中的 ASP.NET core 结合 Consul 集群 & Docker 实现服务治理 里面有讲解了 consul 集群部署.
好了然后我在我的主项目中配置一下, 就开始用吧:
- #region 配置 CAP
- services.AddCAPConfigure(Configuration);
- #endregion
第二步: 在 ASP.NET core webapi 项目中新建一个控制器
配置如下:
- [Route("API/[controller]/[action]")]
- public class ValuesController : Controller
- {
- private readonly ICapPublisher _capBus;
- public ValuesController(ICapPublisher capPublisher)
- {
- _capBus = capPublisher;
- }
- [HttpGet]
- public IActionResult Get()
- {
- _capBus.Publish("show.time", DateTime.Now);
- return Ok();
- }
- [HttpGet]
- [CapSubscribe("show.time")]
- public void CheckReceiveMessage(DateTime time)
- {
- Console.WriteLine(time.AddDays(1));
- }
- }
此处的 MySQL 配置大家可自行补充, 或者按照 @Savorboard 给出的 demo 操作即可.
第三步: 部署一下 kafka.
我在 CentOS 服务器上采用 docker 部署, 命令如下:
- // 下载 zookeeper
- docker pull wurstmeister/zookeeper
- // 下载 kafka
- docker pull wurstmeister/kafka:2.11-0.11.0.3
- // 启动 zookeeper
- docker run -d --name zookeeper --publish 2181:2181 --volume /etc/localtime:/etc/localtime wurstmeister/zookeeper
- // 启动 kafka
- docker run -d --name kafka --publish 9092:9092 \
- --link zookeeper \
- --env KAFKA_ZOOKEEPER_CONNECT=192.168.161.163:2181 \
- --env KAFKA_ADVERTISED_HOST_NAME=192.168.161.163 \
- --env KAFKA_ADVERTISED_PORT=9092 \
- --volume /etc/localtime:/etc/localtime \
- wurstmeister/kafka:2.11-0.11.0.3
部署完毕后就进入下一步运行啦.
第四步: 运行项目, 运行成功后, 我嗯可以在数据库中发现 cap 会自动在数据库中创建两张表, 一张是 发布信息表, 一张是接收信息表.
表:
发现 表中有数据存在:
数据体现法发送接收成功.
我们再来看看 cap 有提供的 UI 界面, 发现里面有一个我们用 consul 注册的服务器. 完美实现.
我们看一下 consul 集群:
四, 总结
欢迎大家积极尝试 CAP, 我也会在后续的项目中采用 CAP, 希望我们的社区越来越强大.
ASP.NET Core 交流群: 787464275 欢迎加群交流
如果您认为这篇文章还不错或者有所收获, 您可以点击右下角的[推荐] 按钮精神支持, 因为这种支持是我继续写作, 分享的最大动力!
来源: https://www.cnblogs.com/guolianyu/p/9756941.html