Actor 是一种高并发处理模型, 每个 Actor 都有着自己的状态有序消息处理机制, 所以在业务处理的情况并不需要制定锁的机制, 从而达到更高效的处理能性. XRPC 是一个基于远程接口调用的 RPC 组件, 它可以简单地实现高性能的远程接口调用; XRPC 在创建远程接口时是支持针对接口创建对应的 Actor 实例. 当创建接口 Actor 后, 所有 Client 针对这一例实例 Actor 的所有方法调用都是有序处理. 以下介绍如何在 XRPC 创建并使用 Actor
什么场景适用于 Actor
既然每个 Actor 都有着自己的状态和顺序处理机制, 那可以针对这优点进行相关业务的展开; 在棋牌游戏中的桌子可以是一个 Actor, 车票里的每一辆车可以是一个 Actor, 秒杀里的每一种商品是一个 Actor. 在这些 Actor 所有的操作都是有序进行, 不存在锁, 也不需要事务 (通过 EventSourcing 来保障) 和不会产生死锁; 数据变更全内存操作, 通过这样可以大提高业务的处理性能.
引用 XRPC
Install-Package BeetleX.XRPC
定义 Actor 的 RPC 服务
XRPC 支持的 Actor 服务功能是建立在 EventNext 之上, 它的好处是直接接口行为的 Actor 创建, 而不是传统的消息加接收方法, 这样在应用设计和调用上也是非常方便灵活. 接下来定义一个简单的 Actor 服务
接口定义
- public interface IAmountService
- {
- Task<int> Income(int amount);
- Task<int> Payout(int amount);
- Task<int> Get();
- }
以上是一个简单的帐记变更行为接口
实现接口
- [Service(typeof(IAmountService))]
- public class AmountService : ActorState, IAmountService
- {
- private int mAmount;
- public override Task ActorInit(string id)
- {
- return base.ActorInit(id);
- }
- public Task<int> Get()
- {
- return mAmount.ToTask();
- }
- public Task<int> Income(int amount)
- {
- mAmount += amount;
- return mAmount.ToTask();
- }
- public Task<int> Payout(int amount)
- {
- mAmount -= amount;
- return mAmount.ToTask();
- }
- }
启动对应的 RPC 服务
- private static XRPCServer mXRPCServer;
- static void Main(string[] args)
- {
- mXRPCServer = new XRPCServer();
- mXRPCServer.ServerOptions.LogLevel = LogType.Error;
- mXRPCServer.Register(typeof(Program).Assembly);
- mXRPCServer.Open();
- Console.Read();
- }
以上代码是在默认端口 9090 上绑定 RPC 服务, 可以通过运行日志查看服务启动情况
创建远程 Actor 调用
创建 RPC Client
- client = new XRPCClient("192.168.2.18", 9090);
- client.Connect();
以上代码是创建一个 RPC 客户端, 通过它的 Create 可以创建接口代理
创建接口 Actor 实例
- IAmountService henry = client.Create<IAmountService>("henry");
- IAmountService ken = client.Create<IAmountService>("ken");
以上是针对 IAmountService 创建了两个 Actor 对象, 这两个对象的操作都是相互隔离互不干扰; 每个 Actor 对象中的方法在并发下都是有序执行, 并不会产生线程安全问题, 所以在不同方法中操作对像的数据成员都不需要锁来保证数据安全性.
测试
为了更好地验证 Actor 的隔离和并发安全性, 简单地并发测试一下
- for (int i = 0; i <concurrent; i++)
- {
- var task = Task.Run(async () =>
- {
- for (int k = 0; k <requests; k++)
- {
- await henry.Income(10);
- System.Threading.Interlocked.Increment(ref mCount);
- }
- });
- tasks.Add(task);
- task = Task.Run(async () =>
- {
- for (int k = 0; k <requests; k++)
- {
- await henry.Payout(10);
- System.Threading.Interlocked.Increment(ref mCount);
- }
- });
- tasks.Add(task);
- task = Task.Run(async () =>
- {
- for (int k = 0; k <requests; k++)
- {
- await ken.Income(10);
- System.Threading.Interlocked.Increment(ref mCount);
- }
- });
- tasks.Add(task);
- task = Task.Run(async () =>
- {
- for (int k = 0; k < requests; k++)
- {
- await ken.Payout(10);
- System.Threading.Interlocked.Increment(ref mCount);
- }
- });
- tasks.Add(task);
- }
- await Task.WhenAll(tasks.ToArray());
- double useTime = EventCenter.Watch.ElapsedMilliseconds - start;
- Console.WriteLine($"Completed count:{mCount}|use time:{useTime}|rps:{(mCount / useTime * 1000d):###.00} |henry:{await henry.Get()},ken:{await ken.Get()}");
两个程序同时在本机跑了一下, 在 50 并发的情况大概是 11 万 RPS
服务中的 Actor 隔离性
服务是通过名称来实例化接口的不同 Actor, 同一服务即使多个 Client 同时对一名称的 Actor 进行创建服务也可以保证它的唯一性.
完整示例代码
来源: https://www.cnblogs.com/smark/p/10941819.html