藏川线前段

--- 摄于 2017 年 9 月 藏川线前段

最近我维护的 yamux 库被提了一个 issue,说是只写不读的时候,yamux stream 会卡死,然后我使用了两个 PR 才解决掉这个问题:第一个 PR 是修复这个 bug;第二个 PR 是修复的时候造成了性能下降,需要修复这个性能下降问题,性能下降的原因是修复导致了数据传输过程中的多次复制,造成了性能损失。让我们来逐个分析问题,这里的修复方法也是非常经典的 Future waker 使用及如何正确实现一个 Future 的教学案例,值得一篇单独的 blog 记录它(当然,代码里也详细描述了修改的逻辑)。

修复 Stream 卡死问题

分析问题

首先分析问题的原因,其实 issue 里有相关的描述:yamux 协议通过 window update 消息来实现读写平衡:当自身的 send window 为 0 时,正确实现的 stream 将无法正常写入,需要等待对端发出一个 window update 消息,更新自己的 send window 值之后,才能唤醒并继续执行写入操作。

而修复前的实现中,window update 消息由 stream 自行处理,但案例是 stream 只写不读的情况下,这样就会导致 window update 消息虽然对端已经发出,本端 session 也已经接收并且转发给了 stream,但只写不读导致了 stream 无法更新 send window,即出现了类似死锁的情况,整个场面都僵住了。

修复方案其实很明显了,核心就是让 send window 能在只写不读的情况下,正常被 window update 消息所更新,这样就解决了。虽然思路很清晰,但修复方案其实考虑了不短的时间。

修复方案

无论是 yamux rust 的另一个实现还是 go 版本的实现,这里都是将 stream 的状态共享给了 session,并且使用了锁的操作,由 session 在接到 window update 消息的同时立刻更新对应 stream 的共享状态,这是很正统的思路,并且实现上没有问题。但是,本人有一点洁癖,以及这个库的思路整体上比较排除同步锁的加入,同时共享状态锁就意味着,即使非极端场景,锁的开销也是不可避免的,它将变成常驻开销。如果有其他方案,这个共享状态是不愿意被引入的,至少我不愿意。我也考虑过 AtomicPtr 的方案,但一样有个问题,内存同步的开销也将变成常态化,甚至比锁的方案会更加复杂,不是个很好的选择。

