初识并行循环
并行循环主要用来处理数据并行的, 如, 同时对数组或列表中的多个数据执行相同的操作.
在 C# 编程中, 我们使用并行类 System.Threading.Tasks.Parallel 提供的静态方法 Parallel.For 和 Parallel.ForEach 来实现并行循环. 从方法名可以看出, 这两个方法是对常规循环 for 和 foreach 的并行化.
简单用法
使用并行循环时需要传入循环范围 (集合) 和操作数据的委托 Action<T>:
- Parallel.For(0, 100, i => {
- Console.WriteLine(i);
- });
- Parallel.ForEach(Enumerable.Range(0, 100), i => {
- Console.WriteLine(i);
- });
使用场景
对于数据的处理需要耗费较长时间的循环适宜使用并行循环, 利用多线程加快执行速度.
对于简单的迭代操作, 且迭代范围较小, 使用常规循环更好好, 因为并行循环涉及到线程的创建, 上下文切换和销毁, 使用并行循环反而影响执行效率.
对于迭代操作简单但迭代范围很大的情况, 我们可以对数据进行分区, 再执行并行循环, 减少线程数量.
循环结果
Parallel.For 和 Parallel.ForEach 方法的所有重载有着同样的返回值类型 ParallelLoopResult, 并行循环结果包含循环是否完成以及最低迭代次数两项信息.
下面的例子使用 Parallel.ForEach 展示了并行循环的结果.
- ParallelLoopResult result = Parallel.ForEach(Enumerable.Range(0, 100), (i,loop) =>
- {// 委托传入 ParallelLoopState, 用来控制循环执行
- Console.WriteLine(i + 1);
- Thread.Sleep(100);
- if (i == 30) // 此处设置循环停止的确切条件
- {
- loop.Break();
- //loop.Stop();
- }
- });
- Console.WriteLine($"{result.IsCompleted}-{result.LowestBreakIteration}");
值得一提的是, 循环的 Break()和 Stop()只能尽早地跳出或者停止循环, 而不能立即停止.
取消循环操作
有时候, 我们需要在中途取消循环操作, 但又不知道确切条件是什么, 比如用户触发的取消. 这时候, 可以利用循环的 ParallelOptions 传入一个 CancellationToken, 同时使用异常处理捕获 OperationCanceledException 以进行取消后的处理. 下面是一个简单的例子.
- /// <summary>
- /// 取消通知者
- /// </summary>
- public static CancellationTokenSource CTSource { get; set; } = new CancellationTokenSource();
- /// <summary>
- /// 取消并行循环
- /// </summary>
- public static void CancelParallelLoop()
- {
- Task.Factory.StartNew(() =>
- {
- try
- {
- Parallel.ForEach(Enumerable.Range(0, 100), new ParallelOptions { CancellationToken = CTSource.Token },
- i =>
- {
- Console.WriteLine(i + 1);
- Thread.Sleep(1000);
- });
- }
- catch (OperationCanceledException oce)
- {
- Console.WriteLine(oce.Message);
- }
- });
- }
- static void Main(string[] args)
- {
- ParallelDemo.CancelParallelLoop();
- Thread.Sleep(3000);
- ParallelDemo.CTSource.Cancel();
- Console.ReadKey();
- }
循环异常收集
并行循环执行过程中, 可以捕获并收集迭代操作引发的异常, 循环结束时抛出一个 AggregateException 异常, 并将收集到的异常赋给它的内部异常集合 InnerExceptions. 外部使用时, 捕获 AggregateException, 即可进行并行循环的异常处理.
下面的例子模拟了并行循环的异常抛出, 收集及处理的过程.
- /// <summary>
- /// 捕获循环异常
- /// </summary>
- public static void CaptureTheLoopExceptions()
- {
- ConcurrentQueue<Exception> exceptions = new ConcurrentQueue<Exception>();
- Parallel.ForEach(Enumerable.Range(0, 100), i =>
- {
- try
- {
- if (i % 10 == 0)
- {// 模拟抛出异常
- throw new Exception($"{DateTime.Now}=> Thread-[{Thread.CurrentThread.ManagedThreadId}] had thrown a exception. [{i}]");
- }
- Console.WriteLine(i + 1);
- Thread.Sleep(100);
- }
- catch (Exception ex)
- {// 捕获并收集异常
- exceptions.Enqueue(ex);
- }
- });
- if (!exceptions.IsEmpty)
- {// 方法内部可直接进行异常处理, 若需外部处理, 将收集到的循环异常抛出
- throw new AggregateException(exceptions);
- }
- }
外部处理方式
- try
- {
- ParallelDemo.CaptureTheLoopExceptions();
- }
- catch (AggregateException aex)
- {
- foreach (Exception ex in aex.InnerExceptions)
- {// 模拟异常处理
- Console.WriteLine(ex.Message);
- }
- }
分区并行处理
当循环操作很简单, 迭代范围很大的时候, ParallelLoop 提供一种分区的方式来优化循环性能. 下面的例子展示了分区循环的使用, 同时也能比较几种循环方式的执行效率.
- /// <summary>
- /// 分区并行处理, 顺便比较各种循环的效率
- /// </summary>
- /// <param name="rangeSize">迭代范围</param>
- /// <param name="opDuration">操作耗时</param>
- public static void PartationParallelLoop(int rangeSize = 10000, int opDuration = 1)
- {
- //PartationParallelLoopWithBuffer
- Stopwatch watch0 = Stopwatch.StartNew();
- Parallel.ForEach(Partitioner.Create(Enumerable.Range(0, rangeSize), EnumerablePartitionerOptions.None),
- i =>
- {// 模拟操作
- Console.WriteLine($"{DateTime.Now}=> Thread-[{Thread.CurrentThread.ManagedThreadId}] was running. [{i}]");
- Thread.Sleep(opDuration);
- });
- watch0.Stop();
- //PartationParallelLoopWithoutBuffer
- Stopwatch watch1 = Stopwatch.StartNew();
- Parallel.ForEach(Partitioner.Create(Enumerable.Range(0, rangeSize),EnumerablePartitionerOptions.NoBuffering),
- i =>
- {// 模拟操作
- Console.WriteLine($"{DateTime.Now}=> Thread-[{Thread.CurrentThread.ManagedThreadId}] was running. [{i}]");
- Thread.Sleep(opDuration);
- });
- watch1.Stop();
- //NormalParallelLoop
- Stopwatch watch2 = Stopwatch.StartNew();
- Parallel.ForEach(Enumerable.Range(0, rangeSize),
- i =>
- {// 模拟操作
- Console.WriteLine($"{DateTime.Now}=> Thread-[{Thread.CurrentThread.ManagedThreadId}] was running. [{i}]");
- Thread.Sleep(opDuration);
- });
- watch2.Stop();
- //NormalLoop
- Stopwatch watch3 = Stopwatch.StartNew();
- foreach (int i in Enumerable.Range(0, rangeSize))
- {// 模拟操作
- Console.WriteLine($"{DateTime.Now}=> Thread-[{Thread.CurrentThread.ManagedThreadId}] was running. [{i}]");
- Thread.Sleep(opDuration);
- }
- watch2.Stop();
- Console.WriteLine();
- Console.WriteLine($"PartationParallelLoopWithBuffer => {watch0.ElapsedMilliseconds}ms");
- Console.WriteLine($"PartationParallelLoopWithoutBuffer => {watch1.ElapsedMilliseconds}ms");
- Console.WriteLine($"NormalParallelLoop => {watch2.ElapsedMilliseconds}ms");
- Console.WriteLine($"NormalLoop => {watch3.ElapsedMilliseconds}ms");
- }
在 I7-7700HQ + 16GB 配置 VS 调试模式下得到下面一组测试结果.
Loop Condition | PartationParallelLoop WithBuffer | PartationParallelLoop WithoutBuffer | Normal ParallelLoop | Normal Loop |
---|---|---|---|---|
10000,1 | 10527 | 11799 | 11155 | 19434 |
10000,1 | 9513 | 11442 | 11048 | 19354 |
10000,1 | 9871 | 11391 | 14782 | 19154 |
100,1000 | 9107 | 5951 | 5081 | 100363 |
100,1000 | 9086 | 5974 | 5187 | 100162 |
100,1000 | 9208 | 5125 | 5255 | 100239 |
100,1 | 350 | 439 | 243 | 200 |
100,1 | 390 | 227 | 166 | 198 |
100,1 | 466 | 225 | 84 | 197 |
应该根据不同的应用场景选择合适的循环策略, 具体如何选择, 朋友们可自行体会~
来源: https://www.cnblogs.com/chenbaoshun/p/10572639.html