前言
通过前面的几篇文章, 讲解了一个短信服务的架构设计与实现. 然而初始方案并非 100% 完美的, 我们仍可以对该架构做一些优化与调整.
同时我也希望通过这篇文章与大家分享一下, 我的架构设计理念.
源码地址: https://github.com/SkyChenSky/Sikiro.SMS/tree/optimize (与之前的是另外的分支)
架构是设计的还是演变的?
架构
该词出自于建筑学. 软件架构定义是指软件系统的基础结构, 是系统中的实体及实体 (服务) 之间的关系所进行的抽象描述. 而架构设计的目的是为了解决软件系统复杂度带来的问题.
复杂度
系统复杂度主要有下面几点:
高可用
高性能
可扩展
安全性
维护成本
用户规模
业务规模
系统的复杂度导致的直接原因是业务规模. 为了用户流畅放心的使用产品, 不得不提高系统性能与安全. 当系统成为人们生活不可缺一部分时, 避免机房停电, 挖掘机挖断电缆导致的系统不可用, 不得不去思考同城跨机房同步, 异地多活的高可用方案.
答案并非二选一
我认为架构, 需要在已知可见的业务复杂度与用户规模的基础上进行架构设计; 伴随着技术积累与成长而对系统进行架构优化; 用户的日益增长, 业务的不断扩充, 迫使了系统的复杂度增加, 为了解决系统带来新的复杂度而进行架构演变.
因此, 架构方案是在已有的业务复杂度, 用户规模, 技术积累度, 人力时间成本等几个方面的取舍决策后的结果体现.
原架构
缺点分析
一般情况下, 调度任务轮询数据库, 90% 的动作是无用功, 频繁的数据库访问会对数据库增加不少压力.
为了让调度任务服务进行轮循数据, 需要在 API 优先进行数据持久化, 这无疑是降低了 API 的性能.
MongoDB 的 Update 操作相比于 Insert 操作时低效的, 对于日志类数据应增量添加.
因此从上述可见, 调度任务服务这块是优化关键点所在.
新架构图
使用了 RabbitMQ 的队列定时任务代替调度任务来实现定时发送.
抛弃了调度任务, 减少了调用链, 同时也减少了应用服务数据量.
对 SMS 集合在 MongoDB 里进行按年月的时间划分, 对于日志类数据可以在有效的时间范围外进行方便的归档, 删除. 同时也避免了同集合的数据量过大导致的查询效率缓慢.
队列定时任务
RabbitMQ 自身并没有定时任务, 然而可以通过消息的 Time-To-Live(过期时间)与 Dead Letter Exchange(死信交换机)的结合模拟定时发布的功能. 具体原理如下:
生产者发布消息, 并发布到已申明消息过期时间 (TTL) 的缓存队列(非真正业务消费队列)
消息在缓存队列等待消息过期, 然后由 Dead Letter Exchange 将消息重新分配到实际消费队列
消费者再从实际消费队列消费并完成业务
Dead Letter Exchange
Dead Letter Exchange 与平常的 Exchange 无异, 主要用于消息死亡后通过 Dead Letter Exchange 与 x-dead-letter-routing-key 重新分配到新的队列进行消费处理.
消息死亡的方式有三种:
消息进入了一条已经达到最大长度的队列
消息因为设置了 Time-To-Live 的导致过期
消息因 basic.reject 或者 basic.nack 动作而拒绝
Time-To-Live
两种消息过期的方式:
队列申明 x-message-ttl 参数
- var args = new Dictionary<string, object>();
- args.Add("x-message-ttl", 60000);
- model.QueueDeclare("myqueue", false, false, false, args);
每条消息发布声明 Expiration 参数
- byte[] messageBodyBytes = System.Text.Encoding.UTF8.GetBytes("Hello, world!");
- IBasicProperties props = model.CreateBasicProperties();
- props.ContentType = "text/plain";
- props.DeliveryMode = 2;
- props.Expiration = "36000000"
- model.BasicPublish(exchangeName,
- routingKey, props,
- messageBodyBytes);
RabbitMQ.Client 队列定时任务 Demo
- class Program
- {
- static void Main(string[] args)
- {
- var factory = new ConnectionFactory
- {
- HostName = "10.1.20.140",
- UserName = "admin",
- Password = "admin@ucsmy"
- };
- using (var connection = factory.CreateConnection())
- using (var channel = connection.CreateModel())
- {
- var queueName = "Queue.SMS.Test";
- var exchangeName = "Exchange.SMS.Test";
- var key = "Route.SMS.Test";
- DeclareDelayQueue(channel, exchangeName, queueName, key);
- DeclareReallyConsumeQueue(channel, exchangeName, queueName, key);
- var body = Encoding.UTF8.GetBytes("info: test dely publish!");
- channel.BasicPublish(exchangeName + ".Delay", key, null, body);
- }
- }
- private static void DeclareDelayQueue(IModel channel, string exchangeName, string queueName, string key)
- {
- var retryDic = new Dictionary<string, object>
- {
- {"x-dead-letter-exchange", exchangeName+".dl"},
- {"x-dead-letter-routing-key", key},
- {"x-message-ttl", 30000}
- };
- var ex = exchangeName + ".Delay";
- var qu = queueName + ".Delay";
- channel.ExchangeDeclare(ex, "topic");
- channel.QueueDeclare(qu, false, false, false, retryDic);
- channel.QueueBind(qu, ex, key);
- }
- private static void DeclareReallyConsumeQueue(IModel channel, string exchangeName, string queueName, string key)
- {
- var ex = exchangeName + ".dl";
- channel.ExchangeDeclare(ex, "topic");
- channel.QueueDeclare(queueName, false, false, false);
- channel.QueueBind(queueName, ex, key);
- }
- }
Sikiro.SMS 实现优化
上面介绍了队列定时任务基本原理, 然而我们需要自己的项目进行修改优化.
API 消息发布
EasyNetQ 是一款非常良好使用性的 RabbitMQ.Client 封装. 对队列定时任务他也已经提供了相应的方法 FuturePublish 给我们使用.
然而他的 FuturePublish 由有三种调度方式:
- DeadLetterExchangeAndMessageTtlScheduler
- DelayedExchangeScheduler
- ExternalScheduler
DelayedExchangeScheduler 是需要 EasyNetQ 项目提供的调度程序, 本质上也是轮询
ExternalScheduler 是通过使用 MQ 的插件.
DeadLetterExchangeAndMessageTtlScheduler 才是我们之前通过 DEMO 实现的方式, 在 EasyNetQ 组件上通过下面代码进行启用.
- services.RegisterEasyNetQ(_infrastructureConfig.Infrastructure.RabbitMQ, a =>
- {
- a.EnableDeadLetterExchangeAndMessageTtlScheduler();
- });
下面代码是 Sikiro.SMS.Api 的优化改造:
- /// <summary>
- /// 添加短信记录
- /// </summary>
- /// <param name="model"></param>
- /// <returns></returns>
- [HttpPost]
- public ActionResult Post([FromBody] List<PostModel> model)
- {
- _smsService.Page(model.MapTo<List<PostModel>, List<AddSmsModel>>());
- ImmediatelyPublish();
- TimingPublish();
- return Ok();
- }
- /// <summary>
- /// 及时发送
- /// </summary>
- private void ImmediatelyPublish()
- {
- _smsService.SmsList.Where(a => a.TimeSendDateTime == null).ToList().MapTo<List<SmsModel>, List<SmsQueueModel>>()
- .ForEach(
- item =>
- {
- _bus.Publish(item, SmsQueueModelKey.Topic);
- });
- }
- /// <summary>
- /// 定时发送
- /// </summary>
- private void TimingPublish()
- {
- _smsService.SmsList.Where(a => a.TimeSendDateTime != null).ToList()
- .ForEach(
- item =>
- {
- _bus.FuturePublish(item.TimeSendDateTime.Value.ToUniversalTime(), item.MapTo<SmsModel, SmsQueueModel>(),
- SmsQueueModelKey.Topic);
- });
- }
重发机制
重发一般是请求服务超时的情况下使用. 而导致这种原因的主要几点是网络波动, 服务压力过大. 因为前面任意一种原因都无法在短时间恢复, 因此对于简单的重试 类似 while(i<3)ReSend() 是没有什么意义的.
因此我们需要借助队列定时任务 + 发送次数 * 延迟时间来完成有效的非频繁的重发.
- public void Start()
- {
- Console.WriteLine("I started");
- _bus.Subscribe<SmsQueueModel>("", msg =>
- {
- try
- {
- _smsService.Send(msg.MapTo<SmsQueueModel, SmsModel>());
- }
- catch (webException e)
- {
- e.WriteToFile();
- ReSend();
- }
- catch (Exception e)
- {
- e.WriteToFile();
- }
- }, a =>
- {
- a.WithTopic(SmsQueueModelKey.Topic);
- });
- }
- private void ReSend()
- {
- var model = _smsService.Sms.MapTo<SmsModel, SmsQueueModel>();
- model.SendCount++;
- _bus.FuturePublish(TimeSpan.FromSeconds(30 * model.SendCount), model, SmsQueueModelKey.Topic);
- }
SMS 日志集合维度
SMS 日志作为非必要业务的运维型监控数据, 在需要的时候随时可以对此进行删除或者归档处理. 因此以时间 (年月) 作为集合维度, 可以很好的对日志数据进行管理.
mongoProxy.Add(MongoKey.SmsDataBase, MongoKey.SmsCollection + "_" + DateTime.Now.ToString("yyyyMM"), model);
结束
经过本系列 6 篇的文章, 介绍了以短信服务为业务场景, 基于. net core 平台的一个简单架构设计, 架构优化与服务实现的实践例子. 希望我的分享能帮助有需要的朋友. 如果有任何好的建议请到下方给我留言.
来源: https://www.cnblogs.com/skychen1218/p/9565198.html