熟悉 TPL Dataflow 博文的朋友可能记得这是个单体程序, 使用 TPL Dataflow 处理工作流任务, 在使用 Docker 部署的过程中, 有一个问题一直无法回避:
在单体程序部署的瞬间会有少量 流量无法处理; 更糟糕的情况下, 迭代部署的这个版本有问题, 上线后无法运作, 更多的流量没有得到处理.
背负神圣使命 (巨大压力) 的程序猿心生一计, 为何不将单体程序改成分布式: 服务 A 只接受数据, 服务 B 只处理数据.
知识储备:
消息队列和订阅发布作为老生常谈的两个知识点被反复提及, 按照 JMS 的规范, 官方称为点对点(point to point, queue) 和 订阅发布(publish/subscribe,topic ),
点对点:
消息生产者生产消息发送到 queue 中, 然后消费者从 queue 中取出并且消费消息.
注意:
消息被消费以后, queue 中不再有存储, 所以消费者不可能消费到已经被消费的消息.
Queue 支持存在多个消费者, 但是对一个消息而言, 只会有一个消费者可以消费.
当没有消费者可用时, 这个消息会被保存直到有 一个可用的消费者.
发布 / 订阅
消息生产者 (发布) 将消息发布到 topic 中, 同时有多个消息消费者 (订阅) 消费该消息. 和点对点方式不同, 发布到 topic 的消息会被所有订阅者消费.
注意:
发布者将消息发布到通道中, 而不用知道订阅者是谁(不关注是否存在订阅者); 订阅者可收听自己感兴趣的多个通道, 也不需要知道发布者是谁(不关注是哪个发布者).
故如果没有消费者, 发布的消息将得不到处理;
头脑风暴
本次采用的消息队列模型:
解耦业务: 新建 Receiver 程序作为生产者, 专注于接收并发送到队列; 原有的 webapp 作为消费者专注数据处理.
起到削峰填谷的作用, 若建立多个消费者 webapp 容器, 还能形成负载均衡的效果.
Redis 原生支持发布 / 订阅 https://redis.io/commands/publish 模型, 内置的 List 数据结构 https://redis.io/commands/lpush 亦能形成轻量级 MQ 的效果.
需要关注 Redis 两个命令( 左进右出, 右进左出同理):
LPUSH & RPOP/BRPOP https://redis.io/commands/brpop
Brpop 中的 B 表示 "Block", 是一个 rpop 命令的阻塞版本: 若指定 List 没有新元素, 在给定时间内, 该命令会阻塞当前 Redis 客户端连接, 直到超时返回 nil
编程实践
本次使用 AspNetCore 完成 RedisMQ 的实践. 引入 Redis 国产第三方开源库 CSRedisCore.
不使用著名的 StackExchange.Redis 组件库的原因:
之前一直使用 StackExchange.Redis, 参考了很多资料, 做了很多优化, 并未完全解决 RedisTimeoutException 问题
StackExchange.Redis 基于其多路复用机制, 不支持阻塞式命令, 故采用了 CSRedisCore, 该库强调了 API 与 Redis 官方命令一致, 很容易上手
生产者 Receiver:
------------------ 截取自 Startup.cs------------------------------
- public void ConfigureServices(IServiceCollection services)
- {
- var csredis = new CSRedisClient(Configuration.GetConnectionString("redis"));
- RedisHelper.Initialization(csredis);
- services.AddMvc();
- }
--------------------- 截取自数据接收 Controller-------------------
- [Route("batch")]
- [HttpPost]
- public async Task BatchPutEqidAndProfileIds([FromBody]List<EqidPair> eqidPairs)
- {
- if (!ModelState.IsValid)
- throw new ArgumentException("Http Body Payload Error.");
- var redisKey = $"{DateTime.Now.ToString("yyyyMMdd")}";
- eqidPairs = await EqidExtractor.EqidExtractAsync(eqidPairs);
- if (eqidPairs != null && eqidPairs.Any())
- RedisHelper.LPush(redisKey, eqidPairs.ToArray());
- await Task.CompletedTask;
- }
消费者 webapp:
根据 RedisMQ 的事件推送方式, 需要轮询 Redis List 数据结构, 这里使用 AspNetCore 内置的 BackgroundService 实现了 后台轮询任务.
- public class BackgroundJob : BackgroundService
- {
- private readonly IEqidPairHandler _eqidPairHandler;
- private readonly ILogger _logger;
- public BackgroundJob(IEqidPairHandler eqidPairHandler, ILoggerFactory loggerFactory)
- {
- _eqidPairHandler = eqidPairHandler;
- _logger = loggerFactory.CreateLogger(nameof(BackgroundJob));
- }
- protected override async Task ExecuteAsync(CancellationToken stoppingToken)
- {
- _logger.LogInformation("Service starting");
- while (!stoppingToken.IsCancellationRequested)
- {
- var key = $"eqidpair:{DateTime.Now.ToString("yyyyMMdd")}";
- var eqidpair = RedisHelper.BRPop(5, key);
- if (eqidpair != null)
- await _eqidPairHandler.AcceptEqidParamAsync(JsonConvert.DeserializeObject<EqidPair>(eqidpair));
- else
- await Task.Delay(1000, stoppingToken);
- }
- _logger.LogInformation("Service stopping");
- }
- }
- var Redis = new CSRedisClient[16]; // 定义成单例
- for (var a = 0; a <Redis.Length; a++)
- Redis[a] = new CSRedisClient(Configuration.GetConnectionString("redis") + ",defualtDatabase=" + a);
- services.AddSingleton<CSRedisClient[]>(Redis);
- RedisHelper.Initialization(Redis[0]);
注册 CSRedisCore 服务
最后依照引言中的部署原理图, 将 Nginx,Receiver, WebApp dockerize, 并且让 webapp 依赖于 Nginx,Receiver
------------------- 截取自 docker-compose.YAML 文件 ----------------------
- App:
- build:
- context: ./App
- dockerfile: Dockerfile
- expose:
- - "80"
- extra_hosts:
- - "dockerhost:172.18.0.1"
- environment:
- TZ: Asia/Shanghai
- volumes:
- - type: bind
- source: /mnt/eqidmanager/eqidlogs
- target: /App/eqidlogs
- - type: bind
- source: /mnt/eqidmanager/applogs
- target: /App/logs
- - type: bind
- source: /home/huangjun/eqidmanager/EqidManager.db
- target: /App/EqidManager.db
- healthcheck:
- test: ['CMD','curl','-f','http://localhost/healthcheck']
- interval: 1m30s
- timeout: 10s
- retries: 3
- depends_on:
- - receiver
- - proxy
- logging:
- options:
- max-size: "200k"
- max-file: "10"
- privileged: true
1 根据 docker-compsoe up https://docs.docker.com/compose/reference/up/ 命令的用法, 若 Receiver 容器正在运行且服务配置并未改变, 该容器不会被停止.
2 根据官方文档对于 depends_on https://docs.docker.com/compose/compose-file/ 指令的说明, 该指定决定了容器启动和停止的顺序, 因此引言中需要 [暂存流量] 刚性需求可以得到满足
改造上线之后, 效果很明显, 现在可以放心安全的迭代 TPL DataFlow 数据处理程序.
作者: JulianHuang
码甲拙见, 如有问题请下方留言大胆斧正; 码字 + Visio 制图, 均为原创, 看官请不吝好评 + 关注, ~..~
来源: https://www.cnblogs.com/JulianHuang/p/11314789.html