9. Tokio 0.2

在速成课程的上一课中,我们介绍了稳定在 Rust 1.39 中的新 async/.await 语 法以及其下的 Future 特性。 该信息极大地取代了去年的现已废弃的第7课,该课程涵盖了旧的未来方法。

现在是时候更新第7课的下半部分了,并且教授最新的 Tokio 0.2版本。对于那些不熟悉它的人,让我引用这个项目的概述:

Tokio是一个事件驱动的非阻塞I / O平台,用于使用 Rust 编程语言编写异步应用程序。

如果您想在 Rust 中编写高效的并发网络服务,则需要使用 Tokio 之类的工具。 这并不是说这是 Tokio 的唯一用例; 您可以在网络服务之外使用事件驱动的调度程序来做很多很棒的事情。 也并不是说 Tokio 是唯一的解决方案; async-std 库提供类似的功能。

但是,对于非阻塞I / O系统,网络服务可能是最常见的领域。 并且Tokio是当今最流行和最成熟的系统。 因此,这个组合就是我们要开始的地方。

顺便说一句,如果你还有其他话题想让我讨论,请在 Twitter 上告诉我

注意:习题解答将包括在博客文章的最后。是的,我一直在改变规则,告我吧。

这篇文章是基于 FP 完成 Rust 教学系列的一部分。 如果你在博客之外阅读这篇文章,你可以在介绍文章的顶部找到这个系列中所有文章的链接。 也可订阅 RSS 频道。

Hello Tokio!

让我们开始吧,继续创建一个新的 Rust 实验项目:

$ cargo new --bin usetokio

如果要确保使用与我相同的编译器版本,请正确设置rust-toolchain:

$ echo 1.39.0 > rust-toolchain

然后将 Tokio 设置为一个依赖项。为了简单起见,我们将安装所有的附加功能:

[dependencies]
tokio = { version = "0.2", features = ["full"] }

你可以使用 cargo build 下载和构建的同时继续向下阅读。

现在我们要编写一个异步 hello world 应用程序:

use tokio::io;

#[tokio::main]
async fn main() -> Result<(), std::io::Error> {
    let mut stdout = io::stdout();
    let mut hello: &[u8] = b"Hello, world!\n";
    io::copy(&mut hello, &mut stdout).await?;
    Ok(())
}

注意:我专门说的是“键入此内容”,而不是“复制并粘贴”。 为了使您熟悉这些内容,建议您手动输入代码。

其中很多内容应该与我们上一节课的内容相似:

  • 因为我们将等待一些东西并生成一个 future ,所以我们的 main 函数是 async。

  • 由于main是异步的,所以我们需要使用一个 executor 来运行它。 这就是为什么我们使用#[tokio::main] 属性。

  • 由于执行I/O可能会失败,因此我们将返回 Result。

自上节课以来,第一个真正的新东西是这个小小的语法:

.await?

我上次提到它,但现在我们在实际使用中看到了它。 这只是我们两个现有技术的结合:.await 将 future 和 用于错误处理的 ? 链接在一起。 事实上,这些组合在一起的效果非常好。我可能会多次提到这一点,因为我非常喜欢它。

接下来要注意的是,我们使用 tokio::io::stdout() 来访问一些值,这些值允许我们与标准输出进行交互。如果您对它很熟悉,那么它看起来非常类似于 std::io::stdout()。这是经过设计的:tokio API 的很大一部分只是对 std 进行异步化。

最后,我们可以查看实际的 tokio::io::copy 调用。你可能已经猜到了,正如 API 文档中所说:

这是 std::io::copy 的异步版本。

然而,这并不是针对 Read 和 Write traits,而是针对它们的异步表亲: AsyncRead 和 AsyncWrite。字节片(& [ u8])是一个有效的 AsyncRead,因此我们可以将输入存储在那里。正如您可能已经猜到的,Stdout 是一个 AsyncWrite。字节片 (&[u8]) 是有效的 AsyncRead,因此我们可以将输入存储在那里。 您可能已经猜到,Stdout是一个AsyncWrite。

练习1:修改此应用程序,以便代替打印 “Hello,world!”,将标准输入的全部内容复制到标准输出。

