8. 直接了当的Future
最后更新于
这有帮助吗?
最后更新于
这有帮助吗?
距离我写完《Rust速成课》系列的已经有一年了。最后一个帖子非常精彩,深入到了 async,futures 和 tokio。所有这些都在一个帖子里。这有点虐待狂的味道,在这一点上我为自己感到骄傲。
从那以后发生了很多事情。 重要的是:Future特性已移入标准库本身,并吸收了一些修改。 然后,为了更好地结合起来,有了一个新的 async/.await 语法。 在Rust中编写异步代码时,我很难高估生活质量的差异有多大。
我最近在 ,演示了 Future 和async/.await 在实践中的应用。 但是在这里,我想对表面情况进行更彻底的分析。 与第7课不同,我将跳过为什么要编写异步代码的动机,并将其分解为更易消化的部分。 像第7课一样,我将内联包含练习题方案,而不是单独发布。
在本示例中,我将使用 async-std 库而不是 tokio。 我唯一真正的原因是,在 tokio 发布了对新 async/await语法的支持之前,我开始使用async-std。 一般来说,我还不准备考虑我更喜欢哪个库。
您应该启动一个 Cargo 项目来配合。试试 cargo new --bin sleepus-interruptus。如果要确保使用相同的编译器版本,请添加一个 rust-toolchain 文件,其中包含字符串1.39.0。运行 cargo run 确保你们都准备好了。
这篇文章是基于 完成 Rust 教学系列的一部分。 如果你在博客之外阅读这篇文章,你可以在找到这个系列中所有文章的链接。 也可 频道。
我想编写一个程序,将打印 Sleepus 消息 10 次,延迟0.5秒。 然后打印Interruptus 消息5次,延迟1秒。 这是一些相当简单的 Rust 代码:
但是,正如我的巧妙命名所暗示的,这不是我的真正目标。 该程序同步运行两个操作,首先打印Sleepus,然后打印Interruptus。 相反,我们希望以交错的方式打印这两套语句。 这样一来,间断实际上就做了一些中断。
练习:使用 std::thread::spawn 函数创建一个操作系统线程,使这些打印的语句交错。
解决这个问题有两种基本方法。第一种方法(也许更明显)是为每个函数生成一个单独的线程,然后等待每个函数完成:
注意两件事:
我们调用 spawn 为 spawn(sleepus),而不是 spawn(sleepus()) 。前者通过函数 spawn 驱动 sleepus 来运行,后者会立即运行 sleappus() 并将其结果传递给 spawn,这不是我们想要的。
我在 main 函数/线程中使用 join() 等待子线程结束。因为我很懒,所以我使用 unwrap 来处理可能发生的任何错误。
另一种方法是创建一个辅助线程,然后调用主线程中的一个函数:
这样更有效率(产生线程的时间更少,用于保存线程的内存更少) ,并且没有真正的缺点。我建议走这条路。
问题:如果我们没有在第二个 spawn 版本中调用 join 的话,该程序的行为是什么?如果我们不调用第一个 spawn 版本的 join 会怎么样?
但是这根本不是一个异步解决问题的方法!我们有两个线程正在处理的操作系统,它们既同步运行又对睡眠进行阻塞调用。让我们建立一些直觉,来了解如何让我们的两个任务(打印“睡眠”和打印“中断”)在一个线程中更好地协同工作。
我们将从最高层次的抽象开始,逐步深入到理解细节。让我们用异步方式重写我们的应用程序。将以下内容添加到 Cargo.toml:
现在我们可以重写我们的应用程序如下:
让我们从头到尾来看看这些变化:
我们从 async_std::task 获取它们,而不是从 std::thread 获取 sleep 和 spawn
现在 sleepus 和 interruptus 前带上了 async
在调用 sleepus 后,我们有一个.await。请注意,这不是 .await() 方法调用,而是新的语法
在 main 函数中有一个新属性 #[async_std::main]
我们现在传入 spawn(sleepus()) 而不是 spawn(sleepus),而是传入函数本身,然后立即运行该函数并将其结果传递给 spawn
现在,对 interruptus() 的之后调用 .await
我们使用.await 语法来代替 JoinHandle 上的 join()
练习:在您自己的机器上运行此代码,并确保所有代码都按预期编译和运行。然后尝试撤销上面列出的一些更改,看看是什么生成了编译器错误,以及是什么产生了不正确的运行时行为。
这看起来像是一大堆更改。 但实际上,我们的代码在结构上与以前的版本几乎相同,这是对 async/.await 语法的真实证明。 一切都按照我们希望的方式在表面下工作: 一个单独的操作系统线程进行非阻塞调用。
让我们来分析一下这些修改的实际含义。
将 async 添加到函数定义的开头可以做三件事:
它允许你在内部使用 .await 语法,我们稍后会解释它的含义。
它修改了函数的返回类型。 async fn foo ()-> Bar 实际上返回 impl std::Future::Future<Output = Bar> 。
自动将结果值包装在新的Future中。 稍后我们会证明这一点更好。
让我们稍微解释一下第二点。在标准库中定义了一个叫做 Future 的特性。它有一个关联的 Output 类型。这个 trait 的意思是: 我保证,当我完成的时候,我会给你一个类型 Output 的值。例如,您可以设想一个异步 HTTP 客户端,它看起来像这样:
发出该请求将需要一些非阻塞 I/O。 在这些情况发生时,我们不想阻塞调用线程。 但是我们确实希望以某种方式获得最终的响应。
稍后我们将更直接地讨论 Future 值。目前,我们将继续使用高级 async/.await 语法。
练习:通过修改其结果类型,将sleepus的签名重写为不使用async关键字。请注意,当您获得正确的类型时,代码将不会编译。注意你得到的错误信息。
async fn sleepus() 的结果类型是隐含的单元值 ()。因此,我们 Future 的 Output 应该是单元的。这意味着我们需要把我们的签名写成:
然而,只有这样的改变,我们才能得到下面的错误消息:
第一条消息非常直接:您只能在异步函数或块内使用.await语法。 我们还没有看到异步块,但这听起来像是:
第二个错误消息是第一个错误消息的结果: async 关键字导致返回类型为 impl Future。如果没有这个关键字,我们的 for 循环计算结果为 () ,这不是 impl Future。
练习:修正编译器错误,在圆角函数中引入异步块。不要在函数签名中添加异步,继续使用 impl Future。
用异步块包装整个函数体解决了这个问题:
也许我们不需要所有这些 async/.await 垃圾。 如果我们取消对 sleepus中的 .await 用法的调用,该怎么办? 也许令人惊讶的是,它编译了,尽管确实给了我们不祥的警告:
我们创建了 Future 的值,但并没有使用它。当然,如果你看看我们程序的输出,你就会明白编译器的意思:
我们所有的 Sleepus 消息都会立即打印输出。 有趣! 问题在于,对 sleep 的调用实际上不再使我们当前的线程进入睡眠状态。 相反,它生成一个实现Future的值。当这个承诺最终实现的时候,我们知道延迟已经发生了。但在我们的案例中,我们只是简单地忽略了 Future,因此从来没有真正地延迟过。
为了了解 .await 语法的作用,我们将直接使用 Future 值来实现我们的功能。 让我们从去掉 async 块开始。
如果我们删除 async 块,我们会得到这样的代码:
这给出了一个我们之前看到的错误消息:
这是有意义的: for 循环的计算结果为 () ,而 () 未实现 Future。解决这个问题的一种方法是在 for 循环之后添加一个表达式,计算结果为实现 Future 的内容。我们已经知道这样一件事: sleep。
练习:调整sleepus函数,使其编译。
练习:将 for 循环后的 sleep 调用替换为 ready 调用。
要进一步去除洋葱皮,让我们更加努力,而不是使用现成的函数。相反,我们将定义我们自己的结构来实现 Future。我打算叫 DoNothing。
练习:该代码无法编译。 如果不查看下面的内容或询问编译器,您认为它会抱怨什么?
这里的问题是 DoNothing 没有提供 Future 实现。我们将做一些编译器驱动的开发,并让 rustc 告诉我们如何修复我们的程序。我们的第一个错误消息是:
因此,让我们添加一个 trait 实现:
失败的原因是:
我们还真的不了解 Pin<&mut Self> 或 Context,但我们确实了解Output。 并且由于我们之前是从 Ready 调用返回 (),所以我们在这里做同样的事情。
哇,编译! 当然,由于 unimplemented!() 调用,它在运行时失败:
现在让我们尝试实现 poll。我们需要返回一个类型为 Poll <self::output> 或 Poll <()> 的值。让我们来看看 Poll 的定义:
使用一些基本的推理,我们可以看到 Ready 意味着“我们的 Future
已经完成,然后是输出” ,而 Pending 意味着“还没有完成”。鉴于我们的NoNothing 想要立即返回 (),我们可以在这里使用 Ready 。
练习:实现 poll 的工作版本。
恭喜,您刚刚实现了您的第一个 Future 结构体!
async
difference记住上面我们说过让一个函数异步做第三件事:
自动将结果值包装在一个新的 Future 中。稍后我们将更好地证明这一点。
稍后,让我们更好地证明这一点。
让我们将sleepus的定义简化为:
编译和运行都很好。让我们尝试切换回 async 方式来编写签名:
我们获得了一个错误:
您可以看到,当您有一个异步函数或块时,结果将自动包装在 Future 中。因此,我们不返回 DoNothing,而是返回一个 impl Future<Output = DoNothing> 。我们的类型需要 Output = ()。
练习:尝试猜测您需要向这个函数添加什么以使它能够编译。
解决这个问题非常容易:您只需将.await附加到DoNothing上:
这使我们对 .await 所做的事情有了更多的直觉:它从某种程度上从DoNothing Future 中提取 () 输出。 但是,我们仍然真的不知道它是如何实现的。 让我们建立一个更复杂的 Future 来靠近。
我们将构建一个新的 Future 实现,它是:
睡眠一定的时间
然后打印一条信息
我们对 SleepPrint 的实现策略是用我们自己的 Future 封装一个现有的 sleep Future。由于我们不知道 sleep 调用结果的确切类型(这只是一个隐含的 future ),因此我们将使用一个参数:
我们可以在sleepus函数中使用以下命令调用此函数:
当然,我们现在得到一个关于缺少的 Future 实现的编译器错误。所以让我们一起努力。我们的 impl 开始于:
这表示,如果 SleepPrint 包含的 sleep 值是输出类型为 () 的Future,则它是 Future。 当然,对于 sleep 函数而言,这是正确的。很好。 我们需要定义输出:
然后我们需要一个 poll 函数:
好了,现在最重要的是。我们拥有基本的 future,我们需要用它做些什么。我们唯一能做的就是调用 poll。poll 需要一个 &mut Context,幸运的是我们已经提供了这个上下文。该上下文包含当前正在运行的任务的信息,因此当任务准备好时,它可以被唤醒(通过 Waker)。
目前,让我们做我们可以合理做的唯一一件事:
有两种可能。如果 poll 返回 Pending,则表示 sleep 还没有完成。在这种情况下,我们希望我们的 future 也表明它还没有完成。为了实现这个功能,我们只需要传播 Pending 值:
但是如果 sleep 已经完成,我们就会收到一个 Ready(()) 变量。在这种情况下,终于是时候打印我们的消息,然后传播 Ready:
就这样,我们已经从一个简单的 future 演变成了一个更复杂的 future。 但这是临时的。
SleepPrint是临时的:它对在Future结束后要执行的特定操作进行硬编码。让我们开始游戏,并对两个不同的Future的动作进行排序。 我们将定义一个具有三个字段的新结构:
第一个 future 运行
第二个 future 运行
使用一个 bool 告诉我们是否完成了第一个 future
由于 Pin 的东西会变得有点复杂,现在是时候使用帮助框架来简化我们的实现并避免不安全的阻塞了。因此,在 Cargo.toml 中添加以下内容:
现在,我们可以定义一个TwoFutures结构,该结构允许我们将第一个和第二个Future投影到固定的指针中:
在 sleepus 中使用这个非常简单:
现在,我们只需要定义我们的 Future 实现。 容易吧? 我们要确保 Fut1 和 Fut2 均为 future。 我们的输出将是 Fut2 的输出。 (如果需要,您还可以返回第一和第二个输出)。要使所有这些工作:
为了使用固定的指针,我们将获得一个新值,该值将投影所有指针:
这样一来,我们就可以直接与我们的三个领域进行互动。 我们要做的第一件事是检查第一个 Future 是否已经完成。 如果没有,我们将对其进行 poll。 如果 poll 测验为Ready,则我们将忽略输出,并指出第一个Future已完成:
接下来,如果第一个 future 已经完成,我们希望对第二个进行 poll。如果第一个 future 没有完成,那么我们说我们正在等待:
就这样,我们一起创造了两个 future,一个更大、更宏伟、更光明的 future。
练习:立刻去掉异步块的用法,让编译器的错误引导你。
您收到的错误消息 () 不是 future。 相反,您需要在调用 println! 之后返回 Future 值。 我们可以使用方便的 async_std::future:: ready:
在 Rust 1.39 和 async/.await 语法之前,基本上这就是异步代码的工作方式。 这远非完美。 除了明显的正确步骤外,它实际上不是循环。 您可以递归调用 sleepus,但是这会创建一个编译器不太喜欢的无限类型。
但是幸运的是,我们现在终于建立了足够的背景知识,可以轻松地解释 .await 语法在做什么,and_then到底在做什么,但不必大惊小怪!
练习:重写上面的 sleepus 函数以使用 .await 代替 and_then。
重写真的很简单,函数的主体变成了非正确的步骤,超扁平化:
然后,我们还需要更改函数的签名以使用异步,或者将所有内容包装到异步块中。你说了算。
除了在此处明显改善了可读性之外,.await 还具有一些巨大的可用性改进。 一个突出的问题是它与循环绑定的容易程度。 对于较早的 future,这确实是一个痛苦。 另外,将多个await调用连在一起非常容易,例如:
不仅如此,还与 ? 完美的错误处理运算符。 上面的示例更有可能是:
最后一个谜团还没有解开,那就是 main 上那个奇怪的属性究竟是怎么回事:
我们的 sleepus 函数和 interruptus 函数实际上什么都不做。他们提供 Future 有关工作方法的说明。有些东西必须真正地执行这些动作。执行这些操作的是一个执行者。async-std 库提供了一个执行器,tokio 也是如此。 为了运行任何 Future,您需要一个执行程序。
上面的属性自动用 async-std 的执行程序包装 main 函数。然而,属性方法完全是可选的。相反,您可以使用 async std::task::block on。
练习:重写 main 以避免使用该属性。您需要将其从 async fn main 重写到 fn main。
由于我们在 main 主体中使用 .await,当我们简单地删除异步限定符时,会出现错误。 因此,我们需要在 main 内部使用一个异步块(或定义一个单独的辅助异步函数)。 综上所述:
每个执行者都有能力管理多个任务。 每个任务都在努力产生单个Future的输出。 就像线程一样,您可以生成其他任务以使并发运行。 这正是我们实现所需交互的方式!
一句话警告。 Future 和 async/.await 实现了一种协作并发形式。 相比之下,操作系统线程提供了抢占式并发。 重要的区别在于,在协作并发中,您必须协作。 如果您的一项任务导致了延迟,例如通过使用 std::thread::sleep或通过执行大量 CPU 计算,则不会中断该任务。
这样做的结果是您应确保在任务内部不执行阻塞调用。 而且,如果您要执行占用大量 CPU 的任务,则可能值得为其生成 OS 线程,或者至少确保执行者不会饿死您的其他任务。
我认为.await表面下的行为并不能说明什么,但我认为准确了解此处发生的情况很有用。 特别是,了解Future值与将Future值的输出实际链接在一起之间的区别是,正确使用 async/.await 的核心。 幸运的是,编译器错误和警告在引导您朝正确的方向方面做得很好。
在下一课中,我们可以开始使用我们新获得的 Future 知识和 async/.await 语法来构建一些异步应用程序。 我们将致力于使用Tokio 0.2编写一些异步 I / O,包括网络代码。
修改 main 函数以调用 spawn 两次而不是一次。
修改main函数以获得非交互行为,该程序在 Interruptus 之前多次打印Sleepus。
我们仍在使 println! 打印用阻止 I/O。 再次打开“unstable”功能,然后尝试使用 async_std::println。 您会收到一条难看的错误消息,直到您摆脱了错误为止。 尝试了解为什么会发生这种情况。
编写一个函数foo,以使以下断言得以通过:assert_eq!(42, async_std::task::block_on(async { foo().await.await }));
我们仍然会收到关于for循环中未使用的Future值的警告,但之后没有一个警告:该函数正在返回一个警告。但是不会得到以后的警告: 这个值是从函数返回的。但是,当然,睡0毫秒只是一种冗长的什么都不做的方式。如果有一个更明确地什么都不做的“虚拟” Future,那就好了。幸运的是,。
这将涉及使用。 我将不在这里描述。 固定事件的具体细节并不能很好地启发 Future 主题。 如果您希望对代码的这一部分感到满意,那么您就不会错过太多。
接下来的一点是包裹令人眼花缭乱的固定指针部分。 我们需要将Pin<mut Self> 投影到 Pin&mut Fut> 中,以便我们可以处理底层的 sleep Future。我们可以使用一个来使它更漂亮一些,但是我们需要做一些不安全的map:
注意: 在本文中,我们不打算深入探讨 Waker 是如何工作的。如果你想要一个现实生活中的例子来说明调用 Waker,我建议你阅读我在 。
像这样把两个任意的 future 粘在一起是很好的。但是第二个future取决于第一个future的结果就更好了。要做到这一点,我们需要一个类似 and_then 的函数(Monads FTW 给我的 Haskell 伙伴)。我不打算在这里用实现的血淋淋的细节来烦你,但是如果你感兴趣的话,。假设你有这个方法,我们可以自己编写这个函数:
这里有一些可以带回家做的练习,你可以。
修改 main 函数以便根本不调用 spawn。相反,。您需要添加一个使用 async_std::prelude::* ; 并将“unstable”特性添加到 Cargo.toml 中的 async-std 依赖项中。