1.1 concurent.furtrue 开启进程, 多进程 & 线程, 多线程
- # concurrent.futures 创建并行的任务
- # 进程池 ProcessPoolExecutor,ThreadPoolExecutor
- # 下面例子是 Io 密集型, 所以时间上比叫多
- from concurrent.futures import ProcessPoolExecutor
- import os,time,random
- def task(n):
- print('%s is running' %os.getpid())
- time.sleep(2)
- return n**2
- # if __name__ == '__main__':
- # p=ProcessPoolExecutor(max_workers=4) #进程池 max_workers 最大的工作任务
- # l=[]
- # start=time.time()
- # for i in range(10):
- # obj=p.submit(task,i) #submit 异步提交, 后面加入. result 就变成同步了
- # l.append(obj)
- # p.shutdown() #shutdown(wait=True) == p.close + p.join
- # print('='*30)
- # print([obj for obj in l]) #结果内存地址
- # print([obj.result() for obj in l])
- # print(time.time()-start)
- #线程池, 用线程来探测上面的例子, 时间少了很多
- from concurrent.futures import ThreadPoolExecutor
- import threading
- import os,time
- def task(n):
- print('%s:%s is running' %(threading.currentThread().getName(),os.getpid()))
- time.sleep(2)
- return n**2
- # if __name__ == '__main__':
- # p=ThreadPoolExecutor() #线程池 max_workers=20 一个进程多少线程, 默认线程 cpu 的个数乘以 5
- # l=[]
- # start=time.time()
- # for i in range(10):
- # obj=p.submit(task,i).result()
- # l.append(obj)
- # p.shutdown()
- # print('='*30)
- # print([obj.result() for obj in l])
- # print(time.time()-start)
- #p.submit(task,i).result()即同步执行, 进程池, 线程池都有
- from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
- import os,time,random
- def task(n):
- print('%s is running' %os.getpid())
- time.sleep(2)
- return n**2
- # if __name__ == '__main__':
- # p=ProcessPoolExecutor()
- # start=time.time()
- # for i in range(10):
- # res=p.submit(task,i).result() #同步执行
- # print(res)
- # print('='*30)
- # print(time.time()-start)
1.2 concurrent.futures map 方法
- # concurrent 的方法 map, 前面是一个函数, 后面是一个可迭代对象
- from concurrent.futures import ProcessPoolExecutor
- import os,time,random
- def task(n):
- print('%s is running' %os.getpid())
- time.sleep(2)
- return n**2
- if __name__ == '__main__':
- p=ProcessPoolExecutor()
- obj=p.map(task,range(10))
- p.shutdown()
- print('='*30)
- print(list(obj))
- # 上面的例子改成 map
- # if __name__ == '__main__':
- # p=ThreadPoolExecutor() #线程池 max_workers=20 一个进程多少线程, 默认线程 cpu 的个数乘以 5
- # l=[]
- # for i in range(10):
- # obj=p.submit(task,i).result()
- # l.append(obj)
- # p.shutdown()
- # print([obj.result() for obj in l])
- # if __name__ == '__main__':
- # p=ThreadPoolExecutor() #线程池 max_workers=20 一个进程多少线程, 默认线程 cpu 的个数乘以 5
- # obj = p.map(task,range(10)) 就可以了
- # p.shutdown()
- # print(list(obj))
- # *** 没法得到其中一个, 因为是迭代器, 而且这个东西没有回调函数, 只是提交任务的话 Map 最好
1.3 concurrent.futures 爬网页例子
- from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
- import requests
- import os
- import time
- from threading import currentThread
- def get_page(url):
- print('%s:<%s> is getting [%s]' %(currentThread().getName(),os.getpid(),url))
- response=requests.get(url)
- time.sleep(2)
- return {'url':url,'text':response.text}
- def parse_page(res):
- res=res.result() #参数传过来是对象, 所以需要得到结果
- print('%s:<%s> parse [%s]' %(currentThread().getName(),os.getpid(),res['url']))
- with open('db.txt','a') as f:
- parse_res='url:%s size:%s\n' %(res['url'],len(res['text']))
- f.write(parse_res)
- if __name__ == '__main__':
- # p=ProcessPoolExecutor()
- p=ThreadPoolExecutor()
- urls = [
- 'https://www.baidu.com',
- 'https://www.baidu.com',
- 'https://www.baidu.com',
- 'https://www.baidu.com',
- 'https://www.baidu.com',
- 'https://www.baidu.com',
- ]
- for url in urls:
- # multiprocessing.pool_obj 方式: p.apply_async(get_page,args=(url,),callback=parse_page), 这是自己调用 get 得到结果给回掉函数
- p.submit(get_page, url).add_done_callback(parse_page) #回掉函数, 前面的结果就是参数, 这个参数是对象, 所以需要 get
- # p.map(get_page,urls) map 写法
- p.shutdown()
- print('主',os.getpid())
二, 协程
一 引子
本节的主题是基于单线程来实现并发, 即只用一个主线程 (很明显可利用的 cpu 只有一个) 情况下实现并发, 为此我们需要先回顾下并发的本质: 切换 + 保存状态
cpu 正在运行一个任务, 会在两种情况下切走去执行其他的任务(切换由操作系统强制控制), 一种情况是该任务发生了阻塞, 另外一种情况是该任务计算的时间过长
ps: 在介绍进程理论时, 提及进程的三种执行状态, 而线程才是执行单位, 所以也可以将上图理解为线程的三种状态
其中第二种情况并不能提升效率, 只是为了让 cpu 能够雨露均沾, 实现看起来所有任务都被同时执行的效果, 如果多个任务都是纯计算的, 这种切换反而会降低效率为此我们可以基于 yield 来验证 yield 本身就是一种在单线程下可以保存任务运行状态的方法, 我们来简单复习一下:
- #1 yiled 可以保存状态, yield 的状态保存与操作系统的保存线程状态很像, 但是 yield 是代码级别控制的, 更轻量级
- #2 send 可以把一个函数的结果传给另外一个函数, 以此实现单线程内程序之间的切换
- # 串行执行
- import time
- def consumer(res):
- '''任务 1: 接收数据, 处理数据'''
- pass
- def producer():
- '''任务 2: 生产数据'''
- res=[]
- for i in range(10000000):
- res.append(i)
- return res
- start=time.time()
- # 串行执行
- res=producer()
- consumer(res)
- stop=time.time()
- print(stop-start) #1.5536692142486572
- # 基于 yield 并发执行
- import time
- def consumer():
- '''任务 1: 接收数据, 处理数据'''
- while True:
- x=yield
- def producer():
- '''任务 2: 生产数据'''
- g=consumer()
- next(g)
- for i in range(10000000):
- g.send(i)
- start=time.time()
- # 基于 yield 保存状态, 实现两个任务直接来回切换, 即并发的效果
- #PS: 如果每个任务中都加上打印, 那么明显地看到两个任务的打印是你一次我一次, 即并发执行的.
- producer()
- stop=time.time()
- print(stop-start) #2.0272178649902344
单纯地切换反而会降低运行效率
而在任务一遇到 io 情况下, 切到任务二去执行, 这样就可以利用任务一阻塞的时间完成任务二的计算, 效率的提升就在于此
- import time
- def consumer():
- '''任务 1: 接收数据, 处理数据'''
- while True:
- x=yield
- def producer():
- '''任务 2: 生产数据'''
- g=consumer()
- next(g)
- for i in range(10000000):
- g.send(i)
- time.sleep(2)
- start=time.time()
- producer() #并发执行, 但是任务 producer 遇到 io 就会阻塞住, 并不会切到该线程内的其他任务去执行
- stop=time.time()
- print(stop-start)
yield 并不能实现遇到 io 切换
对于单线程下, 我们不可避免程序中出现 io 操作, 但如果我们能在自己的程序中 (即用户程序级别, 而非操作系统级别) 控制单线程下的多个任务能在一个任务遇到 io 阻塞时就切换到另外一个任务去计算, 这样就保证了该线程能够最大限度地处于就绪态, 即随时都可以被 cpu 执行的状态, 相当于我们在用户程序级别将自己的 io 操作最大限度地隐藏起来, 从而可以迷惑操作系统, 让其看到: 该线程好像是一直在计算, io 比较少
协程的本质就是在单线程下, 由用户自己控制一个任务遇到 io 阻塞了就切换另外一个任务去执行, 以此来提升效率
因此我们需要找寻一种可以同时满足以下条件的解决方案:
1. 可以控制多个任务之间的切换, 切换之前将任务的状态保存下来, 以便重新运行时, 可以基于暂停的位置继续执行
2. 作为 1 的补充: 可以检测 io 操作, 在遇到 io 操作的情况下才发生切换
二 协程介绍
协程: 是单线程下的并发, 又称微线程, 纤程英文名 Coroutine 一句话说明什么是线程: 协程是一种用户态的轻量级线程, 即协程是由用户程序自己控制调度的
需要强调的是:
- #1. python 的线程属于内核级别的, 即由操作系统控制调度(如单线程遇到 io 或执行时间过长就会被迫交出 cpu 执行权限, 切换其他线程运行)
- #2. 单线程内开启协程, 一旦遇到 io, 就会从应用程序级别 (而非操作系统) 控制切换, 以此来提升效率(!!! 非 io 操作的切换与效率无关)
对比操作系统控制线程的切换, 用户在单线程内控制协程的切换
优点如下:
- #1. 协程的切换开销更小, 属于程序级别的切换, 操作系统完全感知不到, 因而更加轻量级
- #2. 单线程内就可以实现并发的效果, 最大限度地利用 cpu
缺点如下:
- #1. 协程的本质是单线程下, 无法利用多核, 可以是一个程序开启多个进程, 每个进程内开启多个线程, 每个线程内开启协程
- #2. 协程指的是单个线程, 因而一旦协程出现阻塞, 将会阻塞整个线程
总结协程特点:
必须在只有一个单线程里实现并发
修改共享数据不需加锁
用户程序里自己保存多个控制流的上下文栈
附加: 一个协程遇到 IO 操作自动切换到其它协程(如何实现检测 IO,yieldgreenlet 都无法实现, 就用到了 gevent 模块(select 机制))
三 Greenlet
如果我们在单个线程内有 20 个任务, 要想实现在多个任务之间切换, 使用 yield 生成器的方式过于麻烦(需要先得到初始化一次的生成器, 然后再调用 send 非常麻烦), 而使用 greenlet 模块可以非常简单地实现这 20 个任务直接的切换
- # 安装
- pip3 install greenlet
- from greenlet import greenlet
- def eat(name):
- print('%s eat 1' %name)
- g2.switch('egon')
- print('%s eat 2' %name)
- g2.switch()
- def play(name):
- print('%s play 1' %name)
- g1.switch()
- print('%s play 2' %name)
- g1=greenlet(eat)
- g2=greenlet(play)
- g1.switch('egon')# 可以在第一次 switch 时传入参数, 以后都不需要
单纯的切换(在没有 io 的情况下或者没有重复开辟内存空间的操作), 反而会降低程序的执行速度
- # 顺序执行
- import time
- def f1():
- res=1
- for i in range(100000000):
- res+=i
- def f2():
- res=1
- for i in range(100000000):
- res*=i
- start=time.time()
- f1()
- f2()
- stop=time.time()
- print('run time is %s' %(stop-start)) #10.985628366470337
- # 切换
- from greenlet import greenlet
- import time
- def f1():
- res=1
- for i in range(100000000):
- res+=i
- g2.switch()
- def f2():
- res=1
- for i in range(100000000):
- res*=i
- g1.switch()
- start=time.time()
- g1=greenlet(f1)
- g2=greenlet(f2)
- g1.switch()
- stop=time.time()
- print('run time is %s' %(stop-start)) # 52.763017892837524
- View Code
greenlet 只是提供了一种比 generator 更加便捷的切换方式, 当切到一个任务执行时如果遇到 io, 那就原地阻塞, 仍然是没有解决遇到 IO 自动切换来提升效率的问题
单线程里的这 20 个任务的代码通常会既有计算操作又有阻塞操作, 我们完全可以在执行任务 1 时遇到阻塞, 就利用阻塞的时间去执行任务 2 如此, 才能提高效率, 这就用到了 Gevent 模块
四 Gevent 介绍
- # 安装
- pip3 install gevent
Gevent 是一个第三方库, 可以轻松通过 gevent 实现并发同步或异步编程, 在 gevent 中用到的主要模式是 Greenlet, 它是以 C 扩展模块形式接入 Python 的轻量级协程 Greenlet 全部运行在主程序操作系统进程的内部, 但它们被协作式地调度
# 用法
g1=gevent.spawn(func,1,,2,3,x=4,y=5)创建一个协程对象 g1,spawn 括号内第一个参数是函数名, 如 eat, 后面可以有多个参数, 可以是位置实参或关键字实参, 都是传给函数 eat 的
- g2=gevent.spawn(func2)
- g1.join() #等待 g1 结束
- g2.join() #等待 g2 结束
- # 或者上述两步合作一步: gevent.joinall([g1,g2])
- g1.value# 拿到 func1 的返回值
遇到 IO 阻塞时会自动切换任务
- import gevent
- def eat(name):
- print('%s eat 1' %name)
- gevent.sleep(2)
- print('%s eat 2' %name)
- def play(name):
- print('%s play 1' %name)
- gevent.sleep(1)
- print('%s play 2' %name)
- g1=gevent.spawn(eat,'egon')
- g2=gevent.spawn(play,name='egon')
- g1.join()
- g2.join()
- # 或者 gevent.joinall([g1,g2])
- print('主')
- View Code
上例 gevent.sleep(2)模拟的是 gevent 可以识别的 io 阻塞,
而 time.sleep(2)或其他的阻塞, gevent 是不能直接识别的需要用下面一行代码, 打补丁, 就可以识别了
from gevent import monkey;monkey.patch_all()必须放到被打补丁者的前面, 如 time,socket 模块之前
或者我们干脆记忆成: 要用 gevent, 需要将 from gevent import monkey;monkey.patch_all()放到文件的开头
- from gevent import monkey;monkey.patch_all()
- import gevent
- import time
- def eat():
- print('eat food 1')
- time.sleep(2)
- print('eat food 2')
- def play():
- print('play 1')
- time.sleep(1)
- print('play 2')
- g1=gevent.spawn(eat)
- g2=gevent.spawn(play_phone)
- gevent.joinall([g1,g2])
- print('主')
我们可以用 threading.current_thread().getName()来查看每个 g1 和 g2, 查看的结果为 DummyThread-n, 即假线程
五 Gevent 之同步与异步
- from gevent import spawn,joinall,monkey;monkey.patch_all()
- import time
- def task(pid):
- """Some non-deterministic task"""
- time.sleep(0.5)
- print('Task %s done' % pid)
- def synchronous():
- for i in range(10):
- task(i)
- def asynchronous():
- g_l=[spawn(task,i) for i in range(10)]
- joinall(g_l)
- if __name__ == '__main__':
- print('Synchronous:')
- synchronous()
- print('Asynchronous:')
- asynchronous()
- # 上面程序的重要部分是将 task 函数封装到 Greenlet 内部线程的 gevent.spawn 初始化的 greenlet 列表存放在数组 threads 中, 此数组被传给 gevent.joinall 函数, 后者阻塞当前流程, 并执行所有给定的 greenlet 执行流程只会在 所有 greenlet 执行完后才会继续向下走
- View Code
六 Gevent 之应用举例一
- from gevent import monkey;monkey.patch_all()
- import gevent
- import requests
- import time
- def get_page(url):
- print('GET: %s' %url)
- response=requests.get(url)
- if response.status_code == 200:
- print('%d bytes received from %s' %(len(response.text),url))
- start_time=time.time()
- gevent.joinall([
- gevent.spawn(get_page,'https://www.python.org/'),
- gevent.spawn(get_page,'https://www.yahoo.com/'),
- gevent.spawn(get_page,'https://github.com/'),
- ])
- stop_time=time.time()
- print('run time is %s' %(stop_time-start_time))
协程应用: 爬虫
七 Gevent 之应用举例二
通过 gevent 实现单线程下的 socket 并发 (from gevent import monkey;monkey.patch_all() 一定要放到导入 socket 模块之前, 否则 gevent 无法识别 socket 的阻塞)
- from gevent import monkey;monkey.patch_all()
- from socket import *
- import gevent
- # 如果不想用 money.patch_all()打补丁, 可以用 gevent 自带的 socket
- # from gevent import socket
- # s=socket.socket()
- def server(server_ip,port):
- s=socket(AF_INET,SOCK_STREAM)
- s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
- s.bind((server_ip,port))
- s.listen(5)
- while True:
- conn,addr=s.accept()
- gevent.spawn(talk,conn,addr)
- def talk(conn,addr):
- try:
- while True:
- res=conn.recv(1024)
- print('client %s:%s msg: %s' %(addr[0],addr[1],res))
- conn.send(res.upper())
- except Exception as e:
- print(e)
- finally:
- conn.close()
- if __name__ == '__main__':
- server('127.0.0.1',8080)
服务端
服务端
- #_*_coding:utf-8_*_
- __author__ = 'Linhaifeng'
- from socket import *
- client=socket(AF_INET,SOCK_STREAM)
- client.connect(('127.0.0.1',8080))
- while True:
- msg=input('>>:').strip()
- if not msg:continue
- client.send(msg.encode('utf-8'))
- msg=client.recv(1024)
- print(msg.decode('utf-8'))
客户端
- from threading import Thread
- from socket import *
- import threading
- def client(server_ip,port):
- c=socket(AF_INET,SOCK_STREAM) #套接字对象一定要加到函数内, 即局部名称空间内, 放在函数外则被所有线程共享, 则大家公用一个套接字对象, 那么客户端端口永远一样了
- c.connect((server_ip,port))
- count=0
- while True:
- c.send(('%s say hello %s' %(threading.current_thread().getName(),count)).encode('utf-8'))
- msg=c.recv(1024)
- print(msg.decode('utf-8'))
- count+=1
- if __name__ == '__main__':
- for i in range(500):
- t=Thread(target=client,args=('127.0.0.1',8080))
- t.start()
多线程并发多个客户端
来源: https://www.cnblogs.com/6324TV/p/8502811.html