上篇回顾: 静态服务器 + 压测
3.2. 概念篇
1. 同步与异步
同步是指一个任务的完成需要依赖另外一个任务时, 只有等待被依赖的任务完成后, 依赖的任务才能算完成.
异步是指不需要等待被依赖的任务完成, 只是通知被依赖的任务要完成什么工作. 然后继续执行下面代码逻辑, 只要自己完成了整个任务就算完成了(异步一般使用状态, 通知和回调)
PS: 项目里面一般是这样的:(个人经验)
同步架构: 一般都是和钱相关的需求, 需要实时返回的业务
异步架构: 更多是对写要求比较高时的场景(同步变异步)
读一般都是实时返回, 代码一般都是 await xxx()
想象个情景就清楚了:
异步: 现在用户写了篇文章, 可以异步操作, 就算没真正写到数据库也可以返回: 发表成功(大不了失败提示一下)
同步: 用户获取订单信息, 你如果异步就会这样了: 提示下获取成功, 然后一片空白... 用户不卸载就怪了...
2. 阻塞与非阻塞
阻塞是指调用结果返回之前, 当前线程会被挂起, 一直处于等待消息通知, 不能够执行其他业务(大部分代码都是这样的)
非阻塞是指在不能立刻得到结果之前, 该函数不会阻塞当前线程, 而会立刻返回(继续执行下面代码, 或者重试机制走起)
PS: 项目里面重试机制为啥一般都是 3 次?
第一次重试, 两台 PC 挂了也是有可能的
第二次重试, 负载均衡分配的三台机器同时挂的可能性不是很大, 这时候就有可能是网络有点拥堵了
最后一次重试, 再失败就没意义了, 日记写起来, 再重试网络负担就加大了, 得不偿失了
3. 五种 IO 模型
对于一次 IO 访问, 数据会先被拷贝到内核的缓冲区中, 然后才会从内核的缓冲区拷贝到应用程序的地址空间. 需要经历两个阶段:
准备数据
将数据从内核缓冲区拷贝到进程地址空间
由于存在这两个阶段, Linux 产生了下面五种 IO 模型(以 socket 为例)
阻塞式 IO:
当用户进程调用了 recvfrom 等阻塞方法时, 内核进入 IO 的第 1 个阶段: 准备数据 (内核需要等待足够的数据再拷贝) 这个过程需要等待, 用户进程会被阻塞, 等内核将数据准备好, 然后拷贝到用户地址空间, 内核返回结果, 用户进程才从阻塞态进入就绪态
Linux 中默认情况下所有的 socket 都是阻塞的
非阻塞式 IO:
当用户进程发出 read 操作时, 如果 kernel 中的数据还没有准备好, 那么它并不会 block 用户进程, 而是立刻返回一个 error.
用户进程判断结果是一个 error 时, 它就知道数据还没有准备好, 于是它可以再次发送 read 操作
一旦 kernel 中的数据准备好了, 并且又再次收到了用户进程的 system call, 那么它马上就将数据拷贝到了用户内存, 然后返回
非阻塞 IO 模式下用户进程需要不断地询问内核的数据准备好了没有
IO 多路复用:
通过一种机制, 一个进程可以监视多个文件描述符 (套接字描述符) 一旦某个文件描述符就绪(一般是读就绪或者写就绪), 能够通知程序进行相应的读写操作(这样就不需要每个用户进程不断的询问内核数据准备好了没)
常用的 IO 多路复用方式有 select,poll 和 epoll
信号驱动 IO:
内核文件描述符就绪后, 通过信号通知用户进程, 用户进程再通过系统调用读取数据.
此方式属于同步 IO(实际读取数据到用户进程缓存的工作仍然是由用户进程自己负责的)
异步 IO(POSIX 的 aio_系列函数)
用户进程发起 read 操作之后, 立刻就可以开始去做其它的事. 内核收到一个异步 IO read 之后, 会立刻返回, 不会阻塞用户进程.
内核会等待数据准备完成, 然后将数据拷贝到用户内存, 当这一切都完成之后, 内核会给用户进程发送一个 signal 告诉它 read 操作完成了
4.Unix 图示
贴一下 Unix 编程里面的图:
** 非阻塞 IO**
**IO 复用 **
** 信号 IO**
** 异步 AIO**
3.3.IO 多路复用
开始之前咱们通过非阻塞 IO 引入一下:(来个简单例子 socket.setblocking(False))
- import time
- import socket
- def select(socket_addr_list):
- for client_socket, client_addr in socket_addr_list:
- try:
- data = client_socket.recv(2048)
- if data:
- print(f"[来自 {client_addr} 的消息:]\n")
- print(data.decode("utf-8"))
- client_socket.send(
- b"HTTP/1.1 200 ok\r\nContent-Type: text/html;charset=utf-8\r\n\r\n<h1>web Server Test</h1>"
- )
- else:
- # 没有消息是触发异常, 空消息是断开连接
- client_socket.close() # 关闭客户端连接
- socket_addr_list.remove((client_socket, client_addr))
- print(f"[客户端 {client_addr} 已断开连接, 当前连接数:{len(socket_addr_list)}]")
- except Exception:
- pass
- def main():
- # 存放客户端集合
- socket_addr_list = list()
- with socket.socket() as tcp_server:
- # 防止端口绑定的设置
- tcp_server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- tcp_server.bind(('', 8080))
- tcp_server.listen()
- tcp_server.setblocking(False) # 服务端非阻塞
- while True:
- try:
- client_socket, client_addr = tcp_server.accept()
- client_socket.setblocking(False) # 客户端非阻塞
- socket_addr_list.append((client_socket, client_addr))
- except Exception:
- pass
- else:
- print(f"[来自 {client_addr} 的连接, 当前连接数:{len(socket_addr_list)}]")
- # 防止客户端断开后出错
- if socket_addr_list:
- # 轮询查看客户端有没有消息
- select(socket_addr_list) # 引用传参
- time.sleep(0.01)
- if __name__ == "__main__":
- main()
输出:
可以思考下:
为什么 Server 也要设置为非阻塞?
PS: 一个线程里面只能有一个死循环, 现在程序需要两个死循环, so ==> 放一起咯
断开连接怎么判断?
PS: 没有消息是触发异常, 空消息是断开连接
client_socket 为什么不用 dict 存放?
PS:dict 在循环的过程中, del 会引发异常
1.Select
select 和上面的有点类似, 就是轮询的过程交给了操作系统:
kernel 会 "监视" 所有 select 负责的 socket, 当任何一个 socket 中的数据准备好了, select 就会返回. 这个时候用户进程再调用 read 操作, 将数据从 kernel 拷贝到用户进程
来个和上面等同的案例:
- import select
- import socket
- def main():
- with socket.socket() as tcp_server:
- tcp_server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- tcp_server.bind(('', 8080))
- tcp_server.listen()
- socket_info_dict = dict()
- socket_list = [tcp_server] # 监测列表
- while True:
- # 劣势: select 列表数量有限制
- read_list, write_list, error_list = select.select(
- socket_list, [], [])
- for item in read_list:
- # 服务端迎接新的连接
- if item == tcp_server:
- client_socket, client_address = item.accept()
- socket_list.append(client_socket)
- socket_info_dict[client_socket] = client_address
- print(f"[{client_address}已连接, 当前连接数:{len(socket_list)-1}]")
- # 客户端发来
- else:
- data = item.recv(2048)
- if data:
- print(data.decode("utf-8"))
- item.send(
- b"HTTP/1.1 200 ok\r\nContent-Type: text/html;charset=utf-8\r\n\r\n<h1>Web Server Test</h1>"
- )
- else:
- item.close()
- socket_list.remove(item)
- info = socket_info_dict[item]
- print(f"[{info}已断开, 当前连接数:{len(socket_list)-1}]")
- if __name__ == "__main__":
- main()
输出和上面一样
扩展说明:
select 函数监视的文件描述符分 3 类, 分别是 writefds,readfds, 和 exceptfds. 调用后 select 函数会阻塞, 直到有描述符就绪函数返回 (有数据可读, 可写, 或者有 except) 或者超时(timeout 指定等待时间, 如果立即返回设为 null 即可)
select 的一个缺点在于单个进程能够监视的文件描述符的数量存在最大限制, 在 Linux 上一般为 1024(64 位 =>2048)
然后 Poll 就出现了, 就是把上限给去掉了, 本质并没变, 还是使用的轮询
2.EPoll
epoll 在内核 2.6 中提出(Linux 独有), 使用一个文件描述符管理多个描述符, 将用户关心的文件描述符的事件存放到内核的一个事件表中, 采用监听回调的机制, 这样在用户空间和内核空间的 copy 只需一次, 避免再次遍历就绪的文件描述符列表
先来看个案例吧:(输出和上面一样)
- import socket
- import select
- def main():
- with socket.socket() as tcp_server:
- tcp_server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- tcp_server.bind(('', 8080))
- tcp_server.listen()
- # epoll 是 Linux 独有的
- epoll = select.epoll()
- # tcp_server 注册到 epoll 中
- epoll.register(tcp_server.fileno(), select.EPOLLIN | select.EPOLLET)
- # key-value
- fd_socket_dict = dict()
- # 回调需要自己处理
- while True:
- # 返回可读写的 socket fd 集合
- poll_list = epoll.poll()
- for fd, event in poll_list:
- # 服务器的 socket
- if fd == tcp_server.fileno():
- client_socket, client_addr = tcp_server.accept()
- fd = client_socket.fileno()
- fd_socket_dict[fd] = (client_socket, client_addr)
- # 把客户端注册进 epoll 中
- epoll.register(fd, select.EPOLLIN | select.EPOLLET)
- else: # 客户端
- client_socket, client_addr = fd_socket_dict[fd]
- data = client_socket.recv(2048)
- print(
- f"[来自 {client_addr} 的消息, 当前连接数:{len(fd_socket_dict)}]\n")
- if data:
- print(data.decode("utf-8"))
- client_socket.send(
- b"HTTP/1.1 200 ok\r\nContent-Type: text/html;charset=utf-8\r\n\r\n<h1>Web Server Test</h1>"
- )
- else:
- del fd_socket_dict[fd]
- print(
- f"[{client_addr}已离线, 当前连接数:{len(fd_socket_dict)}]\n"
- )
- # 从 epoll 中注销
- epoll.unregister(fd)
- client_socket.close()
- if __name__ == "__main__":
- main()
扩展: epoll 的两种工作模式
LT(level trigger, 水平触发)模式: 当 epoll_wait 检测到描述符就绪, 将此事件通知应用程序, 应用程序可以不立即处理该事件. 下次调用 epoll_wait 时, 会再次响应应用程序并通知此事件. LT 模式是默认的工作模式.
LT 模式同时支持阻塞和非阻塞 socket.
ET(edge trigger, 边缘触发)模式: 当 epoll_wait 检测到描述符就绪, 将此事件通知应用程序, 应用程序必须立即处理该事件. 如果不处理, 下次调用 epoll_wait 时, 不会再次响应应用程序并通知此事件.
ET 是高速工作方式, 只支持非阻塞 socket(ET 模式减少了 epoll 事件被重复触发的次数, 因此效率要比 LT 模式高)
Code 提炼一下:
实例化对象:
epoll = select.epoll()
注册对象:
epoll.register(tcp_server.fileno(), select.EPOLLIN | select.EPOLLET)
注销对象:
epoll.unregister(fd)
PS:epoll 不一定比 Select 性能高, 一般都是分场景的:
高并发下, 连接活跃度不高时: epoll 比 Select 性能高(eg:Web 请求, 页面随时关闭)
并发不高, 连接活跃度比较高: Select 更合适(eg: 小游戏)
Select 是 win 和 Linux 通用的, 而 epoll 只有 Linux 有
其实 IO 多路复用还有一个 kqueue, 和 epoll 类似, 下面的通用写法中有包含
3. 通用写法(Selector)
一般来说: Linux 下使用 epoll,Win 下使用 select(IO 多路复用会这个通用的即可)
先看看 Python 源代码:
- # 选择级别: epoll|kqueue|devpoll> poll> select
- if 'KqueueSelector' in globals():
- DefaultSelector = KqueueSelector
- elif 'EpollSelector' in globals():
- DefaultSelector = EpollSelector
- elif 'DevpollSelector' in globals():
- DefaultSelector = DevpollSelector
- elif 'PollSelector' in globals():
- DefaultSelector = PollSelector
- else:
- DefaultSelector = SelectSelector
实战案例:(可读和可写可以不分开)
- import socket
- import selectors
- # Linux 下使用 epoll,Win 下使用 select
- Selector = selectors.DefaultSelector()
- class Task(object):
- def __init__(self):
- # 存放客户端 fd 和 socket 键值对
- self.fd_socket_dict = dict()
- def run(self):
- self.server = socket.socket()
- self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- self.server.bind(('', 8080))
- self.server.listen()
- # 把 Server 注册到 epoll
- Selector.register(self.server.fileno(), selectors.EVENT_READ,
- self.connected)
- def connected(self, key):
- """客户端连接时处理"""
- client_socket, client_address = self.server.accept()
- fd = client_socket.fileno()
- self.fd_socket_dict[fd] = (client_socket, client_address)
- # 注册一个客户端读的事件(服务端去读消息)
- Selector.register(fd, selectors.EVENT_READ, self.call_back_reads)
- print(f"{client_address}已连接, 当前连接数:{len(self.fd_socket_dict)}")
- def call_back_reads(self, key):
- """客户端可读时处理"""
- # 一个 fd 只能注册一次, 监测可写的时候需要把可读给注销
- Selector.unregister(key.fd)
- client_socket, client_address = self.fd_socket_dict[key.fd]
- print(f"[来自 {client_address} 的消息:]\n")
- data = client_socket.recv(2048)
- if data:
- print(data.decode("utf-8"))
- # 注册一个客户端写的事件(服务端去发消息)
- Selector.register(key.fd, selectors.EVENT_WRITE,
- self.call_back_writes)
- else:
- client_socket.close()
- del self.fd_socket_dict[key.fd]
- print(f"{client_address}已断开, 当前连接数:{len(self.fd_socket_dict)}")
- def call_back_writes(self, key):
- """客户端可写时处理"""
- Selector.unregister(key.fd)
- client_socket, client_address = self.fd_socket_dict[key.fd]
- client_socket.send(b"ok")
- Selector.register(key.fd, selectors.EVENT_READ, self.call_back_reads)
- def main():
- t = Task()
- t.run()
- while True:
- ready = Selector.select()
- for key, obj in ready:
- # 需要自己回调
- call_back = key.data
- call_back(key)
- if __name__ == "__main__":
- main()
Code 提炼一下:
实例化对象:
Selector = selectors.DefaultSelector()
注册对象:
- Selector.register(server.fileno(), selectors.EVENT_READ, call_back)
- Selector.register(server.fileno(), selectors.EVENT_WRITE, call_back)
注销对象:
Selector.unregister(key.fd)
注意一下: 一个 fd 只能注册一次, 监测可写的时候需要把可读给注销(反之一样)
业余拓展:
select, iocp, epoll,kqueue 及各种 I/O 复用机制
https://blog.csdn.net/shallwake/article/details/5265287
kqueue 用法简介
http://www.cnblogs.com/luminocean/p/5631336.html
下级预估: 协程篇 or 网络深入篇
来源: https://www.cnblogs.com/dunitian/p/10099343.html