1, 基本概念
多进程库提供了 Pool 类来实现简单的多进程任务. Pool 类有以下方法:
- apply(): 直到得到结果之前一直阻塞.
- apply_async(): 这是 apply()方法的一个变体, 返回的是一个 result 对象. 这是一个异步的操作, 在所有的子类执行之前不会锁住主进程.
- map(): 这是内置的 map 函数的并行版本, 在得到结果之前一直阻塞, 此方法将可迭代的数据的每一个元素作为进程池的一个任务来执行.
- map_async(): 这是 map 的一个变体, 返回一个 result 对象. 如果指定了回调函数, 回调函数应该是 callable 的, 并且只接受一个参数. 当 result 准备好时, 会自动调用回调函数, 除非调用失败. 回调函数应该立即完成, 否则, 持有 result 的进程将被阻塞.
2, 测试用例
创建四个进程池, 然后使用 map 方法进行一个简单的计算.
- import multiprocessing
- def function_square(data):
- result = data * data
- return result
- if __name__ == "__main__":
- inputs = list(range(100))
- pool = multiprocessing.Pool(processes=4)
- pool_outputs = pool.map(function_square, inputs)
- pool.close()
- pool.join()
- print("pool:", pool_outputs)
pool.map 方法将一些独立的任务提交给进程池. pool.map 和内置 map 的执行结果相同, 但 pool.map 是通过多个并行进程计算的.
3,mpi4py 模块
Python 提供了很多 MPI 模块写并行程序. 其中 mpi4py 在 MPI-1/2 顶层构建, 提供了面向对象的接口, 紧跟 C++ 绑定的 MPI-2.MPI 是 C 语言用户可以无需学习新的接口就可以使用这个库.
此模块包含的主要的应用:
- 点对点通讯
- 集体通讯
- 拓扑
4, 安装 mpi4py
安装 mpich:https://www.microsoft.com/en-us/download/confirmation.aspx?id=56727
下载并安装 msmpisetup.exe
安装完成后安装目录如下:
将 bin 目录添加到系统环境中:
用 cmd 输入并显示如下即为安装成功
安装 mpi4py
pip install mpi4py
MPI 测试用例
- from mpi4py import MPI
- def mpi_test(rank):
- print("I am rank %s" %rank)
- if __name__ == "__main__":
- comm = MPI.COMM_WORLD
- rank = comm.Get_rank()
- mpi_test(rank)
- print("Hello world from process", rank)
使用 mpi 运行文件
在 MPI 中, 并行程序中不同进程用一个非负整数来区别, 如果我们有 P 个进程, 那么 rank 会从 0 到 P-1 分配.
MPI 拿到 rank 的函数如下: rank = comm.Get_rank()
这个函数返回调用它的进程的 rank,comm 叫做交流者, 用于区别不同的进程集合: comm = MPI.COMM_WORLD
5,MPI 点对点通讯
MPI 提供的最实用的一个特性是点对点通讯. 两个不同的进程之间可以通过点对点通讯交换数据: 一个进程是接收者, 一个进程是发送者.
Python 的 mpi4py 通过下面两个函数提供了点对点通讯功能:
- Comm.Send(data, process_destination): 通过它在交流组中的排名来区分发送给不同进程的数据.
- Comm.Recv(process_source): 接收来自源进程的数据, 也是通过在交流组中的排名来分分的.
Comm 变量表示交流着, 定义了可以互相通讯的进程组:
comm = MKPI.COMM_WORLD
交换信息测试用例:
- from mpi4py import MPI
- comm = MPI.COMM_WORLD
- rank = comm.rank
- print("My rank is :",rank)
- if rank == 0:
- data = 10000000
- destination_process = 4
- comm.send(data, dest=destination_process)
- print("sending data %s to process %d" %(data, destination_process))
- if rank == 1:
- destination_process = 8
- data = "hello,I am rank 1"
- comm.send(data, dest=destination_process)
- print("sending data %s to process %d" %(data, destination_process))
- if rank == 4:
- data = comm.recv(source=0)
- print("data received is = %s" %data)
- if rank == 8:
- data1 = comm.recv(source=1)
- print("data received is = %s" %data1)
运行结果:
通过 mpiexec -n 9 运行 9 个互相通讯的进程, 使用 rank 的值来区分每个进程.
整个过程分为两部分, 发送者发送数据, 接收者接收数据, 二者必须都指定发送方 / 接收方, source = 为指定发送者. 如果有发送的数据没有被接收, 程序会阻塞.
comm.send()和 comm.recv()函数都是阻塞的函数, 他们会一直阻塞调用者, 直到数据使用完成, 同时在 MPI 中, 有两种方式发送和接收数据:
- buffer 模式
- 同步模式
在 buffer 模式中, 只要需要发送的数据被拷贝到 buffer 中, 执行权就会交回到主程序, 此时数据并非已经发送 / 接收完成. 在同步模式中, 只有函数真正的结束发送 / 接收任务之后才会返回.
6, 避免死锁
mpi4py 没有提供特定的功能来解决这种情况, 但是提供了一些程序员必须遵守的规则来避免死锁的问题.
出现死锁的情况:
- from mpi4py import MPI
- comm = MPI.COMM_WORLD
- rank = comm.rank
- print("my rank is :",rank)
- if rank == 1:
- data_send = "a"
- destination_process = 5
- source_process = 5
- data_received = comm.recv(source=source_process)
- comm.send(data_send, dest=destination_process)
- print("sending data %s to process %d" %(data_send, destination_process))
- print("data received is = %s" %data_received)
- if rank == 5:
- data_send = "b"
- destination_process = 1
- source_process = 1
- data_received = comm.recv(source=source_process)
- comm.send(data_send, dest=destination_process)
- print("sending data %s to process %d" % (data_send, destination_process))
- print("data received is = %s" % data_received)
运行结果:
进程 1 和进程 5 产生阻塞, 程序阻塞.
此时两个进程都在等待对方, 发生阻塞, 因为 recv 和 send 都是阻塞的, 两个函数都先使用的 recv, 所以调用者都在等待他们完成. 所以讲上述代码改为如下即可解决阻塞:
- from mpi4py import MPI
- comm = MPI.COMM_WORLD
- rank = comm.rank
- print("my rank is :",rank)
- if rank == 1:
- data_send = "a"
- destination_process = 5
- source_process = 5
- comm.send(data_send, dest=destination_process)
- data_received = comm.recv(source=source_process)
- print("sending data %s to process %d" %(data_send, destination_process))
- print("data received is = %s" %data_received)
- if rank == 5:
- data_send = "b"
- destination_process = 1
- source_process = 1
- data_received = comm.recv(source=source_process)
- comm.send(data_send, dest=destination_process)
- print("sending data %s to process %d" % (data_send, destination_process))
- print("data received is = %s" % data_received)
将其中一个函数的 recv 和 send 顺序调换.
运行结果:
也可通过 Sendrecv 函数解决, 代码如下:
- from mpi4py import MPI
- comm = MPI.COMM_WORLD
- rank = comm.rank
- print("my rank is :",rank)
- if rank == 1:
- data_send = "a"
- destination_process = 5
- source_process = 5
- # comm.send(data_send, dest=destination_process)
- # data_received = comm.recv(source=source_process)
- data_received = comm.sendrecv(data_send, dest=destination_process, source=source_process)
- print("sending data %s to process %d" %(data_send, destination_process))
- print("data received is = %s" %data_received)
- if rank == 5:
- data_send = "b"
- destination_process = 1
- source_process = 1
- # data_received = comm.recv(source=source_process)
- # comm.send(data_send, dest=destination_process)
- data_received = comm.sendrecv(data_send, dest=destination_process, source=source_process)
- print("sending data %s to process %d" % (data_send, destination_process))
- print("data received is = %s" % data_received)
运行结果:
7, 集体通讯: Broadcast
在并行代码的开发中, 会经常需要在多个进程间共享某个变量运行时的值, 或操作多个进程提供的变量. MPI 库提供了在多个进程之间交换信息的方法, 将所有进程变成通讯者的这种方法叫做集体交流. 因此, 一个集体交流通常是 2 个以上的进程, 也可以称为广播 -- 一个进程将消息发送给其他进程. mpi4py 模块通过以下方式提供广播的功能:
buf = comm.bcast(data_to_share, rank_of_root_process)
这个函数将 root 消息中包含的信息发送给属于 comm 通讯组其他的进程, 每个进程必须通过相同的 root 和 comm 来调用它.
测试代码:
- from mpi4py import MPI
- comm = MPI.COMM_WORLD
- rank = comm.Get_rank()
- if rank == 0:
- variable_to_share = 100
- else:
- variable_to_share = None
- variable_to_share = comm.bcast(variable_to_share, root=0)
- print("process = %d variable shared = %d" %(rank, variable_to_share))
运行结果:
rank 等于 0 的 root 进程初始化了一个变量, variable_to_share, 值为 100, 然后声明了一个广播 variable_to_share = comm.bcast(variable_to_share, root=0)
这个变量将通过通讯组发送给其他进程.
集体通讯允许组中的多个进程同时进行数据交流. 在 mpi4py 模块中, 只提供了阻塞版本的集体通讯(阻塞调用者, 直到缓存中的数据全部安全发送.)
广泛应用的集体通讯应该是:
- 组中的进程提供通讯的屏障
- 通讯方式包括:
- 将一个进程的数据广播到组中其他进程中
- 从其他进程收集数据发给一个进程
- 从一个进程散播数据到其他进程中
- 减少操作
8, 集体通讯: Scatter
scatter 函数和广播很像, 但是不同的是 comm.bcast 将相同的数据发送给所有在监听的进程, comm.scatter 可以将数据放在数据中, 发送给不同的进程.
comm.scatter 函数接收一个 array, 根据进程的 rank 将其中的元素发给不同的进程, 第一个元素发送给进程 0, 第二个元素发给进程 1, 以此类推.
测试用例:
- from mpi4py import MPI
- comm = MPI.COMM_WORLD
- rank = comm.Get_rank()
- # array_to_share = ["a","b","c","d","e","f","g","h","i","j"]
- if rank == 0:
- array_to_share = [0,1,2,3,4,5,6,7,8,9]
- else:
- array_to_share = None
- recvbuf = comm.scatter(array_to_share, root=0)
- print("Process = %d recvbuf = %s" %(rank, recvbuf))
执行结果:
注意: 列表中的元素个数, 需要个进程保持一致. 否则会出现如下错误.
9, 集体通讯: gather
gather 函数基本上是反向的 scatter, 即收集所有进程发送到 root 进程数据. 方法如下:
recvbuf = comm.gather(sendbuf, rank_of_root_process)
sendbuf 是要发送的数据, rank_of_root_process 代表要接收数据的进程.
测试用例:
- from mpi4py import MPI
- comm = MPI.COMM_WORLD
- size = comm.Get_size()
- # print(size)
- rank = comm.Get_rank()
- data = "process %s" %rank
- # print("start %s"%data)
- data = comm.gather(data, root=0)
- # print(data)
- if rank == 0:
- print("rank = %s receiving data to other process" %rank)
- for i in range(1, size):
- #data[i] = (i+1) ** 2
- value = data[i]
- print("process %s receiving %s from process %s" %(rank, value, i))
- # print(data)
执行结果:
10, 使用 Alltoall 通讯
Alltoall 集体通讯结合了 scatter 和 gather 的功能. 在 mpi4py 中, 有以下三类的 Alltoall 集体通讯.
- - comm.Alltoall(sendbuf, recvbuf);
- - comm.Alltoallv(sendbuf, recvbuf);
- - comm.Alltoallw(sendbuf, recvbuf);
Alltoall 测试用例:
- from mpi4py import MPI
- import numpy
- comm = MPI.COMM_WORLD
- size = comm.Get_size()
- rank = comm.Get_rank()
- a_size = 1
- # print("numpy arange: %s" %numpy.arange(size, dtype=int))
- senddata = (rank+1)*numpy.arange(size, dtype=int)
- recvdata = numpy.empty(size * a_size, dtype=int)
- print("senddata is %s , recvdata is %s" %(senddata, recvdata))
- # print("Recvdata is %s: , \n numpy.empty is %s" %(recvdata, numpy.empty(size * a_size, dtype=int)))
- comm.Alltoall(senddata, recvdata)
- print("process %s sending %s, receiving %s" %(rank, senddata, recvdata))
运行结果:
comm.alltoall 方法将 task j 的 sendbuf 的第 j 个对象拷贝到 task i 中, recvbuf 的第 j 个对象, 一一对应. 发送过程如图:
可以将左右两个方格看做 xy 轴, 结果一一对应, 如左图的 (0,0) 对应的值为 0, 其对应的有图的值为右图的 (0,0) 也为 0. 左图的 3,4 对应的值为 16, 右图 (4,3) 也为 16.
P0 包含的数据[0 1 2 3 4], 它将值 0 赋值给自己, 1 传给进程 P1,2 传给进程 P2,3 传给进程 P3, 以此类推.
相同的 P1 的数据为[0 2 4 6 8] , 它将 0 传给 P0,2 传给 P1,4 传给 P2, 以此类推.
All-to-all 定制通讯也叫全部交换, 这种操作经常用于各种并发算法中, 比如快速傅里叶变换, 矩阵变换, 样本排序以及一些数据库的 Join 操作.
11, 简化操作
同 comm.gather 一样, comm.reduce 接收一个数组, 每一个元素是一个进程的输入, 然后返回一个数组, 每一个元素是进程的输出, 返回给 root 进程. 输出的元素包含了简化的结果.
简化定义如下: comm.Reduce(sendbuf, recvbuf, rank_of_root_process, op = type_of_reduction_operation)
这里需要注意的是, 参数 op 和 comm.gather 不同, 它代表你想应用在数据上的操作, mpi4py 模块代表定义了一系列的简化操作, 包括:
- MPI.MAX: 返回最大的元素
- MPI.MIN: 返回最小的元素
- MPI.SUM: 对所有的元素相加
- MPI.PROD: 对所有元素相乘
- MPI.LAND: 对所有元素进行逻辑操作
- MPI.MAXLOC: 返回最大值, 以及拥有它的进程
- MPI.MINLOC: 返回最小值, 以及拥有它的进程
测试用例:
- import numpy as np
- from mpi4py import MPI
- comm = MPI.COMM_WORLD
- size = comm.size
- rank = comm.rank
- array_size = 3
- recvdata = np.zeros(array_size, dtype=np.int)
- senddata = (rank+1)*np.arange(size, dtype=np.int)
- print("+++++++++++++%s+++++++++++++%s++++++++++++" %(recvdata, senddata))
- print("Process %s sending %s" %(rank, senddata))
- comm.Reduce(senddata, recvdata, root=0, op=MPI.SUM)
- print("on task %s, after Reduce: data = %s" %(rank, recvdata))
执行结果:
MPI.SUM 为求和操作, 过程如下:
简化操作将每个 task 的第 i 个元素相加, 然后放回到 P0 进程 (root 进程) 的第 i 个元素中.
来源: https://www.cnblogs.com/dukuan/p/9811057.html