Skip to content

25.5 协程简介

Rust 设计这个生成器,主要目的在于,基于生成器设计一套协程(Coroutine)的方案,从而方便编写大规模高性能异步程序。这个功能也是 Rust 设计组 2018 年要解决的主要问题之一,预计要到 2018 年年底才能正式稳定下来。到目前为止,依然只是一个实验性质的不稳定功能。

所谓协程,指的是一种用户态的非抢占式的多任务机制。它也可以实现多任务并行。跟线程相比,它的最大特点是它不是被内核调度的,而是由任务自己进行协作式的调度。协程的实现方案一般可以分为 stackful 以及 stackless 两种。Rust 的协程采用的是 stackless coroutine 的设计思路。

在 Rust 语言和标准库中,只引入了极少数的关键字、trait 和类型。async 和 await 关键字是目前许多语言都采用的主流方案,使用关键字而不是用宏来做 API,有助于社区的统一性,避免不同的异步方案使用完全不一样的用户 API。引入关键字使用的是 edition 方案,所以不会造成代码不兼容问题。标准库中只有极少数必须的类型,这也是 Rust 一贯的设计思路。但凡是可以在第三方库中实现的,一律在第三方库中实现,哪怕这个库本来就是官方核心组维护的,这样做可以让这个库的版本升级更灵活,有助于标准库的稳定性。

Rust 的协程设计,核心是 async 和 await 两个关键字,以及 Future 这个 trait:


rust
pub trait Future {
    type Output;
    fn poll(self: Pin<&mut Self>, lw: &LocalWaker) -> Poll<Self::Output>
    ......
}

Future 这个 trait 代表的是异步执行。它里面最重要的一个方法是 poll,意思是,查看当前 Future 的状态,这个方法的返回类型如下:


rust
pub enum Poll<T> {
    Ready(T),
    Pending,
}

对于一个实现了 Future Trait 的类型,每次调用这个 poll 方法,其实就是查看一下这个对象当前的状态是什么,该状态可以为正在执行或者已经执行完毕。

Future 可以组合,一个 Future 可以由其他的一个或者多个 Future 包装而成。跟我们已经见过的迭代器 Iterator 很像。比如,我们可以实现一个新的 Future,它的结果是多个 Future 按顺序执行得到的。或者,实现一个 Future,它的结果是两个子 Future 中先返回的那个。Future 组合的方式可以非常灵活。

然后我们还需要一个调度器 Executor,标准库中有一个 Executor 的 trait。它的具体实现可以由第三方库来实现。它应该有一个主事件循环,不断调用最外层每个收到了事件通知的 Future 的 poll 方法,外层的 Future 的 poll 方法被调用时,它就会调用内层 Future 的 poll 方法,不断嵌套。如果这个 Future 处于 Pending 状态,那么这个 Future 就应该设置好自己需要监听的事件信息,然后马上返回,放弃占用 CPU。等到合适的事件发生时,调度器则应该再次调用这个 Future 的 poll 方法,驱动这个 Future 从上次退出的那里继续往下执行。

大家可以看到,Future 跟 Generator 一样,具备同样的特性,也就是说可以在某个地方主动中断执行,待下一次再进来的时候,刚好可以从上次退出的地方恢复执行。这就是为什么 Rust 的 Future 最终是基于 Generator 实现的。在 Rust 里面,Generator 是 Future 的基础设施。

关于这个 Future trait,另外一个需要注意的点是,它的 self 参数是Pin<&mut Self>类型,而 Generator 的 resume 方法的 self 参数是&mut Self 类型。这个 Pin 类型,也是一个智能指针,它的目的主要就是保证它所指向的对象无法被 move。而&mut Self 类型是无法保证这一点的。举个例子,大家还记得 std::mem::swap 方法吗?


rust
pub fn swap<T>(x: &mut T, y: &mut T)

对于任意两个 T 类型的对象,如果我们拥有指向它们的&mut T 型指针,就可以把它们互换位置。这个操作就相当于把这两个对象都 move 到了其他地方。如果这两个对象存在自引用的现象,那么这个 swap 操作就可以造成它们内部出现野指针。而Pin<&mut T>类型就不存在这样的问题。Pin<&mut T>还实现了 DerefMut trait,所以它依然有权调用那些需要&mut Self 的成员方法。正因为 PinMut 保证了指向的对象不可 move,所以这个 poll 方法就可以不用 unsafe 修饰了。

