Skip to content

25.4 Rust 异步的基石:协程(Coroutine)

在学习理解 async/await 语法之前,我们需要了解其背后真正的驱动力是一种被称为 协程(Coroutine) 的底层机制。

你可以把协程想象成厨房里那个“高效厨师”的工作流程。是一种用户态的、协作式的多任务机制。它也可以实现多任务并行。跟线程相比,它的最大特点是它不是被内核调度的,而是由任务自己进行协作式的调度。协程的实现方案一般可以分为 stackful 以及 stackless 两种。Rust 的协程采用的是 stackless coroutine 的设计思路。

  • 用户态:意味着它的创建和切换比操作系统线程(Thread)轻量得多,你可以轻松创建成千上万个。
  • 协作式:意味着一个任务(比如“烧水”)必须主动“让出”CPU(比如在等待水开时),其他任务(比如“切菜”)才能运行。它不像线程那样由操作系统强制切换(抢占式)。

这一机制使 Rust 能在单线程中管理大量异步任务:任务暂停时不占用线程资源,唤醒时凭借状态恢复继续执行,从而实现高效的非阻塞编程。

因为 协程 本身仍是实验性特性,所以你需要使用 Nightly Rust 来运行本章节的代码。

第一步:什么是协程?

前置知识:了解普通函数如何一次性执行完毕,以及 Iterator 如何一次产出一个值。

协程就像一列观光列车,它可以在任何一个指定的“景点”站(yield 点)暂停,当你再次呼唤它时,它会从上次暂停的站点继续出发。

专业术语:协程(Coroutine)

白话解释:一个可以被暂停和恢复的特殊代码块。它能使用 yield 关键字“产出”一个值并暂停自己,等待下一次被唤醒。与旧的 Generator 概念相比,Coroutine 的设计更加通用。

Coroutine trait 是这么定义的:

rust
// rust-lib/core/src/ops/coroutine.rs
/// [RFC 2033]: https://github.com/rust-lang/rfcs/pull/2033
/// [unstable book]: ../../unstable-book/language-features/coroutines.html
#[lang = "coroutine"]
#[unstable(feature = "coroutine_trait", issue = "43122")]
#[fundamental]
#[must_use = "coroutines are lazy and do nothing unless resumed"]
pub trait Coroutine<R = ()> {
    type Yield;
    type Return;

    #[lang = "coroutine_resume"]
    fn resume(self: Pin<&mut Self>, arg: R) -> CoroutineState<Self::Yield, Self::Return>;
}

一个简单的协程示例

要运行此代码,你需要在 nightly 工具链下,并开启 coroutinescoroutine_trait 特性。

rust
// main.rs
#![feature(stmt_expr_attributes, coroutines, coroutine_trait)] // 开启最新的协程特性

use std::ops::{Coroutine, CoroutineState};
use std::pin::Pin;

fn main() {
    // 创建一个协程闭包
    // 注意:现在它被称为 coroutine
    let mut coroutine = #[coroutine] || {
        println!("观光列车出发!");
        yield 1; // 在第一个景点暂停,产出值 1
        println!("前往下一个景点...");
        yield 2; // 在第二个景点暂停,产出值 2
        println!("旅途结束!");
        // Coroutine 没有显式的 return,最后一个表达式的值就是返回值
        "终点站纪念品"
    };

    // 唤醒协程。我们用 Pin 包裹它,并调用 resume。
    // resume 可以接受一个参数,这里我们传入 ()
    match Pin::new(&mut coroutine).resume(()) {
        CoroutineState::Yielded(value) => {
            println!("在景点 {} 下车游玩", value);
        }
        _ => panic!("出错了"),
    }

    // 再次唤醒
    match Pin::new(&mut coroutine).resume(()) {
        CoroutineState::Yielded(value) => {
            println!("在景点 {} 下车游玩", value);
        }
        _ => panic!("出错了"),
    }

    // 最后一次唤醒
    match Pin::new(&mut coroutine).resume(()) {
        CoroutineState::Complete(retval) => {
            println!("收到最终纪念品: {}", retval);
        }
        _ => panic!("出错了"),
    }
}

输出:

观光列车出发!
在景点 1 下车游玩
前往下一个景点...
在景点 2 下车游玩
旅途结束!
收到最终纪念品: 终点站纪念品

Coroutine::resume 现在可以接受一个参数,这使得协程之间的双向通信成为可能(尽管在 async/await 的场景下,这个参数通常是 ())。

第二步:协程如何“记忆”?

生成器是如何记住上次执行到哪儿,以及所有局部变量的值的?

一句话总结:编译器将你的生成器代码,自动转换成了一个“状态机(State Machine)”对象。

专业术语:状态机(State Machine)

