前言
数据流(Streams)
数据流 (Streams) 是用于处理网络连接的高阶异步 / 等待就绪 (async/await-ready) 原语, 可以在不使用回调和底层传输协议的情况下发送和接收数据.
以下是一个用 asyncio 实现的 TCP 回显客户端:
- import asyncio
- async def tcp_echo_client(message):
- reader, writer = await asyncio.open_connection(
- '127.0.0.1', 8888)
- print(f'Send: {message!r}')
- writer.write(message.encode())
- data = await reader.read(100)
- print(f'Received: {data.decode()!r}')
- print('Close the connection')
- writer.close()
- await writer.wait_closed()
- asyncio.run(tcp_echo_client('Hello World!'))
完整代码见例子一节.
Stream 方法
以下所列的高层 asyncio 方法可以被用作创建和处理 Stream:
coroutine **asyncio.open_connection(host=None,*,loop=None,limit=None,ssl=None,family=0,proto=0,flags=0,sock=None,local_addr=None,server_hostname=None,ssl_handshake_timeout=None)**
创建一个网络连接, 并返回一对 (reader,writer) 对象.
返回的 reader 和 writer 对象是 StreamReader 和 StreamWriter 类的实例.
loop 是可选参数, 在此方法被某个协程 await 时能够自动确定.
limit 限定返回的 StreamReader 实例使用的缓冲区大小. 默认情况下, 缓冲区限制为 64KiB.
其余的参数被直接传递给
- loop.create_connection()
- .
python3.7 新增:
ssl_handshake_timeout
参数.
coroutine **asyncio.start_server(client_connected_cb,host=None,port=None,*,loop=None,limit=None,family=socket.AF_UNSPEC,flags=socket.AI_PASSIVE,sock=None,backlog=100,ssl=None,reuse_address=None,reuse_port=None,ssl_handshake_timeout=None,start_serving=True)**
启动一个 socket 服务端.
client_connected_cb
指定的回调函数, 在新连接建立的时候被调用. 该回调函数接收 StreamReader 和 StreamWriter 类的'实例对'(reader,writer)作为两个参数.
client_connected_cb
可以是普通的可调用函数, 也可以是协程函数. 如果是协程函数, 那么会被自动封装为 Task 对象处理.
loop 是可选参数, 在此方法被某个协程 await 时能够自动确定.
limit 限定返回的 StreamReader 实例使用的缓冲区大小. 默认情况下, 缓冲区限定值为 64KiB.
其余的参数被直接传递给
- loop.create_server()
- .
python3.7 新增:
ssl_handshake_timeout
和 start_serving 参数.
- Unix Sockets
- coroutine **asyncio.open_unix_connection(path=None,*,loop=None,limit=None,ssl=None,sock=None,server_hostname=None,ssl_handshake_timeout=None)**
创建一个 Unix socket 连接, 并返回一对 (reader,writer) 对象.
与 open_connection 类似, 只是运行在 Unix sockets 上.
另见
loop.create_unix_connection()
可用于: Unix
python3.7 新增:
ssl_handshake_timeout
参数.
python3.7 修正: path 参数可以为
类 path(path-like)对象
coroutine **asyncio.start_unix_server(client_connected_cb, path=None, *, loop=None, limit=None, sock=None, backlog=100, ssl=None, ssl_handshake_timeout=None, start_serving=True)**
启动一个 Unix socket 服务端.
类似于 start_server, 只是运行在 Unix sockets 上.
另见
loop.create_unix_server
可用于: Unix
python3.7 新增:
ssl_handshake_timeout
参数.
python3.7 修正: path 参数可以为
类 path(path-like)对象
- StreamReader
- class asyncio.StreamReader
定义一个读取器对象, 提供从 IO 数据流中读取数据的 API.
不建议 直接实例化 StreamReader 对象. 建议通过 open_connection()或 start_server()创建此类对象.
coroutine read(n=-1)
最多读取 n 字节数据. 如果 n 未设置, 或被设置为 - 1, 则读取至 EOF 标志, 并返回读到的所有字节.
如果在缓冲区仍为空时遇到 EOF, 则返回一个空的 bytes 对象.
coroutine readline()
读取一行(以 \ n 为标志).
如果在找到 \ n 之前遇到 EOF, 则返回已读取到的数据段.
如果遇到 EOF 时内部缓冲区仍为空, 则返回空的 bytes 对象.
coroutine readexactly(n)
精确读取 n 字节数据.
如果在尚未读够 n 字节时遇到 EOF, 则引发
IncompleteReadError
异常. 已经读取的部分数据可以通过
IncompleteReadError.partial
属性获取.
coroutine readuntil(separator=b'\n')
从数据流中读取数据直到遇到 separator.
如果执行成功, 读到的数据和分隔符将从内部缓冲区里移除. 返回的数据会在末尾包含分隔符.
如果读取数据的总量超过了配置的数据流缓冲区限制, 则引发 LimitOverrunError, 数据会被留在内部缓冲区中, 可以被再次读取.
如果在找到 separator 分隔符之前遇到 EOF, 则引发
IncompleteReadError
异常, 内部缓冲区会被重置.
IncompleteReadError.partial
属性会包含部分 separator.
python3.5.2 新增.
at_eof()
如果缓冲区为空, 且 feed_eof()被调用, 则返回 True.
- StreamWriter
- class asyncio.StreamWriter
定义一个写入器对象, 提供向 IO 数据流中写入数据的 API.
不建议直接实例化 StreamWriter 对象, 建议通过 open_connection 或 start_server 实例化对象.
can_writer_eof()
如果下层传输支持 write_eof 方法, 则返回 True, 否则返回 False.
write_eof()
在缓冲的写入数据被刷新后, 关闭数据流的写入端.
transport
返回下层的 asyncio 传输.
get_extra_info(name,default=None)
访问可选的传输信息.
write(data)
向数据流中写入数据.
此方法不受流量控制的影响. write()应同 drain()一同使用.
writelines()
向数据流中写入 bytes 列表(或任何的可迭代对象).
此方法不受流量控制的影响. 应与 drain()一同使用.
coroutine drain()
等待恢复数据写入的时机. 例如:
- writer.write(data)
- await writer.drain()
这是一个与底层 IO 输入缓冲区交互的流量控制方法. 当缓冲区达到上限时, drain()阻塞, 待到缓冲区回落到下限时, 写操作可以被恢复. 当不需要等待时, drain()会立即返回.
close()
关闭数据流.
is_closing()
如果数据流已经关闭或正在关闭, 则返回 True.
coroutine wait_closed()
保持等待, 直到数据流关闭.
保持等待, 直到底层连接被关闭, 应该在 close()后调用此方法.
Python3.7 新增.
示例
利用 Stream 实现 TCP 回显客户端
- import asyncio
- async def tcp_echo_client(message):
- reader, writer = await asyncio.open_connection(
- '127.0.0.1', 8888)
- print(f'Send: {message!r}')
- writer.write(message.encode())
- data = await reader.read(100)
- print(f'Received: {data.decode()!r}')
- print('Close the connection')
- writer.close()
- asyncio.run(tcp_echo_client('Hello World!'))
利用 Stream 实现 TCP 回显服务端
- import asyncio
- async def handle_echo(reader, writer):
- data = await reader.read(100)
- message = data.decode()
- addr = writer.get_extra_info('peername')
- print(f"Received {message!r} from {addr!r}")
- print(f"Send: {message!r}")
- writer.write(data)
- await writer.drain()
- print("Close the connection")
- writer.close()
- async def main():
- server = await asyncio.start_server(
- handle_echo, '127.0.0.1', 8888)
- addr = server.sockets[0].getsockname()
- print(f'Serving on {addr}')
- async with server:
- await server.serve_forever()
- asyncio.run(main())
获取 HTTP 头
- import asyncio
- import urllib.parse
- import sys
- async def print_http_headers(url):
- url = urllib.parse.urlsplit(url)
- if url.scheme == 'https':
- reader, writer = await asyncio.open_connection(
- url.hostname, 443, ssl=True)
- else:
- reader, writer = await asyncio.open_connection(
- url.hostname, 80)
- query = (
- f"HEAD {url.path or'/'} HTTP/1.0\r\n"
- f"Host: {url.hostname}\r\n"
- f"\r\n"
- )
- writer.write(query.encode('latin-1'))
- while True:
- line = await reader.readline()
- if not line:
- break
- line = line.decode('latin1').rstrip()
- if line:
- print(f'HTTP header> {line}')
- # Ignore the body, close the socket
- writer.close()
- url = sys.argv[1]
- asyncio.run(print_http_headers(url))
用法:
python example.py http://example.com/path/page.html
或:
python example.py https://example.com/path/page.html
利用 Stream 注册等待数据的开放 socket
- import asyncio
- import socket
- async def wait_for_data():
- # Get a reference to the current event loop because
- # we want to access low-level APIs.
- loop = asyncio.get_running_loop()
- # Create a pair of connected sockets.
- rsock, wsock = socket.socketpair()
- # Register the open socket to wait for data.
- reader, writer = await asyncio.open_connection(sock=rsock)
- # Simulate the reception of data from the network
- loop.call_soon(wsock.send, 'abc'.encode())
- # Wait for data
- data = await reader.read(100)
- # Got data, we are done: close the socket
- print("Received:", data.decode())
- writer.close()
- # Close the second socket
- wsock.close()
- asyncio.run(wait_for_data())
来源: https://www.cnblogs.com/mamingqian/p/10044730.html