注意:在使用 tokio::io::AsyncWriteExt 之后,您可以使用 stdout.write_all 简化此代码,但我们会坚持使用 tokio::io::copy,因为我们将在整个过程中使用它。 但是,如果您好奇:

use tokio::io::{self, AsyncWriteExt};

#[tokio::main]
async fn main() -> Result<(), std::io::Error> {
    let mut stdout = io::stdout();
    stdout.write_all(b"Hello, world!\n").await?;
    Ok(())
}

创建线程

Tokio 提供了一个类似于std::process 模块的 tokio::process模块。 我们可以使用它再次实现Hello World:

use tokio::process::Command;

#[tokio::main]
async fn main() -> Result<(), std::io::Error> {
    Command::new("echo").arg("Hello, world!").spawn()?.await?;
    Ok(())
}

? 和 .await位置可以按需要的任何顺序排列。 您可以将此行读为:

  • 创建一个新的 Command 来运行 echo

  • 给它一个参数 “Hello, world!”

  • spawn 可能会失败

  • 使用第一个? :如果失败,返回错误,否则返回一个 Future

  • 使用 .await: 等待直到 future 完成,并捕获其结果

  • 使用 second?:如果 Result 为 Err,则返回该错误

单行相当不错!

与以前使用回调进行异步的方式相比,async/.await的一大优点是它可以轻松地与循环一起工作。

练习2:扩展此示例,以便将 Hello,world! 打印10次。

中断

到目前为止,我们实际上只完成了单个 .await。 .await 多件事也很容易。 让我们使用 delay_for 暂停一下。

use tokio::time;
use tokio::process::Command;
use std::time::Duration;

#[tokio::main]
async fn main() -> Result<(), std::io::Error> {
    Command::new("date").spawn()?.await?;
    time::delay_for(Duration::from_secs(1)).await;
    Command::new("date").spawn()?.await?;
    time::delay_for(Duration::from_secs(1)).await;
    Command::new("date").spawn()?.await?;
    Ok(())
}

我们还可以使用 tokio::time::interval 函数为每次过去一定的时间创建一个 “ticks” 流。例如,这个程序会一直每秒调用 date 一次,直到它被杀死:

use tokio::time;
use tokio::process::Command;
use std::time::Duration;

#[tokio::main]
async fn main() -> Result<(), std::io::Error> {
    let mut interval = time::interval(Duration::from_secs(1));
    loop {
        interval.tick().await;
        Command::new("date").spawn()?.await?;
    }
}

练习3:为什么循环之后没有一个 Ok(())?

spawn 时间

这一切都很好,但是我们并没有真正利用异步编程。 解决这个问题! 我们看到了两个不同的有趣程序:

  1. 无限暂停1秒和调用日期

  2. 将所有输入从 stdin 复制到 stdou

现在是引入 spawn 的时候了,这样我们就可以把这两个程序合并成一个程序。首先,让我们演示一下 spawn 的简单用法:

use std::time::Duration;
use tokio::process::Command;
use tokio::task;
use tokio::time;

#[tokio::main]
async fn main() -> Result<(), std::io::Error> {
    task::spawn(dating()).await??;
    Ok(())
}

async fn dating() -> Result<(), std::io::Error> {
    let mut interval = time::interval(Duration::from_secs(1));
    loop {
        interval.tick().await;
        Command::new("date").spawn()?.await?;
    }
}

您可能想知道:?? 操作符发生了什么 ? 那是一些特殊的超级错误处理程序吗? 不,这只是正常的错误处理? 申请两次。 让我们来看一些类型签名,以帮助我们:

pub fn spawn<T>(task: T) -> JoinHandle<T::Output>;

impl<T> Future for JoinHandle<T> {
    type Output = Result<T, JoinError>;
}

调用 spawn 返回一个 JoinHandle<T::Output > 。在我们的示例中,我们提供的输入是 dating() ,其输出类型为 Result<() ,std::io::Error > 。这意味着 task::spawn(dating()) 为 JoinHandle<Result<(), std::io::Error>>。

我们还看到 JoinHandle 实现了 Future。 因此,当我们将 .await 应用于此值时,最终得到的类型是 Output = Result 。 因为我们知道 T 是Result <(),std::io::Error>,所以这意味着我们最终得到 Result<Result<(), std::io::Error>, JoinError>。

