基于协程的 TCP 程序
- from gevent import monkey, spawn
- monkey.patch_all()
- import socket
- server = socket.socket()
- server.bind(('127.0.0.1', 8888))
- server.listen()
- def talking(client):
- while True:
- try:
- data = client.recv(1024)
- if not data:
- client.close()
- break
- client.send(data.upper())
- except ConnectionResetError:
- client.close()
- break
- def save():
- while True:
- client, address = server.accept()
- spawn(talking, client)
- spawn(save).join()
- server
- import socket
- client = socket.socket()
- client.connect(('127.0.0.1', 8888))
- while True:
- msg = input('>>>:').strip()
- if not msg: continue
- client.send(msg.encode('utf-8'))
- data = client.recv(1024)
- print(data.decode('utf-8'))
- # import socket
- # from concurrent.futures import ThreadPoolExecutor
- # from threading import current_thread
- #
- # pool = ThreadPoolExecutor(1000)
- #
- #
- # def task():
- # client = socket.socket()
- # client.connect(('127.0.0.1', 8888))
- # while True:
- # msg = '%s say hi' % current_thread().name
- # if not msg: continue
- # client.send(msg.encode('utf-8'))
- # data = client.recv(1024)
- # print(data.decode('utf-8'))
- #
- #
- # for i in range(500):
- # pool.submit(task)
- client
协程 join 的用法
- from gevent import monkey, spawn
- monkey.patch_all()
- import time
- def task1():
- print('task1 run')
- time.sleep(3)
- print('task1 over')
- def task2():
- print('task2 run')
- time.sleep(3)
- print('task2 over')
- spawn(task1)
- spawn(task2)
- print('over')
- time.sleep(10)
代码
IO 模型
模型即套路是解决某个固定问题的方式方法
IO 模型 即解决 IO 问题的方式方法
IO 指的是输入输出, 输入输出设备的速度对比 CPU 而言是非常慢的, 比如 recv input 等都属 IO 操作
IO 操作最大的问题就是会阻塞程序执行
IO 模型要解决的仅仅是网络 IO 操作
IO 模型有以下几个
1. 阻塞 IO
socket 默认就是阻塞的
问题: 同一时间只能服务一个客户端
方法 1: 多线程
优点: 如果并发量不高 效率是较高的 因为每一个客户端都有单独线程来处理
弊端: 不可能无限的开启线程 线程也需要占用资源
方法 2: 多进程
优点: 可以多个 CPU 并行处理
弊端: 占用资源非常大, 一旦客户端稍微多一点立马就变慢了
线程池:
优点: 保证了服务器正常稳定运行, 还帮你负责创建和销毁线程, 以及任务分配
弊端: 一旦并发量超出最大线程数量, 就只能等前面的运行完毕
协程:
基于单线程并发
优点: 不需要创建一堆线程, 不需要在线程间做切换, 没有数量限制
弊端: 不能利用多核优势, 单核处理器性能也是有上限的, 如果真的并发特别 da 那么处理速度会变慢
总结: 真正导致效率低的是阻塞问题 但是上述几个方法并没有真正解决阻塞问题 仅仅是避开了阻塞问题
- import socket
- from threading import Thread
- server = socket.socket()
- server.bind(('127.0.0.1', 8888))
- server.listen()
- def talking(client):
- while True:
- try:
- data = client.recv(1024)
- print('recv...')
- if not data:
- client.close()
- break
- # send 是一个本地 io 操作 速度非常快
- client.send(data.upper())
- except ConnectionResetError:
- client.close()
- break
- while True:
- client, address = server.accept()
- print('accept')
- t = Thread(target=talking, args=(client,))
- t.start()
- server
- import socket
- import os
- client = socket.socket()
- client.connect(('127.0.0.1', 8888))
- print('connect...')
- while True:
- msg = '%s 发来问候 hello' % os.getpid()
- if not msg: continue
- client.send(msg.encode('utf-8'))
- data = client.recv(1024)
- print(data.decode('utf-8'))
- client
2. 非阻塞 IO
即遇到 IO 操作也不会导致程序阻塞, 会继续执行
意味着即使遇到 IO 操作 CUP 执行权也不会被剥夺
程序效率就变高了
弊端: 占用 CUP 太高
原因是需要无限的循环 去向操作系统拿数据
- import socket
- c = socket.socket()
- c.connect(("127.0.0.1", 8888))
- print("connect....")
- while True:
- msg = input(">>>:").strip()
- if not msg:
- continue
- c.send(msg.encode("utf-8"))
- data = c.recv(1024)
- print(data.decode("utf-8"))
- client
- import socket
- # import time
- server = socket.socket()
- server.bind(('127.0.0.1', 8888))
- server.listen()
- # 设置 socket 是否阻塞 默认为 True
- server.setblocking(False)
- # 所有的客户端 socket
- cs = []
- # 所有需要返回数据的客户端
- send_cs = []
- while True:
- # time.sleep(0.2)
- try:
- client, address = server.accept() # 三次握手
- print('run accept')
- cs.append(client) # 存储已经连接成功的客户端
- except BlockingIOError:
- # 没有数据准备 可以作别的事情
- # print('收数据')
- for c in cs[:]:
- try:
- data = c.recv(1024)
- if not data:
- c.close()
- cs.remove(c)
- print(data.decode('utf-8'))
- # 把数据和连接放进去
- send_cs.append((c, data))
- # c.send(data.upper()) io
- # send 也是 io 操作 在一些极端情况下 例如系统缓存满了放不进去 那肯定抛出
- # 非阻塞异常 这时候必须把发送数据 单独拿出来处理 因为 recv 和 send 都有可能抛出相同异常
- except BlockingIOError:
- continue
- except ConnectionResetError:
- c.close()
- # 从所有客户端列表中删除这个连接
- cs.remove(c)
- # print('发送数据')
- for item in send_cs[:]:
- c, data = item
- try:
- c.send(data.upper())
- # 如果发送成功就把数据从列表中删除
- send_cs.remove(item)
- except BlockingIOError: # 如果缓冲区慢了 那就下次再发
- continue
- except ConnectionResetError:
- c.close() # 关闭连接
- send_cs.remove(item) # 删除数据
- # 从所有客户端中删除这个已经断开的连接
- cs.remove(c)
- server
删除列表中的某个数据
- # 方式 1
- # li = [1, 2, 3, 4, 5]
- # rm_list = []
- # for i in li:
- # rm_list.append(i)
- #
- # for i in rm_list:
- # li.remove(i)
- #
- # print(li)
- # 方式 2
- li = [1, 2, 3, 4, 5]
- print(id(li[:]))
- print(id(li))
- # 用切片的方式 产生一个新的列表 新列表中元素与旧列表完全相同
- # 遍历新列表 删除旧列表
- for i in li[:]:
- li.remove(i)
- print(li)
删除列表中的某个元素
3.IO 多路复用 (最重要)
用一个线程来并发处理所有客户端
原本我们是直接问操作系统 要数据
如果是阻塞 IO 没有数据就进入阻塞状态
非阻塞 IO 没有数据就抛出异常 然后继续询问操作系统
在多路复用模块中, 要先问 select 哪些 socket 已经准备就绪 然后再处理这些已经就绪的 socket
既然是已经就绪 那么执行 recv 或是 send 就不会再阻塞
select 模块只有一个函数就是 select
参数 1:r_list 需要被 select 检测是否是可读的客户端 把所有 socket 放到改列表中, select 会负责从中
找出可以读取数据的 socket
参数 2:w_list 需要被 select 检测是否是可写的客户端 把所有 socket 放到改列表中, select 会负责从中
找出可以写入数据的 socket
参数 3:x_list 存储要检测异常条件... 忽略即可
返回一个元组 包含三个列表
- import socket
- c = socket.socket()
- c.connect(("127.0.0.1", 8888))
- print("connect....")
- while True:
- msg = input(">>>:").strip()
- if not msg:
- continue
- c.send(msg.encode("utf-8"))
- data = c.recv(1024)
- print(data.decode("utf-8"))
- client
- import socket
- # import time
- import select
- server = socket.socket()
- server.bind(('127.0.0.1', 8888))
- server.listen()
- # 在多路复用中 一旦 select 交给你一个 socket 一定意味着该 socket 已经准备就绪
- # server.setblocking(False)
- r_list = [server]
- w_list = []
- # 存储需要发送的数据 以及对应的 socket 把 socket 作为 key 数据作为 value
- data_dic = {}
- while True:
- readables, writeables, _ = select.select(r_list, w_list, [])
- # 接收数据 以及服务器创立连接
- for i in readables:
- if i == server: # 如果是服务器就执行 accept
- client, _ = i.accept()
- r_list.append(client)
- else: # 是一个客户端 那就 recv 收数据
- try:
- data = i.recv(1024)
- if not data: # Linux 对方强行下线或者是 Windows 正常下线
- i.close()
- r_list.remove(i)
- continue
- print(data)
- # 发送数据 不清楚 目前是不是可以发 所以交给 select 来检测
- w_list.append(i)
- data_dic[i] = data # 把要发送的数据先存着 等 select 告诉你这个这个连接可以发送时再发送
- except ConnectionResetError: # Windows 强行下线
- i.close()
- r_list.remove(i) # 从检测列表中删除
- # 发送数据
- for i in writeables:
- try:
- i.send(data_dic[i].upper()) # 返回数据
- # data_dic.pop(i)
- # w_list.remove(i)
- except ConnectionResetError:
- i.close()
- # data_dic.pop(i)
- # w_list.remove(i)
- finally:
- data_dic.pop(i) # 删除已经发送成功的数据
- w_list.remove(i) # 从检测列表中删除这个连接 如果不删除 将一直处于可写状态
- server
来源: http://www.bubuko.com/infodetail-2985269.html