在 ASP.NET Core 上利用 MassTransit 来集成使用 RabbitMQ 真的很简单, 代码也很简洁. 近期因为项目需要, 我便在这基础上再次进行了封装, 抽成了公共方法, 使得使用 RabbitMQ 的调用变得更方便简洁. 那么, 就让咱们来瞧瞧其魅力所在吧.
MassTransit
先看看 MassTransit 是个什么宝贝 (MassTransit 官网的简介):
MassTransit 是一个免费的开源轻量级消息总线, 用于使用. NET 框架创建分布式应用程序. MassTransit 在现有的顶级消息传输上提供了一系列广泛的功能, 从而以开发人员友好的方式使用基于消息的会话模式异步连接服务. 基于消息的通信是实现面向服务的体系结构的可靠且可扩展的方式.
通俗描述:
MassTransit 就是一套基于消息服务的高级封装类库, 下游可联接 RabbitMQ,Redis,MongoDb 等服务.
github 官网: https://github.com/MassTransit/MassTransit
RabbitMQ
RabbitMQ 是成熟的 MQ 队列服务, 是由 Erlang 语言开发的 AMQP 的开源实现. 关于介绍 RabbitMQ 的中文资料也很多, 有需要可以自行查找. 我这里贴出其官网与下载安装的链接, 如下:
官网: http://www.rabbitmq.com/
下载与安装: http://www.rabbitmq.com/download.html
实现代码
通过上面的介绍, 咱们已对 MassTransit 与 RabbitMQ 有了初步了解, 那么现在来看看如何在 ASP.NET Core 上优雅的使用 RabbitMQ 吧.
1, 创建一个名为 "RabbitMQHelp.cs" 公共类, 用于封装操作 RabbitMQ 的公共方法, 并通过 Nuget 来管理并引用 "MassTransit" 与 "MassTransit.RabbitMQ" 类库.
2,"RabbitMQHelp.cs" 公共类主要对外封装两个静态方法, 其代码如下:
- using MassTransit;
- using MassTransit.RabbitMqTransport;
- using System;
- using System.Collections.Generic;
- using System.Text;
- using System.Threading.Tasks;
- namespace Lezhima.Comm
- {
- /// <summary>
- /// RabbitMQ 公共操作类, 基于 MassTransit 库
- /// </summary>
- public class RabbitMQHelp
- {
- #region 交换器
- /// <summary>
- /// 操作日志交换器
- /// 同时需在 RabbitMQ 的管理后台创建同名交换器
- /// </summary>
- public static readonly string actionLogExchange = "Lezhima.ActionLogExchange";
- #endregion
- #region 声明变量
- /// <summary>
- /// MQ 联接地址, 建议放到配置文件
- /// </summary>
- private static readonly string mqUrl = "rabbitmq://192.168.6.181/";
- /// <summary>
- /// MQ 联接账号, 建议放到配置文件
- /// </summary>
- private static readonly string mqUser = "admin";
- /// <summary>
- /// MQ 联接密码, 建议放到配置文件
- /// </summary>
- private static readonly string mqPwd = "admin";
- #endregion
- /// <summary>
- /// 创建连接对象
- /// 不对外公开
- /// </summary>
- private static IBusControl CreateBus(Action<IRabbitMqBusFactoryConfigurator, IRabbitMqHost> registrationAction = null)
- {
- // 通过 MassTransit 创建 MQ 联接工厂
- return Bus.Factory.CreateUsingRabbitMq(cfg =>
- {
- var host = cfg.Host(new Uri(mqUrl), hst =>
- {
- hst.Username(mqUser);
- hst.Password(mqPwd);
- });
- registrationAction?.Invoke(cfg, host);
- });
- }
- /// <summary>
- /// MQ 生产者
- /// 这里使用 fanout 的交换类型
- /// </summary>
- /// <param name="obj"></param>
- public async static Task PushMessage(string exchange, object obj)
- {
- var bus = CreateBus();
- var sendToUri = new Uri($"{mqUrl}{exchange}");
- var endPoint = await bus.GetSendEndpoint(sendToUri);
- await endPoint.Send(obj);
- }
- /// <summary>
- /// MQ 消费者
- /// 这里使用 fanout 的交换类型
- /// consumer 必需是实现 IConsumer 接口的类实例
- /// </summary>
- /// <param name="obj"></param>
- public static void ReceiveMessage(string exchange, object consumer)
- {
- var bus = CreateBus((cfg, host) =>
- {
- // 从指定的消息队列获取消息 通过 consumer 来实现消息接收
- cfg.ReceiveEndpoint(host, exchange, e =>
- {
- e.Instance(consumer);
- });
- });
- bus.Start();
- }
- }
- }
3,"RabbitMQHelp.cs" 公共类已经有了 MQ"生产者" 与 "消费者" 两个对外的静态公共方法, 其中 "生产者" 方法可以在业务代码中直接调用, 可传递 JSON, 对象等类型的参数向指定的交换器发送数据. 而 "消费者" 方法是从指定交换器中进行接收绑定, 但接收到的数据处理功能则交给了 "consumer" 类 (因为在实际项目中, 不同的数据有不同的业务处理逻辑, 所以这里我们直接就通过 IConsumer 接口交给具体的实现类去做了). 那么, 下面我们再来看看消费者里传递进来的 "consumer" 类的代码吧:
- using MassTransit;
- using System;
- using System.Collections.Generic;
- using System.Text;
- using System.Threading.Tasks;
- namespace Lezhima.Storage.Consumer
- {
- /// <summary>
- /// 从 MQ 接收并处理数据
- /// 实现 MassTransit 的 IConsumer 接口
- /// </summary>
- public class LogConsumer : IConsumer<ActionLog>
- {
- /// <summary>
- /// 重写 Consume 方法
- /// 接收并处理数据
- /// </summary>
- /// <param name="context"></param>
- /// <returns></returns>
- public Task Consume(ConsumeContext<ActionLog> context)
- {
- return Task.Run(async () =>
- {
- // 获取接收到的对象
- var amsg = context.Message;
- Console.WriteLine($"Recevied By Consumer:{amsg}");
- Console.WriteLine($"Recevied By Consumer:{amsg.ActionLogId}");
- });
- }
- }
- }
调用代码
1, 生产者调用代码如下:
- /// <summary>
- /// 测试 MQ 生产者
- /// </summary>
- /// <returns></returns>
- [HttpGet]
- public async Task<MobiResult> AddMessageTest()
- {
- // 声明一个实体对象
- var model = new ActionLog();
- model.ActionLogId = Guid.NewGuid();
- model.CreateTime = DateTime.Now;
- model.UpdateTime = DateTime.Now;
- // 调用 MQ
- await RabbitMQHelp.PushMessage(RabbitMQHelp.actionLogExchange, model);
- return new MobiResult(1000, "操作成功");
- }
2, 消费者调用代码如下:
- using Lezhima.Storage.Consumer;
- using Microsoft.Extensions.Configuration;
- using System;
- using System.IO;
- namespace Lezhima.Storage
- {
- class Program
- {
- static void Main(string[] args)
- {
- var conf = new ConfigurationBuilder()
- .SetBasePath(Directory.GetCurrentDirectory())
- .AddJsonFile("appsettings.json", true, true)
- .Build();
- // 调用接收者
- RabbitMQHelp.ReceiveMessage(RabbitMQHelp.actionLogExchange,
- new LogConsumer()
- );
- Console.ReadLine();
- }
- }
- }
总结
1, 基于 MassTransit 库使得我们使用 RabbitMQ 变得更简洁, 方便. 而基于再次封装后, 生产者与消费者将不需要关注具体的业务, 也跟业务代码解耦了, 更能适应项目的需要.
2,RabbitMQ 的交换器需在其管理后台自行创建, 而这里使用的 fanout 类型是因为其发送速度最快, 且能满足我的项目需要, 各位可视自身情况选用不同的类型. fanout 类型不会存储消息, 必需要消费者绑定交换器后才会发送给消费者.
来源: https://www.cnblogs.com/Andre/p/9579764.html