第一个 ? 处理外部Result,在 Err 上退出 JoinError 并在 Ok 上给我们Result <(),std::io::Error> 值。第二 ? 处理std::io::Error,在 OK 上给我们一个 ()。

练习4:现在我们已经看到了 spawn,您应该修改程序,以便它在循环中调用 date,并将 stdin 复制到 stdout。

同步代码

您可能无法独占地使用异步友好代码进行交互。也许您有一些非常好的库需要利用,但是它在内部执行阻塞调用。幸运的是,Tokio 为你提供了 spawn 阻塞功能。既然这些文档是如此完美,让我引用一下:

task::spawn_blocking 函数类似于前一节中讨论的 task::spawn 函数,但是它并没有在 Tokio 运行时中生成一个非阻塞的 future 函数,而是在一个专用线程池中生成一个阻塞函数,用于阻塞任务。

练习5:重写 dating() 函数,使用 spawn_blocking 和 std::thread::sleep,这样它大约每秒调用 date 一次。

网络连接

我可以继续逐步浏览Tokio库中的其他炫酷功能。 我鼓励您自己戳他们。 但是我承诺过要建立一些网络知识,天哪,我会兑现的!

我将对 TcpListener 文档中的示例进行一些扩展,使其(1)可以编译,(2)实现一个 echo 服务器。尽管这个程序有一个很大的缺陷,我还是建议你试着找出来。

use tokio::io;
use tokio::net::{TcpListener, TcpStream};

#[tokio::main]
async fn main() -> io::Result<()> {
    let mut listener = TcpListener::bind("127.0.0.1:8080").await?;

    loop {
        let (socket, _) = listener.accept().await?;
        echo(socket).await?;
    }
}

async fn echo(socket: TcpStream) -> io::Result<()> {
    let (mut recv, mut send) = io::split(socket);
    io::copy(&mut recv, &mut send).await?;
    Ok(())
}

我们使用 TcpListener 绑定套接字。 绑定本身是异步的,因此我们使用 .await 等待侦听可用套接字。 我们使用 ? 绑定侦听套接字时错误的处理。

接下来,我们永远循环。 在循环内部,我们使用 .await 像以前一样接受新的连接。 我们捕获套接字(忽略地址作为元组的第二部分)。 然后,我们调用 echo 函数并等待。

在 echo 中,我们使用 tokio::io::split 将 TcpStream 拆分成其组成的read 和 write 部分,然后像以前一样将它们传递到 tokio::io::copy 中。

太棒了! 错误在哪里? 让我问一个问题:当第一个连接仍然处于活动状态时,第二个连接进入,该行为是什么?理想情况下,这个问题会得到解决。 但是,我们的程序只有一项任务。 然后,该任务将在每次调用回显时唤醒。 因此,直到第一个连接关闭,我们的第二个连接才会得到服务。

练习6:修改上面的程序,以便正确处理并发连接。

Tcp 客户端和所有权

让我们编写一个穷人的 HTTP 客户机。它将硬编码建立到服务器的连接,将所有 stdin 复制到服务器,然后将所有数据从服务器复制到 stdout。要使用这个命令,您需要手动输入 HTTP 请求,然后按 Ctrl-D 组合键进行文件结束。

use tokio::io;
use tokio::net::TcpStream;

#[tokio::main]
async fn main() -> io::Result<()> {
    let stream = TcpStream::connect("127.0.0.1:8080").await?;
    let (mut recv, mut send) = io::split(stream);
    let mut stdin = io::stdin();
    let mut stdout = io::stdout();

    io::copy(&mut stdin, &mut send).await?;
    io::copy(&mut recv, &mut stdout).await?;

    Ok(())
}

很好,但这是有限的。 它仅处理HTTP等半双工协议,并且实际上不以任何方式支持 keep-alive。 我们希望使用 spawn 在不同的任务中运行两个副本。 似乎很简单:

let send = spawn(io::copy(&mut stdin, &mut send));
let recv = spawn(io::copy(&mut recv, &mut stdout));

send.await??;
recv.await??;

不幸的是,这不能编译。我们得到了四个几乎相同的错误消息。让我们看看第一个:

error[E0597]: `stdin` does not live long enough
  --> src/main.rs:12:31
   |