这个 yamux 库实现的整体思路是,用 channel 隔离 session 和 stream 的状态,让它们各自维护自己的状态,并正常运行,所有的消息用 message(event) 的方式通过 channel 进行通知(我知道 channel 里面有同步锁,但不关键)。而要让它实现只写不读的情况下进行正常调度,就必须让读事件在 `poll_write 中得到体现,否则就只能回退到共享状态,而且读操作的错误不应该被写操作返回,除非是协议行为出错。

于是通过该思路提了第一个 PR,并在 PR 给问题提出者一些解答。但因为在 poll_write 中执行了 poll_read 的内部方法,它将 write 的 waker 注册到了 read 的 receiver 中,会导致如果真的读到了数据,reader 方无法正常被 waker 唤醒,因为这时候唤醒的是 writer 方,这时候就需要做一个代理,将扭曲的 waker 对象重新改回来。具体的代码是:

match Pin::new(&mut self.frame_receiver).as_mut().poll_next(cx) {
    Poll::Ready(Some(frame)) => {
        self.handle_frame(frame)?;
        should_wake_reader &= true;
    }
    Poll::Ready(None) => {
        self.state = StreamState::RemoteClosing;
        return Err(Error::SubStreamRemoteClosing);
    }
    Poll::Pending => break,
}

// poll by write and read something, then wake read
if should_wake_reader && self.read_buf.len() != buf_len {
    if let Some(waker) = self.readable_wake.take() {
        waker.wake();
    }
}

因为这种写法会有歧义(主要是理解上有一点困难),后续第二个 PR 中,将它改成了这样:

fn recv_frames_wake(&mut self, cx: &mut Context) -> Result<(), Error> {
    let buf_len = self.read_buf.len();
    let state = self.state;
    match self.recv_frames(cx) {
        Ok(should_wake_read) => {
            // if state change to RemoteClosing, wake read
            // if read buf len change, wake read
            if (self.state == StreamState::RemoteClosing && state != StreamState::RemoteClosing)
                || (should_wake_read && buf_len != self.read_buf.len())
            {
                if let Some(waker) = self.readable_wake.take() {
                    waker.wake();
                }
            }

            Ok(())
        }
        Err(e) => {
            // if state change to RemoteClosing, wake read
            if self.state == StreamState::RemoteClosing && state != StreamState::RemoteClosing {
                if let Some(waker) = self.readable_wake.take() {
                    waker.wake();
                }
            }

            Err(e)
        }
    }
}

核心是 writer 端调用的时候发现这时候执行了真正的 reader 端操作,就需要尝试 wake reader 端,如果不这么做,可能会导致 reader 端失去 wake 并无法被唤醒。

修复性能问题

在修复 bug 的 PR 上,简单直接地使用了原有的结构,导致数据在 yamux 层被复制了两遍,可以看一下代码:

pub struct StreamHandle {
	...
    read_buf: BytesMut,
    ...
}

impl StreamHandle {
    fn handle_data(&mut self, frame: Frame) -> Result<(), Error> {
        self.process_flags(frame.flags())?;
        let length = frame.length();
        if length > self.recv_window {
            return Err(Error::RecvWindowExceeded);
        }

        let (_, body) = frame.into_parts();
        if let Some(data) = body {
            self.read_buf.extend_from_slice(&data);
        }
        self.recv_window -= length;
        Ok(())
    }
}

重点是这个 extend_from_sliceread_buf: BytesMut 结构,它执行的是全数据复制。而修复之前,这里是 stream 只要 read_buf 存在数据,就不会尝试读取 session 的消息,代码相应是 self.read_buf = data,也就不存在这个问题。

修复性能的方案很简单,读到多个数据是不可避免的,这里的结构必须支持多个 BytesMut 同时存在的情况,即直接将 read_buf 的类型改成 Vec<BytesMut>,多个数据相比于复制两遍,变成了 push/iter/drain 的操作,显然便宜很多,代码变成了:

pub struct StreamHandle {
	...
    read_buf: Vec<BytesMut>,
    ...
}

impl StreamHandle {
    fn handle_data(&mut self, frame: Frame) -> Result<(), Error> {
        self.process_flags(frame.flags())?;
        let length = frame.length();
        if length > self.recv_window {
            return Err(Error::RecvWindowExceeded);
        }

        let (_, body) = frame.into_parts();
        if let Some(data) = body {
            // yamux allows empty data frame
            // but here we just drop it
            if length > 0 {
                self.read_buf.push(data);
            }
        }
        self.recv_window -= length;
        Ok(())
    }
}

相应的,真正读的操作,也会变得更复杂:

let mut offset = None;
let mut total_read = 0;
for (index, read_buf) in self.read_buf.iter_mut().enumerate() {
    let n = buf.remaining().min(read_buf.len());
    if n == 0 {
        break;
    }
    buf.put_slice(&read_buf.split_to(n));
    if read_buf.is_empty() {
        offset = Some(index);
    }
    total_read += n;
}
if let Some(offset) = offset {
    self.read_buf.drain(..=offset);
    // drain does not shrink the capacity, if the capacity is too large, shrink it
    if self.read_buf.capacity() > 24
        && self.read_buf.capacity() / (self.read_buf.len() + 1) > 4
    {
        self.read_buf.shrink_to_fit();
    }
}

最后

在整个修复方案上,是一个很经典的 Future waker 使用案例,希望能帮助大家理解 Future 的工作原理,以及认识到正确实现一个 Future 需要注意的地方。

评论区

加载更多

登录后评论