藏川线前段

--- 摄于 2017 年 9 月 藏川线前段

升级网络过程中遇到的 bug

ckb hardfork 已经过去将近两个月了,现在是时候将之前耽搁的网络库升级工作继续执行了,但,没想到的是,这次升级过程中碰到了一些 bug,导致了几天的 debug 工作,也因为这个,重新梳理了一下之前的代码,async 写法与 poll 实现的一些区别。

遇到问题

这次升级有个很麻烦的地方,需要将所有相关内容全部改完,可编译之后,才能开始验证修改是否符合预期。这次的修改内容也很麻烦,最大的变动是 引入 async trait 的宏实现,用户端的所有代码都需要进行不小的变动,虽然我相对比较熟悉,但依然难免会碰到一些问题,而且由于 poll 与 async fn 写法和语义上的一些差别,可能会导致行为出现些许变化。

这次遇到问题的地方是,在初始可编译之后,启动 ckb 的测试,发现 network 模块 的单元测试无法通过,经过一番测试,最后得出以下现象:

那么这些测试到底有什么联系,导致一起执行的时候,出现问题呢?

static RT: once_cell::sync::OnceCell<tokio::runtime::Runtime> =
once_cell::sync::OnceCell::new();

let rt = RT.get_or_init(|| {
    let num_threads = ::std::cmp::max(num_cpus::get(), 4);
    tokio::runtime::Builder::new_multi_thread()
    .worker_threads(num_threads)
    .enable_all()
    .build()
    .unwrap()
});

这些测试共用了一个 runtime,在运行时,所有网络启动都在其中。升级之前,这样使用没问题,升级之后就出问题了,暂且不论原因,将这段代码改掉,变成每个测试使用自己的 runtime,得出结论:

开始深入 debug

在找到最容易复现问题的测试之后,替换 tentacle 库到本地代码库,加入一些日志,看看效果。

首先说明,test_bootnode_mode_inbound_eviction 这个测试很简单,就是启动了 6 个网络,然后依次连上,判断最后的连接数和逐出规则是否一致,执行错误的 panic 信息只有类似 node session number is {}, not {} 这样的期待的连接数是什么,当前的连接数是什么。

在 tentacle 中加入日志后发现:在没有通过的测试中,tentacle 接到的 dail 命令只有 3-4 次,而测试中明确使用了 5 次 dail 命令。这个现象有几种可能性:

priority channel

priority channel 是一个非常简单的实现,相较于普通的 channel,它在最底层多了一个 queue 用来实现优先队列的效果,发送的时候指定 flag 就可以让消息分别入不同的 queue,Receiver 端的 核心代码 是实现优先队列的优先出列,其他与正常的 channel 一致,包括 poll 的过程中,pending 时插入 waker 并再次调用出队的处理。

我花了不少时间去确认 channel 实现的问题,也写了不少测试,再加上升级前后,channel 相关的代码并没有做修改,初步确认,这里是没有问题的。

select 宏

从 poll 改到 async fn,在命令接收处,有一些微妙的改动,主要在 select 宏,它的作用是并发 await 多个 future,当其中多个 future 同时被 wake 时,随机执行其中一个,然后跳出,展开大致如下:

loop {
    let output = {
        let futures = (...);
        poll_fn(|cx| {
            let start = random();
            let mut is_pending = false;
            for i in 0..branch {
                match branch {
                    0 => {
                        match futures[0].poll {
                            Poll::ready(output) => return output
                            Poll::pending => {
                                is_pending = true
                                continue
                            }
                        }
                    }
                    1 => {
                        match futures[0].poll {
                            Poll::ready(output) => return output
                            Poll::pending => {
                                is_pending = true
                                continue
                            }
                        }
                    }
                    ...
                }
            }
            if is_pending {
                Pending
            } else {
		Ready(output)
            }
        }).await
    };
    match output {
        do user branch or break
    }
}

而原有 poll 实现,区别在于:

有没有可能:由于 channel 未被 poll 导致 waker 未注册,进而无法被唤醒,这是一种猜想,于是,我手动在 async fn 中还原了之前的行为:

loop {
    let output = {
        let output = (None, None);
        let futures = (...);
        poll_fn(|cx| {
            let mut is_pending = false;
            match futures[0].poll {
                Poll::ready(output) => output.0 = Some(output)
                Poll::pending => {
                    is_pending = true
                }
            }

            match futures[0].poll {
                Poll::ready(output) => output.1 = Some(output)
                Poll::pending => {
                    is_pending &= true
                }
            }     
            if is_pending {
                Pending
            } else {
		Ready(output)
            }
        }).await
    };
    match output {
        do user branch or break
    }
}

经过一番测试,发现这样的修改并没有什么效果,也就是说,select 宏并没有什么问题。

budget 模型

事情逐渐陷入僵局,并没有什么新的线索可以让我定位到问题,现在已知现象:

那么,hang 住的时候,测试进程在做什么呢,有什么办法可以看一下,于是,我使用了 lldb attach,得出以下信息(已裁剪):

frame #9: 0x0000557a19c24f81 ckb_network-04aa2da82c55f2f4`_$LT$core..pin..Pin$LT$P$GT$$u20$as$u20$core..future..future..Future$GT$:
:poll::hb4a7aa45bff2457c(self=Pin<&mut core::pin::Pin<alloc::boxed::Box<(dyn core::future::future::Future<Output=core::option::Option<(
)>> + core::marker::Send), alloc::alloc::Global>>> @ 0x00007fcfb11ed210, cx=0x00007fcfb11ee378) at future.rs:124:9
    frame #10: 0x0000557a19c23f52 ckb_network-04aa2da82c55f2f4`_$LT$$RF$mut$u20$F$u20$as$u20$core..future..future..Future$GT$::poll::h5