12 |     let send = spawn(io::copy(&mut stdin, &mut send));
   |                      ---------^^^^^^^^^^------------
   |                      |        |
   |                      |        borrowed value does not live long enough
   |                      argument requires that `stdin` is borrowed for `'static`
...
19 | }
   | - `stdin` dropped here while still borrowed

问题出在这里:我们的副本 Future不拥有stdin值(或发送值)。 相反,它具有一个(可变的)引用。 该值保留在 main函数的 Future 中。 忽略错误情况,我们知道 main 函数将等待send完成(感谢send.await),因此生存期似乎是正确的。 但是,Rust无法识别此生命周期信息。 (此外,我还没有完全考虑到这一点,我很确定在发生 panic 时,可能会比使用Future时更早删除send)。

为了解决这个问题,我们需要说服编译器创建一个拥有 stdin 的 Future。最简单的方法就是使用 async move 块。

练习7:使用两个 async move 块编译上面的代码。

lines 使用

本节将对程序进行一系列修改。我建议你在考虑解决方案之前先解决每个挑战。但是,与其他练习不同的是,我将以内部方式显示解决方案,因为它们是相互关联的。

让我们构建一个异步程序,该程序计算标准输入的行数。 您将需要使用lines方法。 阅读文档,尝试找出使类型所需的用途和包裹器。

use tokio::prelude::*;
use tokio::io::AsyncBufReadExt;

#[tokio::main]
async fn main() -> Result<(), std::io::Error> {
    let stdin = io::stdin();
    let stdin = io::BufReader::new(stdin);
    let mut count = 0u32;
    let mut lines = stdin.lines();
    while let Some(_) = lines.next_line().await? {
        count += 1;
    }
    println!("Lines on stdin: {}", count);
    Ok(())
}

好吧,再提高一个层次。我们不使用标准输入,而是使用一个文件名列表作为命令行参数,并计算所有文件中的总行数。最初,一次只读一个文件是可以的。换句话说: 不用麻烦调用 spawn。试一试,然后回到这里:

use tokio::prelude::*;
use tokio::io::AsyncBufReadExt;

#[tokio::main]
async fn main() -> Result<(), std::io::Error> {
    let mut args = std::env::args();
    let _me = args.next(); // ignore command name
    let mut count = 0u32;

    for filename in args {
        let file = tokio::fs::File::open(filename).await?;
        let file = io::BufReader::new(file);
        let mut lines = file.lines();
        while let Some(_) = lines.next_line().await? {
            count += 1;
        }
    }

    println!("Total lines: {}", count);
    Ok(())
}

但是现在是时候使它适当地异步了,并在单独的任务中处理文件。 为了完成这项工作,我们需要创建所有任务,然后.await每个任务。 对于这个,我使用了 Future<Output = Result<u32,std::io::Error> 的 Vec。 试一试!

use tokio::prelude::*;
use tokio::io::AsyncBufReadExt;

#[tokio::main]
async fn main() -> Result<(), std::io::Error> {
    let mut args = std::env::args();
    let _me = args.next(); // ignore command name
    let mut tasks = vec![];

    for filename in args {
        tasks.push(tokio::spawn(async {
            let file = tokio::fs::File::open(filename).await?;
            let file = io::BufReader::new(file);
            let mut lines = file.lines();
            let mut count = 0u32;
            while let Some(_) = lines.next_line().await? {
                count += 1;
            }
            Ok(count) as Result<u32, std::io::Error>
        }));
    }

    let mut count = 0;
    for task in tasks {
        count += task.await??;
    }

    println!("Total lines: {}", count);
    Ok(())
}

最后,在这一过程中:让我们改变处理计数的方式。 让我们让每个单独的任务更新一个共享的可变变量,而不是等待第二个for循环中的计数。 为此,应使用 Arc<Mutex<u32>> 。 不过,您仍然需要保留任务的Vec,以确保等待所有文件被读取。

use tokio::prelude::*;
use tokio::io::AsyncBufReadExt;
use std::sync::Arc;

// avoid thread blocking by using Tokio's mutex
use tokio::sync::Mutex;

