本文转至 http://www.cnblogs.com/kaituorensheng/p/4465768.html, 在其基础上进行了一些小小改动.
在利用 Python 进行系统管理的时候, 特别是同时操作多个文件目录, 或者远程控制多台主机, 并行操作可以节约大量的时间. 当被操作对象数目不大时, 可以直接利用 multiprocessing 中的 Process 动态成生多个进程, 十几个还好, 但如果是上百个, 上千个目标, 手动的去限制进程数量却又太过繁琐, 此时可以发挥进程池的功效.
Pool 可以提供指定数量的进程供用户调用, 当有新的请求提交到 pool 中时, 如果池还没有满, 那么就会创建一个新的进程用来执行该请求; 但如果池中的进程数已经达到规定最大值, 那么该请求就会等待, 直到池中有进程结束, 才会创建新的进程来它.
例 1: 使用进程池
- from multiprocessing import freeze_support,Pool
- import time
- def Foo(i):
- time.sleep(2)
- print('___time---',time.ctime())
- return i+100
- def Bar(arg):
- print('----exec done:',arg,time.ctime())
- if __name__ == '__main__':
- freeze_support()
- pool = Pool(3) #线程池中的同时执行的进程数为 3
- for i in range(4):
- pool.apply_async(func=Foo,args=(i,),callback=Bar) #线程池中的同时执行的进程数为 3, 当一个进程执行完毕后, 如果还有新进程等待执行, 则会将其添加进去
- # pool.apply(func=Foo,args=(i,))
- print('end')
- pool.close()
- pool.join()# 调用 join 之前, 先调用 close 函数, 否则会出错. 执行完 close 后不会有新的进程加入到 pool,join 函数等待所有子进程结束
执行结果:
- end
- ___time--- Thu Jun 16 15:11:45 2016
- ----exec done: 100 Thu Jun 16 15:11:45 2016
- ___time--- Thu Jun 16 15:11:45 2016
- ----exec done: 101 Thu Jun 16 15:11:45 2016
- ___time--- Thu Jun 16 15:11:45 2016
- ----exec done: 102 Thu Jun 16 15:11:45 2016
- ___time--- Thu Jun 16 15:11:47 2016
- ----exec done: 103 Thu Jun 16 15:11:47 2016
函数解释:
apply_async(func[, args[, kwds[, callback]]]) 它是非阻塞, apply(func[, args[, kwds]])是阻塞的(理解区别, 看例 1 例 2 结果区别)
close() 关闭 pool, 使其不在接受新的任务.
terminate() 结束工作进程, 不在处理未完成的任务.
join() 主进程阻塞, 等待子进程的退出, join 方法要在 close 或 terminate 之后使用.
执行说明: 创建一个进程池 pool, 并设定进程的数量为 3,xrange(4)会相继产生四个对象 [0, 1, 2, 4], 四个对象被提交到 pool 中, 因 pool 指定进程数为 3, 所以 0,1,2 会直接送到进程中执行, 当其中一个执行完事后才空出一个进程处理对象 3, 所以会出现输出 "msg: hello 3" 出现在 "end" 后. 因为为非阻塞, 主函数会自己执行自个的, 不搭理进程的执行, 所以运行完 for 循环后直接输出 "mMsg: hark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~", 主程序在 pool.join() 处等待各个进程的结束.
例 2: 使用进程池(阻塞)
- from multiprocessing import freeze_support,Pool
- import time
- def Foo(i):
- time.sleep(2)
- print('___time---',time.ctime())
- return i+100
- def Bar(arg):
- print('----exec done:',arg,time.ctime())
- if __name__ == '__main__':
- freeze_support()
- pool = Pool(3) #线程池中的同时执行的进程数为 3
- for i in range(4):
- pool.apply(func=Foo,args=(i,))
- print('end')
- pool.close()
- pool.join()# 调用 join 之前, 先调用 close 函数, 否则会出错. 执行完 close 后不会有新的进程加入到 pool,join 函数等待所有子进程结束
执行结果
- ___time--- Thu Jun 16 15:15:16 2016
- ___time--- Thu Jun 16 15:15:18 2016
- ___time--- Thu Jun 16 15:15:20 2016
- ___time--- Thu Jun 16 15:15:22 2016
- end
例 3: 使用进程池, 并关注结果
- import multiprocessing
- import time
- def func(msg):
- print('hello :',msg,time.ctime())
- time.sleep(2)
- print('end',time.ctime())
- return 'done' + msg
- if __name__=='__main__':
- pool = multiprocessing.Pool(2)
- result = []
- for i in range(3):
- msg = 'hello %s' %i
- result.append(pool.apply_async(func=func,args=(msg,)))
- pool.close()
- pool.join()
- for res in result:
- print('***:',res.get())
- print('AAAAAAAAll end--')
执行结果
- hello : hello 0 Thu Jun 16 15:26:33 2016
- hello : hello 1 Thu Jun 16 15:26:33 2016
- end Thu Jun 16 15:26:35 2016
- hello : hello 2 Thu Jun 16 15:26:35 2016
- end Thu Jun 16 15:26:35 2016
- end Thu Jun 16 15:26:37 2016
- ***: donehello 0
- ***: donehello 1
- ***: donehello 2
- AAAAAAAAll end--
注: get()函数得出每个返回结果的值
例 4: 使用多个进程池
- import multiprocessing
- import time,os,random
- def Lee():
- print('\nRun task Lee--%s******ppid:%s'%(os.getpid(),os.getppid()),'~~~~',time.ctime())
- start = time.time()
- time.sleep(random.randrange(10))
- end = time.time()
- print('Task Lee,runs %0.2f seconds.'%(end-start),'~~~~',time.ctime())
- def Marlon():
- print("\nRun task Marlon-%s******ppid:%s"%(os.getpid(),os.getppid()),'~~~~',time.ctime())
- start = time.time()
- time.sleep(random.random() * 40)
- end=time.time()
- print( 'Task Marlon runs %0.2f seconds.' %(end - start),'~~~~',time.ctime())
- def Allen():
- print( "\nRun task Allen-%s******ppid:%s"%(os.getpid(),os.getppid()),'~~~~',time.ctime())
- start = time.time()
- time.sleep(random.random() * 30)
- end = time.time()
- print( 'Task Allen runs %0.2f seconds.' %(end - start),'~~~~',time.ctime())
- def Frank():
- print( "\nRun task Frank-%s******ppid:%s"%(os.getpid(),os.getppid()),'~~~~',time.ctime())
- start = time.time()
- time.sleep(random.random() * 20)
- end = time.time()
- print( 'Task Frank runs %0.2f seconds.' %(end - start),'~~~~',time.ctime())
- if __name__ == '__main__':
- func_list = [Lee,Marlon,Allen,Frank]
- print('parent process id %s'%os.getpid())
- pool = multiprocessing.Pool(4)
- for func in func_list:
- pool.apply_async(func) #Pool 执行函数, apply 执行函数, 当有一个进程执行完毕后, 会添加一个新的进程到 pool 中
- print( 'Waiting for all subprocesses done...')
- pool.close()
- pool.join() #调用 join 之前, 一定要先调用 close() 函数, 否则会出错, close()执行后不会有新的进程加入到 pool,join 函数等待素有子进程结束
- print ('All subprocesses done.')
执行结果
- parent process id 98552
- Waiting for all subprocesses done...
- Run task Lee--97316******ppid:98552 ~~~~ Thu Jun 16 15:20:50 2016
- Run task Marlon-95536******ppid:98552 ~~~~ Thu Jun 16 15:20:50 2016
- Run task Allen-95720******ppid:98552 ~~~~ Thu Jun 16 15:20:50 2016
- Run task Frank-98784******ppid:98552 ~~~~ Thu Jun 16 15:20:50 2016
- Task Allen runs 0.31 seconds. ~~~~ Thu Jun 16 15:20:51 2016
- Task Lee,runs 7.00 seconds. ~~~~ Thu Jun 16 15:20:57 2016
- Task Frank runs 14.48 seconds. ~~~~ Thu Jun 16 15:21:05 2016
- Task Marlon runs 31.72 seconds. ~~~~ Thu Jun 16 15:21:22 2016
- All subprocesses done.
multiprocessing pool map
- #coding: utf-8
- import multiprocessing
- def m1(x):
- print x * x
- if __name__ == '__main__':
- pool = multiprocessing.Pool(multiprocessing.cpu_count())
- i_list = range(8)
- pool.map(m1, i_list)
一次执行结果
- 0
- 1
- 4
- 9
- 16
- 25
- 36
- 49
参考: http://www.dotblogs.com.tw/rickyteng/archive/2012/02/20/69635.aspx
问题: http://bbs.chinaunix.net/thread-4111379-1-1.html
- #coding: utf-8
- import multiprocessing
- import logging
- def create_logger(i):
- print i
- class CreateLogger(object):
- def __init__(self, func):
- self.func = func
- if __name__ == '__main__':
- ilist = range(10)
- cl = CreateLogger(create_logger)
- pool = multiprocessing.Pool(multiprocessing.cpu_count())
- pool.map(cl.func, ilist)
- print "hello------------>"
一次执行结果
- hello------------>
来源: http://www.bubuko.com/infodetail-2995694.html