< 返回版块

yfaming 发表于 2019-11-25 22:56

Tags:async

不要自作主张地返回 Poll::Pending。

只有在你调用的方法返回了 Poll::Pending 时,才能够返回 Poll::Pending。 不然,也许程序会通过编译,但运行结果不会是你想要的。如果你的 Future 只是对现有 Future 的简单包装,一般不会犯这种错误,但情况复杂了,可能会忘。

我们日常使用的 Future,如 TcpStream/channel/timer 之类,其实是与底层的 reactor 相关联的,它们在返回 Poll::Pending 的时候,同时会在 reactor 注册事件,所以才会收到通知,最终 wakeup 相应的 Future。我们自己随便返回 Poll::Pending 的时候,显然不会收到通知的。

可能是由于 Rust 选择的抽象方式的原因,我们一般只关注 Future 本身,忽略与 reactor 的联系。其他语言的话,貌似 reactor 或者叫 event loop 才是最核心的概念。

async/await 是高层 primitive,而 Poll 是低层 primitive。高层使用低层,很方便,反过来,比较麻烦,得避免。

有个感觉,async/await 普及开来的话,Poll 只有较底层的库才关心,日常只会与 async/await 打交道。 (另外,我们就算与 Poll 打交道时,其实几乎也不关心 Context 或者 Waker,感觉它们更底层。)

await 与 tokio::sync::Mutex 一起使用,需要注意死锁问题。

从某种意义上说 await 也是有「锁」的语义,至少都需要「等待」。只是 Mutex 等待别人 unlock,而 await 在等待事件发生。(tokio::sync::Mutext 的 lock() 方法返回的就是 Future,需要你 await 的。)

举个例子,lock 之后,然后 await。如果被 await 的 Future,也需要 lock,然后才能返回 Poll::Ready。这时实际上就死锁了,两个 Future 会无限等待下去。

所以呢,不要认为,我反正只用了一把锁,所以死锁是不可能的。(推广一下,即使不用锁,但相互 await 的关系搞乱了,同样可能死锁的)。

有个感觉,在持有锁时去 await 可能就是代码有问题的一个标志,bad smell。即使不死锁,await 的事件常常是 IO 之类,耗时很长或不可控,相当于锁的临界区太大了。搞不好即使没有死锁,你的程序也从 concurrent 的变成 stop-and-wait 的了。

「Future 组合」的问题。

这里不仅仅指 Future,还有 Stream/Sink,以及 tokio 提供的 AsyncRead/AsyncWrite 等 trait,它们共同点是,都会返回 Poll。

情况可能是这样,你有一个 struct XXX,它拥有多个 Future/Stream/Sink/AsyncRead/AsyncWrite 对象,你需要把它们组合在一起,并且需要把 struct XXX 实现为一个 Future,然后交给 tokio::spawn() 来运行。 这里一般两种办法,一是手动实现 Future,也就是 impl Future for XXX {} 然后实现 poll 方法;另一种是为 struct XXX 定义一个 async fn foo(),然后 tokio::spawn(xxx.foo())。

对于手动 impl Future for XXX {}。相当于选择了底层 primitive 这条路。由于 Future::poll 不是 async 方法,当然你不能使用 await,只能用 poll/poll_next/poll_ready/start_send/poll_flush/poll_close/poll_read/poll_write/poll_shutdown 这些方法了,它们是 Future/Stream/Sink/AsyncRead/AsyncWrite 的方法。 这里需要注意的点,除了上面提到的「永远不要自作主张去返回 Poll::Pending」外,还需要知道,一个 Future 在 poll() 返回 Poll::Pending 后,以后如果对应事件发生,会再次 wakeup 这个 Future,从而再次调用 poll() 方法。需要注意,你的 poll() 方法用到的局部变量,可能需要存下来,作为你的 struct XXX 的字段。打个比方,你的 struc XXX 需要实现的是类 proxy 的功能,它从一个 Stream 取数据,然后发到另一个 Sink。首先要调用 Stream::poll_next() 取数据,取到之后,Sink 的发送是分三步的,依次是 poll_ready/start_send/poll_flush,其中 poll_ready 和 poll_flush 返回的类型是 Poll。取完数据,然后调用 poll_ready,只有返回 Poll::Ready,才能继续前进。如果返回 Poll::Pending,当然不能直接返回,只能先把取到的数据存下来,基本上意味着得给 struct XXX 加个 Option 字段,调用 poll() 时,需要先检查 Option 是不是 Some(T)。反正呢,手动 impl Future,得把 poll() 方法写成无状态的,局部变量需要跨越多次 poll() 调用而存活的话,得存在外部(比如作为 struct 的字段)。这一点跟 web 应用挺像的,应用是无状态的,所有信息都在外部的存储里面。另外,为什么 async/await 为什么很受期待,因为编译器会把这些状态自动给你放到生成的 Generator struct 里面,写代码时直接用局部变量就行,不需要给 struct 定义这些额外的字段。async/await 另一好处,如果你自己实现的 poll() 比较复杂,调用多个 poll()/poll_xxx() 系函数的话,因为有多个可能返回 Poll::Pending 的地方,当等待的事件发生,那些地方从 Pending 变成 Ready 时,你的 poll() 函数会被调用多次,实际上可能会有重复的代码执行。比如,如果你的 poll() 要调用 Stream 的 poll_ready/start_send/poll_flush 方法,实现的时候你可能直接就依次调用了,那么第一次进入你的 poll() 时, poll_ready() 会返回 Poll::Pending,第二次进入你的 poll() 时,poll_flush() 会返回 Poll::Pending(注意 start_send() 返回的是 Result 不是 Poll)。但这样的话,其实 poll_ready() 其实被额外调用了一次。当然你可以自己定义一些状态来避免,但代码就没那么直观了。但是呢,如果用 await,编译器貌似生成代码的时候,就会根据 await 点生成多个状态,每次检查状态来决定怎么调用,不会造成额外的函数调用的,这也体现了 zero-cost abstraction 了。