#[tokio::main]
async fn main() -> Result<(), std::io::Error> {
    let mut args = std::env::args();
    let _me = args.next(); // ignore command name
    let mut tasks = vec![];
    let count = Arc::new(Mutex::new(0u32));

    for filename in args {
        let count = count.clone();
        tasks.push(tokio::spawn(async move {
            let file = tokio::fs::File::open(filename).await?;
            let file = io::BufReader::new(file);
            let mut lines = file.lines();
            let mut local_count = 0u32;
            while let Some(_) = lines.next_line().await? {
                local_count += 1;
            }

            let mut count = count.lock().await;
            *count += local_count;
            Ok(()) as Result<(), std::io::Error>
        }));
    }

    for task in tasks {
        task.await??;
    }

    let count = count.lock().await;
    println!("Total lines: {}", *count);
    Ok(())
}

LocalSet 和 !Send

感谢@xudehseng对本节的启发。

好吧,最后那个练习看起来是不是有点做作?是的!在我看来,以前的等待计数和汇总的主要功能本身是优越的。但是,我想教你一些别的东西。

如果将 Arc<Mutex<u32> 替换为 Rc<RefCell<u32> 会发生什么:

use tokio::prelude::*;
use tokio::io::AsyncBufReadExt;
use std::rc::Rc;
use std::cell::RefCell;

#[tokio::main]
async fn main() -> Result<(), std::io::Error> {
    let mut args = std::env::args();
    let _me = args.next(); // ignore command name
    let mut tasks = vec![];
    let count = Rc::new(RefCell::new(0u32));

    for filename in args {
        let count = count.clone();
        tasks.push(tokio::spawn(async {
            let file = tokio::fs::File::open(filename).await?;
            let file = io::BufReader::new(file);
            let mut lines = file.lines();
            let mut local_count = 0u32;
            while let Some(_) = lines.next_line().await? {
                local_count += 1;
            }

            *count.borrow_mut() += local_count;
            Ok(()) as Result<(), std::io::Error>
        }));
    }

    for task in tasks {
        task.await??;
    }

    println!("Total lines: {}", count.borrow());
    Ok(())
}

你会得到一个错误:

error[E0277]: `std::rc::Rc<std::cell::RefCell<u32>>` cannot be shared between threads safely
  --> src/main.rs:15:20
   |