64490e2ea91a696(self=Pin<&mut &mut core::pin::Pin<alloc::boxed::Box<(dyn core::future::future::Future<Output=core::option::Option<()>> 
+ core::marker::Send), alloc::alloc::Global>>> @ 0x00007fcfb11ed280, cx=0x00007fcfb11ee378) at future.rs:112:9
    frame #11: 0x0000557a19df3a77 ckb_network-04aa2da82c55f2f4`tentacle::protocol_handle_stream::ServiceProtocolStream$LT$T$GT$::run::_
$u7b$$u7b$closure$u7d$$u7d$::_$u7b$$u7b$closure$u7d$$u7d$::h85acdd3a2cd6f117(cx=0x00007fcfb11ee378) at select.rs:507:49
    frame #12: 0x0000557a19ea2218 ckb_network-04aa2da82c55f2f4`_$LT$tokio..future..poll_fn..PollFn$LT$F$GT$$u20$as$u20$core..future..fu
ture..Future$GT$::poll::h0e40a1f1e60786ef(self=Pin<&mut tokio::future::poll_fn::PollFn<tentacle::protocol_handle_stream::{impl#1}::run:
:{async_fn#0}::{closure_env#0}<alloc::boxed::Box<(dyn tentacle::traits::ServiceProtocol + core::marker::Send + core::marker::Unpin), al
loc::alloc::Global>>>> @ 0x00007fcfb11ed640, cx=0x00007fcfb11ee378) at poll_fn.rs:38:9
    frame #13: 0x0000557a19df318c ckb_network-04aa2da82c55f2f4`tentacle::protocol_handle_stream::ServiceProtocolStream$LT$T$GT$::run::_
$u7b$$u7b$closure$u7d$$u7d$::hd941c36c86009bb0((null)=ResumeTy @ 0x00007fcfb11edc30) at protocol_handle_stream.rs:235:13
    frame #14: 0x0000557a19de8a5b ckb_network-04aa2da82c55f2f4`_$LT$core..future..from_generator..GenFuture$LT$T$GT$$u20$as$u20$core..f
uture..future..Future$GT$::poll::h8531003019b6f92a(self=Pin<&mut core::future::from_generator::GenFuture<tentacle::protocol_handle_stre
am::{impl#1}::run::{async_fn_env#0}<alloc::boxed::Box<(dyn tentacle::traits::ServiceProtocol + core::marker::Send + core::marker::Unpin
), alloc::alloc::Global>>>> @ 0x00007fcfb11edce8, cx=0x00007fcfb11ee378) at mod.rs:91:19
    frame #15: 0x0000557a19e2ddf4 ckb_network-04aa2da82c55f2f4`tentacle::service::Service$LT$T$GT$::init_proto_handles::_$u7b$$u7b$clos
ure$u7d$$u7d$::h04ec03d46791223a((null)=ResumeTy @ 0x00007fcfb11ee1f0) at service.rs:802:41

大部分线程都在 tentacle::protocol_handle_stream 的 async fn 中运转,这很不正常,并且这很可能意味着,没有执行器能够执行其他的 future,与之前的猜测一致,有其他异常导致 tentacle 无法接到用户命令。

出现这种问题怎么解决它,这是一个经典的协作式运行时饿死问题,当 worker 被占用并无法跳出任务时,整个程序将处于僵死状态,tokio 引入了 budget 模型 将内部的核心实现都插入这个计算,以类似 CPU 时间片的方式来分配 worker 的连续执行能力,到达上限时,当前 Future 被强制让出执行权,用来更和谐地与其他任务协作。但是,这不是一套强制的抢占式模型,用户不依赖 tokio 内部 Future 自行实现的 Future 并没有这个逻辑,恰巧,tentacle::protocol_handle_stream 中并没有这套逻辑,于是,我引入它,经过测试,引入该系统后,ckb network 的单元测试 100% 能通过了。

这说明,我修复了这个 bug,但这并不是结束点,我还没有找到引起这个问题的原因,这样强制修复并不完整,如果就此结束,那隐患就埋下了。

再次回到 hang 住的地方

再次在本地复现 hang 住的场景,并加入日志,想看看程序是在干什么。经过一番查看,该程序是 100% cpu 满转执行中,一致无法被打断,加入 budget 之后,因为强制 pending 可以被让出,最后定位到,是 ckb 升级过程中,ping 协议的 poll 方法实现错误:

/// Behave like `Stream::poll_next`, but nothing output
/// if ready with Some, it will continue poll immediately
/// if ready with None, it will don't try to call the function again
#[inline]
async fn poll(&mut self, _context: ProtocolContextMutRef<'_>) -> Option<()> {
    Some(())
}

一直返回 Some 意味着该 future 一直处于 ready 状态,可以一直执行,所以,所有 worker 都被卡死在这里了。然后我修复了这段逻辑。

最后

到这,我终于找到了导致异常的源头问题,这次修复过程也就圆满结束了。具体修复可以看看这个 PR。虽然真实问题是实现上导致的,但 tentacle 还是需要引入 budget 用来对用户的错误行为进行兜底。

值得一提的是,整个过程中,我还使用了 tokio-console 去定位可能存在的 wake 丢失问题,但并没有带来太大的收益,在切换单线程和多线程 runtime 的过程中还引起了一些误导,不过等完全知道这次 bug 的起因之后,再想想这些误导,其实严格意义上并不是错误的,只是,对我排查问题并没有带来太大的收益。

评论区

加载更多

登录后评论