7. Async,Futures和Tokio

注意: 随着 Rust 1.39(2019年11月) 中增加了 async/await 语法,Rust 中与异步代码相关的所有内容都得到了彻底的改进。 因此,这个教程现在已经过时了。 它对于理解更深层次的原则仍然很有用,但是我希望在将来写一个更新的教程来介绍这种新的方法。

与 Haskell,Erlang 和 Go 等语言不同,Rust 没有提供绿色线程和异步 I/O 的运行时系统。 但对于许多实际使用案例,即使不是硬性要求,也强烈希望使用异步 I/O。 在 Rust 中,处理此问题的行业标准库便是 tokio。

这篇文章是基于 FP 完成 Rust 教学系列的一部分。 如果你在博客之外阅读这篇文章,你可以在介绍文章的顶部找到这个系列中所有文章的链接。 也可订阅 RSS 频道。

这个速成课程的教训与其他课程有所不同,因为:

  1. 有很多相互关联的资料覆盖,不容易分离出来

  2. 与其他许多情况相比,了解这些库设计背后的动机更为重要

  3. 我相信这些资料可能对那些没有遵循其余速成课程的人有用

因此,将适用一些不同的规则:

  • 首先要定义 Rust 所需的知识以理解本课程,即:

    • 所有语法的基础知识

    • Traits和相关类型

    • 迭代器

    • b闭包

  • 我不会在以后的文章中提供练习解决方案,由于资料是累积性的,因此我会立即提供它们。我仍然强烈建议花费大量的时间和精力来尝试自己解决这些问题,然后再去寻找解决方案。 这很难,也很耗时,但是最终还是值得的。

此外,这个课程比以前的课程更长,更复杂。 你应该比其他课程花更多的时间来计划它。 我考虑将其分为多个课程,但最终决定将所有内容放在一起。 相反,我将会在此课程中休息一会儿。

现在真正的介绍!

为什么是 async?

假设读者已经熟悉异步 I/O 及其总体动机。 如果您不是,那么值得阅读一下 C10K 问题,我们许多人开始认真思考异步I/O。 您可能也有兴趣阅读我写的有关绿色线程的文章,这是一个基于语言运行时的解决方案,可以解决同样的问题。

最后,Rust 异步 I/O 方法的目标是:

  • 最小化系统资源处理大量并发 I/O 任务

  • 在操作系统提供的异步 I/O 机制之上提供零成本的抽象

  • 在库级别执行此操作,而不是在Rust中引入运行时

一个简单的问题

很难一头扎进 tokio 的奇妙世界。 您需要了解 futures 和 streams、tasks 和 executors、异步 I/O系统调用和 Async 类型等。 为了尝试与这种学习体验脱钩,我们将从一个简单的问题开始。 这个问题主要是对现实世界中的异步 I/O 建模,但并不完美,并将演示许多设计问题。 它还允许我们在深入研究 futures 和 tokio 之前,稍微多一点并发编程。

我们将运行一个单独的线程,这个线程将访问两个原子值:

  • 一个 AtomicBool 来告诉我们是否希望这个线程继续运行

  • 一个 Atomicusize 来计数

我们还没有介绍原子类型,但是它们听起来就是这样: 可以从多个线程安全地访问的变量。 由于了解它们并不是本课的重点,我将把关于这些类型更多的使用问题留给它们的 API 文档

我们单独的线程将循环运行。 只要 AtomicBool 为 true,它将:

  • 给定毫秒数的睡眠

  • 向控制台打印一条消息

  • 增加 AtomicUsize 计数

我们将通过不同的方法来观察计数器的变化来看一系列示例。

Interval

我们将这个东西称为 Interval(定时器),因为它在一定程度上代表了 Javascript 中的 setInterval 调用。 创建一个新项目:

$ cargo new interval --bin
$ cd interval

我们将把 Interval 的代码放到一个单独的模块中。首先,将以下代码放入src/interval.rs 中:

use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
use std::thread::{sleep, spawn};
use std::time::Duration;

#[derive(Clone)]
pub struct Interval {
    counter: Arc<AtomicUsize>,
    still_running: Arc<AtomicBool>,
}

impl Drop for Interval {
    fn drop(&mut self) {
        println!("Interval thread shutting down");
        self.still_running.store(false, Ordering::SeqCst);
    }
}

impl Interval {
    pub fn from_millis(millis: u64) -> Interval {
        let duration = Duration::from_millis(millis);

        let counter = Arc::new(AtomicUsize::new(0));
        let counter_clone = counter.clone();

        let still_running = Arc::new(AtomicBool::new(true));
        let still_running_clone = still_running.clone();

        spawn(move || {
            println!("Interval thread launched");
            while still_running_clone.load(Ordering::SeqCst) {
                sleep(duration);
                let old = counter_clone.fetch_add(1, Ordering::SeqCst);
                println!("Interval thread still alive, value was: {}", old);
            }
        });

        Interval {
            counter,
            still_running,
        }
    }

    pub fn get_counter(&self) -> usize {
        self.counter.load(Ordering::SeqCst)
    }
}

接下来,让我们提供一个最小的 src/main.rs,以此使用 Interval 数据类型:

mod interval;

use self::interval::Interval;

fn main() {
    let interval = Interval::from_millis(500); // half a second
    let duration = std::time::Duration::from_millis(2000); // 2 seconds
    for i in 1..11 {
        println!("Iteration number {}, counter is {}", i, interval.get_counter());
        std::thread::sleep(duration);
    }
}

你应该看到这样的东西:

Iteration number 1, counter is 0
Interval thread launched
Interval thread still alive, value was: 0
Interval thread still alive, value was: 1
Interval thread still alive, value was: 2
Iteration number 2, counter is 3
Interval thread still alive, value was: 3
Interval thread still alive, value was: 4
...
Interval thread still alive, value was: 33
Interval thread still alive, value was: 34
Iteration number 10, counter is 35
Interval thread still alive, value was: 35
Interval thread still alive, value was: 36
Interval thread still alive, value was: 37
Interval thread still alive, value was: 38
Interval thread shutting down

万岁,我们有一些并发的交流。

这种方法存在的问题

首先突出的问题是,我们在主线程中遗漏了一些更新。 注意计数器是如何从0跳转到3的。 这显然是主线程中间隔设置的问题:我们将延迟2秒而不是半秒。 让我们改为在主线程中延迟十分之一秒(100毫秒) ,并检查自上次以来值是否发生了更改。

注意: 这种方式仍然可能会错过一些更新,因为 sleep 保证线程至少在一定时间内处于睡眠状态,但睡眠时间可能会更长。 然而,由于有这么大的差异,我们相当肯定我们会捕捉到所有的更新。

fn main() {
    let interval = Interval::from_millis(500); // half a second
    let duration = std::time::Duration::from_millis(100); // 0.1 seconds
    let mut last = interval.get_counter();
    for i in 1..51 {
        let curr = interval.get_counter();

        if curr != last {
            last = curr;
            println!("Iteration number {}, counter is {}", i, curr);
        }

        std::thread::sleep(duration);
    }
}

我不得不将迭代次数增加到50,因为我们的许多主线程迭代最终在计数器中没有显示任何变化。 下面是一个在我的机器上运行的例子:

Interval thread launched
Interval thread still alive, value was: 0
Iteration number 6, counter is 1
Interval thread still alive, value was: 1
Iteration number 11, counter is 2
Interval thread still alive, value was: 2
Iteration number 16, counter is 3
Interval thread still alive, value was: 3
Iteration number 21, counter is 4
Interval thread still alive, value was: 4
Iteration number 26, counter is 5
Interval thread still alive, value was: 5
Iteration number 31, counter is 6
Interval thread still alive, value was: 6
Iteration number 36, counter is 7
Interval thread still alive, value was: 7
Iteration number 41, counter is 8
Interval thread still alive, value was: 8
Iteration number 46, counter is 9
Interval thread still alive, value was: 9
Interval thread shutting down

