藏川线前段

--- 摄于 2017 年 9 月 藏川线前段

异步(五)

上一讲的代码都相对比较简单,这一讲将会有极大的突破,代码会复杂很多,也算进入了 Future 最关键的地方——如何正确手动实现自己想要的 Future。

channel 是什么

我们得从调度单元开始讲起,更准确地说,异步的基本单元是 Future 这个 task,而同步,它的基本单元是线程,这又回到了第二讲时候提出的绿色线程的概念。channel 它在同步环境中是用来跨线程通信用的,而在异步环境中,它是用来跨 Future(task) 通信用的,合并起来,channel 是跨调度单元的通信工具(同理其实可以推理到 lock/semaphore)。

接着,channel 和 queue 的核心区别:是否存在唤醒(调度单元的)机制。channel 实际上是由 queue + 唤醒机制组合而成的。

在同步环境中,唤醒机制是

在异步环境中,唤醒机制是:

不能混用的原因也很简单,因为如果同步 channel 在异步环境中将线程 block 住了,就意味着这个线程的异步执行器被卡死了,这样的操作会导致异步环境陷入异常状态,我们之前说过,异步需要让线程一直工作,并频繁将执行到 pending 状态的 task(Future) 切换到后台继续执行可以执行的 task(Future)。

mpsc channel

我们直接跳过了 oneshot channel,因为它是一个一次性操作,在塞数据的时候,不需要考虑如果塞不进去怎么办的问题,它可以简单认为塞不进去就是 channel 被关闭了。

ok,我们现在开始考虑怎么构建一个异步版本的 mpsc channel,需要一些什么东西:

unbound

先从简单的 unbound channel 开始,unbound 意味着 sender 端不需要考虑数据满的问题,也就省略了唤醒 sender 因无法塞数据而导致的 pending 状态。

struct Inner<T> {
    // High level marker channel status
	// The lower bit expresses the number of items in the Queue
    state: AtomicUsize,
    message_queue: Queue<T>,
    recv_task: AtomicWaker,
}

发送的时候:

fn send(&self, msg: T) -> Result {
    let mut curr = self.state.load(SeqCst)
    let state = decode(curr);
    loop {
        if state.is_open {
            return Err
        }

        state.num + 1
        let next = encode(state);
        // CAS
        match self.state.compare_exchange(curr, next, SeqCst, SeqCst) {
            Ok(_) => break,
            Err(actual) => curr = actual,
        }
    }
    self.queue.push(msg);
    self.recv_task.wake();
    Ok(())
}

接收的时候:

fn next_message(&self) -> Poll<Option<T>> {
    match self.queue.pop() {
        Some(msg) => {
            // same as sender add num with cas
            self.dec_num_on_state();
            Poll::Ready(Some(msg))
        }
        None => {
            if self.state.is_open {
                Poll::Read(None)
            } else {
                Poll::Pending
            }
        }
    }
}

impl<T> Stream for Inner<T> {
    type Item = T;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        // Try to read a message off of the message queue.
        match self.next_message() {
            Poll::Ready(msg) => {
                Poll::Ready(msg)
            }
            Poll::Pending => {
                // There are no messages to read, in this case, park.
                self.recv_task.register(cx.waker());
                // Check queue again after parking to prevent race condition:
                // a message could be added to the queue after previous `next_message`
                // before `register` call.
                self.next_message()
            }
        }
    }
}

这里引入了几个东西:

在上述代码中,我们可以看到:

这就是最基本的自唤醒式 Future 了,在不考虑清理的难度下,上述结构就可以正常工作了,但 mpsc channel 需要有正常的 drop 过程,还需要记录一下 sender 的数量,这就是另外的话题了,略过不表。

bound

相对与 unbound channel 来说,bound channel 更复杂:

相应的,代码需要做一些调整:

struct SenderTask {
    task: Option<Waker>,
    is_parked: bool,
}

struct Inner<T> {
    buffer: usize,
    state: AtomicUsize,
    message_queue: Queue<T>,
    parked_queue: Queue<Arc<Mutex<SenderTask>>>,
    recv_task: AtomicWaker,
}

struct Sender<T> {
    inner: Arc<Inner<T>>,
    sender_task: Arc<Mutex<SenderTask>>,
    maybe_parked: AtomicBool,
}

发送的时候:

fn send(&self, msg: T) -> Result {
	let num = self.inc_num_messages();
    let park_self= num > self.inner.buffer;
    
    if park_self {
        self.park()
    }
    self.inner.queue.push(msg);
    self.inner.recv_task.wake();
    Ok(())
}

fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), SendError>> {
    let state = decode_state(self.inner.state.load(SeqCst));
    if !state.is_open {
        return Poll::Ready(Err(Disconnect));
    }

    self.poll_unparked(Some(cx)).map(Ok)
}

fn park(&self) {
    {
        let mut sender = self.sender_task.lock();
        sender.task = None;
        sender.is_parked = true;
    }

    // Send handle over queue
    let t = self.sender_task.clone();
    self.inner.parked_queue.push(t);

    // Check to make sure we weren't closed after we sent our task on the
    // queue
    let state = decode_state(self.inner.state.load(SeqCst));
    self.maybe_parked.store(state.is_open, Relaxed);
}

fn poll_unparked(&self, cx: Option<&mut Context<'_>>) -> Poll<()> {
    // First check the `maybe_parked` variable. This avoids acquiring the
    // lock in most cases
    if self.maybe_parked.load(Relaxed) {
        // Get a lock on the task handle
        let mut task = self.sender_task.lock();

        if !task.is_parked {
            self.maybe_parked.store(false, Relaxed);
            return Poll::Ready(());
        }

        // At this point, an unpark request is pending, so there will be an
        // unpark sometime in the future. We just need to make sure that
        // the correct task will be notified.
        //
        // Update the task in case the `Sender` has been moved to another
        // task
        task.task = cx.map(|cx| cx.waker().clone());

        Poll::Pending
    } else {
        Poll::Ready(())
    }
}

接收的时候:

fn unpark_one(&mut self) {
    if let Some(task) = self.parked_queue.pop() {
        let task = task.lock();
        task.is_parked = false;

        if let Some(task) = task.task.take() {
            task.wake();
        }
    }
}

fn next_message(&self) -> Poll<Option<T>> {
    match self.queue.pop() {
        Some(msg) => {
            // same as sender add num with cas
            self.dec_num_on_state();
            self.park_one();
            Poll::Ready(Some(msg))
        }
        None => {
            if self.state.is_open {
                Poll::Read(None)
            } else {
                Poll::Pending
            }
        }
    }
}

我们从上面的代码可以看到,最大的改动点在于,多了一个 Queue,用来保存 SenderTask 这个结构,并且每个 Sender 对应的 SenderTask 都是独立的。在使用 Sender 的时候,要求先调用 poll_ready 接口,只有这样,在 Pending 状态时,Sender 端对应的 Waker 才能被正确注册,Receiver 端执行时,才能在 SenderTask 中找到 Waker 进行 wake 操作。这个对唤醒的顺序也是先进先出的公平性原则,避免多个 Sender 端都进入 Pending 时饿死了某一个 Sender。

最后

发现只要有代码,就不想多写内容了,因为代码可见。感觉文章还是不能存在太多代码,但没有代码又无法直观感受到细微处的变化,因为很有可能就是一个顺序的问题导致实现上存在问题,非常纠结。

评论区

加载更多

登录后评论