上面还提到使用底层 primitive(poll) 时,避免使用高层 primitive(async/await)。但也许被 rustc 虐得不行了,难保会有些诡异的想法,又或者 struct XXX 刚好有个字段,它有个你很想用的方法,只是它是 async 的。毕竟, async func() -> T 函数实际的类型,其实是 impl Future<Output=T> 的不知名类型。你这么来一下, let a = func(); a 就是一个 Future,你就可以调用 poll() 了。这样又能回到手动 impl Future 这条路了。这看起来挺不错,不过这里的局部变量 a 作为一个临时状态,其实是需要作为 struct XXX 的字段存起来的,痛苦的点在于给字段声明什么类型。具体类型是不知道的,只能是 Box<dyn Future<Output=xxx>>,但接下来类型系统或者 borrow checker 就会发飙,也许会报错说不满足 'static, Send 或者 UnPin,又或生命周期有问题,或者借用有问题,或者 Pin 抱住你腿不让走,各种乱七八糟的都出来了,直面 Rust 黑暗内幕挺痛苦的。。。

对于为 struct XXX 定义一个 async fn foo(),然后 tokio::spawn(xxx.foo()) 这种情况。如果在 async fn foo() 里面调用其他 async 函数时都是顺序的,那一切都挺简单,反正就 await 就可以了。而如果要协调 async 函数之间的行为,可能会遇到问题。比如,使用 select! 宏的场景,你的 struct 需要处理两个数据源,一个是 channel,一个是 TcpStream,谁收到数据就处理谁,不能一个阻塞另一个。可能一开始本能就写了两个 async 方法,handle_channel 和 handle_stream,然后放到 select! 处理,结果就杯具了,因为这两个方法都需要 &mut self,rustc 编译不过。我一开始的解法是,把这两个方法都改成 struct 的静态方法(不使用任何 self 系参数),以前需要用到的字段,都改为参数传进去;需要共享的字段,就传 Arc<Mutex>,总算编译通过了。结果一运行就发现不对,排查下来,原因就是上面的 await + Mutex 导致的死锁,真是欲哭无泪。折腾很久,最后的办法是,完全不使用 struct 直接三个 async 函数,最外层的 async 充当了 struct 的作用,它调的两个 async 函数职责变化不大。当然不这么做,而是仍然在原来基础上改其实也是可以的。但最关键的点在于,需要把 TcpStream,split 成 ReadHalf 和 WriteHalf,这样你的借用会简单很多。前面死锁,就是因为没有 split,而是直接 Arc<Mutex> 传过去,才引发的。有个感觉,实际中可能经常需要把 TcpStream split() 成 ReadHalf/WriteHalf,这样的话用起来其实和 channel 有点像。拆分之后能简化借用关系,但 ReadHalf/WriteHalf 持有的是 TcpStream 的引用,也许生命周期方面会有些问题,但还没遇到过。不过这里提到的问题可能与 async/await 关系不大,主要还是怎样与 borrow checker 和谐相处的问题。&self 和 &mut self 是粒度比较粗的借用。有时候方法里面只用到了对象的几个字段而已,但 &self 和 &mut self 却借走了整个对象,算是「借用扩大化」,多个方法一起被调用时,就会有干扰。相反如果极端一点,完全不用 struct,struct 的字段在代码里以单独的变量存在,那么就可以「用多少借多少」,最大程度上避免了「借用冲突」,我的解法算是这种极端的一种尝试。应该说完全不用 struct 也是做不到的。但什么时候分,什么时候合,这个还挺值得注意的。Rust 的 struct 不像其他语言的 class,不能随便啥都往里面扔,否则会被 rustc 教做人。设计 struct 的最佳实践是什么,还没看到好的总结,不过呢提前知道坑可能在哪里,等你掉进去时就不会那么震惊了。知乎有篇文章讲到了这个问题,感觉讲得挺好的: https://zhuanlan.zhihu.com/p/26393679

评论区

写评论
AlephAlpha 2020-01-22 10:45

fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> 注意这个 Pin

对以下内容的回复:

Qingluan 2019-12-15 23:56

最近遇到一个问题, 再 tokio 0.2.4 中 用 Timeout 这个module在文档中显示已经实现 Future, 但是我现在需要在另外一个 poll 中调用 不管是 poll(cx) / poll() 都显示找不到方法 "no method named poll found for type tokio::time::Timeout<XX> in the current scope"

作者 yfaming 2019-11-26 11:58

这个地方说的不太清楚,具体是这样: future_a 调用 lock(),然后 await future_b。 但 future_b 也需要调用 lock()。 这样一来,future_b 拿不到锁,无法完成,于是 futre_a 的 await 无法结束,因此不会释放锁,因此 future_b 拿不到锁。。这样就死锁了

对以下内容的回复:

Ryan-Git 2019-11-26 10:41

"lock 之后,然后 await,如果被 await 的 Future,也需要 lock" 这个不是重入锁的场景吗?不能叫死锁吧

Nalleyer 2019-11-26 10:40

好文,顶一下慢慢学习

frostRed 2019-11-26 10:27

写得很好,难得的干货。把文档里面和 reddit 上经常提的几个要注意的点都讲到了。读起来很有代入感。

1 共 6 条评论, 1 页