白话解释:一个能够记住自己当前“状态”的对象。对于生成器来说,“状态”包括两个方面:

  1. 执行到哪了:是在第一个 yield 之前,还是在第一个和第二个 yield 之间?
  2. 变量的值是多少:在暂停时,所有需要跨越 yield 使用的局部变量的值都需要被保存下来。

让我们看一个经典的斐波那契例子:

rust
let mut fib = || {
    let mut current = 0; // 需要跨越 yield
    let mut next = 1;    // 需要跨越 yield

    loop {
        // `new_next` 只在单次循环内有效,不需要跨越 yield
        let new_next = current + next;
        current = next;
        next = new_next;
        yield current; // 暂停点
    }
};

编译器分析后会得出结论:

  • currentnext 的值在每次 yield 之后,下一次 resume 时还需要使用,所以它们必须被提升为状态机结构体的成员变量
  • new_next 在每次循环开始时创建,在 yield 之前就完成了它的使命,所以它只是 resume 方法内部的一个普通局部变量,不需要被保存。

深入底层:一个手工模拟的状态机

假设我们有这样一个协程:

rust
let mut coroutine = || {
    let mut x = 10;
    yield x;
    x += 1;
    yield x;
};

编译器生成的(概念上的)状态机可能长这样:

rust
// 这是一个概念性的表示,用于解释原理

// 状态机用一个 enum 来表示当前进行到哪一步
enum MyCoroutineState {
    // 初始状态,保存着需要跨越 yield 的变量 x
    Start { x: i32 },
    // 第一次 yield 后的状态,同样保存着 x
    Yielded1 { x: i32 },
    // 最终完成状态
    Done,
}

// 整个协程就是一个包裹了状态的结构体
struct MyCoroutine {
    state: MyCoroutineState,
}

impl Coroutine for MyCoroutine {
    type Yield = i32;
    type Return = (); // 协程最后没有返回值

    fn resume(self: Pin<&mut Self>, _arg: ()) -> CoroutineState<i32, ()> {
        // 使用 mem::replace 可以安全地取出当前状态并留下一个占位符,
        // 避免在匹配期间对 self 的可变借用冲突。
        let current_state = std::mem::replace(&mut self.state, MyCoroutineState::Done);

        match current_state {
            // 1. 如果是初始状态
            MyCoroutineState::Start { x } => {
                println!("第一次唤醒,从头开始");
                // 更新下一次的状态为 Yielded1,并保存更新后的 x
                self.state = MyCoroutineState::Yielded1 { x: x };
                // 产出值并暂停
                return CoroutineState::Yielded(x);
            }
            // 2. 如果是第一次 yield 后的状态
            MyCoroutineState::Yielded1 { mut x } => {
                println!("第二次唤醒,从上个 yield 点继续");
                x += 1;
                // 更新状态为 Done,因为后面没有 yield 了
                self.state = MyCoroutineState::Done;
                // 产出值并暂停
                return CoroutineState::Yielded(x);
            }
            // 3. 如果已经完成
            MyCoroutineState::Done => {
                println!("协程已完成,再次唤醒会 panic");
                panic!("Coroutine resumed after completion");
            }
        }
    }
}

这个手工模拟完美地展示了:

  1. 编译转换:编译器将协程转换为状态机结构体,例如 MyCoroutine
  2. 状态转移:通过枚举 enum 中的不同变体来记录执行阶段(如初始、第一次 yield 后、完成等)。
  3. 数据保存:需要记忆的变量(x)被存储在 enum 变体的字段中。
  4. 恢复执行resume 方法通过 match 当前枚举状态,恢复变量值并跳转到正确的代码逻辑继续执行,实现 “未变变量继续可用” 的承诺。

第三步(进阶):自引用问题

这是生成器依然是实验性功能的核心原因,也是理解 Pin 的关键。 看下面这个例子,它在 nightly 下也无法编译:

rust
// 无法编译!
let _g = || {
    let local = [1, 2, 3];
    let ptr = &local[0]; // ptr 借用了同一个协程状态机内部的 local

    yield; // <--- 暂停点

    println!("{}", *ptr); // 恢复后,需要使用 ptr
};

编译时,你会得到一个类似这样的错误: error: borrow may still be in use when coroutine yields

为什么? 因为编译器生成的状态机是自引用的:

rust
// 概念上的结构体
struct SelfRefCoroutine<'a> {
    local: [i32; 3],
    // ptr 指向了同一个结构体内的 local,'a 必须等于'self
    ptr: &'a i32,
    state: i32,
}

ptr 这个字段是一个引用,它指向了同一个结构体内部local 字段。这种结构被称为 “自引用结构体” (Self-Referential Struct)

为什么自引用是危险的

