C# 中的任务 Task
在 C# 编程中, 实现并行可以直接使用线程, 但使用起来很繁琐; 也可以使用线程池, 线程池很大程度上简化了线程的使用, 但是也有着一些局限, 比如我们不知道作业什么时候完成, 也取不到作业的返回值; 解决线程池局限性的方案是使用任务. 本文将总结 C# 中 Task 的使用.
类似于线程池工作项对异步操作的封装, 任务是对异步操作的另一种形式的封装, 这种封装抽象层次更高, 让我们能够对异步操作进行更多的控制.
任务启动后, 通过任务调度器 TaskScheduler 来调度..NET 中提供两种任务调度器, 一种是线程池任务调度器, 也是默认调度器, 它会将任务派发给线程池工作者线程; 另一种是上下文同步任务调度器, 它会将任务派发给当前上下文线程, 例如 GUI 线程. 此外, 我们也能自定义任务调度器, 例如可以将异步 IO 任务派发给线程池 IO 线程.
Task 的使用方法
隐式使用
Parallel 静态类除了提供并行循环的各种重载, 还提供了一个方法 Parallel.Invoke. 这个方法可以创建并执行一个或多个异步任务, 使用方法如下:
- /// <summary>
- /// 任务模拟
- /// </summary>
- private static void DoWork(int workId = 0)
- {
- Console.WriteLine($"{DateTime.Now}=> Thread[{Thread.CurrentThread.ManagedThreadId}] started work[{workId}].");
- Thread.Sleep(3000);
- Console.WriteLine($"{DateTime.Now}=> Thread[{Thread.CurrentThread.ManagedThreadId}] done work[{workId}].");
- }
- /// <summary>
- /// 任务的隐式使用
- /// </summary>
- public static void ImplicitUsingOfTask()
- {
- Parallel.Invoke(()=>DoWork(1),()=>DoWork(2),() => DoWork(3));
- }
上例的运行结果如下:
- 2019/3/27 20:40:18=> Thread[9] started work[1].
- 2019/3/27 20:40:18=> Thread[12] started work[3].
- 2019/3/27 20:40:18=> Thread[10] started work[2].
- 2019/3/27 20:40:21=> Thread[9] done work[1].
- 2019/3/27 20:40:21=> Thread[12] done work[3].
- 2019/3/27 20:40:21=> Thread[10] done work[2].
对于简单的多任务并行, 使用上述的方式很方便, 但是这种方式与线程池一样, 我们不能控制任务的执行或者获取任务返回值.
显式使用
相对于使用 Parallel.Invoke 执行并行操作, 更常用的是使用 Task 和 Task<T > 提供的方法进行异步和并行处理. 下面是任务最基本的使用:
- Task.Run(() =>
- {
- //TODO
- });
- Task.Factory.StartNew(() =>
- {
- //TODO
- });
任务的常用操作
获取任务的返回值
具有返回值的任务使用 Task<T>,T 可根据我们的需求指定, 下面是获取任务返回值的方法.
- Task<int> task = Task<int>.Factory.StartNew(() =>
- {
- Thread.Sleep(3000);// 模拟操作用时
- return DateTime.Now.Day;
- });
- int day = task.Result;
需要说明的是, 获取任务的结果会阻塞当前线程.
等待任务完成
有时候, 我们需要等待一些任务全部完成后才能执行后续操作, 有时候只要多个任务中的一个完成了, 就可以执行后续操作. Task 提供了 Wait,WaitAll 和 WaitAny 等方法满足我们的需求. 下面的例子展示了各种等待方法的使用.
- /// <summary>
- /// 任务等待测试
- /// </summary>
- public static void TaskWait()
- {
- Stopwatch watch = new Stopwatch();
- #region 场景 1: 等待一个任务完成
- Task task = Task.Run(() => DoWorkOfTask(1000));
- Console.WriteLine("start wait. work duration: 1000");
- watch.Start();
- task.Wait();// 等待 1 秒左右
- watch.Stop();
- Console.WriteLine($"end wait. time: {watch.ElapsedMilliseconds}");
- #endregion
- #region 场景 2: 等待多个任务完成
- Task[] tasks = new Task[3]
- {
- Task.Run(() => DoWorkOfTask(1000)),
- Task.Run(() => DoWorkOfTask(2000)),
- Task.Run(() => DoWorkOfTask(3000)),
- };
- Console.WriteLine("start wait all. work duration: min 1000 max 3000.");
- watch.Restart();
- Task.WaitAll(tasks);// 等待 3 秒左右
- watch.Stop();
- Console.WriteLine($"end wait. time: {watch.ElapsedMilliseconds}");
- #endregion
- #region 场景 3: 等待某个任务完成
- tasks = new Task[3]
- {
- Task.Run(() => DoWorkOfTask(1000)),
- Task.Run(() => DoWorkOfTask(2000)),
- Task.Run(() => DoWorkOfTask(3000)),
- };
- Console.WriteLine("start wait any. work duration: min 1000 max 3000.");
- watch.Restart();
- Task.WaitAny(tasks);// 等待 1 秒左右
- watch.Stop();
- Console.WriteLine($"end wait. time: {watch.ElapsedMilliseconds}");
- #endregion
- }
- /// <summary>
- /// 做任务
- /// </summary>
- /// <param name="workDuration"> 任务时长 </param>
- private static void DoWorkOfTask(int workDuration)
- {
- Console.WriteLine($"{DateTime.Now}=> Thread[{Thread.CurrentThread.ManagedThreadId}] started task[{Task.CurrentId}].");
- Thread.Sleep(workDuration);
- Console.WriteLine($"{DateTime.Now}=> Thread[{Thread.CurrentThread.ManagedThreadId}] completed task[{Task.CurrentId}].");
- }
使用 Wait,WaitAll 和 WaitAny 方法时, 我们可以设置超时时间或者传入取消 Token, 以控制等待时间. 但这些方法返回布尔值, 只能表明是否等待成功; 假如我们需要知道所等待的任务返回值, 则可以使用 WhenAll 或 WhenAny 方法, 这两个方法不能控制等待时间, 但会返回一个完成的任务. 如下例:
- Task<int>[] tasks = new Task<int>[3]
- {
- Task<int>.Factory.StartNew(() =>
- {
- Console.WriteLine($"task #{Task.CurrentId} run");
- Thread.Sleep(100);
- Console.WriteLine($"task #{Task.CurrentId} done");
- return 100;
- }),
- Task<int>.Factory.StartNew(() =>
- {
- Console.WriteLine($"task #{Task.CurrentId} run");
- Thread.Sleep(500);
- Console.WriteLine($"task #{Task.CurrentId} done");
- return 1000;
- }),
- Task<int>.Factory.StartNew(() =>
- {
- Console.WriteLine($"task #{Task.CurrentId} run");
- Thread.Sleep(1000);
- Console.WriteLine($"task #{Task.CurrentId} done");
- return 10000;
- }),
- };
- //int[] results = Task.WhenAll(tasks).Result;
- //Console.WriteLine($"[{string.Join(",",results)}]");
- Task<int> task = Task.WhenAny(tasks).Result;
- Console.WriteLine($"task #{task.Id}. result {task.Result}");
Task.WhenAll 和 Task.WhenAny 在等待结束时, 都会创建一个完成状态的任务, WhenAll 将等待的所有已完成任务的结果放入创建任务的结果中, WhenAny 则将等待的已完成任务放到创建任务的结果中.
任务延续
有时候, 我们需要在一个任务完成时开始另一个任务. 对于这种需求, 我们可以使用 Task 的 ContinueWith 等方法来处理.
- Task task = Task.Run(() => DoWorkOfTask(3000));
- task.ContinueWith(t => DoWorkOfTask(1000));
运行结果:
- 2019/3/27 21:25:09=> Thread[10] started task[1].
- 2019/3/27 21:25:12=> Thread[10] completed task[1].
- 2019/3/27 21:25:12=> Thread[11] started task[2].
- 2019/3/27 21:25:13=> Thread[11] completed task[2].
我们还可以通过 TaskContinuationOptions 指定延续任务的执行条件, 如任务取消时或者任务出现异常时才执行, 等.
子任务的使用
有时候, 我们要在一个任务里面创建一些其他任务, 并且还要在任务里面等待创建的任务完成, 此时我们可以使用子任务.
- Task parent = Task.Factory.StartNew(() =>
- {
- Console.WriteLine($"parent task #{Task.CurrentId} run.");
- for (int i = 0; i <10; i++)
- {
- Task.Factory.StartNew(() =>
- {
- Console.WriteLine($"child task #{Task.CurrentId} run.");
- Thread.Sleep(1000);
- Console.WriteLine($"child task #{Task.CurrentId} done.");
- }, TaskCreationOptions.AttachedToParent);
- }
- });
- parent.Wait();
- Console.WriteLine($"parent task #{parent.Id} done.");
在一个任务中创建的新任务, 默认情况下与父级任务是分离的, 各自的运行不受影响, 除非在创建任务时显式附加到父级任务中. 例如, 上例中如果不指定 TaskCreationOptions.AttachedToParent,parent.Wait() 就不会持续到所有子任务都执行完成.
任务的取消
我们在启动任务时, 传入取消令牌 CancellationToken, 当收到取消请求时, 抛出取消异常并在等待任务完成时捕获异常 TaskCanceledException. 我们通过这种方式控制任务的取消.
- /// <summary>
- /// 任务取消
- /// </summary>
- public static void TaskCancle()
- {
- Console.WriteLine("Press any key to begin. Press'c'to cancel.");
- Console.ReadKey(true);
- Console.WriteLine();
- CancellationTokenSource tokenSource = new CancellationTokenSource();
- ConcurrentBag<Task> tasks = new ConcurrentBag<Task>();
- // 单任务取消
- Task task1 = Task.Factory.StartNew(() => DoWorkOfTask(5000, tokenSource.Token), tokenSource.Token);
- tasks.Add(task1);
- // 嵌套任务取消
- Task task2 = Task.Factory.StartNew(() =>
- {
- for (int i = 0; i <10; i++)
- {
- int duration = 1000 * i;
- tasks.Add(Task.Factory.StartNew(()=>DoWorkOfTask(duration, tokenSource.Token), tokenSource.Token));
- }
- DoWorkOfTask(5000,tokenSource.Token);
- }, tokenSource.Token);
- tasks.Add(task2);
- char ch = Console.ReadKey().KeyChar;
- if (ch == 'c' || ch == 'C')
- {
- tokenSource.Cancel();
- Console.WriteLine($"{DateTime.Now}=> Task cancellation requested.");
- }
- try
- {
- Task.WaitAll(tasks.ToArray());
- }
- catch (AggregateException ae)
- {
- foreach (Exception ex in ae.InnerExceptions)
- {// 任务取消通过抛出 TaskCanceledException 实现
- TaskCanceledException tce = ex as TaskCanceledException;
- string cancelledTask = tce == null ? string.Empty : $"Task #{tce.Task.Id}";
- Console.WriteLine($"Exception: {ex.GetType().Name}. {cancelledTask}");
- }
- }
- finally
- {
- tokenSource.Dispose();
- }
- Console.WriteLine();
- // 显示任务状态
- foreach (Task task in tasks)
- {
- Console.WriteLine($"Task: #{task.Id} now is {task.Status}");
- }
- }
- /// <summary>
- /// 带取消令牌的作业
- /// </summary>
- /// <param name="workDuration"> 作业时长 </param>
- /// <param name="cancleToken"> 取消令牌 </param>
- private static void DoWorkOfTask(int workDuration, CancellationToken cancleToken)
- {
- if (cancleToken.IsCancellationRequested)
- {// 开始之前取消
- Console.WriteLine($"{DateTime.Now}=> Task #{Task.CurrentId} was cancelled before it got started.");
- cancleToken.ThrowIfCancellationRequested();
- }
- Console.WriteLine($"{DateTime.Now}=> Thread[{Thread.CurrentThread.ManagedThreadId}] started task #{Task.CurrentId}.");
- Thread.Sleep(workDuration);
- if (cancleToken.IsCancellationRequested)
- {// 开始之后取消
- Console.WriteLine($"{DateTime.Now}=> Task #{Task.CurrentId} was cancelled.");
- cancleToken.ThrowIfCancellationRequested();
- }
- Console.WriteLine($"{DateTime.Now}=> Thread[{Thread.CurrentThread.ManagedThreadId}] completed task #{Task.CurrentId}.");
- }
任务的异常处理
上面提到通过取消令牌抛出 TaskCanceledException 的方式控制任务的取消, 实际上, Task 会把自身执行过程中的所有异常都包装到一个 AggregateException 中, 并传回调用线程. 我们在主线程中通过捕获 AggregateException 来进行异常处理.
简单的处理方式
我们可以在任务的调用线程捕获并遍历 AggregateException 的内部异常, 或者使用 AggregateException 提供的 Handle 方法进行处理, 如下:
- Task task = Task.Run(() =>
- {
- throw new Exception($"Task #{Task.CurrentId} thrown an exception");
- });
- try
- {
- task.Wait();
- }
- catch (AggregateException ae)
- {
- // 处理方式 1: 遍历内部异常进行处理
- foreach (Exception ex in ae.InnerExceptions)
- {
- Console.WriteLine($"foreach: {ex.Message}");
- }
- // 处理方式 2: 使用 AggregateException 的 Handle 方法
- ae.Handle(ex=>
- {
- Console.WriteLine($"handle: {ex.Message}");
- return true ;
- });
- }
使用延续任务处理任务的异常
有时候, 我们可以给任务附加一个任务异常时才会执行的延续任务, 并在延续任务中进行异常处理.
- Task.Run(() => { throw new Exception($"Task #{Task.CurrentId} thrown an exception"); })
- .ContinueWith(t =>
- {
- Console.WriteLine($"{t.Exception?.InnerException?.Message}");
- }, TaskContinuationOptions.OnlyOnFaulted);
嵌套任务的异常处理
下面是一个 3 层嵌套的任务.
- Task parent = Task.Factory.StartNew(() =>
- {// 父级任务
- for (int i = 0; i <10; i++)
- {
- Task.Factory.StartNew(() =>
- {//1 代子任务
- for (int j = 0; j <10; j++)
- {
- Task.Factory.StartNew(() =>
- {//2 代子任务
- throw new Exception($"Task #{Task.CurrentId} thrown an exception.");
- }/*, TaskCreationOptions.AttachedToParent*/);
- }
- throw new Exception($"Task #{Task.CurrentId} thrown an exception.");
- }/*, TaskCreationOptions.AttachedToParent*/);
- }
- throw new Exception($"Task #{Task.CurrentId} thrown an exception.");
- });
- try
- {
- parent.Wait();
- }
- catch (AggregateException ae)
- {
- ae.Flatten().Handle(ex =>
- {
- Console.WriteLine(ex.Message);
- return true;
- });
- }
运行上面的代码只会得到一行输出:
Task #1 thrown an exception.
看起来有点奇怪, 为什么只捕获到一个异常呢? 其实也是在情理之中的: 任务默认只会把自身异常传递到它自己的调用线程, 子任务是在父任务中调用的, 其异常只会传递到父任务的执行线程, 所以我们在父任务的调用线程, 也就是我们的主线程中是捕获不到子任务的异常的.
取消上面代码的两处 /*, TaskCreationOptions.AttachedToParent*/, 就会捕获到所有异常.
任务调度器
.NET 提供的任务调度器
任务是由 TaskScheduler 调度的, 启动任务时, 默认使用线程池任务调度器, 任务将会被派发到线程池工作线程. 线程池的调度前面已经总结过, 这里不再展开..NET 提供的另一种任务调度器是同步上下文调度器, 用 TaskScheduler.FromCurrentSynchronizationContext() 获取, 这个调度器会把任务派发给当前的上下文线程, 常用在 GUI 应用程序中.
例如, 我们在一个窗体中新建一个 ListBox, 新建几个任务向其中添加项, 代码如下:
- this.lbxMsg.Items.Add($"{DateTime.Now:O}=>Current thread is thread #{Thread.CurrentThread.ManagedThreadId} .");
- for (int i = 0; i <10; i++)
- {
- new Task(() =>
- {
- for (int j = 0; j <3; j++)
- {
- this.lbxMsg.Items.Add($"{DateTime.Now:O}=> Task #{Task.CurrentId} add an item with thread #{Thread.CurrentThread.ManagedThreadId}.");
- }
- }).Start(TaskScheduler.FromCurrentSynchronizationContext());
- }
运行上面的代码可以发现创建的任务都是由界面线程执行的. 这里如果使用默认的任务调度器将产生 "线程间操作无效" 的异常.
实际使用时, 可以给一个异步任务添加延续任务, 来处理异步任务的结果或者异常等. 如下:
- Task.Run(() =>
- {
- Thread.Sleep(3000); // 模拟操作过程
- return 1000; // 模拟结果
- }).ContinueWith(t =>
- {
- this.lbxMsg.Items.Add(t.Result); // 在界面呈现结果或做其他处理
- }, TaskScheduler.FromCurrentSynchronizationContext());
自定义任务调度器
除了使用. NET 提供的调度器外, 我们能够继承类 TaskScheduler 来实现自己的任务调度器. 这里不再展开, 需要了解的可以参考 Samples for Parallel Programming with the .NET Framework.
来源: https://www.cnblogs.com/chenbaoshun/p/10621819.html