目录
ASP.NET Core 3.0 使用 gRPC
ASP.NET Core 3.0 gRPC 认证授权
一. 前言
在前一文 《ASP.NET Core 3.0 使用 gRPC》中有提到 gRPC 支持双向流调用, 支持实时推送消息, 这也是 gRPC 的一大特点, 且 gRPC 在对双向流的控制支持上也是非常强大的.
二. 什么是 gRPC 流
gRPC 有四种服务类型, 分别是: 简单 RPC(Unary RPC), 服务端流式 RPC (Server streaming RPC), 客户端流式 RPC (Client streaming RPC), 双向流式 RPC(Bi-directional streaming RPC). 它们主要有以下特点:
服务类型 | 特点 |
---|---|
简单 RPC | 一般的 rpc 调用,传入一个请求对象,返回一个返回对象 |
服务端流式 RPC | 传入一个请求对象,服务端可以返回多个结果对象 |
客户端流式 RPC | 客户端传入多个请求对象,服务端返回一个结果对象 |
双向流式 RPC | 结合客户端流式 RPC 和服务端流式 RPC,可以传入多个请求对象,返回多个结果对象 |
三. 为什么 gRPC 支持流
gRPC 通信是基于 HTTP/2 实现的, 它的双向流映射到 HTTP/2 流. HTTP/2 具有流的概念, 流是为了实现 HTTP/2 的多路复用. 流是服务器和客户端在 HTTP/2 连接内用于交换帧数据的独立双向序列, 逻辑上可看做一个较为完整的交互处理单元, 即表达一次完整的资源请求, 响应数据交换流程; 一个业务处理单元, 在一个流内进行处理完毕, 这个流生命周期完结.
特点如下:
一个 HTTP/2 连接可同时保持多个打开的流, 任一端点交换帧
流可被客户端或服务器单独或共享创建和使用
流可被任一端关闭
在流内发送和接收数据都要按照顺序
流的标识符自然数表示, 1~2^31-1 区间, 有创建流的终端分配
流与流之间逻辑上是并行, 独立存在
摘自 HTTP/2 笔记之流和多路复用 by 聂永
四. gRPC 中使用双向流调用
我们在前文中编写的 RPC 属于简单 RPC, 没有包含流调用, 下面直接讲一下双向流, 根据第二小节列举的四种服务类型, 如果我们掌握了简单 RPC 和双向流 RPC, 那么服务端流式 RPC 和客户端流式 RPC 自然也就没有问题了.
这里我们继续使用前文的代码, 要实现的目标是一次给多个猫洗澡.
1 首先在 LuCat.proto 定义两个 rpc, 一个 Count 用于统计猫的数量, 一个 双向流 RPC BathTheCat 用于给猫洗澡
- syntax = "proto3";
- option csharp_namespace = "AspNetCoregRpcService";
- import "google/protobuf/empty.proto";
- package LuCat; // 定义包名
- // 定义服务
- service LuCat{
- // 定义给猫洗澡双向流 rpc
- rpc BathTheCat(stream BathTheCatReq) returns ( stream BathTheCatResp);
- // 定义统计猫数量简单 rpc
- rpc Count(google.protobuf.Empty) returns (CountCatResult);
- }
- message SuckingCatResult{
- string message=1;
- }
- message BathTheCatReq{
- int32 id=1;
- }
- message BathTheCatResp{
- string message=1;
- }
- message CountCatResult{
- int32 Count=1;
- }
2 添加服务的实现
这里安利下 Resharper, 非常方便
- private readonly ILogger<LuCatService> _logger;
- private static readonly List<string> Cats=new List<string>(){"英短银渐层","英短金渐层","美短","蓝猫","狸花猫","橘猫"};
- private static readonly Random Rand=new Random(DateTime.Now.Millisecond);
- public LuCatService(ILogger<LuCatService> logger)
- {
- _logger = logger;
- }
- public override async Task BathTheCat(IAsyncStreamReader<BathTheCatReq> requestStream, IServerStreamWriter<BathTheCatResp> responseStream, ServerCallContext context)
- {
- var bathQueue=new Queue<int>();
- while (await requestStream.MoveNext())
- {
- // 将要洗澡的猫加入队列
- bathQueue.Enqueue(requestStream.Current.Id);
- _logger.LogInformation($"Cat {requestStream.Current.Id} Enqueue.");
- }
- // 遍历队列开始洗澡
- while (bathQueue.TryDequeue(out var catId))
- {
- await responseStream.WriteAsync(new BathTheCatResp() { Message = $"铲屎的成功给一只 {Cats[catId]} 洗了澡!" });
- await Task.Delay(500);// 此处主要是为了方便客户端能看出流调用的效果
- }
- }
- public override Task<CountCatResult> Count(Empty request, ServerCallContext context)
- {
- return Task.FromResult(new CountCatResult()
- {
- Count = Cats.Count
- });
- }
BathTheCat 方法会接收多个客户端发来的 CatId, 然后将他们加入队列中, 等客户端发送完成后开始依次洗澡并返回给客户端.
3 客户端实现
随机发送 10 个猫 Id 给服务端, 然后接收 10 个洗澡结果.
- var channel = GrpcChannel.ForAddress("https://localhost:5001");
- var catClient = new LuCat.LuCatClient(channel);
- // 获取猫总数
- var catCount = await catClient.CountAsync(new Empty());
- Console.WriteLine($"一共 {catCount.Count} 只猫.");
- var rand = new Random(DateTime.Now.Millisecond);
- var bathCat = catClient.BathTheCat();
- // 定义接收吸猫响应逻辑
- var bathCatRespTask = Task.Run(async() =>
- {
- await foreach (var resp in bathCat.ResponseStream.ReadAllAsync())
- {
- Console.WriteLine(resp.Message);
- }
- });
- // 随机给 10 个猫洗澡
- for (int i = 0; i <10; i++)
- {
- await bathCat.RequestStream.WriteAsync(new BathTheCatReq() {Id = rand.Next(0, catCount.Count)});
- }
- // 发送完毕
- await bathCat.RequestStream.CompleteAsync();
- Console.WriteLine("客户端已发送完 10 个需要洗澡的猫 id");
- Console.WriteLine("接收洗澡结果:");
- // 开始接收响应
- await bathCatRespTask;
- Console.WriteLine("洗澡完毕");
4 运行
可以看到双向流调用成功, 客户端发送了 10 个猫洗澡请求对象, 服务端返回了 10 个猫洗澡结果对象. 且是实时推送的, 这就是 gRPC 的双向流调用.
这里大家可以自行改进来演变成客户端流式或者服务端流式调用. 客户端发送一个猫 Id 列表, 然后服务端返回每个猫洗澡结果, 这就是服务端流式调用. 客户端依次发送猫 Id, 然后服务端一次性返回所有猫的洗澡结果(给所有猫洗澡看做是一个业务, 返回这个业务的结果), 就是客户端流式调用. 这里我就不再演示了.
五. 流控制
gRPC 的流式调用支持对流进行主动取消的控制, 进而可以衍生出流超时限制等控制.
在流式调用是, 可以传一个 CancellationToken 参数, 它就是我们用来对流进行取消控制的:
改造一下我们在第四小节的代码:
1 客户端
- var cts = new CancellationTokenSource();
- // 指定在 2.5s 后进行取消操作
- cts.CancelAfter(TimeSpan.FromSeconds(2.5));
- var bathCat = catClient.BathTheCat(cancellationToken: cts.Token);
- // 定义接收吸猫响应逻辑
- var bathCatRespTask = Task.Run(async() =>
- {
- try
- {
- await foreach (var resp in bathCat.ResponseStream.ReadAllAsync())
- {
- Console.WriteLine(resp.Message);
- }
- }
- catch (RpcException ex) when (ex.StatusCode == StatusCode.Cancelled)
- {
- Console.WriteLine("Stream cancelled.");
- }
- });
2 服务端
- // 遍历队列开始洗澡
- while (!context.CancellationToken.IsCancellationRequested && bathQueue.TryDequeue(out var catId))
- {
- _logger.LogInformation($"Cat {catId} Dequeue.");
- await responseStream.WriteAsync(new BathTheCatResp() { Message = $"铲屎的成功给一只 {Cats[catId]} 洗了澡!" });
- await Task.Delay(500);// 此处主要是为了方便客户端能看出流调用的效果
- }
3 运行
设置的是双向流式调用 2.5s 后取消流, 从客户端调用结果看到, 并没有收到全部 10 个猫的洗澡返回结果, 流就已经被取消了, 这就是 gRPC 的流控制.
六. 结束
这里流式调用可以实现实时推送, 服务端到客户端或者客户端到服务端短实时推送消息, 但是这个和传统意义上的长连接主动推送, 广播消息不一样, 不管你是服务端流式, 客户端流式还是双向流式, 必须要由客户端进行发起, 通过客户端请求来建立流通信.
七. 参考资料
GRPC 的四种服务类型 by twtydgo
HTTP/2 笔记之流和多路复用 by 聂永
本文所用代码
来源: https://www.cnblogs.com/stulzq/p/11590088.html