--- 摄于 2017 年 9 月 藏川线前段
上一讲的代码都相对比较简单,这一讲将会有极大的突破,代码会复杂很多,也算进入了 Future 最关键的地方——如何正确手动实现自己想要的 Future。
我们得从调度单元开始讲起,更准确地说,异步的基本单元是 Future 这个 task,而同步,它的基本单元是线程,这又回到了第二讲时候提出的绿色线程的概念。channel 它在同步环境中是用来跨线程通信用的,而在异步环境中,它是用来跨 Future(task) 通信用的,合并起来,channel 是跨调度单元的通信工具(同理其实可以推理到 lock/semaphore)。
接着,channel 和 queue 的核心区别:是否存在唤醒(调度单元的)机制。channel 实际上是由 queue + 唤醒机制组合而成的。
在同步环境中,唤醒机制是
在异步环境中,唤醒机制是:
不能混用的原因也很简单,因为如果同步 channel 在异步环境中将线程 block 住了,就意味着这个线程的异步执行器被卡死了,这样的操作会导致异步环境陷入异常状态,我们之前说过,异步需要让线程一直工作,并频繁将执行到 pending 状态的 task(Future) 切换到后台继续执行可以执行的 task(Future)。
我们直接跳过了 oneshot channel,因为它是一个一次性操作,在塞数据的时候,不需要考虑如果塞不进去怎么办的问题,它可以简单认为塞不进去就是 channel 被关闭了。
ok,我们现在开始考虑怎么构建一个异步版本的 mpsc channel,需要一些什么东西:
先从简单的 unbound channel 开始,unbound 意味着 sender 端不需要考虑数据满的问题,也就省略了唤醒 sender 因无法塞数据而导致的 pending 状态。
struct Inner<T> {
// High level marker channel status
// The lower bit expresses the number of items in the Queue
state: AtomicUsize,
message_queue: Queue<T>,
recv_task: AtomicWaker,
}
发送的时候:
fn send(&self, msg: T) -> Result {
let mut curr = self.state.load(SeqCst)
let state = decode(curr);
loop {
if state.is_open {
return Err
}
state.num + 1
let next = encode(state);
// CAS
match self.state.compare_exchange(curr, next, SeqCst, SeqCst) {
Ok(_) => break,
Err(actual) => curr = actual,
}
}
self.queue.push(msg);
self.recv_task.wake();
Ok(())
}
接收的时候:
fn next_message(&self) -> Poll<Option<T>> {
match self.queue.pop() {
Some(msg) => {
// same as sender add num with cas
self.dec_num_on_state();
Poll::Ready(Some(msg))
}
None => {
if self.state.is_open {
Poll::Read(None)
} else {
Poll::Pending
}
}
}
}
impl<T> Stream for Inner<T> {
type Item = T;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
// Try to read a message off of the message queue.
match self.next_message() {
Poll::Ready(msg) => {
Poll::Ready(msg)
}
Poll::Pending => {
// There are no messages to read, in this case, park.
self.recv_task.register(cx.waker());
// Check queue again after parking to prevent race condition:
// a message could be added to the queue after previous `next_message`
// before `register` call.
self.next_message()
}
}
}
}
这里引入了几个东西:
在上述代码中,我们可以看到:
这就是最基本的自唤醒式 Future 了,在不考虑清理的难度下,上述结构就可以正常工作了,但 mpsc channel 需要有正常的 drop 过程,还需要记录一下 sender 的数量,这就是另外的话题了,略过不表。
相对与 unbound channel 来说,bound channel 更复杂:
相应的,代码需要做一些调整:
struct SenderTask {
task: Option<Waker>,
is_parked: bool,
}
struct Inner<T> {
buffer: usize,
state: AtomicUsize,
message_queue: Queue<T>,
parked_queue: Queue<Arc<Mutex<SenderTask>>>,
recv_task: AtomicWaker,
}
struct Sender<T> {
inner: Arc<Inner<T>>,
sender_task: Arc<Mutex<SenderTask>>,
maybe_parked: AtomicBool,
}
发送的时候:
fn send(&self, msg: T) -> Result {
let num = self.inc_num_messages();
let park_self= num > self.inner.buffer;
if park_self {
self.park()
}
self.inner.queue.push(msg);
self.inner.recv_task.wake();
Ok(())
}
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), SendError>> {
let state = decode_state(self.inner.state.load(SeqCst));
if !state.is_open {
return Poll::Ready(Err(Disconnect));
}
self.poll_unparked(Some(cx)).map(Ok)
}
fn park(&self) {
{
let mut sender = self.sender_task.lock();
sender.task = None;
sender.is_parked = true;
}
// Send handle over queue
let t = self.sender_task.clone();
self.inner.parked_queue.push(t);
// Check to make sure we weren't closed after we sent our task on the
// queue
let state = decode_state(self.inner.state.load(SeqCst));
self.maybe_parked.store(state.is_open, Relaxed);
}
fn poll_unparked(&self, cx: Option<&mut Context<'_>>) -> Poll<()> {
// First check the `maybe_parked` variable. This avoids acquiring the
// lock in most cases
if self.maybe_parked.load(Relaxed) {
// Get a lock on the task handle
let mut task = self.sender_task.lock();
if !task.is_parked {
self.maybe_parked.store(false, Relaxed);
return Poll::Ready(());
}
// At this point, an unpark request is pending, so there will be an
// unpark sometime in the future. We just need to make sure that
// the correct task will be notified.
//
// Update the task in case the `Sender` has been moved to another
// task
task.task = cx.map(|cx| cx.waker().clone());
Poll::Pending
} else {
Poll::Ready(())
}
}
接收的时候:
fn unpark_one(&mut self) {
if let Some(task) = self.parked_queue.pop() {
let task = task.lock();
task.is_parked = false;
if let Some(task) = task.task.take() {
task.wake();
}
}
}
fn next_message(&self) -> Poll<Option<T>> {
match self.queue.pop() {
Some(msg) => {
// same as sender add num with cas
self.dec_num_on_state();
self.park_one();
Poll::Ready(Some(msg))
}
None => {
if self.state.is_open {
Poll::Read(None)
} else {
Poll::Pending
}
}
}
}
我们从上面的代码可以看到,最大的改动点在于,多了一个 Queue,用来保存 SenderTask
这个结构,并且每个 Sender 对应的 SenderTask
都是独立的。在使用 Sender 的时候,要求先调用 poll_ready
接口,只有这样,在 Pending 状态时,Sender 端对应的 Waker 才能被正确注册,Receiver 端执行时,才能在 SenderTask
中找到 Waker 进行 wake
操作。这个对唤醒的顺序也是先进先出的公平性原则,避免多个 Sender 端都进入 Pending 时饿死了某一个 Sender。
发现只要有代码,就不想多写内容了,因为代码可见。感觉文章还是不能存在太多代码,但没有代码又无法直观感受到细微处的变化,因为很有可能就是一个顺序的问题导致实现上存在问题,非常纠结。
请登录后评论
评论区
加载更多