Rust 的一个基本假设是:任何值默认都是可以被移动(Move)的。移动操作在 Rust 中非常常见且廉价(通常只是内存拷贝)。 举个例子,大家还记得 std::mem::swap 方法吗?

rust
pub const fn swap<T>(x: &mut T, y: &mut T)

对于任意两个 T 类型的对象,如果我们拥有指向它们的 &mut T 型指针,就可以把它们互换位置。这个操作就相当于把这两个对象都 move 到了其他地方。如果这两个对象存在自引用的现象,那么这个 swap 操作就可以造成它们内部出现野指针。

  1. 初始状态SelfRefCoroutine 对象位于内存地址 0x1000。它的 local 字段在 0x1000ptr 字段存储的地址也是 0x1000。一切正常。
  2. 移动对象:现在,我们将整个对象移动到一个新的内存位置,比如 0x2000(这可能在函数返回、集合扩容等情况下发生)。
  3. 灾难发生
    • local 字段的数据被拷贝到了新地址 0x2000
    • 但是 ptr 字段里存储的地址值 0x1000 也被原封不动地拷贝了过去!
    • 现在,ptr 依然指向旧的、已经被释放或无效的内存地址 0x1000。它成了一个悬垂指针(Dangling Pointer)
    • 当代码恢复执行,试图解引用 *ptr 时,就会访问非法内存,导致程序崩溃或未定义行为。

为了从根本上杜绝这种危险,Rust 的借用检查器会禁止创建这种可能在移动后失效的自引用结构体。

第四步:解决方案 Pin —— 精准的“固定”

既然问题出在“移动”上,那如果不让它移动,不就安全了吗?这就是 Pin 的作用。

Pin 的设计思想

Rust 设计组意识到,一个协程(或 Future)在它第一次被 resume/poll 之前,是可以安全移动的。因为此时,内部的自引用指针(如 ptr)可能还没有被初始化,或者即使初始化了,整个数据块一起移动也不会出错。

危险只发生在第一次 resume/poll 之后。一旦协程开始执行并暂停,内部的自引用关系就建立起来了。此时如果再移动它,就会导致内存不安全。

所以,Pin 的作用可以更精确地描述为:

Pin 是一个契约,它保证一旦一个值(比如协程状态机)被 Pin 住,它的内存地址将永久固定,直到被销毁。这使得在它被首次 poll/resume 后,其内部的自引用关系是安全的。

Future::pollCoroutine::resumeself 参数被设计为 Pin<&mut Self>,就是强制执行了这个契约:

  • 你不能直接调用 my_future.poll()
  • 你必须先把它 Pin 住,然后才能通过 Pin 后的引用去调用 poll
  • 一旦被 Pin,这个 Future 就不能再被移动了。

这个设计巧妙地平衡了灵活性(创建时可移动)和安全性(运行时不可移动),并且使得 pollresume 方法的 API 本身可以是安全的(即方法签名上不需要 unsafe 关键字),因为 Pin 类型已经从类型系统层面提供了安全保证。

这是 Rust 内存安全与零成本抽象哲学的完美体现。 Pin 的存在,就是为了让协程(以及其上的 async/await)这种包含自引用的高级抽象能够安全地运作。

总结:async/await、协程与 Future 的完整关系链

现在,我们可以描绘出这幅现代化、准确的全景图了。

  1. async fn:你编写一个 async 函数。
  2. 编译器转换:编译器接收到 async 代码,将其转换为一个实现了实验性 Coroutine trait 的匿名状态机
  3. .await 的角色:你在代码中写的每一个 .await,在这个状态机内部就对应一个 yield 点。
  4. Pin 是什么? 是一个安全保障,通过“钉住”内存地址,防止 Future 状态机因移动而产生悬垂指针,确保了内存安全。
  5. CoroutineFuture:这是关键的一步!async/await 在稳定版 Rust 中可用,但 Coroutine 仍然是实验性的。这是如何做到的?
    • 编译器生成的协程状态机,会被一个非常小的**适配器(Adapter)**包裹起来。
    • 这个适配器实现了稳定版的 Future trait
    • 适配器的 poll 方法的内部逻辑很简单:就是去调用被包裹的协程的 resume 方法。
      • 如果 resume 返回 CoroutineState::Yielded(...),那么 poll 就返回 Poll::Pending
      • 如果 resume 返回 CoroutineState::Complete(...),那么 poll 就返回 Poll::Ready(...)

另外,协程本身并不是直接面向广大用户的接口。用户真正需要的是完成异步任务。作为开发者,我们只需要和 async/await 以及 Future 打交道。而 Coroutine 则是那个隐藏在幕后、驱动一切的、最核心的引擎。

参考资料:

Released under the MIT License