这里没有丢失任何计数器更新,但是从间隔线程编号的颠簸中,我们可以看到在主线程检查编号没有变化的情况下,我们浪费了很多时间。

另一个不那么明显的问题是,我们将整个操作系统线程用于这个睡眠迭代检查。 在我们简单的程序中,这没什么大不了的。 但是想象一下,我们决定让50个不同的类似任务进行下去。 这需要49条额外的线程,其中大多数线程大部分时间都是睡眠的。 这非常浪费。 我们应该能做得更好。

最后,也许目前最不重要的是,这一切都是临时的。这个看起来就是一个能够被完成并且在将来能够产生价值的普通需求。 即使这看起来是最不重要的问题,我们也会从解决它开始。

Future trait

谁曾想到我们的曲折会导致本课标题中提到的主题之一! 您可能已经注意到在主线程循环中形成的一种模式:

  • 检查是否有新值可用

  • 如果可以的话就使用它

  • 如果不可用,请跳过

这正是 Future trait 允许我们做的事情,除此之外: 它还允许错误处理。 我们暂时不会担心这个问题,因为我的代码没有任何错误:)。

首先,将 futures crates 添加为依赖项。 在Cargo.toml中:

[dependencies]
futures = "0.1"

接下来,让我们添加一个新模块,以提供 Future 的结构实现。瞧,src/future.rs:

extern crate futures;

use super::interval::Interval;
use futures::prelude::*;

pub struct IntervalFuture {
    interval: Interval,
    last: usize,
}

impl IntervalFuture {
    pub fn new(interval: Interval) -> IntervalFuture {
        let last = interval.get_counter();
        IntervalFuture { interval, last }
    }
}

impl Future for IntervalFuture {
    type Item = usize;
    type Error = ();

    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
        let curr = self.interval.get_counter();
        if curr == self.last {
            Ok(Async::NotReady)
        } else {
            self.last = curr;
            Ok(Async::Ready(curr))
        }
    }
}

就像我们的主循环一样,我们将拥有一个Interval和提供的最后一个值。 新方法相当简单。 对于 impl Future,我们需要定义三件事:

  • 这个类型准备好后返回的东西。 在我们的示例中,它是计数器值,即 usize。

  • 可能发生的错误类型。 我们没有任何错误,所以我们使用 ()。 (Haskller 在我尖叫,我们应该使用 Void。 很快,我们就可以使用 never 了)

  • poll 函数会返回结果。 在错误情况下,返回 Self::Error; 在成功的情况下,返回一个 Async 枚举类型。 正如我们在方法中看到的那样,它要么是带有值的 Ready 变量,要么是 NotReady。

我们函数的逻辑与以前相同,因此我不会对实现进行评论。 回到我们的 src/main.rs 中,不使用 curr/last逻辑,我们可以对poll() 的结果进行模式匹配:

extern crate futures;

mod future;
mod interval;

use self::interval::Interval;
use self::future::IntervalFuture;
use futures::prelude::*;

fn main() {
    let interval = Interval::from_millis(500); // half a second
    let mut interval_future = IntervalFuture::new(interval);
    let duration = std::time::Duration::from_millis(100); // 0.1 seconds

    for i in 1..51 {
        match interval_future.poll() {
            Ok(Async::Ready(curr)) => {
                println!("Iteration number {}, counter is {}", i, curr);
            }
            Ok(Async::NotReady) => (),
            Err(()) => unreachable!(),
        }

        std::thread::sleep(duration);
    }
}

可以说,这是对前一代码的一个小小的改进,虽然没有什么大的改进。但是恭喜您,您现在正式使用 futures crate 了!

Poll 类型定义

只是一个小助手。 结果 Result<Async<Self::Item>, Self::Error> 对您来说似乎有点笨拙。如果是这样的话,您将很高兴了解 Poll 类型定义,它允许您用 Poll<Self::Item, Self::Error> 替换上面的定义。 这没什么大不了的,但是当你阅读其他代码的时候,认识到这一点很重要。

tokio 执行器

现在,我们在 main 函数中运行自己的执行器:手动执行循环,延迟等操作。除了乏味,我们上面已经提到了一些缺点:

  • 我们希望执行的每个任务都需要一个线程

  • 我们需要实现某种类型的猜测和检查线程睡眠

是时候拿出真本事,把这事搞定了。 我们首先会失去一些功能,然后再重新构建它。

首先,将 tokio = “0.1“ 添加到 Cargo.toml 中。 现在,让我们在 Future 上通过调用 tokio::run 尝试使用 tokio executor:

extern crate futures;
extern crate tokio;

mod future;
mod interval;

use self::interval::Interval;
use self::future::IntervalFuture;
use tokio::prelude::*;

fn main() {
    let interval = Interval::from_millis(500); // half a second
    let mut interval_future = IntervalFuture::new(interval);

    tokio::run(interval_future)
}

失败并出现编译错误:

error[E0271]: type mismatch resolving `<future::IntervalFuture as futures::Future>::Item == ()`
  --> src/main.rs:15:5
   |
15 |     tokio::run(interval_future)
   |     ^^^^^^^^^^ expected usize, found ()
   |
   = note: expected type `usize`
              found type `()`
   = note: required by `tokio::run`

tokio::run 函数期望 Item 为 () 的Future,但是我们使用了 usize。 无论如何,这都是有道理的:当我们得到一个值时,难道不想编写一些代码来实际做一些事情吗?

我们要解决这个问题,首先是过度痛苦的方式,然后是愉快的方式。 这也会帮助你理解为什么第五课花了那么多时间在闭包上。

定义 Future 适配器

还记得如何使用其他迭代器并组成更强大的迭代器吗? 好吧,您可以使用 Future 做同样的事情。 让我们定义一个新类型,它将:

  • 包裹一个IntervalFuture

  • 打印就绪新值

现在我们将把它放在 src/main.rs 中,它不会持续很久。

extern crate futures;
extern crate tokio;

mod future;
mod interval;

use self::interval::Interval;
use self::future::IntervalFuture;
use tokio::prelude::*;

struct IntervalPrinter(IntervalFuture);

impl Future for IntervalPrinter {
    type Item = ();
    type Error = ();
    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
        match self.0.poll() {
            Ok(Async::Ready(curr)) => {
                println!("Counter is: {}", curr);
                Ok(Async::Ready(()))
            }
            Ok(Async::NotReady) => Ok(Async::NotReady),
            Err(e) => Err(e),
        }
    }
}

fn main() {
    let interval = Interval::from_millis(500); // half a second
    let interval_future = IntervalFuture::new(interval);
    let interval_printer = IntervalPrinter(interval_future);

    tokio::run(interval_printer)
}

编译代码,但还不要运行它。 考虑到我们已经看到的所有类型和traits,这是相对简单的,但显然是乏味的。 让我们从一个小小的简化开始。

try_ready 宏

poll 的内容用了很多行代码:

  • 模式匹配

  • 如果NotReady,就返回NotReady

  • 如果Err, 就返回 Err

这是一个重复的模式,与我们在第3课中看到的错误处理非常相似。 futures crate 提供了一个 try_ready! 宏处理这个烦恼。 在 src/main 的 extern crate futures; 上边添加:

#[macro_use]

然后 poll 实现可以简化为:

let curr = try_ready!(self.0.poll());
println!("Counter is: {}", curr);
Ok(Async::Ready(()))

太棒了! 然后再一次编译,不要运行。 (我很好奇,为什么我一直这么说? 我们很快就会知道了,只是还有一站)。

我需要闭包

很神奇,我在这个速成班上到了第七课,却没有用那个双关语。 显然,定义一个完整的结构和 Future 实现对于仅仅打印一行来说有点过了。 幸运的是,期货板条箱的作者也注意到了这一点。 在未来特质中有许多组合子,这使得将事物链接在一起变得容易。 我们将使用 and_then 方法:

extern crate futures;
extern crate tokio;

mod future;
mod interval;

use self::interval::Interval;
use self::future::IntervalFuture;
use tokio::prelude::*;

fn main() {
    let interval = Interval::from_millis(500); // half a second
    let interval_future = IntervalFuture::new(interval);
    let interval_printer = interval_future.and_then(|curr| {
        println!("Counter is: {}", curr);
        futures::future::ok(())
    });

    tokio::run(interval_printer)
}

这样好多了! 但是如果你像我一样,futures::future::ok(()) 困扰着你。 它在那里有什么用处? 这是未来设计的一个重要部分,我们将利用这一点向前发展。 它允许我们创建操作链,以便在异步 I/O 的每一位完成时运行。 现在,我们不想在计数器输出第一个值之后做任何其他事情,所以我们只返回 futures::future::ok(()),这意味着“什么也不做,返回 item()”。

练习1

还有另外一种方法:map,这实际上是一个比 and_then 更好的选择。使用 map 试着重写上面的代码。 注意: 这个问题没有解答。

出于好奇。interval_printer 是什么类型的?让我们使用之前的愚蠢的技巧,给它一个错误的类型。 添加像 let interval_printer: bool = ... 一些愚蠢的东西。然后尝试编译,您将得到如下类型:

futures::AndThen<
  future::IntervalFuture,
  futures::FutureResult<(), ()>,
  [closure@src/main.rs:14:59: 17:6]
>

如果这看起来有点像 Iterator 类型,那是设计问题。 就像迭代器一样,Futures会在类型本身中捕获大量有关其将要执行的信息。 这使Futures可以编译为高效代码,以达到Rust零成本抽象的口号。

最后,运行它!

这种悬念是不是让你很难受? 好吧,你的机会来了。当你运行 cargo run 时会发生什么?

$ cargo run
Interval thread launched
Interval thread shutting down
Interval thread still alive, value was: 0
...

仅此而已。 挂起来了。 它不会打印出我们辛辛苦苦添加的信息。 我是个彻头彻尾的失败者,我的生活被毁了。

经过一个小时的思考和几杯威士忌,事情开始变得清晰起来。 在我们最初的实现中,我们在主函数中有一个非常浪费的循环。 它不停地检查计数器是否有变化。 我们在 Future 实现中也是这样做的,先睡眠,然后再检查是否有一个 NotReady。 但是 tokio 可能不会这么做,对吧? (答案是肯定的)。

与其做一些愚蠢的浪费的事情,futures crate 要聪明得多,它有一种机制:

  • 确定哪个任务正在尝试访问 future 和 then 提供的数据

  • 通知任务有新数据可用

futures::task::current() 函数提供当前正在运行的任务,作为一个 Task 结构。 该结构有一个 notify 方法,用于让任务知道有更多的数据可用。

在我们的程序中,我们将逻辑分为 Interval 和 IntervalFuture。 IntervalFuture 需要负责调用 current() 函数(考虑一下为什么会这样)。 为了实现这一目标,我们需要做出以下改变:

  • 在 Interval 中添加一个新字段来保存 Arc<Mutex<Option<task>> (是的,这是一个拗口的字段) ,并正确初始化。

  • 每次我们调用 fetch_add 和更新计数器时,如果任务存在的话也调用 notify () 。

  • 提供 set_task 方法以间隔设置Task。

  • 在 IntervalFuture 中返回 NotReady 时,调用 set_task。

练习2

在查看下面的解决方案之前,请先尝试实现这四个更改。 关于我们尚未涵盖的 Rust 的某些功能的提示:您最终会想要在互斥对象中保存的 Option 上进行模式匹配。 您将希望通过引用进行模式匹配,这将需要一些类似于 Some(ref task) => 的代码。 最终的输出应该是:

Interval thread launched
Interval thread still alive, value was: 0
Counter is: 1
Interval thread shutting down

如果您想要确定,您可以在 Github 上查看代码的初始版本

练习2解答

您可以在 Github 上查看 diff 和 所有源码。这儿包含不同的行:

