问题
正在 await 一批任务, 希望在每个任务完成时对它做一些处理. 另外, 希望在任务一完成就立即进行处理, 而不需要等待其他任务.
问题的重点在于希望任务完成之后立即进行处理, 而不去等待其他任务.
这里还沿用文中的例子.
等待几秒钟之后返回等待的秒数, 之后立即打印任务等待的秒数.
等待的函数如下
- static async Task<int> DelayAndReturnAsync(int val)
- {
- await Task.Delay(TimeSpan.FromSeconds(val));
- return val;
- }
以下方法执行之后的打印结果是 "2", "3", "1". 想得到结果 "1", "2", "3" 应该如何实现.
- static async Task ProcessTasksAsync()
- {
- // 创建任务队列.
- Task<int> taskA = DelayAndReturnAsync(2);
- Task<int> taskB = DelayAndReturnAsync(3);
- Task<int> taskC = DelayAndReturnAsync(1);
- var tasks = new[] { taskA, taskB, taskC };
- // 按顺序 await 每个任务.
- foreach (var task in tasks)
- {
- var result = await task;
- Trace.WriteLine(result);
- }
- }
文中给了两种解决方案. 一种是抽出更高级的 async 方法, 一种是借助作者的 nuget 拓展. 作者还推荐了另外两个博客文章.
- Processing tasks as they complete
- ORDERING BY COMPLETION, AHEAD OF TIME
这两篇文章介绍了更多处理方法.
抽象方法, 并发执行
- static async Task AwaitAndProcessAsync(Task<int> task)
- {
- var result = await task;
- Trace.WriteLine(result);
- }
将执行和处理抽象出来, 借助 Task.WhenAll 和 LINQ 并发执行.
- var processingTasks = (from t in tasks
- select AwaitAndProcessAsync(t)).ToArray();
- // 等待全部处理过程的完成.
- await Task.WhenAll(processingTasks);
或者
- var processingTasks = tasks.Select(async t =>
- {
- var result = await t;
- Trace.WriteLine(result);
- }).ToArray();
- // 等待全部处理过程的完成.
- await Task.WhenAll(processingTasks);
借助 nuget 拓展: Nito.AsyncEx
推荐预发布版本:
需要添加引用 using Nito.AsyncEx;
- static async Task UseOrderByCompletionAsync()
- {
- // 创建任务队列.
- Task<int> taskA = DelayAndReturnAsync(2);
- Task<int> taskB = DelayAndReturnAsync(3);
- Task<int> taskC = DelayAndReturnAsync(1);
- var tasks = new[] { taskA, taskB, taskC };
- // 等待每一个任务完成.
- foreach (var task in tasks.OrderByCompletion())
- {
- var result = await task;
- Trace.WriteLine(result);
- }
- }
串行执行
使用 ConcurrentExclusiveSchedulerPair, 使任务串行执行, 结果是 "2", "3", "1".
- var scheduler = new ConcurrentExclusiveSchedulerPair().ExclusiveScheduler;
- foreach (var t in tasks)
- {
- await t.ContinueWith(completed =>
- {
- switch (completed.Status)
- {
- case TaskStatus.RanToCompletion:
- Trace.WriteLine(completed.Result);
- //Process(completed.Result);
- break;
- case TaskStatus.Faulted:
- //Handle(completed.Exception.InnerException);
- break;
- }
- }, scheduler);
- }
上篇文章中提到了使用 Task.WhenAny 处理已完成的任务: https://www.cnblogs.com/AlienXu/p/10609253.html#idx_2
文中的例子从算法层面是不推荐使用的, 作者推荐了他自己的拓展 Nito.AsyncEx, 源码地址:.
另外两种实现的实现方法差不多, 都是借助 TaskCompletionSource<T > 和 Interlocked.Incrementa 处理 Task.
这里只列出 ORDERING BY COMPLETION, AHEAD OF TIME 的解决方案.
- /// <summary>
- /// 返回一系列任务, 这些任务的输入类型相同和返回结果类型一致
- /// 返回的任务将以完成顺序返回
- /// </summary>
- private static IEnumerable<Task<T>> OrderByCompletion<T>(IEnumerable<Task<T>> inputTasks)
- {
- // 复制输入, 以下的处理将不需要考虑是否会对输入有影响
- var inputTaskList = inputTasks.ToList();
- var completionSourceList = new List<TaskCompletionSource<T>>(inputTaskList.Count);
- for (int i = 0; i <inputTaskList.Count; i++)
- {
- completionSourceList.Add(new TaskCompletionSource<T>());
- }
- // 索引
- // 索引最好是从 0 开始, 但是 Interlocked.Increment 返回的是递增之后的值, 所以这里应该赋值 - 1
- int prevIndex = -1;
- // 可以不用再循环之外处理 Action, 这样会让代码更清晰. 现在有 C#7.0 的新特性本地方法可以使用
- /*// 本地方法
- void continuation(Task<T> completedTask)
- {
- int index = Interlocked.Increment(ref prevIndex);
- var source = completionSourceList[index];
- PropagateResult(completedTask, source);
- }*/
- Action<Task<T>> continuation = completedTask =>
- {
- int index = Interlocked.Increment(ref prevIndex);
- var source = completionSourceList[index];
- PropagateResult(completedTask, source);
- };
- foreach (var inputTask in inputTaskList)
- {
- inputTask.ContinueWith(continuation,
- CancellationToken.None,
- TaskContinuationOptions.ExecuteSynchronously,
- TaskScheduler.Default);
- }
- return completionSourceList.Select(source => source.Task);
- }
- /// <summary>
- /// 对 TaskCompletionSource 进行处理
- /// </summary>
- private static void PropagateResult<T>(Task<T> completedTask,
- TaskCompletionSource<T> completionSource)
- {
- switch (completedTask.Status)
- {
- case TaskStatus.Canceled:
- completionSource.TrySetCanceled();
- break;
- case TaskStatus.Faulted:
- completionSource.TrySetException(completedTask.Exception.InnerExceptions);
- break;
- case TaskStatus.RanToCompletion:
- completionSource.TrySetResult(completedTask.Result);
- break;
- default:
- throw new ArgumentException("Task was not completed");
- }
- }
来源: https://www.cnblogs.com/AlienXu/p/10641903.html