缘起
在许多编程语言里, 我们都非常乐于去研究在这个语言中所使用的异步网络编程的框架, 比如说 Python 的 Gevent,asyncio,Nginx 和 OpenResty,Go 等, 今年年初我开始接触 Rust, 并被其无 GC, 内存安全, 极小的运行时等特性所吸引, 经过一段时间的学习, 开始寻找构建实际项目的解决方案, 很快 mio,tokio 等框架进入了我的视野, 于是开始从更加底层的 mio 出发实验.
https://github.com/Hevienz/mio_test/blob/master/src/main.rs
可以看到 mio 是一个非常底层的异步编程的框架, 这意味着如果我们要在实际的项目开发中使用它时, 就不得不从 event loop 开始编写我们的软件, 这并不是我们所期望的, 于是我们需要一个更高层次抽象的框架, 这便是本文要为大家讲述的 tokio.
tokio
tokio 是 Rust 中的异步编程框架, 它将复杂的异步编程抽象为 Futures,Tasks 和 Executor, 并提供了 Timers 等基础设施, 下文中我们将一一展开.
运行时模型
tokio 是一个基于轮训的模型. 比如我们要在 tokio 上调度我们的 task, 我们需要为其实现 Future trait. 比如下面的例子中, 我们想要得到一个 widget, 但它有可能还没有准备好, 这时候我们调用 poll 的结果就是
Ok(Async::NotReady)
,Executor 会负责重复的调用 poll, 直到 widget 准备好, 返回
- Ok(Async::Ready(()))
- .
- /// A task that polls a single widget and writes it to STDOUT.
- pub struct MyTask;
- impl Future for MyTask {
- type Item = ();
- type Error = ();
- fn poll(&mut self) -> Result<Async<()>, ()> {
- match poll_widget() {
- Async::Ready(widget) => {
- println!("widget={:?}", widget);
- Ok(Async::Ready(()))
- }
- Async::NotReady => {
- return Ok(Async::NotReady);
- }
- }
- }
- }
在最简单的情况下, Executor 可能会长这样.(注: 这不是真实的实现, 只是用来说明概念)
- pub struct SpinExecutor {
- tasks: VecDeque<Box<Future<Item = (), Error = ()>>>,
- }
- impl SpinExecutor {
- pub fn spawn<T>(&mut self, task: T)
- where T: Future<Item = (), Error = ()> + 'static
- {
- self.tasks.push_back(Box::new(task));
- }
- pub fn run(&mut self) {
- while let Some(mut task) = self.tasks.pop_front() {
- match task.poll().unwrap() {
- Async::Ready(_) => {}
- Async::NotReady => {
- self.tasks.push_back(task);
- }
- }
- }
- }
- }
Executor 频繁地轮询所有 task, 即使某些 task 仍然会以 NotReady 返回.
理想情况下, Executor 应该可以通过某种方式知道哪些 task 恰好转变为 "就绪" 状态. 这正是 futures 任务模型的核心.
Futures
future 是对一个未来事件的抽象. 比如你可以将各种事件抽象为 future:
在线程池中执行的数据库查询. 当数据库查询完成时, future 完成, 其值是查询的结果.
对服务器的 RPC 调用. 当服务器回复时, future 完成, 其值是服务器的响应.
超时事件. 当时间到了, future 就完成了, 它的值是 ().
在线程池上运行的长时间运行的 CPU 密集型任务. 任务完成后, future 完成, 其值为任务的返回值.
这里我们举一个例子:
- extern crate futures;
- extern crate tokio;
- extern crate tokio_core;
- use std::error::Error;
- use futures::Future;
- use futures::future::{ok, done};
- use tokio_core::reactor::Core;
- fn my_fn_squared(i: u32) -> Result<u32, Box<Error>> {
- Ok(i * i)
- }
- fn my_fut_squared(i: u32) -> impl Future<Item = u32, Error = Box<Error + 'static>> {
- ok(i * i)
- }
- fn my_fut() -> impl Future<Item = u32, Error = Box<Error + 'static>> {
- ok(10)
- }
- fn main() {
- let mut reactor = Core::new().unwrap();
- let chained_future = my_fut().and_then(|retval| {
- done(my_fn_squared(retval)).and_then(|retval2| my_fut_squared(retval2))
- });
- let retval3 = reactor.run(chained_future).unwrap();
- println!("{:?}", retval3);
- }
这里, 我们的 my_fut 的返回值实现了 Future, 我们知道它被 Executor 执行完成后, 会返回一个 u32 或者 一个
Box<Error + 'static>
, 而现在我们就可以通过 .and_then 来处理这个 u32 的值, 而最终我们将我们的 future 链接了起来, 交给 Executor 执行.
Tasks
Tasks 是应用程序的 "逻辑单元". 他们以 Future trait 来表示. 一旦 task 完成处理, task 的 future 实现将以值 () 返回.
Tasks 被传递给 Executor,Executor 处理 task 的调度. Executor 通常在一组或一组线程中调度许多 task.task 不得执行计算繁重的逻辑, 否则将阻止其他 task 执行.
Tasks 既可以通过实现 Future trait 来实现, 也可以通过使用 futures 和 tokio crates 中的各种组合器函数来构建 future 来实现.
I/O
tokio crate 也提供了 TCP,UDP 的支持, 不像 std 中的实现, tokio 的网络类型是基于 poll 模型的, 并且当他们的 "就绪" 状态改变时会通知 task executors. 在 tokio::net 模块中你将会找到像 TcpListener,TcpStream,UdpSocket 这些类型.
所有这些类型都提供了 future 的 API 以及 poll API.
Tokio 网络类型被一个基于 mio 的 reactor 所驱动, 默认情况下, 它在后台线程上启动.
使用 future API
一些帮助使用 future API 的函数包括:
incoming: 入站 TCP 连接的 Stream.
read_exact: 将 n 字节准确读入缓冲区.
read_to_end: 将所有字节读入缓冲区.
write_all: 写缓冲区的全部内容.
copy: 将字节从一个 I/O 句柄复制到另一个.
这些函数中的许多都是源于 AsyncRead 和 AsyncWrite trait 的. 这些 trait 类似于 std 中的 Read 和 Write, 但仅仅用于具有 future aware 的类型, 例如符合下面的特征:
调用 read 或 write 是非阻塞的, 他们从不阻塞调用线程.
如果一个调用会以其他方式阻塞, 那么会返回一个错误 WouldBlock. 如果发生这种情况, 则当前 future 的 task 将在 I/O 再次准备就绪时被调度.
注意 AsyncRead 和 AsyncWrite 类型的用户应该使用 poll_read 和 poll_write 代替直接调用 read 和 write.
例如, 以下是如何接受连接, 从中读取 5 个字节, 然后将 5 个字节写回 socket 的例子:
- let server = listener.incoming().for_each(|socket| {
- println!("accepted socket; addr={:?}", socket.peer_addr().unwrap());
- let buf = vec![0; 5];
- let connection = io::read_exact(socket, buf)
- .and_then(|(socket, buf)| {
- io::write_all(socket, buf)
- })
- .then(|_| Ok(())); // Just discard the socket and buffer
- // Spawn a new task that processes the socket:
- tokio::spawn(connection);
- Ok(())
- })
使用 Poll API
当手动实现 Future 时, 需要使用基于 Poll 的 API, 并且你需要返回 Async. 当您需要实现自己的处理自定义逻辑的组合器时, 这非常有用.
例如, 这就是如何为 TcpStream 实现 read_exact future 的例子.
- pub struct ReadExact {
- state: State,
- }
- enum State {
- Reading {
- stream: TcpStream,
- buf: Vec<u8>,
- pos: usize,
- },
- Empty,
- }
- impl Future for ReadExact {
- type Item = (TcpStream, Vec<u8>);
- type Error = io::Error;
- fn poll(&mut self) -> Result<Async<Self::Item>, io::Error> {
- match self.state {
- State::Reading {
- ref mut stream,
- ref mut buf,
- ref mut pos
- } => {
- while *pos <buf.len() {
- let n = try_ready!({
- stream.poll_read(&mut buf[*pos..])
- });
- *pos += n;
- if n == 0 {
- let err = io::Error::new(
- io::ErrorKind::UnexpectedEof,
- "early eof");
- return Err(err)
- }
- }
- }
- State::Empty => panic!("poll a ReadExact after it's done"),
- }
- match mem::replace(&mut self.state, State::Empty) {
- State::Reading { stream, buf, .. } => {
- Ok(Async::Ready((stream, buf)))
- }
- State::Empty => panic!(),
- }
- }
- }
数据报
UdpSocket 类型提供了许多方便的方法:
send_dgram 允许您将发送数据报作为 future, 如果无法立即发送整个数据报, 则返回错误.
recv_dgram 表示将数据报读入缓冲区.
示例
- #[macro_use]
- extern crate log;
- extern crate futures;
- extern crate pretty_env_logger;
- extern crate tokio;
- use futures::future::{done, ok};
- use futures::{Future, Stream};
- use tokio::io::{self as tio, AsyncRead};
- use tokio::net::{TcpListener, TcpStream};
- use std::error;
- use std::fmt;
- use std::io;
- fn client_fut(socket: TcpStream) -> impl Future<Item = (), Error = ()> + 'static + Send {
- futures::lazy(move || match socket.peer_addr() {
- Ok(peer) => {
- info!("Tcp connection [{:?}] connected to server", peer);
- Ok((socket, peer))
- }
- Err(err) => {
- error!("Fetch peer address failed: {:?}", err);
- Err(())
- }
- }).and_then(move |(socket, peer)| {
- let buf = vec![0; 5];
- let svc_fut = tio::read_exact(socket, buf)
- .and_then(|(socket, buf)| {
- tio::write_all(socket, buf)
- })
- .then(|_| Ok(()));
- tokio::spawn(svc_fut);
- ok(())
- })
- }
- fn server_fut(listener: TcpListener) -> impl Future<Item = (), Error = ()> + 'static + Send {
- listener
- .incoming()
- .for_each(|socket| {
- tokio::spawn(client_fut(socket));
- Ok(())
- })
- .map_err(|err| {
- error!("Accept connection failed: {:?}", err);
- })
- }
- fn run() -> Result<(), io::Error> {
- let addr = "127.0.0.1:1234".parse().unwrap();
- info!("Listening on {:?}", addr);
- let listener = TcpListener::bind(&addr)?;
- let server_fut = server_fut(listener);
- tokio::run(server_fut);
- Ok(())
- }
- fn print<T: fmt::Debug, E: error::Error>(result: Result<T, E>) {
- match result {
- Ok(any) => info!("Result: {:?}", any),
- Err(err) => error!("Error: {:?}", err),
- }
- }
- fn init() {
- pretty_env_logger::init();
- }
- fn main() {
- init();
- print(run());
- }
- Timers
在编写基于网络的应用程序时, 通常需要根据时间执行操作.
在一段时间后运行一些代码.
取消运行时间过长的运行操作.
以一定间隔重复执行操作.
这些用例通过使用 timer 模块中提供的各种计时器 API 来处理.
延迟运行代码
在这个例子中, 我们希望在一段时间后执行任务. 为此, 我们使用 Delay API. 我们要做的只是将 "Hello world!" 写到终端.
- use tokio::prelude::*;
- use tokio::timer::Delay;
- use std::time::{Duration, Instant};
- fn main() {
- let when = Instant::now() + Duration::from_millis(100);
- let task = Delay::new(when)
- .and_then(|_| {
- println!("Hello world!");
- Ok(())
- })
- .map_err(|e| panic!("delay errored; err={:?}", e));
- tokio::run(task);
- }
为长时间运行的操作设置 Timeout
在编写健壮的网络应用程序时, 确保在合理的时间内完成操作至关重要. 在等待来自外部的, 不受信任的来源的数据时尤其如此.
该 Deadline 类型确保操作在固定的时间内完成.
- use tokio::io;
- use tokio::net::TcpStream;
- use tokio::prelude::*;
- use std::time::{Duration, Instant};
- fn read_four_bytes(socket: TcpStream)
- -> Box<Future<Item = (TcpStream, Vec<u8>), Error = ()>>
- {
- // The instant at which the read will be aborted if
- // it has not yet completed.
- let when = Instant::now() + Duration::from_secs(5);
- let buf = vec![0; 4];
- let fut = io::read_exact(socket, buf)
- .deadline(when)
- .map_err(|_| println!("failed to read 4 bytes by deadline"));
- Box::new(fut)
- }
周期性运行代码
在一个时间间隔内重复运行代码对于在套接字上发送 PING 消息, 或经常检查配置文件等情况很有用.
Interval 类型实现了 Stream, 并以指定的速率挂起.
- use tokio::prelude::*;
- use tokio::timer::Interval;
- use std::time::{Duration, Instant};
- fn main() {
- let task = Interval::new(Instant::now(), Duration::from_millis(100))
- .take(10)
- .for_each(|instant| {
- println!("fire; instant={:?}", instant);
- Ok(())
- })
- .map_err(|e| panic!("interval errored; err={:?}", e));
- tokio::run(task);
- }
计时器的注意事项
Tokio 计时器的粒度为 1 毫秒. 任何更小的间隔都会向上舍入到最接近的毫秒. 定时器在用户域中实现 (即不使用操作系统定时器, 像 linux 上的 timerfd). 它使用分层散列计时器轮实现, 在创建, 取消和触发超时时提供有效的恒定时间复杂度.
Tokio 运行时包括每个工作线程一个计时器实例. 这意味着, 如果运行时启动 4 个工作线程, 则将有 4 个计时器实例. 这在大多数情况下避免了同步, 因为当使用计时器时, 任务将在位于当前线程上的状态下操作.
也就是说, 计时器实现是线程安全的, 并支持从任何线程使用.
基本组合器
下面是关于 Future 的图表, 来自于 Cheatsheet for Futures https://tokio.rs/img/diagrams/cheatsheet-for-futures.html .
- // Constructing leaf futures
- fn empty () -> Future<T, E>
- fn ok (T) -> Future<T, E>
- fn err (E) -> Future<T, E>
- fn result(Result<T, E>) -> Future<T, E>
- // General future constructor
- fn poll_fn(FnMut(thread_local!(Task)) -> Poll<T, E>) -> Future<T, E>
- // Mapping futures
- fn Future::map (Future<T, E>, FnOnce(T) -> U) -> Future<U, E>
- fn Future::map_err (Future<T, E>, FnOnce(E) -> F) -> Future<T, F>
- fn Future::from_err(Future<T, Into<E>>) -> Future<T, E>
- // Chaining (sequencing) futures
- fn Future::then (Future<T, E>, FnOnce(Result<T, E>) -> IntoFuture<U, F>) -> Future<U, F>
- fn Future::and_then(Future<T, E>, FnOnce(T) -> IntoFuture<U, E>) -> Future<U, E>
- fn Future::or_else (Future<T, E>, FnOnce(E) -> IntoFuture<T, F>) -> Future<T, F>
- fn Future::flatten (Future<Future<T, E>, Into<E>>) -> Future<T, E>
- // Joining (waiting) futures
- fn Future::join (Future<T, E>, IntoFuture<U, E>) -> Future<(T, U), E>
- fn Future::join3(Future<T, E>, IntoFuture<U, E>, IntoFuture<V, E>) -> Future<(T, U, V), E>
- fn Future::join4(Future<T, E>, IntoFuture<U, E>, IntoFuture<V, E>, IntoFuture<W, E>) -> Future<(T, U, V, W), E>
- fn Future::join5(Future<T, E>, IntoFuture<U, E>, IntoFuture<V, E>, IntoFuture<W, E>, IntoFuture<X, E>) -> Future<(T, U, V, W, X), E>
- fn join_all (IntoIterator<IntoFuture<T, E>>) -> Future<Vec<T>, E>
- // Selecting (racing) futures
- fn Future::select (Future<T, E>, IntoFuture<T, E>) -> Future<(T, Future<T, E>), (E, Future<T, E>)>
- fn Future::select2(Future<T, E>, IntoFuture<U, F>) -> Future<Either<(T, Future<U, F>), (U, Future<T, E>)>, Either<(E, Future<U, F>), (F, Future<T, E>)>>
- fn select_all (IntoIterator<IntoFuture<T, E>>) -> Future<(T, usize, Vec<Future<T, E>>), (E, usize, Vec<Future<T, E>>)>
- fn select_ok (IntoIterator<IntoFuture<T, E>>) -> Future<(T, Vec<Future<T, E>>), E>
- // Utility
- fn lazy (FnOnce() -> IntoFuture<T, E>) -> Future<T, E>
- fn loop_fn (S, FnMut(S) -> IntoFuture<Loop<T, S>, E>) -> Future<T, E>
- // Miscellaneous
- fn Future::into_stream (Future<T, E>) -> Stream<T, E>
- fn Future::flatten_stream(Future<Stream<T, E>, E>) -> Stream<T, E>
- fn Future::fuse (Future<T, E>) -> Future<T, E>
- fn Future::catch_unwind (Future<T, E>+UnwindSafe) -> Future<Result<T, E>, Any+Send>
- fn Future::shared (Future<T, E>) -> Future<SharedItem<T>, SharedError<E>>+Clone
- fn Future::wait (Future<T, E>) -> Result<T, E>
这部分的内容推荐参考这篇文章, https://www.jianshu.com/p/5059c403a335 .
本文不再赘述.
返回 futures
在使用 futures 时, 您可能需要做的第一件事就是返回一个 Future. 这有几种选择, 从最符合人体工程学到最不符合.
Trait 对象
impl Trait
Trait 对象
首先, 您始终可以选择返回一个 boxed trait 对象:
- fn foo() -> Box<Future<Item = u32, Error = io::Error>> {
- // ...
- }
这个策略的好处是它很容易写出来并且易于创建.
这种方法的缺点是, 在构建 future 时需要运行时分配, 在使用该 future 时需要动态分派. Box 需要在堆上分配而 future 会被置入其中.
通常可以通过仅在您想要返回的 future 链的最后来 Boxing 来减少分配.
impl Trait
- fn add_10<F>(f: F) -> impl Future<Item = i32, Error = F::Error>
- where F: Future<Item = i32>,
- {
- f.map(|i| i + 10)
- }
- pub struct LinesCodec {
- // Stored index of the next index to examine for a `\n` character.
- // This is used to optimize searching.
- // For example, if `decode` was called with `abc`, it would hold `3`,
- // because that is the next index to examine.
- // The next time `decode` is called with `abcde\n`, the method will
- // only look at `de\n` before returning.
- next_index: usize,
- }
- fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<String>, io::Error> {
- // Look for a byte with the value '\n' in buf. Start searching from the search start index.
- if let Some(newline_offset) = buf[self.next_index..].iter().position(|b| *b == b'\n')
- {
- // Found a '\n' in the string.
- // The index of the '\n' is at the sum of the start position + the offset found.
- let newline_index = newline_offset + self.next_index;
- // Split the buffer at the index of the '\n' + 1 to include the '\n'.
- // `split_to` returns a new buffer with the contents up to the index.
- // The buffer on which `split_to` is called will now start at this index.
- let line = buf.split_to(newline_index + 1);
- // Trim the `\n` from the buffer because it's part of the protocol,
- // not the data.
- let line = &line[..line.len() - 1];
- // Convert the bytes to a string and panic if the bytes are not valid utf-8.
- let line = str::from_utf8(&line).expect("invalid utf8 data");
- // Set the search start index back to 0.
- self.next_index = 0;
- // Return Ok(Some(...)) to signal that a full frame has been produced.
- Ok(Some(line.to_string()))
- } else {
- // '\n' not found in the string.
- // Tell the next call to start searching after the current length of the buffer
- // since all of it was scanned and no '\n' was found.
- self.next_index = buf.len();
- // Ok(None) signifies that more data is needed to produce a full frame.
- Ok(None)
- }
- }
- fn encode(&mut self, line: String, buf: &mut BytesMut) -> Result<(), io::Error> {
- // It's important to reserve the amount of space needed. The `bytes` API
- // does not grow the buffers implicitly.
- // Reserve the length of the string + 1 for the '\n'.
- buf.reserve(line.len() + 1);
- // String implements IntoBuf, a trait used by the `bytes` API to work with
- // types that can be expressed as a sequence of bytes.
- buf.put(line);
- // Put the '\n' in the buffer.
- buf.put_u8(b'\n');
- // Return ok to signal that no error occured.
- Ok(())
- }
- TcpStream::connect(&addr).and_then(|sock| {
- let framed_sock = sock.framed(LinesCodec::new());
- framed_sock.for_each(|line| {
- println!("Received line {}", line);
- Ok(())
- })
- });
来源: https://www.cnblogs.com/hymenz/p/9334297.html