目录
一, 大数据时代的现状
二, 面对挑战的方法
2.1 并行计算
2.2 改用 GPU 处理计算密集型程序
3.3 分布式计算
三, 用 python 写并行程序
3.1 进程与线程
3.2 全局解释器锁 GIL:
3.3 multiprocessing
四, multiprocessing 实战
总结
小子今天想来谈谈 "并行计算", 作为一个非科班人员, 我为什么去捣鼓这么一个在科班里也比较专业的问题了. 这就要说下我前几天做的一个作业了, 当时我用 python 写了个程序, 结果运行了一天, 这个速度可让我愁了, 我还怎么优化, 怎么交作业啊. 于是小子就去各大论坛寻丹问药了, 终于让我发现可以用并行计算来最大化压榨电脑的 CPU, 提升计算效率, 而且 python 里有 multiprocessing 这个库可以提供并行计算接口, 于是小子花 1 天时间改进程序, 终于在规定时间内做出了自己满意的结果, 上交了作业. 之后, 小子对并行计算充满了兴趣, 于是又重新在 Google 上游历了一番, 大致弄清了 GPU,CPU, 进程, 线程, 并行计算, 分布式计算等概念, 也把 python 的 multiprocessing 耍了一遍, 现在小子也算略有心得了, 所以来此立碑, 以示后来游客.
小子本文分为四部分, 一是大数据时代现状, 其二是面对挑战的方法, 然后是用 python 写并行程序, 最后是 multiprocessing 实战.
一, 大数据时代的现状
当前我们正处于大数据时代, 每天我们会通过手机, 电脑等设备不断的将自己的数据传到互联网上. 据统计, YouTube 上每分钟就会增加 500 多小时的视频, 面对如此海量的数据, 如何高效的存储与处理它们就成了当前最大的挑战.
但在这个对硬件要求越来越高的时代, CPU 却似乎并不这么给力了. 自 2013 年以来, 处理器频率的增长速度逐渐放缓了, 目前 CPU 的频率主要分布在 3~4GHz. 这个也是可以理解的, 毕竟摩尔定律都生效了 50 年了, 如果它老人家还如此给力, 那我们以后就只要静等处理器频率提升, 什么计算问题在未来那都不是话下了. 实际上 CPU 与频率是于能耗密切相关的, 我们之前可以通过加电压来提升频率, 但当能耗太大, 散热问题就无法解决了, 所以频率就逐渐稳定下来了, 而 Intel 与 AMD 等大制造商也将目标转向了多核芯片, 目前普通桌面 PC 也达到了 4~8 核.
二, 面对挑战的方法
咱们有了多核 CPU, 以及大量计算设备, 那我们怎么来用它们应对大数据时代的挑战了. 那就要提到下面的方法了.
2.1 并行计算
并行 (parallelism) 是指程序运行时的状态, 如果在同时刻有多个 "工作单位" 运行, 则所运行的程序处于并行状态. 图一是并行程序的示例, 开始并行后, 程序从主线程分出许多小的线程并同步执行, 此时每个线程在各个独立的 CPU 进行运行, 在所有线程都运行完成之后, 它们会重新合并为主线程, 而运行结果也会进行合并, 并交给主线程继续处理.
图一, 多线程并行
图二是一个多线程的任务 (沿线为线程时间), 但它不是并行任务. 这是因为 task1 与 task2 总是不在同一时刻执行, 这个情况下单核 CPU 完全可以同时执行 task1 与 task2. 方法是在 task1 不执行的时候立即将 CPU 资源给 task2 用, task2 空闲的时候 CPU 给 task1 用, 这样通过时间窗调整任务, 即可实现多线程程序, 但 task1 与 task2 并没有同时执行过, 所以不能称为并行. 我们可以称它为并发(concurrency) 程序, 这个程序一定意义上提升了单个 CPU 的使用率, 所以效率也相对较高.
图二, 多线程并发
并行编程模型:
数据并行 (Data Parallel) 模型: 将相同的操作同时作用于不同数据, 只需要简单地指明执行什么并行操作以及并行操作对象. 该模型反映在图一中即是, 并行同时在主线程中拿取数据进行处理, 并线程执行相同的操作, 然后计算完成后合并结果. 各个并行线程在执行时互不干扰.
消息传递 (Message Passing) 模型: 各个并行执行部分之间传递消息, 相互通讯. 消息传递模型的并行线程在执行时会传递数据, 可能一个线程运行到一半的时候, 它所占用的数据或处理结果就要交给另一个线程处理, 这样, 在设计并行程序时会给我们带来一定麻烦. 该模型一般是分布式内存并行计算机所采用方法, 但是也可以适用于共享式内存的并行计算机.
什么时候用并行计算:
多核 CPU-- 计算密集型任务. 尽量使用并行计算, 可以提高任务执行效率. 计算密集型任务会持续地将 CPU 占满, 此时有越多 CPU 来分担任务, 计算速度就会越快, 这种情况才是并行程序的用武之地.
单核 CPU-- 计算密集型任务. 此时的任务已经把 CPU 资源 100% 消耗了, 就没必要使用并行计算, 毕竟硬件障碍摆在那里.
单核 CPU--I/O 密集型任务. I/O 密集型任务在任务执行时需要经常调用磁盘, 屏幕, 键盘等外设, 由于调用外设时 CPU 会空闲, 所以 CPU 的利用率并不高, 此时使用多线程程序, 只是便于人机交互. 计算效率提升不大.
多核 CPU--I/O 密集型任务. 同单核 CPU--I/O 密集型任务.
2.2 改用 GPU 处理计算密集型程序
GPU 即图形处理器核心(Graphics Processing Unit), 它是显卡的心脏, 显卡上还有显存, GPU 与显存类似与 CPU 与内存.
GPU 与 CPU 有不同的设计目标, CPU 需要处理所有的计算指令, 所以它的单元设计得相当复杂; 而 GPU 主要为了图形 "渲染" 而设计, 渲染即进行数据的列处理, 所以 GPU 天生就会为了更快速地执行复杂算术运算和几何运算的.
GPU 相比与 CPU 有如下优势:
强大的浮点数计算速度.
大量的计算核心, 可以进行大型并行计算. 一个普通的 GPU 也有数千个计算核心.
强大的数据吞吐量, GPU 的吞吐量是 CPU 的数十倍, 这意味着 GPU 有适合的处理大数据.
GPU 目前在处理深度学习上用得十分多, 英伟达 (NVIDIA) 目前也花大精力去开发适合深度学习的 GPU. 现在上百层的神经网络已经很常见了, 面对如此庞大的计算量, CPU 可能需要运算几天, 而 GPU 却可以在几小时内算完, 这个差距已经足够别人比我们多打几个比赛, 多发几篇论文了.
3.3 分布式计算
说到分布式计算, 我们就先说下下 Google 的 3 篇论文, 原文可以直接点链接去下载:
GFS(The Google File System) https://ai.google/research/pubs/pub51 : 解决数据存储的问题. 采用 N 多台廉价的电脑, 使用冗余的方式, 来取得读写速度与数据安全并存的结果.
MapReduce(Simplified Data Processing on Large Clusters) https://ai.google/research/pubs/pub62 : 函数式编程, 把所有的操作都分成两类, map 与 reduce,map 用来将数据分成多份, 分开处理, reduce 将处理后的结果进行归并, 得到最终的结果.
BigTable(Bigtable: A Distributed Storage System for Structured Data) https://ai.google/research/pubs/pub27898 : 在分布式系统上存储结构化数据的一个解决方案, 解决了巨大的 Table 的管理, 负载均衡的问题.
Google 在 2003~2006 年发表了这三篇论文之后, 一时之间引起了轰动, 但是 Google 并没有将 MapReduce 开源. 在这种情况下 Hadoop 就出现了, Doug Cutting 在 Google 的 3 篇论文的理论基础上开发了 Hadoop, 此后 Hadoop 不断走向成熟, 目前 Facebook,IBM,ImageShack 等知名公司都在使用 Hadoop 运行他们的程序.
分布式计算的优势:
可以集成诸多低配的计算机 (成千上万台) 进行高并发的储存与计算, 从而达到与超级计算机媲美的处理能力.
三, 用 python 写并行程序
在介绍如何使用 python 写并行程序之前, 我们需要先补充几个概念, 分别是进程, 线程与全局解释器锁(Global Interpreter Lock, GIL).
3.1 进程与线程
进程(process):
在面向线程设计的系统 (如当代多数操作系统, Linux 2.6 及更新的版本) 中, 进程本身不是基本运行单位, 而是线程的容器.
进程拥有自己独立的内存空间, 所属线程可以访问进程的空间.
程序本身只是指令, 数据及其组织形式的描述, 进程才是程序的真正运行实例. 例如, Visual Studio 开发环境就是利用一个进程编辑源文件, 并利用另一个进程完成编译工作的应用程序.
线程(threading):
线程有自己的一组 CPU 指令, 寄存器与私有数据区, 线程的数据可以与同一进程的线程共享.
当前的操作系统是面向线程的, 即以线程为基本运行单位, 并按线程分配 CPU.
进程与线程有两个主要的不同点, 其一是进程包含线程, 线程使用进程的内存空间, 当然线程也有自己的私有空间, 但容量小; 其二是进程有各自独立的内存空间, 互不干扰, 而线程是共享内存空间.
图三展示了进程, 线程与 CPU 之间的关系. 在图三中, 进程一与进程二都含有 3 个线程, CPU 会按照线程来分配任务, 如图中 4 个 CPU 同时执行前 4 个线程, 后两个标红线程处于等待状态, 在 CPU 运行完当前线程时, 等待的线程会被唤醒并进入 CPU 执行. 通常, 进程含有的线程数越多, 则它占用 CPU 的时间会越长.
图三, 进程, 线程与 CPU 关系
3.2 全局解释器锁 GIL:
GIL 是计算机程序设计语言解释器用于同步线程的一种机制, 它使得任何时刻仅有一个线程在执行. 即便在多核心处理器上, 使用 GIL 的解释器也只允许同一时间执行一个线程. Python 的 Cpython 解释器 (普遍使用的解释器) 使用 GIL, 在一个 Python 解释器进程内可以执行多线程程序, 但每次一个线程执行时就会获得全局解释器锁, 使得别的线程只能等待, 由于 GIL 几乎释放的同时就会被原线程马上获得, 那些等待线程可能刚唤醒, 所以经常造成线程不平衡享受 CPU 资源, 此时多线程的效率比单线程还要低下. 在 python 的官方文档里, 它是这样解释 GIL 的:
In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple native threads from executing Python bytecodes at once. This lock is necessary mainly because CPython's memory management is not thread-safe. (However, since the GIL exists, other features have grown to depend on the guarantees that it enforces.)
可以说它的初衷是很好的, 为了保证线程间的数据安全性; 但是随着时代的发展, GIL 却成为了 python 并行计算的最大障碍, 但这个时候 GIL 已经遍布 CPython 的各个角落, 修改它的工作量太大, 特别是对这种开源性的语音来说. 但幸好 GIL 只锁了线程, 我们可以再新建解释器进程来实现并行, 那这就是 multiprocessing 的工作了.
3.3 multiprocessing
multiprocessing 是 python 里的多进程包, 通过它, 我们可以在 python 程序里建立多进程来执行任务, 从而进行并行计算. 官方文档如下所述:
The multiprocessing package offers both local and remote concurrency, effectively side-stepping the Global Interpreter Lock by using subprocesses instead of threads.
我们接下来介绍下 multiprocessing 的各个接口:
3.3.1 进程 process
multiprocessing.Process(target=None, args=())
target: 可以被 run()调用的函数, 简单来说就是进程中运行的函数
args: 是 target 的参数
process 的方法:
start(): 开始启动进程, 在创建 process 之后执行
join([timeout]): 阻塞目前父进程, 直到调用 join 方法的进程执行完或超时(timeout), 才继续执行父进程
terminate(): 终止进程, 不论进程有没有执行完, 尽量少用.
示例 1
- from multiprocessing import Process
- def f(name):
- print 'hello', name
- if __name__ == '__main__':
- p = Process(target=f, args=('bob',)) # p 进程执行 f 函数, 参数为'bob', 注意后面的 ","
- p.start() # 进程开始
- p.join() # 阻塞主线程, 直至 p 进程执行结束
3.3.2 进程池 Process Pools
class multiprocessing.Pool([processes])
processes 是进程池中的进程数, 默认是本机的 CPU 数量
方法:
apply(func[, args[, kwds]])进程池中的进程进行 func 函数操作, 操作时会阻塞进程, 直至生成结果.
apply_async(func[, args[, kwds[, callback]]])与 apply 类似, 但是不会阻塞进程
map(func, iterable[, chunksize])进程池中的进程进行映射操作
map_async(func, iterable[, chunksize[, callback]])
imap(func, iterable[, chunksize]): 返回有序迭代器
imap_unordered(func, iterable[, chunsize]): 返回无序迭代器
close(): 禁止进程池再接收任务
terminate(): 强行终止进程池, 不论是否有任务在执行
join(): 在 close()或 terminate()之后进行, 等待进程退出
示例 2
- from multiprocessing import Pool
- def f(x):
- return x*x
- if __name__ == '__main__':
- p = Pool(5) # 创建有 5 个进程的进程池
- print(p.map(f, [1, 2, 3])) # 将 f 函数的操作给进程池
- 3.3.3 Pipes & Queues
- multiprocessing.Pipe([duplex])
返回两个连接对象(conn1, conn2), 两个连接对象分别访问 pipe 的头和尾, 进行读写操作
Duplex: True(default), 创建的 pipe 是双向的, 也即两端都可以进行读写; 若为 False, 则 pipe 是单向的, 仅可以在一端读, 另一端写, 此时与 Queue 类似.
multiprocessing.Queue([maxsize])
qsize(): 返回 queue 中 member 数量
empty(): 如果 queue 是空的, 则返回 true
full(): 如果 queue 中 member 数量达到 maxsize, 则返回 true
put(obj): 将一个 object 放入到 queue 中
get(): 从队列中取出一个 object 并将它从 queue 中移除, FIFO 原则
close(): 关闭队列, 并将缓存的 object 写入 pipe
示例
- from multiprocessing import Pool
- import time
- def f(x):
- return x*x
- if __name__ == '__main__':
- pool = Pool(processes=4) # start 4 worker processes
- result = pool.apply_async(f, (10,)) # evaluate "f(10)" asynchronously in a single process
- print result.get(timeout=1) # prints "100" unless your computer is *very* slow
- print pool.map(f, range(10)) # prints "[0, 1, 4,..., 81]"
- it = pool.imap(f, range(10))
- print it.next() # prints "0"
- print it.next() # prints "1"
- print it.next(timeout=1) # prints "4" unless your computer is *very* slow
- result = pool.apply_async(time.sleep, (10,))
- print result.get(timeout=1) # raises multiprocessing.TimeoutError
3.3.4 进程锁 multiprocessing.Lock
当一个进程获得 (acquire) 锁之后, 其它进程在想获得锁就会被禁止, 可以保护数据, 进行同步处理.
acquire(block=True, timeout=None): 尝试获取一个锁, 如果 block 为 true, 则会在获得锁之后阻止其它进程再获取锁.
release(): 释放锁
3.3.5 共享内存 --Value, Array
共享内存通常需要配合进程锁来处理, 保证处理的顺序相同.
multiprocessing.Value(typecode_or_type, *args[, lock])
返回一个 ctype 对象,
创建 c = Value('d', 3.14), 调用 c.value()
multiprocessing.Array(typecode_or_type, size_or_initializer, *, lock=True)
返回一个 ctype 数组, 只能是一维的
Array('i', [1, 2, 3, 4])
Type code | C Type | Python Type | Minimum size in bytes |
---|---|---|---|
'b' | signed char | int | 1 |
'B' | unsigned char | int | 1 |
'u' | Py_UNICODE | Unicode character | 2 |
'h' | signed short | int | 2 |
'H' | unsigned short | int | 2 |
'i' | signed int | int | 2 |
'I' | unsigned int | int | 2 |
'l' | signed long | int | 4 |
'L' | unsigned long | int | 4 |
'q' | signed long long | int | 8 |
'Q' | unsigned long long | int | 8 |
'f' | float | float | 4 |
'd' | double | float | 8 |
3.3.6 其它方法
multiprocessing.active_children(): 返回当前进程的所有子进程
multiprocessing.cpu_count(): 返回本计算机的 CPU 数量
multiprocessing.current_process(): 返回当前进程
3.3.7 注意事项:
尽量避免共享数据
所有对象都尽量是可以 pickle 的
避免使用 terminate 强行终止进程, 以造成不可预料的后果
有队列的进程在终止前队列中的数据需要清空, join 操作应放到 queue 清空后
明确给子进程传递资源, 参数
Windows 平台另需注意:
注意跨模块全局变量的使用, 可能被各个进程修改造成结果不统一
主模块需要加上 if name == 'main': 来提高它的安全性, 如果有交互界面, 需要加上 freeze_support()
四, multiprocessing 实战
process,lock 与 value 尝试:
- import multiprocessing as mp
- import time
- def job(v, num, l):
- l.acquire() # 锁住
- for _ in range(5):
- time.sleep(0.1)
- v.value += num # 获取共享内存
- print(v.value)
- l.release() # 释放
- def multicore():
- l = mp.Lock() # 定义一个进程锁
- #l = 1
- v = mp.Value('i', 0) # 定义共享内存
- p1 = mp.Process(target=job, args=(v,1,l)) # 需要将 lock 传入
- p2 = mp.Process(target=job, args=(v,3,l))
- p1.start()
- p2.start()
- p1.join()
- p2.join()
- if __name__=='__main__':
- multicore()
上述代码即对共享内存叠加 5 次, p1 进程每次叠加 1,p2 进程每次叠加 3, 为了避免 p1 与 p2 在运行时抢夺共享数据 v, 在进程执行时锁住了该进程, 从而保证了执行的顺序. 我测试了三个案例:
直接运行上述代码输出[1, 2, 3, 4, 5, 8, 11, 14, 17, 20], 运行时间为 1.037s
在 1 的基础上注释掉锁(上述注释了三行), 在没有锁的情况下, 输出[1, 4, 5, 8, 9, 12, 13, 15, 14, 16], 运行时间为 0.53s
在 2 的基础上将 p1.join()调到 p2.start()前面, 输出为[1, 2, 3, 4, 5, 8, 11, 14, 17, 20], 运行时间为 1.042s.
可以发现, 没锁的情况下调整 join 可以取得与加锁类似的结果, 这是因为 join 即是阻塞主进程, 直至当前进程结束才回到主进程, 若将 p1.join()放到 p1.start()后面, 则会马上阻塞主进程, 使得 p2 要稍后才开始, 这与锁的效果一样.
如果如上述代码所示, p1.join()在 p2.start()后面, 虽然是 p1 先 join(), 但这时只是阻塞了主进程, 而 p2 是兄弟进程, 它已经开始了, p1 就不能阻止它了, 所以这时如果没锁的话 p1 与 p2 就是并行了, 运行时间就是一半, 但因为它们争抢共享变量, 所以输出就变得不确定了.
- pool
- import multiprocessing as mp
- #import pdb
- def job(i):
- return i*i
- def multicore():
- pool = mp.Pool()
- #pdb.set_trace()
- res = pool.map(job, range(10))
- print(res)
- res = pool.apply_async(job, (2,))
- # 用 get 获得结果
- print(res.get())
- # 迭代器, i=0 时 apply 一次, i=1 时 apply 一次等等
- multi_res = [pool.apply_async(job, (i,)) for i in range(10)]
- # 从迭代器中取出
- print([res.get() for res in multi_res])
- multicore()
pool 其实非常好用, 特别是 map 与 apply_async. 通过 pool 这个接口, 我们只有指定可以并行的函数与函数参数列表, 它就可以自动帮我们创建多进程池进行并行计算, 真的不要太方便. pool 特别适用于数据并行模型, 假如是消息传递模型那还是建议自己通过 process 来创立进程吧.
总结
小子这次主要是按自己的理解把并行计算理了下, 对进程, 线程, CPU 之间的关系做了下阐述, 并把 python 的 multiprocessing 这个包拎了拎, 个人感觉这个里面还大有学问, 上次我一个师兄用 python 的 process 来控制单次迭代的运行时间 (运行超时就跳过这次迭代, 进入下一次迭代) 也是让我涨了见识, 后面还要多多学习啊.
感谢您花费宝贵的时间阅读到这里, 希望能有所收获, 也欢迎在评论区进行交流.
推荐好文:
multiprocessing 官方文档
python 多进程的理解 multiprocessing Process join run(推荐好文)
多进程 Multiprocessing
来源: https://www.cnblogs.com/fugeny/p/9898971.html