一般情况下,实现任务调度以及为通过各种异步操作实现 Future trait 并不是最终用户关注的问题,这些应该都已经被网络开发框架完成,比如 tokio。大部分用户需要关注的是如何利用这些框架完成业务逻辑。用户此时用得最多的是 async 和 await 关键字。在最新的 nightly 版本中,只有 async 关键字的实现已完成,await 关键字还存在争议,它目前依然是使用宏来实现的。一个基本的使用示例如下所示:


rust
async fn async_fn(x: u8) -> u8 {
    let msg = await!(read_from_network());
    let result = await!(calculate(msg, x));
    result
}

在这个示例中,假设 read_from_network()以及 calculate()函数都是异步的。最外层的 async_fn 函数当然也是异步的。当代码执行到 await!(read_from_network())里面的时候,发现异步操作还没有完成,它会直接退出当前这个函数,把 CPU 让给其他任务执行。当这个数据从网络上传输完成了,调度器会再次调用这个函数,它会从上次中断的地方恢复执行。所以用 async/await 的语法写代码,异步代码的逻辑在源码组织上跟同步代码的逻辑差别并不大。这里面状态保存和恢复这些琐碎的事情,都由编译器帮我们完成了。

下面给大家解释一下 async 和 await 分别做了什么事情。

async 关键字可以修饰函数、闭包以及代码块。对于函数:


rust
async fn f1(arg: u8) -> u8 {}

实际上等同于:


rust
fn f1(arg: u8) -> impl Future<Output = u8> {}

这两种写法实际上是一模一样的。凡是被 async 修饰的函数,返回的都是一个实现了 Future trait 的类型。由 async 修饰的闭包也是一样的。async 代码块同样类似。它相当于创建了一个语句块表达式,这个表达式的返回类型是 impl Future。async 关键字不仅对函数签名做了一个改变,而且对函数体也自动做了一个包装,被 async 关键字包起来的部分,会自动产生一个 Generator,并把这个 Generator 包装成一个满足 Future 约束的结构体。在函数体中用户需要返回的是 Future::Output 类型。

对于 await 这个宏,我们可以在标准库中看到它的实现:


rust
macro_rules! await {
    ($e:expr) => { {
        let mut pinned = $e;
        let mut pinned = unsafe { $crate::mem::PinMut::new_unchecked(&mut pinned) };
        loop {
            match $crate::future::poll_in_task_cx(&mut pinned) {
                $crate::task::Poll::Pending => yield,
                $crate::task::Poll::Ready(x) => break x,
            }
        }
    } }
}

从语法上来讲,await 一定只能在 async 函数或代码块中出现,所以它实际上是被包在一个 Generator 里面的。await 这个宏的逻辑也很简单,await!(another_future)所做的事情就是,先构造一个 PinMut 指针指向another_future,然后调用another_future的 poll 方法。如果其处于 Pending 状态则 yield,暂时退出这个 Future。每当调度器恢复它的执行时,它都会继续调用 poll 方法,直到处于 Ready 状态,这时候这个 await 就算是执行完毕了,继续执行后面的语句。

下面我们看一下,如果基于 async/await 写程序,看起来会是什么样子:


rust
async fn fetch_rust_lang(client: hyper::Client) -> io::Result<String> {
    let response = await!(client.get("https://www.rust-lang.org"))?;
    if !response.status().is_success() {
        return Err(io::Error::new(io::ErrorKind::Other, "request failed"))
    }
    let body = await!(response.body().concat())?;
    let string = String::from_utf8(body)?;
    Ok(string)
}

可以看到,使用 async/await 来写异步逻辑,一方面可以保证高效率,另一方面,代码流程还是跟普通的同步逻辑类似,比较符合直觉。当然,我们也可以不用这个语法,通过 Future 的各种 adapter 的组合来完成同样的功能,这就跟上文中的对比一样,其与 async/await 的区别,就与手写 Iterator 和 Generator 之间的区别一样。

Released under the MIT License