1. 简介
C# 并发编程经典实例 https://book.douban.com/subject/26274181/ 是一本关于使用 C# 进行并发编程的入门参考书, 使用 "问题 - 解决方案 - 讨论" 的模式讲解了以下这些概念:
面向异步编程的 async 和 await
使用 TPL(任务并行库)
创建数据流管道的 TPL Dataflow 库
基于 LINQ 的 Reactive Extensions
为并发代码编写单元测试
并发方法之间的互操作
不可变, 线程安全和生产者 / 消费者集合
并发代码中的取消功能支持
支持异步的面向对象编程
线程同步访问数据
我还挺喜欢这本书的, 只有短短的 170 页却提供了大量的最佳实践, 介绍了当时最新的 C# 平台并发开发技术, 作为参考书时至今日依然很有推荐价值. 不过篇幅所限, 从入门知识到最佳实践之间往往缺乏过渡. 例如第四章数据流基础, 前一页还在介绍要安装哪个 Nuget 包才可以使用数据流, 下一页突然讨论链接数据流块,传递出错信息, 至于数据流有哪些类型各自的使用场景都没介绍到, 于是我只好配合博客园上的这篇文章 TPL DataFlow 初探 来学习数据流的知识.
2. 实现一个下载工具的 UI
为什么这篇文章放在 UWP 板块下面?
这本书 2015 年在国内出版, 读了这本书后感觉很有用. 最近重读了这本书, 试着用 UWP 复习一下书上的知识, 除了有些 Nuget 包的名字变了其它内容都适用于 UWP 开发, 最终成果是一个 (十分阳春的) 下载工具 UI, 所以就放在 UWP 板块下了.
2.1 基础的 async/await
- private async void OnAddLinks(object sender, RoutedEventArgs e)
- {
- var dialog = new AddDownloadDialog();
- await dialog.ShowAsync();
- if (dialog.Downloads == null)
- return;
- ...
- ...
- }
基础的用法没什么好说的.
微软的文档提到 "应将"'Async'作为后缀添加到所编写的每个异步方法名称中.", 但即使没这样做 VS 和 R# 也没有提示.
2.2 同时开始一组任务并等待它们完成
- private async Task<IEnumerable<Downloader>> AddNewDownloadAsync(IEnumerable<Uri> links, CancellationToken cancellationToken)
- {
- var downlodTasks = links.Select(Downloader.CreateAsync);
- var downlodTasksArray = downlodTasks.ToArray();
- var downloads = await Task.WhenAll(downlodTasksArray);
- return downloads;
- }
反正就是使用
- Task<TResult[]> WhenAll(params Task[] tasks)
- .
2.3 一组任务中任一任务完成时的处理
- Task<Downloader> Selector(Uri link) => Downloader.CreateAsync(link, cancellationToken);
- var downlodTasks = links.Select(Selector);
- var progressTasks = downlodTasks.Select(async t =>
- {
- var result = await t.ToObservable().Timeout(TimeSpan.FromSeconds(6));
- await _mutex.WaitAsync(cancellationToken);
- try
- {
- if (cancellationToken.IsCancellationRequested == false)
- {
- FinishedTasks++;
- _downloads.Add(t.Result);
- }
- }
- finally
- {
- _mutex.Release();
- }
- return result;
- }).ToArray();
- var downloads = await Task.WhenAll(progressTasks);
2.4 发出取消请求
由 CancellationTokenSource 发出取消请求, CancellationToken 则让代码能够响应取消请求.
- try
- {
- _cancellationTokenSource = new CancellationTokenSource();
- await AddNewDownloadAsync(_cancellationTokenSource.Token);
- }
- catch (OperationCanceledException ex)
- {
- InAppNotification.Show("Task Paused:" + ex.Message, 5000);
- }
- catch (Exception ex)
- {
- ProgressControl.State = ProgressState.Faulted;
- InAppNotification.Show("Task Error:" + ex.Message, 5000);
- }
- _cancellationTokenSource.Cancel();
上面代码演示了如何通过 CancellationTokenSource 发出取消请求, 被取消的代码应该会抛出 OperationCanceledException. 也有可能被取消的代码还来不及响应取消就完成或报错了.
2.5 通过轮询响应取消请求
- while (ReceivedBytes <TotalBytes)
- {
- await Task.Delay(TimeSpan.FromSeconds(1), cts.Token);
- var bytesReceived = random.Next(1024 * 1024);
- ReceivedBytes += bytesReceived;
- cancellationToken.ThrowIfCancellationRequested();
- }
被取消的代码可以通过
ThrowIfCancellationRequested()
抛出 OperationCanceledException. 也可以通过检查 IsCancellationRequested 再做其它处理, 但抛出 OperationCanceledException 是标准处理方式.
如果再下一层代码里支持取消, 则应该将 CancellationToken 传递给它, 例如这里的 Task.Delay.
2.6 超时后取消
- var downlodTasks = links.Select(link =>
- {
- var cts = new CancellationTokenSource();
- var token = cts.Token;
- cts.CancelAfter(TimeSpan.FromSeconds(5));
- return Downloader.CreateAsync(link, token);
- });
- var downlodTasksArray = downlodTasks.ToArray();
- var downloads = await Task.WhenAll(downlodTasksArray);
CancellationTokenSource 调用
CancelAfter(TimeSpan delay)
或者使用构造函数
CancellationTokenSource(TimeSpan delay)
设置取消前等待的时间间隔都可以实现超时后取消.
2.7 使用 Rx 实现超时
上面的方法实现超时其实相当于发出了一个取消请求, 最终会抛出一个 OperationCanceledException, 有时会难以区分用户的取消操作和超时后被取消. 我有时会用 Rx 来实现超时.
var result = await t.ToObservable().Timeout(TimeSpan.FromSeconds(6));
这段代码会抛出 TimeoutException, 更加有超时的感觉. 但是 CancellationTokenSource 没有被取消, 所以原本以为被取消的代码仍会继续偷偷摸摸地执行下去.
2.8 报告进度
- public async Task StartDownloadAsync(IProgress<int> progress, CancellationToken cancellationToken)
- {
- _cancellationToken = cancellationToken;
- var random = new Random();
- using (var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken))
- {
- while (ReceivedBytes <TotalBytes)
- {
- await Task.Delay(TimeSpan.FromSeconds(1), cts.Token);
- var bytesReceived = random.Next(1024 * 1024);
- ReceivedBytes += bytesReceived;
- progress?.Report(bytesReceived);
- cancellationToken.ThrowIfCancellationRequested();
- }
- }
- }
- var progress = new Progress<int>();
- progress.ProgressChanged += (s, e) =>
- {
- DownloadedData?.Invoke(this, e);
- OnPropertyChanged(nameof(Downloader));
- };
- _cancellationTokenSource = new CancellationTokenSource();
- await Downloader.StartDownloadAsync(progress, _cancellationTokenSource.Token);
使用 IProgress 报告进度, 使用 Progress 的
event EventHandler ProgressChanged
接收进度.
IProgress.Report(T value)
可以是异步的, 所以 T 最好定义为一个不可变类型或者至少是值类型.
2.9 限制每次只开始 5 个下载
- _semaphore = new SemaphoreSlim(5);
- var tasks = dialog.Downloads.Select(async item =>
- {
- var model = new DownloaderModel { Downloader = item };
- Downloads.Add(model);
- model.DownloadedData += OnDownloadData;
- await _semaphore.WaitAsync();
- try
- {
- await model.StartDownloadAsync();
- }
- catch (OperationCanceledException)
- {
- //do nothing
- }
- finally
- {
- _semaphore.Release();
- }
- }).ToArray();
- await Task.WhenAll(tasks);
虽然有几种方法实现, 但 SemaphoreSlim 看着挺好理解的.
3.0 使用 Rx 的缓冲统计下载速度
- private void OnDownloadData(object sender, int e)
- {
- _progress.Report(e);
- }
当下载进度更新时使用 IProgress 报告进度.
- var progress = new Progress<int>();
- _progress = progress;
- var reports = Observable.FromEventPattern<int>(handler => progress.ProgressChanged += handler, handler => progress.ProgressChanged -= handler);
- reports.Buffer(TimeSpan.FromSeconds(1)).Subscribe(async x =>
- {
- await CoreApplication.MainView.CoreWindow.Dispatcher.RunAsync(Windows.UI.Core.CoreDispatcherPriority.Normal, () =>
- {
- SpeedElement.Text = string.Format("{0} Bytes/S", x.Sum(s => s.EventArgs).ToString("N0"));
- });
- });
这段代码收集 ProgressChanged 事件, 并每一秒钟把收集到的事件作为一个集合发布.
3. 书中的其它建议
一旦你输入 new Thread(), 那就糟糕了, 说明项目中的代码太过时了.
比起老式的多线程机制, 采用高级的抽象机制会让程序功能更加强大, 效率更高. 事实上 UWP 好像只能使用线程池, 不能直接访问及控制线程(因为习惯用 Task 没关心线程, 也许有我不知道的方式), 看起来微软希望开发者使用 Task 这个更合理的抽象而不是直接使用线程.
在编写任务并行程序时, 要格外留意下闭包 (closure) 捕获的变量.
这是个常见的错误, 幸好很多情况下 R# 都会提示这个错误.
基本的 lock 语句就可以很好地处理 99% 的情况了.
经常在 Code Review 时看到 Monitor 或 ReaderWriterLockSlim 之类的. 但是, 我明白的, 比起直接用 lock 这样写比较帅气(但我还是会要求改过来).
应该把 lock 语句使用的对象设为私有变量, 并且永远不要暴露给非本类的方法.
lock 一个属性, 或者直接 lock(this)都十分危险. 我真的 CodeReview 过因为习惯性地 lock(this)而产生死锁的代码.
另外锁对象的使用范围尽量小, 不要在多个语句中使用同一个锁对象.
在 UI 线程上执行代码时, 永远不要使用针对特定平台的类型. WPF,Silverlight,iOS,Android 都有 Dispatcher 类, Windows 应用商店平台使用 CoreDispatcher,Windows Forms 有 ISynchronizeInvoke 接口. 不要在新写的代码中使用这些类型, 就当它们不存在吧. 使用这些类型会使代码无所谓绑定到某个特定平台上. SynchronizationContext 是通用的, 基于上述类型的抽象类.
在 UWP 中, 在线程中调用 UI 元素通常如下:
- await Task.Run(async () =>
- {
- await CoreApplication.MainView.CoreWindow.Dispatcher.RunAsync(Windows.UI.Core.CoreDispatcherPriority.Normal, () =>
- {
- Header.Text = "some message";
- });
- });
如果使用 SynchronizationContext, 则代码如下:
- var synchronizationContext = SynchronizationContext.Current;
- await Task.Run(() =>
- {
- synchronizationContext.Post(a =>
- {
- Header.Text = "some message";
- }, null);
- });
看起来 SynchronizationContext 确实更通用一些.
来源: https://www.cnblogs.com/dino623/p/async.html