Uwl.Admin.Core 中使用 RabbitMQ 消息队列:
本文负责讲解 RabbitMQ 的使用
Uwl.Admin.Core 使用的技术有:
*,Async 和 Await 异步编程
*,Repository + Service 仓储模式编程; 仓储模式支持工作单元
*,Swagger 前后端文档说明, 基于 RESTful 风格编写接口
*,Cors 简单的跨域解决方案
*,JWT 自定义策略授权权限验证
*, 依赖注入选择的是官方自带的 DI 注入, 没有使用第三方框架, ORM 使用 EF Core, 数据库使用的是 Sql server,(后期会扩展 MySQL 版本);
*,AutoMapper 自动对象映射,
*,Linq To Sql \ lambda 表达式树查询;(表达式树查询是个人扩展的, 表达式树的使用方法请参考 Uwl.Data.Server.MenuServer 的多条件查询)
*, 登录认证方式使用 JWT 认证方式, 后台接口使用 SwaggerUI 展示, 角色权限使用 自定义权限处理器 PermissionHandler 继承与微软官方 IAuthorizationRequirement;
*,Excel 导入导出使用的是 Epplus 第三方框架, 导入导出只需要配置 Attribute 特性就好, 不需要在自己写列名; 导出只支持 List 导出, 暂时不支持 Datatable;(Excel 使用方法请参考 UserController 控制器)
*,Rabbit MQ 消息队列(目前暂无业务使用场景后期准备用来记录日志)
*,Redis 轻量级分布式缓存;(Redis 使用方法请参考 Uwl.Data.Server.MenuServer 类)
*,QuartzNet 第三方任务框架;(使用方法请参考类库 Uwl.ScheduledTask.Job.TestJobOne 类)
*,IdentityServer4 授权模式已开发完成, 未发布演示服务器代码在 GitHub;(Identityserver4Auth 分支)
RabbitMQ 简介
AMQP, 即 Advanced Message Queuing Protocol, 高级消息队列协议, 是应用层协议的一个开放标准, 为面向消息的中间件设计. 消息中间件主要用于组件之间的解耦, 消息的发送者无需知道消息使用者的存在, 反之亦然.
AMQP 的主要特征是面向消息, 队列, 路由(包括点对点和发布 / 订阅), 可靠性, 安全.
RabbitMQ 是一个开源的 AMQP 实现, 服务器端用 Erlang 语言编写, 支持多种客户端, 如: Python,Ruby,.NET,Java,JMS,C,PHP,ActionScript,XMPP,STOMP 等, 支持 Ajax. 用于在分布式系统中存储转发消息, 在易用性, 扩展性, 高可用性等方面表现不俗.
RabbitMQ 提供了可靠的消息机制, 跟踪机制和灵活的消息路由, 支持消息集群和分布式部署. 适用于排队算法, 秒杀活动, 消息分发, 异步处理, 数据同步, 处理耗时任务, CQRS 等应用场景.
RabbitMQ 安装
请参考我的第一篇博客:
安装完成之后访问 web 控制台
http:// 服务器 ip:15672/ 注意配置防火墙, 默认用户名密码都是 guest, 若新建用户一定要记得配置权限. guest 仅限 localhost 访问, 外网无法使用此账号!
.NET Core 使用 RabbitMQ
通过 nuget 安装: https://www.nuget.org/packages/RabbitMQ.Client/
定义生产者.
本文的代码生产者是基础的消息队列生产者, 源代码请看我的开源项目 UWl.Admin.Core https://github.com/GeorGeWzw/Uwl.Admin.Core
- public class RabbitServer: IRabbitMQ
- {
- private IConnection connection;
- private ConnectionFactory connectionFactory;
- public RabbitServer()
- {
- try
- {
- connectionFactory = new ConnectionFactory()
- {
- UserName = Appsettings.App(new string[] { "RabbitMQConfig", "UserName" }),
- Password = Appsettings.App(new string[] { "RabbitMQConfig", "Password" }),
- HostName = Appsettings.App(new string[] { "RabbitMQConfig", "HostName" }),
- AutomaticRecoveryEnabled= Convert.ToBoolean(Appsettings.App(new string[] { "RabbitMQConfig", "AutomaticRecoveryEnabled" })),
- TopologyRecoveryEnabled= Convert.ToBoolean(Appsettings.App(new string[] { "RabbitMQConfig", "TopologyRecoveryEnabled" })),
- };
- }
- catch (Exception)
- {
- throw;
- }
- }
- public IConnection GetConnection()
- {
- return this.connectionFactory.CreateConnection();
- }
- /// <summary>
- /// RabbitMQ 指定队列名称模式发送消息
- /// </summary>
- /// <param name="queuename">队列名字</param>
- /// <param name="obj">传输数据</param>
- public void SendData(string queuename, object obj)
- {
- connection = GetConnection();
- if (obj == null)
- return;
- if (connection == null)
- return;
- if (queuename.IsNullOrEmpty())
- return;
- using (connection)
- {
- using (var channel= connection.CreateModel())
- {
- // 声明一个队列 // 队列模式 一共有四种
- channel.QueueDeclare(queuename, false, false, false, null);
- // 第一个参数: 预计大小, 第二个参数每次读取几个, 第三个参数是否本地
- channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
- // 交付模式
- var prop = channel.CreateBasicProperties();
- // 非持久性 (1) 或持久性(2).
- prop.DeliveryMode = 2;
- // 将对象转化为 JSON 字符串
- var JSON = JsonConvert.SerializeObject(obj);
- // 将字符串转换为二进制
- var bytes= Encoding.UTF8.GetBytes(JSON);
- // 开始传送
- channel.BasicPublish("", queuename, prop,bytes);
- }
- }
- }
- }
定义消费者.
消费者我是使用. Net Core 控制台程序来写的源代码放到了百度网盘请自行下载 RebbitMQDemo 链接: https://pan.baidu.com/s/1n9CaSiAuB9t63Fh_YIU78A 提取码: 3939
- // 创建连接工厂
- ConnectionFactory factory = new ConnectionFactory
- {
- UserName = "wzw",
- Password = "wzw",
- HostName = "localhost"
- };
- // 创建连接
- var connection = factory.CreateConnection();
- // 创建通道
- var channel = connection.CreateModel();
- // 接收到的消息处理事件
- EventingBasicConsumer Recipient = new EventingBasicConsumer(channel);
- Recipient.Received += (ch, ea) =>
- {
- var RecipientMsg = Encoding.UTF8.GetString(ea.Body);
- Console.WriteLine($"后台处理方法收到消息:{RecipientMsg}");
- // 确认该消息已被处理
- channel.BasicAck(ea.DeliveryTag, false);
- Console.WriteLine($"消息已经处理[{ea.DeliveryTag}]");
- };
- channel.BasicConsume("hello", false, Recipient);
- Console.WriteLine("后台处理方法已启动");
- Console.ReadKey();
- channel.Dispose();
- connection.Close();
RabbitMQ 消费失败的处理
RabbitMQ 采用消息应答机制, 即消费者收到一个消息之后, 需要发送一个应答, 然后 RabbitMQ 才会将这个消息从队列中删除, 如果消费者在消费过程中出现异常, 断开连接切没有发送应答, 那么 RabbitMQ 会将这个消息重新投递.
使用 RabbitMQ 的 Exchange
前面我们可以看到生产者将消息投递到 Queue 中, 实际上这在 RabbitMQ 中这种事情永远都不会发生. 实际的情况是, 生产者将消息发送到 Exchange(交换器), 由 Exchange 将消息路由到一个或多个 Queue 中(或者丢弃)
AMQP 协议中的核心思想就是生产者和消费者隔离, 生产者从不直接将消息发送给队列. 生产者通常不知道是否一个消息会被发送到队列中, 只是将消息发送到一个交换机. 先由 Exchange 来接收, 然后 Exchange 按照特定的策略转发到 Queue 进行存储. 同理, 消费者也是如此. Exchange 就类似于一个交换机, 转发各个消息分发到相应的队列中.
来源: https://www.cnblogs.com/pual13/p/12313685.html