C# 8.0 中, 提供了一种新的 IAsyncEnumerable<T > 接口, 在对集合进行迭代时, 支持异步操作. 比如在读取文本中的多行字符串时, 如果读取每行字符串的时候使用同步方法, 那么会导致线程堵塞. IAsyncEnumerable<T > 可以解决这种情况, 在迭代的时候支持使用异步方法. 也就是说, 之前我们使用 foreach 来对 IEnumerable 进行迭代, 现在可以使用 await foreach 来对 IAsyncEnumerable<T > 进行迭代, 每个项都是可等待的. 这种新的接口称为 async-streams, 将会随. NET Core 3 发布. 我们来看一下如何在 LINQ 中实现异步的迭代.
使用常规的 IEnumerable<T>
首先我们创建一个新的 Console 项目, 基于. NET Core 3:
- namespace AsyncLinqDemo
- {
- class Program
- {
- static void Main(string[] args)
- {
- Console.WriteLine("Input the file path:");
- var file = Console.ReadLine();
- var lines = ReadAllLines(file);
- foreach (var line in lines)
- {
- Console.WriteLine(line);
- }
- }
- static IEnumerable<string> ReadAllLines(string file)
- {
- using (var fs = File.OpenRead(file))
- {
- using (var sr = new StreamReader(fs))
- {
- while (true)
- {
- string line = sr.ReadLine();
- if(line == null)
- {
- break;
- }
- yield return line;
- }
- }
- }
- }
- }
- }
这是一个很简单的 Console 程序, 实现了一个简单的返回类型为 IEnumerable<string > 的 ReadAllLines(string file) 方法, 从文本文件中逐行读取文本, 并逐行输出. 如果文本内容较少的话, 没什么问题. 但如果我们使用过 aync/await, 就会了解, 在 IO 操作如读取或写入文件的时候, 最好使用异步方法以避免线程阻塞. 让我们来改进一下.
使用异步的 IAsyncEnumerable<T>
可以优化的是下面这句:
string line = sr.ReadLine();
对于 IO 操作, 最好使用异步方式. 这里可使用相应的异步方法:
string line = await sr.ReadLineAsync();
我们说 "异步是传染的", 如果这里使用异步, 那么相应的该方法的返回值也要使用异步, 所以需要将返回值改为 static async Task<IEnumerable<string>>, 但这样会得到一个错误:
ErrorCS1624The body of 'Program.ReadAllLines(string)' cannot be an iterator block because 'Task<IEnumerable<string>>' is not an iterator interface typeAsyncLinqDemoC:\Source\Workspaces\Console\AsyncLinqDemo\AsyncLinqDemo\Program.cs23Active
因为 Task<IEnumerable<string>> 并不是一个可以迭代的接口类型, 所以我们无法在方法内部使用 yield 关键字. 解决问题的办法是使用新的 IAsyncEnumerable 接口:
- static async IAsyncEnumerable<string> ReadAllLines(string file)
- {
- using (var fs = File.OpenRead(file))
- {
- using (var sr = new StreamReader(fs))
- {
- while (true)
- {
- string line = await sr.ReadLineAsync();
- if(line == null)
- {
- break;
- }
- yield return line;
- }
- }
- }
- }
按 F12 查看该接口的定义:
- namespace System.Collections.Generic
- {
- public interface IAsyncEnumerable<out T>
- {
- IAsyncEnumerator<T> GetAsyncEnumerator(CancellationTokencancellationToken = default);
- }
- }
这是一个异步的迭代器, 并提供了 CancellationToken. 再按 F12 查看 IAsyncEnumerator<T > 的定义, 可发现里面是这样的:
- namespace System.Collections.Generic
- {
- public interface IAsyncEnumerator<out T> : IAsyncDisposable
- {
- T Current { get; }
- ValueTask<bool> MoveNextAsync();
- }
- }
这里 MoveNextAsync() 方法实际是返回了一个结果类型为 bool 的 Task, 每次迭代都是可等待的, 从而实现了迭代器的异步.
使用 await foreach 消费 IAsyncEnumerable<T>
当我们做了以上改动之后, ReadAllLines() 方法返回的是一个支持异步的 IAsyncEnumerable, 那么在使用的时候, 也不能简单的使用 foreach 了. 修改 Main 方法如下:
- static async Task Main(string[] args)
- {
- Console.WriteLine("Input the file path:");
- var file = Console.ReadLine();
- var lines = ReadAllLines(file);
- await foreach (var line in lines)
- {
- Console.WriteLine(line);
- }
- }
首先在 foreach 之前添加 await 关键字, 还要需要将 Main 方法由 void 改为 async Task. 这样整个程序都是异步执行了, 不会再导致堵塞了. 这个例子只是一个简单的 demo, 是否使用异步并不会感觉到明显的区别. 如果在迭代内部需要比较重的操作, 如从网络获取大量数据或读取大量磁盘文件, 异步的优势还是会比较明显的.
使用 LINQ 消费 IAsyncEnumerable<T>
使用 LINQ 来操作集合是常用的功能. 如果使用 IEnumberable, 在 Main 方法中可以做如下改动:
- var lines = ReadAllLines(file);
- var res = from line in lines where line.StartsWith("ERROR:") selectline.Substring("ERROR:".Length);
- foreach (var line in res)
- {
- Console.WriteLine(line);
- }
或:
var res = lines.Where(x => x.StartsWith("ERROR:")).Select(x => x.Substring("ERROR:".Length));
如果使用了新的 IAsyncEnumerable, 你会发现无法使用 Where 等操作符了:
ErrorCS1936Could not find an implementation of the query pattern for source type 'IAsyncEnumerable<string>'. 'Where' not found.AsyncLinqDemoC:\Source\Workspaces\Console\AsyncLinqDemo\AsyncLinqDemo\Program.cs16Active
目前 LINQ 还没有提供对 IAsyncEnumerable 的原生支持, 不过微软提供了一个 Nuget 包来实现此功能. 在项目中打开 Nuget Package Manger 搜索安装 System.Linq.Async, 注意该包目前还是预览版, 所以要勾选 include prerelease 才能看到. 安装该 Nuget 包后, Linq 查询语句中的错误就消失了.
在 System.Linq.Async 这个包中, 对每个同步的 LINQ 方法都做了相应的扩展. 所以基本上代码无需什么改动即可正常编译.
对于 LINQ 中的条件语句, 也可以使用 WhereAwait() 方法来支持 await:
public static IAsyncEnumerable<TSource> WhereAwait<TSource>(thisIAsyncEnumerable<TSource> source, Func<TSource, int, ValueTask<bool>>predicate);
如需要在条件语句中进行 IO 或网络请求等异步操作, 可以这样用:
var res = lines.WhereAwait(async x => await DoSomeHeavyOperationsAsync(x));
DoSomeHeavyOperationsAsync 方法的签名如下:
- private static ValueTask<bool> DoSomeHeavyOperationsAsync(string x)
- {
- //Do some works...
- }
小结
通过以上的示例, 我们简要了解了如何使用 IAsyncEnumerable 接口以及如何在 LINQ 中实现异步查询. 在使用该接口时, 我们需要创建一个自定义方法返回 IAsyncEnumerable<T > 来代替 IEnumberable<T>, 这个方法可称为 async-iterator 方法, 需要注意以下几点:
该方法应该被声明为 async.
返回 IAsyncEnumerable<T>.
同时使用 await 及 yield. 如 await foreach,yield return 或 yield break 等.
例如:
- async IAsyncEnumerable<int> GetValuesFromServer()
- {
- while (true)
- {
- IEnumerable<int> batch = await GetNextBatch();
- if (batch == null) yield break;
- foreach (int item in batch)
- {
- yield return item;
- }
- }
- }
此外还有一些限制:
无法在 try 的 finally 块中使用任何形式的 yield 语句.
无法在包含任何 catch 语句的 try 语句中使用 yield return 语句.
期待. NET Core 3 的正式发布!
了解新西兰 IT 行业真实码农生活
请长按上方二维码关注 "程序员在新西兰"
来源: https://www.cnblogs.com/yanxiaodi/p/Support-IAsyncEnumerable-with-LINQ.html