15 |         tasks.push(tokio::spawn(async {
   |                    ^^^^^^^^^^^^ `std::rc::Rc<std::cell::RefCell<u32>>` cannot be shared between threads safely
   |
  ::: /Users/michael/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.2/src/task/spawn.rs:49:17
   |
49 |     T: Future + Send + 'static,
   |                 ---- required by this bound in `tokio::task::spawn::spawn`

任务可以调度到多个不同的线程。因此,你的 future 一定是 Send 的。而 Rc<RefCell<u32> 绝对是 !Send 。但是,在我们的用例中,使用多个 OS 线程不太可能加速我们的程序,将要做大量的阻塞 I/O。如果我们可以坚持在同一个操作系统线程上创建所有任务,并且避免使用 Send,那就更好了。。当然,Tokio 提供了这样一个函数: Tokio::task::spawn_local。使用它(以及用异步移动代替异步) ,我们的程序编译,但在运行时中断:

thread 'main' panicked at '`spawn_local` called from outside of a local::LocalSet!', src/libcore/option.rs:1190:5

嗷,我个人不太喜欢这种运行时检测的东西。但是这个概念非常简单: 如果你想在当前线程上产生代码,你需要设置你的运行时来支持它。我们的方法是使用 LocalSet。为了使用这个属性,您需要丢弃 #[tokio::main ]属性。

练习8:按照 LocalSet 的文档使上面的程序与 Rc<RefCell<u3 > 一起工作。

总结

与上一堂 Tokio 课程相比,这一节很短,该课程似乎永远持续下去。这证明了使用新的 async/.await 语法有多么容易。

异步编程显然可以涵盖更多内容,但是希望这可以为使用 async/.await语法和Tokio库本身奠定基础。这是您需要了解的最大基础。

如果我们有 future 的课程,我相信它们将涵盖其他库,例如Hyper,它们将转移到 Tokio 0.2,以及人们提出的特定用例。 如果您希望涵盖某些内容,请在Twitter或以下评论中将其提及。

练习

练习1

use tokio::io;

#[tokio::main]
async fn main() -> Result<(), std::io::Error> {
    let mut stdin = io::stdin();
    let mut stdout = io::stdout();
    io::copy(&mut stdin, &mut stdout).await?;
    Ok(())
}

练习2

use tokio::process::Command;

#[tokio::main]
async fn main() -> Result<(), std::io::Error> {
    for _ in 1..=10 {
        Command::new("echo").arg("Hello, world!").spawn()?.await?;
    }
    Ok(())
}

练习3

由于循环将永远运行或因错误而退出,因此循环后的任何代码都将永远不会被调用。 因此,放置在此处的代码将生成警告。

练习4

use std::time::Duration;
use tokio::process::Command;
use tokio::{io, task, time};

#[tokio::main]
async fn main() -> Result<(), std::io::Error> {
    let dating = task::spawn(dating());
    let copying = task::spawn(copying());

    dating.await??;
    copying.await??;

    Ok(())
}

async fn dating() -> Result<(), std::io::Error> {
    let mut interval = time::interval(Duration::from_secs(1));
    loop {
        interval.tick().await;
        Command::new("date").spawn()?.await?;
    }
}

async fn copying() -> Result<(), std::io::Error> {
    let mut stdin = io::stdin();
    let mut stdout = io::stdout();
    io::copy(&mut stdin, &mut stdout).await?;
    Ok(())
}

练习5

async fn dating() -> Result<(), std::io::Error> {
    loop {
        task::spawn_blocking(|| { std::thread::sleep(Duration::from_secs(1)) }).await?;
        Command::new("date").spawn()?.await?;
    }
}

练习6

最简单的调整是用 tokio::spawn 包装 echo 调用:

loop {
    let (socket, _) = listener.accept().await?;
    tokio::spawn(echo(socket));
}

但是,这样做有一个不足之处: 我们忽略了由创建任务所产生的错误。在这种情况下,最好的行为可能是处理衍生任务中的错误:

#[tokio::main]
async fn main() -> io::Result<()> {
    let mut listener = TcpListener::bind("127.0.0.1:8080").await?;

    let mut counter = 1u32;
    loop {
        let (socket, _) = listener.accept().await?;
        println!("Accepted connection #{}", counter);
        tokio::spawn(async move {
            match echo(socket).await {
                Ok(()) => println!("Connection #{} completed successfully", counter),
                Err(e) => println!("Connection #{} errored: {:?}", counter, e),
            }
        });
        counter += 1;
    }
}

练习7

use tokio::io;
use tokio::spawn;
use tokio::net::TcpStream;

#[tokio::main]
async fn main() -> io::Result<()> {
    let stream = TcpStream::connect("127.0.0.1:8080").await?;
    let (mut recv, mut send) = io::split(stream);
    let mut stdin = io::stdin();
    let mut stdout = io::stdout();

    let send = spawn(async move {
        io::copy(&mut stdin, &mut send).await
    });
    let recv = spawn(async move {
        io::copy(&mut recv, &mut stdout).await
    });

    send.await??;
    recv.await??;

    Ok(())
}

练习8

use tokio::prelude::*;
use tokio::io::AsyncBufReadExt;
use std::rc::Rc;
use std::cell::RefCell;

fn main() -> Result<(), std::io::Error> {
    let mut rt = tokio::runtime::Runtime::new()?;
    let local = tokio::task::LocalSet::new();
    local.block_on(&mut rt, main_inner())
}

async fn main_inner() -> Result<(), std::io::Error> {
    let mut args = std::env::args();
    let _me = args.next(); // ignore command name
    let mut tasks = vec![];
    let count = Rc::new(RefCell::new(0u32));

    for filename in args {
        let count = count.clone();
        tasks.push(tokio::task::spawn_local(async move {
            let file = tokio::fs::File::open(filename).await?;
            let file = io::BufReader::new(file);
            let mut lines = file.lines();
            let mut local_count = 0u32;
            while let Some(_) = lines.next_line().await? {
                local_count += 1;
            }

            *count.borrow_mut() += local_count;
            Ok(()) as Result<(), std::io::Error>
        }));
    }

    for task in tasks {
        task.await??;
    }

    println!("Total lines: {}", count.borrow());
    Ok(())

最后更新于