diff --git a/src/future.rs b/src/future.rs
index 9aaee3c..e231e7b 100644
--- a/src/future.rs
+++ b/src/future.rs
@@ -22,6 +22,8 @@ impl Future for IntervalFuture {
     fn poll(&mut self) -> Result<Async<Self::Item>, Self::Error> {
         let curr = self.interval.get_counter();
         if curr == self.last {
+            let task = futures::task::current();
+            self.interval.set_task(task);
             Ok(Async::NotReady)
         } else {
             self.last = curr;
diff --git a/src/interval.rs b/src/interval.rs
index 044e2ca..8013ac6 100644
--- a/src/interval.rs
+++ b/src/interval.rs
@@ -1,12 +1,14 @@
 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
-use std::sync::Arc;
+use std::sync::{Arc, Mutex};
 use std::thread::{sleep, spawn};
 use std::time::Duration;
+use futures::task::Task;

 #[derive(Clone)]
 pub struct Interval {
     counter: Arc<AtomicUsize>,
     still_running: Arc<AtomicBool>,
+    task: Arc<Mutex<Option<Task>>>,
 }

 impl Drop for Interval {
@@ -26,22 +28,37 @@ impl Interval {
         let still_running = Arc::new(AtomicBool::new(true));
         let still_running_clone = still_running.clone();

+        let task: Arc<Mutex<Option<Task>>> = Arc::new(Mutex::new(None));
+        let task_clone = task.clone();
+
         spawn(move || {
             println!("Interval thread launched");
             while still_running_clone.load(Ordering::SeqCst) {
                 sleep(duration);
                 let old = counter_clone.fetch_add(1, Ordering::SeqCst);
                 println!("Interval thread still alive, value was: {}", old);
+
+                let task = task_clone.lock().unwrap();
+                match *task {
+                    None => (),
+                    Some(ref task) => task.notify(),
+                };
             }
         });

         Interval {
             counter,
             still_running,
+            task,
         }
     }

     pub fn get_counter(&self) -> usize {
         self.counter.load(Ordering::SeqCst)
     }
+
+    pub fn set_task(&mut self, task: Task) {
+        let mut guard = self.task.lock().unwrap();
+        *guard = Some(task);
+    }
 }

太好了,我们终于有一个可以工作的 tokio 程序了!

请不要担心它有多复杂。通知是tokio如何在内部工作的重要方面。但是,在大多数情况下,您将不会创建自己的原始Futures,而是会处理tokio或其他库提供的现有Futures。 这些现有的 Future 将提供必要的通知逻辑。 您只需要遵守这一规则:

仅当从基础Future收到NotReady时,才从轮询函数返回NotReady。

只有一个值?

令人失望的是,我们出色的长时间运行计数器最终只能输出单个值。 我们可以创建某种循环吗? 像下面这样的简单方法不起作用:

let interval_printer = interval_future.and_then(|curr| {
    println!("Counter is: {}", curr);
    interval_printer
});

这不是Haskell,我们无法递归引用中定义的 interval_printer。 继续做一些类似的事情,最终您会感到沮丧,然后回到威士忌。浏览 futures 文档,像 loop_fn 这样的辅助函数看起来很有前途,但是在这种情况下,我没有找到一种使之起作用的简单方法(请让我知道是否错过了某些东西!)。在停止之前,我最终得到了这样的奇特的东西:

fn main() {
    let interval = Interval::from_millis(500); // half a second
    let interval_future = Arc::new(Mutex::new(IntervalFuture::new(interval)));
    let interval_printer = loop_fn(interval_future, |interval_future| {
        let interval_future_clone = interval_future.clone();
        interval_future.lock().unwrap().and_then(|curr| {
            println!("Counter: {}", curr);
            futures::future::ok(Continue(interval_future_clone))
        })
    });

    tokio::run(interval_printer)
}

另一个结构体!

与前面一样,我们将定义另一个帮助器类型来实现这个循环概念。 然后我们会看到这个问题已经在 futures crate 本身得到了更好的解决,但我们会很快到达那里。

我们想定义一个新的结构体 KeepPrinting,它是一个包裹 IntervalFuture 的新类型:

  • 有一个 Future 实现

  • 有 Item = ()

  • 有 loop 实现

  • 有 try_ready! 宏

练习3

尝试实现 KeepPrinting 并在主函数中使用它。解决方案马上就会出现,但是不要作弊!

#[macro_use]
extern crate futures;
extern crate tokio;

mod future;
mod interval;

use self::future::IntervalFuture;
use self::interval::Interval;
use tokio::prelude::*;

struct KeepPrinting(IntervalFuture);

impl Future for KeepPrinting {
    type Item = ();
    type Error = ();
    fn poll(&mut self) -> Poll<(), ()> {
        loop {
            let curr = try_ready!(self.0.poll());
            println!("Counter: {}", curr);
        }
    }
}

fn main() {
    let interval = Interval::from_millis(500); // half a second
    let interval_future = IntervalFuture::new(interval);
    let keep_printing = KeepPrinting(interval_future);

    tokio::run(keep_printing)
}

就这样,我们得到了一个无限循环的程序。这几乎看起来像我们可以使用Iterators完成的事情。 这让我感到奇怪……futures 中是否存在类似 Iterator 的东西?

Streams

Future 是一个有延迟的单一 Result 的动作。Stream 是一个结果流,类似于 Iterator,每个值之间有一个延迟。

在 src/main.rs 中,添加 mod stream; 然后编辑 src/stream.rs:

  • 调用结构 IntervalStream 而不是 IntervalFuture

  • 提供一个impl Stream for IntervalStream 代替 impl Future

  • 跟随编译器错误来修复它

在 main 函数中,我们希望创建一个 IntervalStream 值,而不是使用 KeepPrinting 或其他任何东西。 然而,tokio::run 需要一个 Future 而不是 Stream 来运行。 幸运的是,有一个 helper 函数,它对流中的每个值运行一个给定的闭包。

练习4

尝试实现 src/stream.rs 和 src/main.rs. 的解决方案。

当我第一次学习 for_each 时意识到,就像 and_then 一样,它需要以Future结尾。不知道这是我自己的缺点,还是一个常见的问题。 无论如何,如果在闭包中意识到你需要像 future::ok(()) 那样的东西,那么您就很不错。

另外,在返回 Option<item> 时,Stream 的 poll 函数略有不同。 这类似于迭代器的工作方式。 在我们的例子中,我们有一个无限流,所以我们从来不提供 None 情况。

src/stream.rs 文件代码:

extern crate futures;

use super::interval::Interval;
use futures::prelude::*;

pub struct IntervalStream {
    interval: Interval,
    last: usize,
}

impl IntervalStream {
    pub fn new(interval: Interval) -> IntervalStream {
        let last = interval.get_counter();
        IntervalStream { interval, last }
    }
}

impl Stream for IntervalStream {
    type Item = usize;
    type Error = ();

    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
        let curr = self.interval.get_counter();
        if curr == self.last {
            let task = futures::task::current();
            self.interval.set_task(task);
            Ok(Async::NotReady)
        } else {
            self.last = curr;
            Ok(Async::Ready(Some(curr)))
        }
    }
}

src/main.rs 文件代码:

xtern crate futures;
extern crate tokio;

mod interval;
mod stream;

use self::interval::Interval;
use self::stream::IntervalStream;
use tokio::prelude::*;

fn main() {
    let interval = Interval::from_millis(500); // half a second
    let interval_stream = IntervalStream::new(interval);
    let future = interval_stream.for_each(|curr| {
        println!("Counter: {}", curr);
        futures::future::ok(())
    });

    tokio::run(future)
}

练习5

像迭代器一样,Streams 也有助手方法,您可以使用这些方法构建更复杂的东西。 例如,尝试引入 map 并只打印前10个计数器值,但在打印前将它们加倍。 (没有提供解决方案)

这一切都开始很好地组合在一起了! 虽然在 futures crate中还有一些细节需要学习,但是你已经掌握了大部分的大创意。 下一步是熟悉 tokio 中的 API,但是相对而言,这并不那么令人费解。 为了让大家明白我们到目前为止所做的,我们将进行一些练习,然后继续 tokio。

练习6

定义一个新的结构 MyOk,这样 main 函数就可以正常工作:

fn main() {
    let name = String::from("Alice");
    let future = MyOk::new(name).and_then(|name| {
        println!("Name: {}", name);
        MyOk::new(())
    });

    tokio::run(future)
}

提示:在查看解决方案之前,这里有一个帮助:您需要在 MyOk 新型类型内添加一个 Option,并且对其进行两次轮询是无效的。

练习6解答

struct MyOk<T>(Option<T>);

impl<T> MyOk<T> {
    fn new(t: T) -> MyOk<T> {
        MyOk(Some(t))
    }
}

impl<T> Future for MyOk<T> {
    type Item = T;
    type Error = ();
    fn poll(&mut self) -> Poll<T, ()> {
        Ok(Async::Ready(self.0.take().unwrap()))
    }
}

练习7

使用 iter_ok 将范围 1. .11转换为流,然后将其收集为 Vec 并打印出来。

练习7解答

fn main() {
    tokio::run(stream::iter_ok(1..11).collect().and_then(|x| {
        println!("{:?}", x);
        future::ok(())
    }))
}

异步 I/O

我们通过创建一个虚拟的异步 I/O 数据源(Interval)来学习 futures crate。我们在那个世界里建立了 Future 和 Stream。我们用 tokio 执行7⃣器来管理这些事情。现在是时候使用一些真正的异步 I/O 了。

我们关心的大多数异步 I/O 最终将成为网络通信。在操作系统级别上,文件系统操作并不总是很好地处理异步 I/O。也就是说,为了让我们了解更多,让我们使用一个基于文件系统的示例。

你可以在 docs.rs 上找到这些文档

列出目录中的文件

如果你浏览上面的文档,你可能会发现函数 read _ dir。它传入一个 path,并返回一个 ReadDirFuture。这是 tokio 中的标准方法,就像我们使用 Iterators 一样: 简单的包装器函数提供对执行繁重任务结构的访问。在 tokio 要习惯的一件事就是如何阅读这些文档。

单击 ReadDirFuture 结构。 它具有一个 Future 实现,其中 Item 为ReadDir,错误为 std::io::Error。 在处理该 ReadDir 之前,让我们先编译一下。 由于这仍然是速成课,因此在每一步中,我们都可能会碰壁。

首先,要调用 read_dir,我们需要一个目录。让我们暂时用 "." (工作目录)。稍后我们将使用命令行参数。下面是一个简单的实现:

extern crate tokio;

use tokio::prelude::*;
use tokio::fs;

fn main() {
    let future = fs::read_dir(".");
    tokio::run(future)
}

这给了我们一个有点吓人的错误信息:

error[E0271]: type mismatch resolving `<tokio_fs::read_dir::ReadDirFuture<&str> as tokio::prelude::Future>::Item == ()`
 --> src/main.rs:8:5
  |
8 |     tokio::run(future)
  |     ^^^^^^^^^^ expected struct `tokio_fs::read_dir::ReadDir`, found ()
  |
  = note: expected type `tokio_fs::read_dir::ReadDir`
             found type `()`
  = note: required by `tokio::run`

error[E0271]: type mismatch resolving `<tokio_fs::read_dir::ReadDirFuture<&str> as tokio::prelude::Future>::Error == ()`
 --> src/main.rs:8:5
  |
8 |     tokio::run(future)
  |     ^^^^^^^^^^ expected struct `std::io::Error`, found ()
  |
  = note: expected type `std::io::Error`
             found type `()`
  = note: required by `tokio::run`

不过,让我为你缩小范围:

expected type `tokio_fs::read_dir::ReadDir`, found type `()`
expected type `std::io::Error`, found type `()`

啊对! tokio::run 要求我们的 Item 和 Error 为()。 我们可以使用map_err 修改错误。 如果出现以下错误,我们就将其打印出来:

let future = fs::read_dir(".")
    .map_err(|e| eprintln!("Error reading directory: {}", e))
    ;

这样就消除了第一个编译错误:

.and_then(|readdir| {
    println!("FIXME: use this: {:?}", readdir);
})

啊哦,我们得到了这个编译错误。你能解决它吗?

error[E0277]: the trait bound `(): tokio::prelude::Future` is not satisfied

以我的经验,当您看到它时,它几乎总是意味着:“ 我忘记添加 future::ok(())。 请记住,and_then 需要在下一个 Future 运行时结束。 添加该行,您的代码应编译。 运行产生输出:

FIXME: use this: ReadDir(ReadDir("."))

酷!现在是查看 ReadDir 文档的时候了。代替 Future 实现,它有一个 Stream。让我们把 for_each 放进去,看看会发生什么。

挑战:在编译之前,试着猜测下面代码中的错误。

.and_then(|readdir| {
    readdir
        .for_each(|entry| {
            println!("{:?}", entry.path());
        })
})

这个代码有两个问题:

  1. 它会留下一个错误类型的 std::io::Error

  2. 提供给 for_each 的闭包末尾不包括 future::ok(())

继续解决这些问题。为了确保我们达成共识,这里是我编译并成功运行的解决方案:

extern crate tokio;

use tokio::prelude::*;
use tokio::fs;

fn main() {
    let future = fs::read_dir(".")
        .map_err(|e| eprintln!("Error reading directory: {}", e))
        .and_then(|readdir| {
            readdir
                .map_err(|e| eprintln!("Error reading directory: {}", e))
                .for_each(|entry| {
                    println!("{:?}", entry.path());
                    future::ok(())
                })
        })
        ;
    tokio::run(future)
}

重复错误处理

令人恼火的是,我们有两个相同的 map_err 调用。 我们有两种不同的错误源:初始的 read_dir Futur 和 流式传输各个DirEntry。 但是,两种情况下的错误类型是相同的:std::io::Error。 因此,我们可以将错误处理移至最后,只需执行一次即可:

fn main() {
    let future = fs::read_dir(".")
        .and_then(|readdir| {
            readdir
                .for_each(|entry| {
                    println!("{:?}", entry.path());
                    future::ok(())
                })
        })
        .map_err(|e| eprintln!("Error reading directory: {}", e))
        ;
    tokio::run(future)
}

Flattening

事实证明,拥有一个可以生成另一个Future 的 Future 是很常见的,然后我们要运行第二个 Future,它有一个帮助方法 flatten()。 当Future给我们一个 Stream 时,还有一个 flatten_stream() 会做同样的事情。

练习8

使用 flatten_stream 重写上面的代码。 您最终应该不会调用and_then。 解决方案如下:

extern crate tokio;

use tokio::prelude::*;
use tokio::fs;

fn main() {
    let future = fs::read_dir(".")
        .flatten_stream()
        .for_each(|entry| {
            println!("{:?}", entry.path());
            future::ok(())
        })
        .map_err(|e| eprintln!("Error reading directory: {}", e))
        ;
    tokio::run(future)
}

命令行参数

总是把工作目录里的东西打印出来有点无聊。相反,让我们接受所有的命令行参数(跳过第一个,即可执行名称) ,并列出目录内容。我们将使用 Stream::iter_ok 将 Args 迭代转换为 Stream:

extern crate tokio;

use tokio::prelude::*;
use tokio::fs;
use std::env::args;

fn main() {
    let future = stream::iter_ok(args())
        .skip(1)
        .for_each(|dir| {
            fs::read_dir(dir)
                .flatten_stream()
                .for_each(|entry| {
                    println!("{:?}", entry.path());
                    future::ok(())
                })
        })
        .map_err(|e| eprintln!("Error reading directory: {}", e))
        ;
    tokio::run(future)
}

不幸的是,这不能编译。完整的错误消息很多(我鼓励您自己查看) ,但前几行就足以找到问题所在:

error[E0277]: `std::env::Args` cannot be sent between threads safely
  --> src/main.rs:20:5
   |
20 |     tokio::run(future)
   |     ^^^^^^^^^^ `std::env::Args` cannot be sent between threads safely

哦,该死。 Args 不是线程安全的,所以不能转换成 stream。很公平: vectors 来拯救!

练习9

在定义 future 之前创建一个 vectors 参数,并使用它。

练习9解答

extern crate tokio;

use tokio::prelude::*;
use tokio::fs;
use std::env::args;

fn main() {
    let args: Vec<String> = args().skip(1).collect();
    let future = stream::iter_ok(args)
        .for_each(|dir| {
            fs::read_dir(dir)
                .flatten_stream()
                .for_each(|entry| {
                    println!("{:?}", entry.path());
                    future::ok(())
                })
        })
        .map_err(|e| eprintln!("Error reading directory: {}", e))
        ;
    tokio::run(future)
}

并发在哪里?

如果你为这个程序提供两个不同的目录和大量的文件,你可能会注意到它按顺序处理这些目录: 它将打印第一个目录中的所有文件,然后是第二个目录中的所有文件。考虑到异步 I/O 和并发通常是并行的,这可能有点令人惊讶。

到目前为止,我们一次只完成一项任务。 我们的程序输出 args 的值,并且为每个参数提供一个Future。 该Future运行完成,然后处理 args 中的下一个值。

如果我们要同时处理每个目录怎么办? 为此,我们需要创建另一个任务,就像 tokio::run 创建新线程一样。 tokio::spawn 需要一个Future,其中 Item 和 Error 均为 ()。 这是我们程序的更多并发版本:

let future = stream::iter_ok(args)
    .for_each(|dir| {
        let future = fs::read_dir(dir)
            .flatten_stream()
            .map_err(|e| eprintln!("Error reading directory: {}", e))
            .for_each(|entry| {
                println!("{:?}", entry.path());
                future::ok(())
            })
            ;
        tokio::spawn(future);
        future::ok(())
    })
    ;

请注意,我在 tokio::spawn(future) 之后放置了 future::ok(()) 调用。 事实证明这不是必需的, spawn 返回一个 Spawn 值,其行为类似于 future::ok(())(通过其 IntoFuture 实现)。 因此,只需删除 future::ok 和在 spawn 之后的分号,您的代码仍然可以使用。

注意: 除非每个目录中都有大量文件,否则您可能不会注意到并发性。

跳过 Vector

令我烦恼的最后一件事是Vec。 确实,没有它,我们似乎应该能够摆脱困境。 我们不能将 Args 转换为 Stream,因为这需要在线程之间发送值。但是现在我们有了一个新的技巧:spawning。 如果我们从不将 Args 发送到任何地方,而只是产生一堆任务,该怎么办?

当我第一次学习 tokio 时,我会承认我花了更多的时间。我将向您展示的技巧而不是我为之骄傲。 我们需要创建一个Future,然后在其中运行for循环。 我们如何创建一个使我们无需等待其他任何代码即可运行的Future 呢? 我们可以使用 future:: ok(()) 创建一个虚拟Future,然后将下一个动作与 and_then 链接在一起。

let future = future::ok(()).and_then(|()| {
    for dir in args().skip(1) {
        let future = fs::read_dir(dir)
            .flatten_stream()
            .map_err(|e| eprintln!("Error reading directory: {}", e))
            .for_each(|entry| {
                println!("{:?}", entry.path());
                future::ok(())
            })
            ;
        tokio::spawn(future);
    }
    future::ok(())
});

如果您愿意的话,另一种方法是使用 future::poll_fn 帮助函数。 就像Future 的 poll 方法一样,不需要参数,该函数返回Result<Async<Item>, Error>。

练习10

使用 future::poll_fn 重写我们上面的程序。你的程序不应该使用 and then。

练习10解答

extern crate tokio;

use tokio::prelude::*;
use tokio::fs;
use std::env::args;

fn main() {
    let future = future::poll_fn(|| {
        for dir in args().skip(1) {
            let future = fs::read_dir(dir)
                .flatten_stream()
                .map_err(|e| eprintln!("Error reading directory: {}", e))
                .for_each(|entry| {
                    println!("{:?}", entry.path());
                    future::ok(())
                })
                ;
            tokio::spawn(future);
        }
        Ok(Async::Ready(()))
    });
    tokio::run(future)
}

对于文件系统操作来说,这一切都是值得的吗?可能不会。让我们进入更有趣的东西:网络通信!

TCP 客户端

Rust 的标准库已经为 TCP 通信提供了开箱即用非常好的支持。例如,下面的代码将发送一个 HTTP 请求到 http://httpbin.org/json,然后打印完整的响应标头和正文:

use std::io::{Read, Write};
use std::net::TcpStream;

fn main() -> Result<(), Box<std::error::Error>> {
    let mut stream = TcpStream::connect("httpbin.org:80")?;
    stream.write_all(b"GET /json HTTP/1.1\r\nHost: httpbin.org\r\nConnection: close\r\n\r\n")?;

    let mut buffer = vec![];
    stream.read_to_end(&mut buffer)?;

    println!("{}", std::str::from_utf8(&buffer)?);
    Ok(())
}

这里有一些简化的假设,比如使用 connection: close,这样我们就可以使用 read_to_end,并且假设响应正确地使用 UTF-8 编码。但这并不是对标准库中 TCP 支持的真正要求。

真正的问题与我们在本课中一直在讨论的问题相同:它会阻塞整个OS线程,从而阻止对网络的成功写入和后续读取。 让我们看一下 tokio 如何提供帮助。

在 tokio::net::TcpStream 中有一个 TcpStream 类型,看起来是个不错的选择。 它需要一个SocketAddr,我们可能可以很容易地做到这一点。 它返回一个 ConnectFuture。 让我们从一些简单的建立连接的代码开始:

extern crate tokio;

use tokio::net::TcpStream;
use tokio::prelude::*;
use std::net::ToSocketAddrs;

fn main() {
    let mut addr_iter = "httpbin.org:80".to_socket_addrs().unwrap();
    let addr = match addr_iter.next() {
        None => panic!("DNS resolution failed"),
        Some(addr) => addr,
    };
    let future = TcpStream::connect(&addr)
        .map_err(|e| eprintln!("Error connecting: {:?}", e))
        .map(|stream| {
            println!("Got a stream: {:?}", stream);
        });
    tokio::run(future)
}

目前,to_socket_addrs 业务不是我们的重点,因此我将忽略它。 请随意练习,以改进该代码的错误处理。

我们在这里有了所有熟悉的部分:定义Future,处理错误,并使用map将操作与打开的连接链接在一起。

让我们更仔细地看一下传递给闭包的流值。它来自 ConnectFuture,因此我们需要查看 Item 关联类型。当然,如果你检查文档,你会看到它是 TcpStream。太好了。

我们使用原始的非异步阻塞代码 write_all。如果我在 tokio 中搜索 write_all,我会发现有一个返回 WriteAll Futurehelper 函数。有趣的是:

  • write_all 接受两个参数: AsyncWrite 和 AsRef<[u8]> 。这将是我们的 TcpStream 和要发送的数据。

  • AsyncWrite 的 Item 是一对变量 A 和 T,它们与我们传入的参数完全相同。

将我们最初提供的信息流还给我们,对于继续连接至关重要。 我没有看到任何文档说明返回字节缓冲区的要点,但是我相信它已返回,以便在需要时可以重用可变的缓冲区。

无论如何,让我们放在一起并发出我们的请求:

请注意,我是如何用 and_then 调用替换上一个 map 的,这样我就可以在连接建立后提供另一个将要执行的 Future。

在 tokio 中也有一个 read_to_end 函数会不会太过分。不,一点也不

练习11

使用 read_to_end 将整个响应读取到 Vec 中,然后使用std::str::from_utf8将其打印出来,并根据需要随意处理错误。

练习11解答

extern crate tokio;

use std::net::ToSocketAddrs;
use tokio::io::{read_to_end, write_all};
use tokio::net::TcpStream;
use tokio::prelude::*;

const REQ_BODY: &[u8] = b"GET /json HTTP/1.1\r\nHost: httpbin.org\r\nConnection: close\r\n\r\n";

fn main() {
    let mut addr_iter = "httpbin.org:80".to_socket_addrs().unwrap();
    let addr = match addr_iter.next() {
        None => panic!("DNS resolution failed"),
        Some(addr) => addr,
    };
    let future = TcpStream::connect(&addr)
        .and_then(|stream| {
            write_all(stream, REQ_BODY)
        }).and_then(|(stream, _body)| {
            let buffer = vec![];
            read_to_end(stream, buffer)
        }).map(|(_stream, buffer)| {
            let s = std::str::from_utf8(&buffer).unwrap();
            println!("{}", s);
        }).map_err(|e| eprintln!("Error occured: {:?}", e));
    tokio::run(future)
}

将 Stream 保存到文件

为了最大的烦恼,我将重复说明:tokio不适用于异步文件操作。 也就是说,有一个我们可以使用的tokio::fs::File结构。 让我们尝试将响应内容写入 httpbin.json:

let future = TcpStream::connect(&addr)
    .and_then(|stream| {
        write_all(stream, REQ_BODY)
    }).and_then(|(stream, _body)| {
        let buffer = vec![];
        read_to_end(stream, buffer)
    }).and_then(|(_stream, buffer)| {
        File::create("httpbin.json").and_then(|file| {
            write_all(file, &buffer).map(|_| ())
        })
    }).map_err(|e| eprintln!("Error occured: {:?}", e));

不幸的是,编译器不太喜欢这样:

error[E0277]: the trait bound `std::fs::File: tokio::io::AsyncWrite` is not satisfied
  --> src/main.rs:25:17
   |
25 |                 write_all(file, &buffer).map(|_| ())
   |                 ^^^^^^^^^ the trait `tokio::io::AsyncWrite` is not implemented for `std::fs::File`
   |
   = note: required by `tokio::io::write_all

好吧,我想这是有道理的: 你不能异步写入一个文件,所以 tokio::io::write_all 不会工作。幸运的是,File 实现了 Write trait,它提供了一个阻塞 write_all,这对我们的目的来说已经足够了。

练习12

重写上面的代码以成功地编写 httpbin.json。

练习12解答

第一个解决方案,忽略任何闭包的错误处理:

let future = TcpStream::connect(&addr)
    .and_then(|stream| {
        write_all(stream, REQ_BODY)
    }).and_then(|(stream, _body)| {
        let buffer = vec![];
        read_to_end(stream, buffer)
    }).and_then(|(_stream, buffer)| {
        File::create("httpbin.json").map(|mut file| {
            file.write_all(&buffer).unwrap()
        })
    }).map_err(|e| eprintln!("Error occured: {:?}", e));

但理想情况下,我们希望避免使用 unwrap() ,而是在此处提升 I/O 错误,由下面的 map_err 处理。 事实证明,要实现这一目标,只需进行微不足道的更改:

File::create("httpbin.json").and_then(|mut file| {
    file.write_all(&buffer)
})

我们不使用 map,而是使用 and _ then,这要求我们返回一些实现 Future 的值。但幸运的是,Result 本身实现了Future! Ok 成为该 Future的 Item,Err 成为其 Error。 问题解决了!

练习13

我们根本没有用到 tokio 让我们使这个程序并发。编写一个程序,该程序接受命令行参数以确定要发出的 HTTP 请求和要将它们存储到的文件。为了简化实现,我们让它接受如下输入:

$ cargo run httpbin.org:80 /json httpbin.json example.com:80 / homepage.html

可以自由地处理无效的命令行参数,但是这是最简单的。

练习13解答

extern crate tokio;

use std::net::ToSocketAddrs;
use tokio::io::{read_to_end, write_all};
use tokio::net::TcpStream;
use tokio::prelude::*;
use std::fs::File;

fn download(host: String, path: String, filename: String) -> impl Future<Item=(), Error=()> {
    let mut addr_iter = host.to_socket_addrs().unwrap();
    let addr = match addr_iter.next() {
        None => panic!("DNS resolution failed"),
        Some(addr) => addr,
    };
    let req_body = format!(
        "GET {} HTTP/1.1\r\nHost: {}:80\r\nConnection: close\r\n\r\n",
        path,
        host,
        );

    TcpStream::connect(&addr)
        .and_then(|stream| {
            write_all(stream, req_body).and_then(|(stream, _body)| {
                let buffer = vec![];
                read_to_end(stream, buffer).and_then(|(_stream, buffer)| {
                    File::create(filename).and_then(|mut file| {
                        file.write_all(&buffer)
                    })
                })
            })
        }).map_err(|e| eprintln!("Error occured: {:?}", e))
}

fn main() {
    tokio::run(future::poll_fn(|| {
        let mut args = std::env::args().skip(1);
        loop {
            match (args.next(), args.next(), args.next()) {
                (Some(host), Some(path), Some(filename)) => {
                    tokio::spawn(download(host, path, filename));
                }
                _ => return Ok(Async::Ready(())),
            }
        }
    }))
}

更好的错误处理

当我们有一个糟糕的地址时,我们只是 panic。让我们做得更好一点。首先,我将定义一个 helper 函数来返回一个很好的 Result。我们将使用一个 String 来处理 Err 变量,但是我们最好应该定义一个枚举:

fn resolve_addr(host: &str) -> Result<SocketAddr, String> {
    let mut addr_iter = match host.to_socket_addrs() {
        Ok(addr_iter) => addr_iter,
        Err(e) => return Err(format!("Invalid host name {:?}: {:?}", host, e)),
    };
    match addr_iter.next() {
        None => Err(format!("No addresses found for host: {:?}", host)),
        Some(addr) => Ok(addr),
    }
}

download的内部我们继续 panic:

let addr = resolve_addr(&host).unwrap();

但是让我们做得更好。使用“?“没用的,因为我们不返回结果。其中一个想法是使用 return 提前返回:

let addr = match resolve_addr(&host) {
    Ok(addr) => addr,
    Err(e) => {
        eprintln!("Error resolving address: {}", e);
        return future::err(());
    }
};

然而,我们从编译器中得到了一个有趣的错误消息:

error[E0308]: mismatched types
  --> src/main.rs:34:5
   |
34 | /     TcpStream::connect(&addr)
35 | |         .and_then(|stream| {
36 | |             write_all(stream, req_body).and_then(|(stream, _body)| {
37 | |                 let buffer = vec![];
...  |
43 | |             })
44 | |         }).map_err(|e| eprintln!("Error occured: {:?}", e))
   | |___________________________________________________________^ expected struct `tokio::prelude::future::FutureResult`, found struct `tokio::prelude::future::MapErr`

为了让工作正常,我们需要确保我们总是返回相同的类型。到目前为止,我们已经使用 impl Future 来表示“我们将返回一些类型,这是一个 Future” ,但是我们还没有告诉编译器这个类型是什么。相反,编译器已经推断出。但是现在,我们有两种不同的类型。

一种方法是动态调度,例如使用Box 。 但是,有更好的方法:使用 Either helper 类型。这种类型用于我们有两种不同类型的 future,它们都有相同的 Item 和 Error。让我们看看如何重写上面的代码:

fn download(host: String, path: String, filename: String) -> impl Future<Item=(), Error=()> {
    let addr = match resolve_addr(&host) {
        Ok(addr) => addr,
        Err(e) => {
            eprintln!("Error resolving address: {}", e);
            return future::Either::A(future::err(()));
        }
    };
    let req_body = format!(
        "GET {} HTTP/1.1\r\nHost: {}:80\r\nConnection: close\r\n\r\n",
        path,
        host,
        );

    future::Either::B(TcpStream::connect(&addr)
        .and_then(|stream| {
            write_all(stream, req_body).and_then(|(stream, _body)| {
                let buffer = vec![];
                read_to_end(stream, buffer).and_then(|(_stream, buffer)| {
                    File::create(filename).and_then(|mut file| {
                        file.write_all(&buffer)
                    })
                })
            })
        }).map_err(|e| eprintln!("Error occured: {:?}", e)))
}

练习14

实现你自己的数据类型,并在上面的代码中使用它:

练习14解答

enum Either<A, B> {
    A(A),
    B(B),
}

impl<A, B> Future for Either<A, B>
    where A: Future<Item=B::Item, Error=B::Error>,
          B: Future,
{
    type Item = A::Item;
    type Error = A::Error;

    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
        match self {
            Either::A(a) => a.poll(),
            Either::B(b) => b.poll(),
        }
    }
}

TCP服务端

既然我们的 TCP 客户机已经非常成功,那么让我们转移到服务器端:

  • 绑定监听套接字

  • 接受来自该套接字的连接

  • 将所有数据从套接字的输入端复制到套接字的输出端

绑定一个监听套接字将成为绑定的阻塞调用,采用我们的老朋友SocketAddr。 由于我们不再使用DNS解析了,因此我们对执行此操作的方法可能会稍为懒惰:

extern crate tokio;

use tokio::prelude::*;
use tokio::net::TcpListener;

fn main() {
    let addr = "127.0.0.1:3000".parse().expect("couldn't parse address");
    let listener = TcpListener::bind(&addr).expect("couldn't bind address");
    println!("It worked! {:?}", listener);
}

我们现在有了一个 TcpListener。与我们看到的其他类型不同,它没有 Future 或 Stream 的实现。但是,它确实有一个名为 Incoming() 的方法,该方法返回 Incoming,它有一个 Stream 实现,其中 Item 是 TcpStream。看起来很有希望!

let future = listener
    .incoming()
    .for_each(|socket| {
        println!("Accepted a connection! {:?}", socket);
        future::ok(())
    })
    .map_err(|e| eprintln!("An error occurred: {:?}", e))
    ;
tokio::run(future)

就像这样,我们实现了上面的点(1)和点(2)。我们只剩下第三点: 复制所有的数据。让我们在tokio搜索做复制的东西。看起来像 tokio::io::copy 就可以了。我们需要同时为读者和作者提供信息。既然我们是同一套接字的读者和写入者,让我们为两者提供相同的值:

let future = listener
    .incoming()
    .for_each(|socket| {
        copy(socket, socket)
            .map(|_| println!("Connection closed"))
    })
    .map_err(|e| eprintln!("An error occurred: {:?}", e))
    ;

你已经在嘲笑我的滑稽愚蠢的错误了吗?

error[E0382]: use of moved value: `socket`
  --> src/main.rs:13:26
   |
13 |             copy(socket, socket)
   |                  ------  ^^^^^^ value used here after move
   |                  |
   |                  value moved here

当然,我们不能在两个位置使用相同的值。幸运的是,在设计 Streams 时,作者提供了一个名为 split 的方法,为我们提供了流的读写结束。有了这些,我们的 echo 服务器就变得微不足道了:

extern crate tokio;

use tokio::prelude::*;
use tokio::net::TcpListener;
use tokio::io::copy;

fn main() {
    let addr = "127.0.0.1:3000".parse().expect("couldn't parse address");
    let listener = TcpListener::bind(&addr).expect("couldn't bind address");
    let future = listener
        .incoming()
        .for_each(|socket| {
            let (reader, writer) = socket.split();
            copy(reader, writer)
                .map(|_| println!("Connection closed"))
        })
        .map_err(|e| eprintln!("An error occurred: {:?}", e))
        ;
    tokio::run(future)
}

直接写

使用 copy 有点忽略了表面之下发生的血淋淋的细节。首先,我们使用 write_all 函数向写入方写入一些任意的信息。

注意: tokio::io::write_all 函数接受一个 AsyncWrite 并返回一个 WriteAll Future。不要因为 write_all 方法的存在而感到困惑,它实际上是一个阻塞调用。在写这篇教程的时候,我花了大约5分钟的时间来摸索。

extern crate tokio;

use tokio::prelude::*;
use tokio::net::TcpListener;
use tokio::io::{copy, write_all};

fn main() {
    let addr = "127.0.0.1:3000".parse().expect("couldn't parse address");
    let listener = TcpListener::bind(&addr).expect("couldn't bind address");
    let future = listener
        .incoming()
        .for_each(|socket| {
            let (reader, writer) = socket.split();
            write_all(writer, b"Welcome to the echo server\r\n")
                .map(|_| println!("Connection closed"))
        })
        .map_err(|e| eprintln!("An error occurred: {:?}", e))
        ;
    tokio::run(future)
}

修改上面的代码,以便在打印 “ Welcome to the echo server” 之后,它会继续实际回送已发送的内容。

write_all(writer, b"Welcome to the echo server\r\n")
    .and_then(|(writer, _)| {
        copy(reader, writer)
            .map(|_| println!("Connection closed"))
    })

编码

直接从 reader 那里阅读比直接写给 writer 要稍微棘手一些。我们可以使用基本的轮询读取函数,但是我们不打算在这里使用。(如果想了解更多信息,请随时阅读官方tokio教程)。

相反,我们将引入一个新的概念,codecs。到目前为止,我们一直在隐式地使用 AsyncRead 和 AsyncWrite traits,它们本质上提供了构建我们自己的 Futures 所需的原始轮询函数(就像我们很久以前在本课开始时所做的那样)。然而,我们通常不希望在那个抽象级别上工作。相反,我们希望处理某种有框架(或分块)的数据。

相反,新的抽象将是接收器,它是“可以异步将其他值发送到的值”。 我们将继续使用Stream特质作为阅读方面,我们已经很熟悉了。

让我们设计一个例子,我们的 echo 服务器目前提供的输出有点奇怪:

Hello
Hello
There
There
World
World

很难说我输入了什么,服务器响应了什么。相反,我希望从服务器发回的每一行都以 “ You said: ” 开头。用到目前为止,我们看到的抽象方法做这件事是相当痛苦的: 我们需要获取大量的数据,寻找换行符,分解输入,在“ You said: ”消息中拼接。我知道这是一个速成课,但是我不想就这样一头栽进去。

相反,让我们直接跳到更好的解决方案。我想把我们的 TCP 流视为一条数据流。如果我搜索单词“ lines”(实际上这就是我学习 codec 的方式) ,最终得到的是 LinesCodec。它提供了一个方法 new () ,以及一个长度为 new_with_max_length。我们将在这里使用 new,但我建议阅读文档,看看为什么在任何类型的安全敏感上下文中这都是一个糟糕的主意。

该类型上唯一的另一种方法是max_length,它看起来并不能帮助我们实际将 TCP 套接字作为一行流处理。 因此,让我们看一下特征实现。 我们已经有了所有常见的可疑对象:Clone,PartialOrd 等。但是还有两个新的可疑对象:Decoder 和 Encoder。 好吧,这看起来确实很有趣。

通读 Decoder 上的文档,它提供了一种称为 framed 的方法,该方法的描述非常出色(请花点时间浏览该链接并阅读文档)。事不宜迟,让我们尝试将LinesCodec添加到回显服务器中:

extern crate tokio;

use tokio::prelude::*;
use tokio::net::TcpListener;
use tokio::codec::{Decoder, LinesCodec};

fn main() {
    let addr = "127.0.0.1:3000".parse().expect("couldn't parse address");
    let listener = TcpListener::bind(&addr).expect("couldn't bind address");
    let future = listener
        .incoming()
        .for_each(|socket| {
            let lines_codec = LinesCodec::new();
            let socket = lines_codec.framed(socket);
            socket
                .send(String::from("Welcome to the echo server"))
                .map(|_| println!("Connection closed"))
        })
        .map_err(|e| eprintln!("An error occurred: {:?}", e))
        ;
    tokio::run(future)
}

您可能已经注意到,在“ Welcome”字符串的末尾不再有一个换行序列。这是因为我们的代码行编解码器会自动处理这个问题。另外,我们现在需要使用 String::from,因为这个 Sink 的 Item 是一个 String。

我们还可以使用 split 将 Sink 从流中分离出来:

let (sink, stream) = lines_codec.framed(socket).split();
sink
    .send(String::from("Welcome to the echo server"))
    .map(|_| println!("Connection closed"))

并且,我们可以使用 Stream 的 each 来获得一条线的lines:

extern crate tokio;

use tokio::prelude::*;
use tokio::net::TcpListener;
use tokio::codec::{Decoder, LinesCodec};

fn main() {
    let addr = "127.0.0.1:3000".parse().expect("couldn't parse address");
    let listener = TcpListener::bind(&addr).expect("couldn't bind address");
    let future = listener
        .incoming()
        .for_each(|socket| {
            let lines_codec = LinesCodec::new();
            let (sink, stream) = lines_codec.framed(socket).split();
            sink
                .send(String::from("Welcome to the echo server"))
                .and_then(|sink| {
                    stream
                        .for_each(|line| {
                            println!("Received a line: {}", line);
                            future::ok(())
                        })
                        .map(|_| println!("Connection closed"))
                })
        })
        .map_err(|e| eprintln!("An error occurred: {:?}", e))
        ;
    tokio::run(future)
}

我们在这里差不多完成了:我们只需要将行发送回 sink 即可,而不是发送到 stdout。 不幸的是,到目前为止,使用send方法将非常棘手,因为最终将在for_each的每次迭代中消耗接收器。 我们可以找到一种使所有工作正常进行的方法,但是,让我们开始追逐并使用send_all。

练习16

修改上面的代码,以便不将行打印到标准输出,而是将其发送回客户端,并显示消息“You said:”。 您将要查看 send_all

练习16解答

extern crate tokio;

use tokio::prelude::*;
use tokio::net::TcpListener;
use tokio::codec::{Decoder, LinesCodec};

fn main() {
    let addr = "127.0.0.1:3000".parse().expect("couldn't parse address");
    let listener = TcpListener::bind(&addr).expect("couldn't bind address");
    let future = listener
        .incoming()
        .for_each(|socket| {
            let lines_codec = LinesCodec::new();
            let (sink, stream) = lines_codec.framed(socket).split();
            sink
                .send(String::from("Welcome to the echo server"))
                .and_then(|sink| {
                    let stream = stream
                        .map(|line| format!("You said: {}", line))
                        ;
                    sink.send_all(stream)
                        .map(|_| println!("Connection closed"))
                })
        })
        .map_err(|e| eprintln!("An error occurred: {:?}", e))
        ;
    tokio::run(future)
}

下节课

哇,这是重大的一课!你在 tokio 上应该有一个非常坚实的基础。现在是时候使用该库以及用于执行HTTP服务器和客户端等相关高级库的更多经验了。

根据读者的反馈,下一课可能会深入到 tokio 和相关库中,或者回到类似 Rust 的生命周期更基本的方面。在 tokio 这边,我们学习的有:

  • tasks 之间传递消息

  • UDP 通信

  • 递归目录遍历

  • 文件的并行下载

最后更新于