Gevent 官网文档地址: http://www.gevent.org/contents.html
进程, 线程, 协程区分
我们通常所说的协程 Coroutine 其实是 corporate routine 的缩写, 直接翻译为协同的例程, 一般我们都简称为协程.
在 linux 系统中, 线程就是轻量级的进程, 而我们通常也把协程称为轻量级的线程即微线程.
进程和协程
下面对比一下进程和协程的相同点和不同点:
相同点:
相同点存在于, 当我们挂起一个执行流的时, 我们要保存的东西:
栈, 其实在你切换前你的局部变量, 以及要函数的调用都需要保存, 否则都无法恢复
寄存器状态, 这个其实用于当你的执行流恢复后要做什么
而寄存器和栈的结合就可以理解为上下文, 上下文切换的理解:
CPU 看上去像是在并发的执行多个进程, 这是通过处理器在进程之间切换来实现的, 操作系统实现这种交错执行的机制称为上下文切换
操作系统保持跟踪进程运行所需的所有状态信息. 这种状态, 就是上下文.
在任何一个时刻, 操作系统都只能执行一个进程代码, 当操作系统决定把控制权从当前进程转移到某个新进程时, 就会进行上下文切换, 即保存当前进程的上下文, 恢复新进程的上下文, 然后将控制权传递到新进程, 新进程就会从它上次停止的地方开始.
不同点:
执行流的调度者不同, 进程是内核调度, 而协程是在用户态调度, 也就是说进程的上下文是在内核态保存恢复的, 而协程是在用户态保存恢复的, 很显然用户态的代价更低
进程会被强占, 而协程不会, 也就是说协程如果不主动让出 CPU, 那么其他的协程, 就没有执行的机会.
对内存的占用不同, 实际上协程可以只需要 4K 的栈就足够了, 而进程占用的内存要大的多
从操作系统的角度讲, 多协程的程序是单进程, 单协程
线程和协程
既然我们上面也说了, 协程也被称为微线程, 下面对比一下协程和线程:
线程之间需要上下文切换成本相对协程来说是比较高的, 尤其在开启线程较多时, 但协程的切换成本非常低.
同样的线程的切换更多的是靠操作系统来控制, 而协程的执行由我们自己控制.
协程只是在单一的线程里不同的协程之间切换, 其实和线程很像, 线程是在一个进程下, 不同的线程之间做切换, 这也可能是协程称为微线程的原因吧.
Gevent 模块
Gevent 是一种基于协程的 Python 网络库, 它用到 Greenlet 提供的, 封装了 libevent 事件循环的高层同步 API. 它让开发者在不改变编程习惯的同时, 用同步的方式写异步 I/O 的代码.
简单示例:
- import gevent
- def test1():
- print 12
- gevent.sleep(0)
- print 34
- def test2():
- print 56
- gevent.sleep(0)
- print 78
- gevent.joinall([
- gevent.spawn(test1),
- gevent.spawn(test2),
- ])
结果:
12 56 34 78
猴子补丁 Monkey patching
这个补丁是 Gevent 模块最需要注意的问题, 有了它, 才会让 Gevent 模块发挥它的作用. 我们往往使用 Gevent 是为了实现网络通信的高并发, 但是, Gevent 直接修改标准库里面大部分的阻塞式系统调用, 包括 socket,ssl,threading 和 select 等模块, 而变为协作式运行. 但是我们无法保证你在复杂的生产环境中有哪些地方使用这些标准库会由于打了补丁而出现奇怪的问题.
一种方法是使用 gevent 下的 socket 模块, 我们可以通过 "from gevent import socket" 来导入. 不过更常用的方法是使用猴子布丁(Monkey patching). 使用猴子补丁褒贬不一, 但是官网上还是建议使用 "patch_all()", 而且在程序的第一行就执行.
- from gevent import monkey; monkey.patch_socket()
- import gevent
- import socket
- urls = ['www.baidu.com', 'www.gevent.org', 'www.python.org']
- jobs = [gevent.spawn(socket.gethostbyname, url) for url in urls]
- gevent.joinall(jobs, timeout=5)
- print [job.value for job in jobs]
上述代码的第一行就是对 socket 标准库打上猴子补丁, 此后 socket 标准库中的类和方法都会被替换成非阻塞式的, 所有其他的代码都不用修改, 这样协程的效率就真正体现出来了. Python 中其它标准库也存在阻塞的情况, gevent 提供了 "monkey.patch_all()" 方法将所有标准库都替换.
获取协程状态
started 属性 / ready()方法: 判断协程是否已启动.
successful()方法: 判断协程是否成功运行且没有抛出异常.
value 属性: 获取协程执行完之后的返回值.
另外, greenlet 协程运行过程中发生的异常是不会被抛出到协程外的, 因此需要用协程对象的 "exception" 属性来获取协程中的异常.
下面的例子很好的演示了各种方法和属性的使用.
- #!/usr/bin/env python
- # _*_ coding utf-8 _*_
- #Author: aaron
- import gevent
- def win():
- return 'You win!'
- def fail():
- raise Exception('You failed!')
- winner = gevent.spawn(win)
- loser = gevent.spawn(fail)
- print(winner.started) # True
- print(loser.started) # True
- # 在 Greenlet 中发生的异常, 不会被抛到 Greenlet 外面.
- # 控制台会打出 Stacktrace, 但程序不会停止
- try:
- gevent.joinall([winner, loser])
- except Exception as e:
- # 这段永远不会被执行
- print('This will never be reached')
- print(winner.ready()) # True
- print(loser.started) # True
- print(winner.value) # 'You win!'
- print(loser.value) # None
- print('successful',winner.successful()) # True
- print('successful',loser.successful()) # False
- # 这里可以通过 raise loser.exception 或 loser.get()
- # 来将协程中的异常抛出
- print(loser.exception)
协程运行超时控制
之前我们讲过在 "gevent.joinall()" 方法中可以传入 timeout 参数来设置超时, 我们也可以在全局范围内设置超时时间:
- import gevent
- from gevent import Timeout
- timeout = Timeout(2) # 2 seconds
- timeout.start()
- def wait():
- gevent.sleep(10)
- try:
- gevent.spawn(wait).join()
- except Timeout:
- print('Could not complete')
上例中, 我们将超时设为 2 秒, 此后所有协程的运行, 如果超过两秒就会抛出 "Timeout" 异常. 我们也可以将超时设置在 with 语句内, 这样该设置只在 with 语句块中有效:
- with Timeout(1):
- gevent.sleep(10)
此外, 我们可以指定超时所抛出的异常, 来替换默认的 "Timeout" 异常. 比如下例中超时就会抛出我们自定义的 "TooLong" 异常.
- class TooLong(Exception):
- pass
- with Timeout(1, TooLong):
- gevent.sleep(10)
协程间通信
事件 (Event) 对象
greenlet 协程间的异步通讯可以使用事件 (Event) 对象. 该对象的 "wait()" 方法可以阻塞当前协程, 而 "set()" 方法可以唤醒之前阻塞的协程. 在下面的例子中, 5 个 waiter 协程都会等待事件 evt, 当 setter 协程在 3 秒后设置 evt 事件, 所有的 waiter 协程即被唤醒.
- #!/usr/bin/env python
- # _*_ coding utf-8 _*_
- #Author: aaron
- import gevent
- from gevent.event import Event
- evt = Event()
- def setter():
- print 'Wait for me'
- gevent.sleep(3) # 3 秒后唤醒所有在 evt 上等待的协程
- print "Ok, I'm done"
- evt.set() # 唤醒
- def waiter():
- print "I'll wait for you"
- evt.wait() # 等待
- print 'Finish waiting'
- gevent.joinall([
- gevent.spawn(setter),
- gevent.spawn(waiter),
- gevent.spawn(waiter),
- gevent.spawn(waiter),
- gevent.spawn(waiter),
- gevent.spawn(waiter)
- ])
AsyncResult 事件
除了 Event 事件外, gevent 还提供了 AsyncResult 事件, 它可以在唤醒时传递消息. 让我们将上例中的 setter 和 waiter 作如下改动:
- #!/usr/bin/env python
- # _*_ coding utf-8 _*_
- #Author: aaron
- from gevent.event import AsyncResult
- aevt = AsyncResult()
- def setter():
- print 'Wait for me'
- gevent.sleep(3) # 3 秒后唤醒所有在 evt 上等待的协程
- print "Ok, I'm done"aevt.set('Hello!') # 唤醒, 并传递消息
- def waiter():
- print("I'll wait for you")
- message = aevt.get() # 等待, 并在唤醒时获取消息
- print 'Got wake up message: %s' % message
队列 Queue
队列 Queue 的概念相信大家都知道, 我们可以用它的 put 和 get 方法来存取队列中的元素. gevent 的队列对象可以让 greenlet 协程之间安全的访问. 运行下面的程序, 你会看到 3 个消费者会分别消费队列中的产品, 且消费过的产品不会被另一个消费者再取到:
- #!/usr/bin/env python
- # _*_ coding utf-8 _*_
- #Author: aaron
- import gevent
- from gevent.queue import Queue
- products = Queue()
- def consumer(name):
- #while not products.empty():
- while True:
- try:
- print('%s got product %s' % (name, products.get_nowait()))
- gevent.sleep(0)
- except gevent.queue.Empty:
- break
- print('Quit')
- def producer():
- for i in range(1, 10):
- products.put(i)
- gevent.joinall([
- gevent.spawn(producer),
- gevent.spawn(consumer, 'steve'),
- gevent.spawn(consumer, 'john'),
- gevent.spawn(consumer, 'nancy'),
- ])
注意: 协程队列跟线程队列是一样的, put 和 get 方法都是阻塞式的, 它们都有非阻塞的版本: put_nowait 和 get_nowait. 如果调用 get 方法时队列为空, 则是不会抛出 "gevent.queue.Empty" 异常. 我们只能使用 get_nowait()的方式让气抛出异常.
信号量
信号量可以用来限制协程并发的个数. 它有两个方法, acquire 和 release. 顾名思义, acquire 就是获取信号量, 而 release 就是释放. 当所有信号量都已被获取, 那剩余的协程就只能等待任一协程释放信号量后才能得以运行:
- #!/usr/bin/env python
- # _*_ coding utf-8 _*_
- #Author: aaron
- import gevent
- from gevent.coros import BoundedSemaphore
- sem = BoundedSemaphore(2)
- def worker(n):
- sem.acquire()
- print('Worker %i acquired semaphore' % n)
- gevent.sleep(0)
- sem.release()
- print('Worker %i released semaphore' % n)
- gevent.joinall([gevent.spawn(worker, i) for i in xrange(0, 6)])
上面的例子中, 我们初始化了 "BoundedSemaphore" 信号量, 并将其个数定为 2. 所以同一个时间, 只能有两个 worker 协程被调度. 程序运行后的结果如下:
Worker 0 acquired semaphore
Worker 1 acquired semaphore
Worker 0 released semaphore
Worker 1 released semaphore
Worker 2 acquired semaphore
Worker 3 acquired semaphore
Worker 2 released semaphore
Worker 3 released semaphore
Worker 4 acquired semaphore
Worker 4 released semaphore
Worker 5 acquired semaphore
Worker 5 released semaphore
如果信号量个数为 1, 那就等同于同步锁.
协程本地变量
同线程类似, 协程也有本地变量, 也就是只在当前协程内可被访问的变量:
- #!/usr/bin/env python
- # _*_ coding utf-8 _*_
- #Author: aaron
- import gevent
- from gevent.local import local
- data = local()
- def f1():
- data.x = 1
- print data.x
- def f2():
- try:
- print data.x
- except AttributeError:
- print 'x is not visible'
- gevent.joinall([
- gevent.spawn(f1),
- gevent.spawn(f2)
- ])
通过将变量存放在 local 对象中, 即可将其的作用域限制在当前协程内, 当其他协程要访问该变量时, 就会抛出异常. 不同协程间可以有重名的本地变量, 而且互相不影响. 因为协程本地变量的实现, 就是将其存放在以的 "greenlet.getcurrent()" 的返回为键值的私有的命名空间内.
多并发 socket 模型
服务器端:
- #!/usr/bin/env python
- # _*_ coding utf-8 _*_
- #Author: aaron
- import socket
- import gevent
- from gevent import socket, monkey
- monkey.patch_all()
- def server(port):
- s = socket.socket()
- s.bind(('0.0.0.0', port))
- s.listen(500)
- while True:
- cli, addr = s.accept()
- gevent.spawn(handle_request, cli)
- def handle_request(conn):
- try:
- while True:
- data = conn.recv(1024)
- print("recv:", data)
- conn.send(data)
- if not data:
- conn.shutdown(socket.SHUT_WR)
- except Exception as ex:
- print(ex)
- finally:
- conn.close()
- if __name__ == '__main__':
- server(8001)
当客户端连接上服务器端时, 服务器端通过开辟一个协程与该客户端完成交互任务, 同时由于使用了 Gevent 协程的方式, 在每个客户端与服务器交互时, 并不会影响到服务器端的工作.
客户端:
- #!/usr/bin/env python
- # _*_ coding utf-8 _*_
- #Author: aaron
- import socket
- HOST = 'localhost' # The remote host
- PORT = 8001 # The same port as used by the server
- s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- s.connect((HOST, PORT))
- while True:
- msg = bytes(input(">>:"), encoding="utf8")
- s.sendall(msg)
- data = s.recv(1024)
- # print(data)
- print('Received', repr(data)) # repr 格式化输出
- s.close()
来源: https://www.cnblogs.com/cjaaron/p/9178083.html