--- 摄于 2017 年 9 月 藏川线前段
上一讲,我们以极其精简的方式回顾了异步系统在 Rust 生态里面的发展,以及 Future 是怎么最后被定义成这样的。接下来,我们从简单到复杂的一些例子去简单了解 Future 是怎么工作的。了解了它是如何工作的之后,我们就能理解甚至手动实现自己的 Future 和 Runtime。
Rust 中的 Future 与其他语言不太一样,它是 Lazy 的,换言之,如果没有 Runtime 驱动,它的构建仅仅只是创建了一个结构体,并不会做任何多余的事情。首先通过感性认识来了解,runtime 到底对 Future 做了些啥。
struct Hello;
impl Future for Hello {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
println!("hello");
Poll::Ready(())
}
}
fn main() {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(Hello{});
}
output:
hello
这个一个正常能跑的 Future,但没啥意义,它的异步不存在任何可以被暂停的点,单纯只是输出一个日志而已。
将 poll 的实现改成:
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
println!("hello");
cx.waker().wake_by_ref();
Poll::Ready(())
}
并不会出现两次 “hello” 输出,说明该 Future 在 Ready 后就被 Drop 掉了。
将 poll 的实现改成:
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
println!("hello");
Poll::Pending(())
}
这个 Future 将在输出一次 “hello” 之后卡死,没有任何后续输出。
再将 poll 的实现改成:
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
println!("hello");
cx.waker().wake_by_ref();
Poll::Pending(())
}
这个 Future 将会被一直执行,类似于 loop {},区别在于,loop 将会死循环,无法让出任何执行间隙,而这个 Future 由于每次都需要返回 Pending 状态,它存在调用间隙,能够主动让出执行权。
虽然上面两种都是不好的实现,但让我们知道了一些东西:
很多同学一定会将关注点放在 context 上面,因为从最简单的 Future 程序来看,其他的东西并不重要,都是正常执行的代码而已,关键是 Waker 是什么。
Waker 是由 Runtime 自行实现的东西,它里面是一个虚表,完全的一点都没有类型系统,全是指针:
pub const fn new(
clone: unsafe fn(*const ()) -> RawWaker,
wake: unsafe fn(*const ()),
wake_by_ref: unsafe fn(*const ()),
drop: unsafe fn(*const ())
) -> RawWakerVTable;
pub const fn new(data: *const (), vtable: &'static RawWakerVTable) -> RawWaker;
pub unsafe fn from_raw(waker: RawWaker) -> Waker
这一堆乱七八糟的东西是啥,它其实是 Future 系统对 Waker 构建的约束,Waker 必须能够实现:
这几个构建 vtable 的函数指针实际上就是 Waker 的功能需求,然后再将它封装成 Waker。这些东西实际上不需要用户掌握,但是,如果能够理解它们,对用户来说也有好处——能够相对精准地理解和控制 Future 是怎么工作的。
在上一讲,我们提到了 Runtime 的一般架构:Executor + React(Driver),那么所谓的 wake 可以在这样的架构上怎么实现呢?这里提供一个最简单的例子。
static QUEUE: Queue
fn wake(task: impl Future + Send + 'static) {
QUEUE.push(task)
}
// executor
fn exec() {
while let Some(task) = QUEUE.pop() {
// ... do something
task.poll(....);
// .. do something
}
}
提到唤醒任务,我们很自然就会想到,直接将 Future 塞进执行队列,然后让 Executor 去执行,这就是一种最简单的 wake 实现方式。当然,真正的 Production-Ready Runtime 策略不会这么简单,有些优化是必须要做的,比如将任务队列分级用来避免激烈竞争;多队列间的 work-steal 调度;Future 上需要挂上状态避免重复提交等等问题。
这一讲,大致上讲了些 Future 运行时 Waker 是什么的,以及它在 Runtime 里面可以是怎样的实现。当理解了这些,也就能大体上理解这么多异步 Runtime 到底在做什么,或者说提供了些什么。
下一讲,说说最简单的 channel 实现。
请登录后评论
评论区
加载更多