Skip to content

30.1 异步管道

异步管道是最常用的一种管道类型。它的特点是:发送端和接收端之间存在一个缓冲区,发送端发送数据的时候,是先将这个数据扔到缓冲区,再由接收端自己去取。因此,每次发送,立马就返回了,发送端不用管数据什么时候被接收端处理。

我们先用一个简单示例来演示一下管道的基本用法:


rust
use std::thread;
use std::sync::mpsc::channel;

fn main() {
    let (tx, rx) = channel();
    thread::spawn(move|| {
        for i in 0..10 {
            tx.send(i).unwrap();
        }
    });

    while let Ok(r) = rx.recv() {
        println!("received {}", r);
    }
}

在这个示例中,我们首先创建了一个管道。channel 函数的签名是这样的:


rust
pub fn channel<T>() -> (Sender<T>, Receiver<T>)

它返回了一个 tuple,里面包括一个发送者(Sender)和一个接收者(Receiver)。

接下来我们创建一个子线程,然后将这个发送者 move 进入了子线程中。

子线程中的发送者不断循环调用 send 方法,发送数据。在主线程中,我们使用接收者不断调用 recv 方法接收数据。

我们可以注意到,channel()是一个泛型函数,Sender 和 Receiver 都是泛型类型,且一组发送者和接收者必定是同样的类型参数,因此保证了发送和接收端都是同样的类型。因为 Rust 中的类型推导功能的存在,使我们可以在调用 channel 的时候不指定具体类型参数,而通过后续的方法调用,推导出正确的类型参数。

Sender 和 Receiver 的泛型参数必须满足 T:Send 约束。这个条件是显而易见的:被发送的消息会从一个线程转移到另外一个线程,这个约束是为了满足线程安全。如果用户指定的泛型参数没有满足条件,在编译的时候会发生错误,提醒我们修复 bug。

发送者调用 send 方法,接收者调用 recv 方法,返回类型都是 Result 类型,用于错误处理,因为它们都有可能调用失败。当发送者已经被销毁的时候,接收者调用 recv 则会返回错误;同样,当接收者已经销毁的时候,发送者调用 send 也会返回错误。

在管道的接收端,如果调用 recv 方法的时候还没有数据,它会进入等待状态阻塞当前线程,直到接收到数据才继续往下执行。

管道还可以是多发送端单接收端。做法很简单,只需将发送端 Sender 复制多份即可。复制方式是调用 Sender 类型的 clone()方法。这个库不支持多接收端的设计,因此 Receiver 类型没有 clone()方法。在上例的基础上我们稍做改动,创建多个线程,每个线程发送一个数据到接收端。代码如下:


rust
use std::thread;
use std::sync::mpsc::channel;

fn main() {
    let (tx, rx) = channel();

    for i in 0..10 {
        let tx = tx.clone();  // 复制一个新的 tx,将这个复制的变量 move 进入子线程
        thread::spawn(move|| {
            tx.send(i).unwrap();
        });
    }
    drop(tx);

    while let Ok(r) = rx.recv() {
        println!("received {}", r);
    }
}

以上代码编译执行,可以发现它打印的结果与前面的例子不同了。在前面示例中,这些数字呈顺序排列,因为发送端是按顺序发送的,接收端会保持同样的顺序。但在这个示例中,这些数字呈乱序排列,因为它们来自不同的线程,哪个先执行哪个后执行并不是确定的,取决于操作系统的调度。

目前我们用的这个管道是“异步”的,标准库还提供了另外一种“同步”管道供我们使用。同步管道和异步管道在接收端是一样的逻辑,区别在于发送端。

Released under the MIT License