一, IO 模型介绍
本文所讨论的是 Linux 环境下的 network IO,IO 模型可以分为 5 种, 分别是阻塞 IO(blocking IO), 非阻塞 IO(non-blocking IO), 多路复用 IO(IO multiplexing), 异步 IO(asynchronous IO), 信号驱动 IO(signal IO). 其中信号驱动 IO(signal IO)在实际中并不常用, 所有主要介绍其余四种 IO Model.
总结一下 IO 发生时涉及的对象和步骤. 对于一个 network IO (这里我们以 read 举例), 它会涉及到两个系统对象, 一个是调用这个 IO 的进程 (线程), 另一个就是系统内核(kernel). 当一个 read 操作发生时, 该操作会经历两个阶段:
- #1)等待数据准备 (Waiting for the data to be ready)
- #2)将数据从内核拷贝到进程中(Copying the data from the kernel to the process)
这两个阶段很重要, 因为这些 IO 模型的区别就是在两个阶段上各有不同的情况.
- #1, 输入操作: read,readv,recv,recvfrom,recvmsg 共 5 个函数, 如果会阻塞状态, 则会经历 wait data 和 copy data 两个阶段, 如果设置为非阻塞则在 wait 不到 data 时抛出异常
- #2, 输出操作: write,writev,send,sendto,sendmsg 共 5 个函数, 在发送缓冲区满了会阻塞在原地, 如果设置为非阻塞, 则会抛出异常
- #3, 接收外来链接: accept, 与输入操作类似
- #4, 发起外出链接: connect, 与输出操作类似
二, 阻塞 IO
在 Linux 中, 默认情况下所有的 socket 都是 blocking, 一个典型的读操作流程大概是这样:
当用户进程调用了 recvfrom 这个系统调用, kernel 就开始了 IO 的第一个阶段: 准备数据. 对于 network io 来说, 很多时候数据在一开始还没有到达 (比如, 还没有收到一个完整的 UDP 包), 这个时候 kernel 就要等待足够的数据到来. 而在用户进程这边, 整个进程会被阻塞. 当 kernel 一直等到数据准备好了, 它就会将数据从 kernel 中拷贝到用户内存, 然后 kernel 返回结果, 用户进程才解除 block 的状态, 重新运行起来. 所以, blocking IO 的特点就是在 IO 执行的两个阶段(等待数据和拷贝数据两个阶段) 都被 block 了.
实际上, 除非特别指定, 几乎所有的 IO 接口 ( 包括 socket 接口 ) 都是阻塞型的.
三, 非阻塞 IO
Linux 下, 可以通过设置 socket 使其变为 non-blocking. 当对一个 non-blocking socket 执行读操作时, 流程是这个样子:
从图中可以看出, 当用户进程发出 read 操作时, 如果 kernel 中的数据还没有准备好, 那么它并不会 block 用户进程, 而是立刻返回一个 error. 从用户进程角度讲 , 它发起一个 read 操作后, 并不需要等待, 而是马上就得到了一个结果. 用户进程判断结果是一个 error 时, 它就知道数据还没有准备好, 于是用户就可以在本次到下次再发起 read 询问的时间间隔内做其他事情, 或者直接再次发送 read 操作. 一旦 kernel 中的数据准备好了, 并且又再次收到了用户进程的 system call, 那么它马上就将数据拷贝到了用户内存(这一阶段仍然是阻塞的), 然后返回. 也就是说非阻塞的 recvform 系统调用调用之后, 进程并没有被阻塞, 内核马上返回给进程, 如果数据还没准备好, 此时会返回一个 error. 进程在返回之后, 可以干点别的事情, 然后再发起 recvform 系统调用. 重复上面的过程, 循环往复的进行 recvform 系统调用. 这个过程通常被称之为轮询. 轮询检查内核数据, 直到数据准备好, 再拷贝数据到进程, 进行数据处理. 需要注意, 拷贝数据整个过程, 进程仍然是属于阻塞的状态. 所以, 在非阻塞式 IO 中, 用户进程其实是需要不断的主动询问 kernel 数据准备好了没有.
下面我们来做一个非阻塞 IO 的示例:
- import socket
- import time
- sk=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
- sk.bind(('127.0.0.1',6667))
- sk.listen(5)
- sk.setblocking(False) #把阻塞状态设置成非阻塞状态
- print('waiting client connection...')
- while True:
- try:
- connection,address = sk.accept()
- print('+++',address)
- client_messge = connection.recv(1024)
- print(str(client_messge,'utf8'))
- connection.close()
- except Exception as e:
- print(e)
- time.sleep(4)
服务端
- import socket,time
- sk = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
- while True:
- sk.connect(('127.0.0.1',6667))
- print('hello')
- sk.sendall(bytes('hello','utf8'))
- time.sleep(2)
- break
客户端
非阻塞 IO 的优点是能够在等待任务完成的时间里干其他活了, 但是并不推荐. 原因是它的缺点也十分明显:
- #1. 循环调用 recv()将大幅度推高 CPU 占用率;
- #2. 任务完成的响应延迟增大了, 因为每过一段时间才去轮询一次 read 操作, 而任务可能在两次轮询之间的任意时间完成. 这会导致整体数据吞吐量的降低.
四, 多路复用 IO
多路复用 IO 的好处, 就在于单个进程就可以同时处理多个网络连接的 IO. 它的基本原理就是 select/epoll 这个 function 会不断的轮询所负责的所有 socket, 当某个 socket 有数据到达了, 就通知用户进程. 它的流程如图:
当用户进程调用了 select, 那么整个进程会被 block, 而同时, kernel 会 "监视" 所有 select 负责的 socket, 当任何一个 socket 中的数据准备好了, select 就会返回. 这个时候用户进程再调用 read 操作, 将数据从 kernel 拷贝到用户进程. 这个图和 blocking IO 的图其实并没有太大的不同, 事实上还更差一些. 因为这里需要使用两个系统调用(select 和 recvfrom), 而 blocking IO 只调用了一个系统调用(recvfrom). 但是, 用 select 的优势在于它可以同时处理多个连接.
下面我们来做一个多路复用 IO 的示例:
- import socket
- import select
- sk = socket.socket()
- sk.bind(('127.0.0.1',8801))
- sk.listen(5)
- inputs = [sk,]
- while True:
- r,w,e = select.select(inputs,[],[],5) #第一个输入列表, 第二个输出列表, 第三个错误列表, 第四个每隔 5 秒钟
- print(len(r))
- for obj in r:
- if obj == sk:
- conn,addr = obj.accept()
- print(conn)
- inputs.append(conn)
- else:
- data_bype = obj.recv(1024)
- print(str(data_bype,'utf8'))
- inp = input('回答 %s 号客户>>>'%inputs.index(obj))
- obj.sendall(bytes(inp,'utf8'))
- print('>>',r)
服务端
- import socket
- sk = socket.socket()
- sk.connect(('127.0.0.1',8801))
- while True:
- inp = input('>>>')
- sk.sendall(bytes(inp,'utf8'))
- data = sk.recv(1024)
- print(str(data,'utf8'))
客户端
多路复用 IO 的优点就是相比其他模型, 使用 select() 的事件驱动模型只用单线程 (进程) 执行, 占用资源少, 不消耗太多 CPU, 同时能够为多客户端提供服务.
五, 异步 IO
Linux 下的 asynchronous IO 其实用得不多, 它的流程图如下:
用户进程发起 read 操作之后, 立刻就可以开始去做其它的事. 而另一方面, 从 kernel 的角度, 当它受到一个 asynchronous read 之后, 首先它会立刻返回, 所以不会对用户进程产生任何 block. 然后, kernel 会等待数据准备完成, 然后将数据拷贝到用户内存, 当这一切都完成之后, kernel 会给用户进程发送一个 signal, 告诉它 read 操作完成了.
异步 IO 最大特点就是全程无阻塞.
六, selector 模块
它的功能与 select 模块类似, 实现高效的多路复用 IO, 常用于非阻塞的 socket 编程中.
下面我们利用 selector 模块来实现 socket 编程:
- import selectors
- import socket
- sel = selectors.DefaultSelector()
- def accept(sock,mask):
- conn,addr = sock.accept()
- print('accepted',conn,'from',addr)
- conn.setblocking(False)
- sel.register(conn,selectors.EVENT_READ,read)
- def read(conn,mask):
- try:
- data = conn.recv(1000)
- if not data:
- raise Exception
- print('echoing',repr(data),'to',conn)
- conn.send(data)
- except Exception as e:
- print('closing',conn)
- sel.unregister(conn)
- conn.close()
- sock = socket.socket()
- sock.bind(('localhost',8090))
- sock.listen(100)
- sock.setblocking(False)
- sel.register(sock,selectors.EVENT_READ,accept) #注册, sock 绑定 accept 函数
- while True:
- events = sel.select() #监听
- for key,mask in events:
- callback = key.data #第一次 key.data 返回一个函数
- callback(key.fileobj,mask) #第一次 key.fileobj 返回 sock
服务端
- import socket
- sk = socket.socket()
- sk.connect(('127.0.0.1',8090))
- while 1:
- inp = input('>>>')
- sk.send(inp.encode('utf8'))
- data = sk.recv(1024)
- print(data.decode('utf8'))
客户端
来源: http://www.bubuko.com/infodetail-3231617.html