藏川线前段

--- 摄于 2017 年 9 月 藏川线前段

CITA 的命令行工具(二)

Rust社区

异步的发展

众所周知,Rust 1.0 发布在 2015 年,虽然在 1.0 之前开发了将近十年时间,破坏性的各种实验进行了无数次,但是在 1.0 发布的时候,本身标准库并不支持异步系统。于是 Rust 社区开始了对异步系统长达几年的实验,实验内容就是实现各种异步框架,让 Rust 能够支持异步操作。

这个实验从一开始就分为两派:

官方也在做实验:

国内也有几位大佬做了这方面的尝试:

以上是 Rust 社区对异步的实验探索产物,近期(个人乐观预测到明年三月左右,悲观的话。。。)async 关键字的使用应该能够登录 Rust stable 版本,整个 2018 年,社区都在期待这件事完成,可惜还是延时了。

http 库的发展

hyper

作为网络方面的基础库 HTTP 库的发展也让人十分揪心,在 2018 年之前,整个社区的 http 基础库就只有一个, seanmonstar 带头在社区实现的 hyper ,这个库在 0.10 版本以下是使用的同步逻辑,在 0.11 的时候跟进 tokio 进行了异步的尝试,在 tokio 重构后,在 0.12 的时候再次跟进 tokio 的重构方案,慢慢稳定下来,但是这个库的发展速度相对来说比较慢。

actix

2018 年上半年,由 fafhrd91 带头(也可以说一己之力)在社区中实现了一个 actor 模型的框架 actix,再基于 actix,实现了 actix-web ,这位大佬真真以一己之力把 actix 周边的基础库(生态)开发出来,进而吸引到一堆大佬加入其中,硬生生在 hyper 一统江湖的局面中杀出了一个 actix,相对与 hyper 来说,这个库的发展快得多,支持的特性也多,如果熟悉 actor 模型的话,这个库也是一个很好的选择。actix 并没有同步的版本,从开始就在 tokio 和 futures 的上层进行工作。

顺便一说,这位大佬,也是 aiohttp 的核心开发者之一,佩服。在这个库的贡献者名单里面,我发现了不少 Python 社区的名人,比如 flask 的作者,大佬果然无论到哪里都是大佬。

cita-tool 的选择及方案调整

接下来,到了重要的抉择时刻,说到底,命令行工具的核心还是 http 客户端的选择,考虑到熟悉程度以及 actor 模型到底能吸引(排斥)多少人的问题,cita-tool 最后选择了 tokio + hyper 的一套核心方案,然而,就这套方案依然重构了几次,就为了能够让自己不难受。

初始方案

受到之前写 InfluxDBClient-rs 库的影响,习惯性地将真正的执行器(client)存在一个对外核心结构体中,第一版的 cita-tool 中 Client 写成了如下模型:

#[derive(Debug)]
pub struct Client {
    id: u64,
    run_time: Runtime,
    chain_id: Option<u32>,
    sha3_private_key: Option<Sha3PrivKey>,
    #[cfg(feature = "blake2b_hash")]
    blake2b_private_key: Option<Blake2bPrivKey>,
    debug: bool,
}

impl Client {
    fn run(
        &self,
        reqs: JoinAll<
            Vec<Box<dyn Future<Item = hyper::Chunk, Error = ToolError> + 'static + Send>>,
        >,
    ) -> Result<Vec<JsonRpcResponse>, ToolError> {
        let responses = self.run_time.borrow_mut().block_on(reqs)?;
        Ok(responses
            .into_iter()
            .map(|response| {
                serde_json::from_slice::<JsonRpcResponse>(&response)
                    .map_err(ToolError::SerdeJson)
                    .unwrap()
            })
            .collect::<Vec<JsonRpcResponse>>())
	}
}

这个 Runtime 之前还经历过 Handle 的版本,无论是 Runtime 还是 Handle 实际上都是一个东西,就是 tokio 库提供的 event loop,可以简单地理解为是真正的对外请求的执行环境。所有其他的操作都只是为了构造一个 future(task) 任务,然后扔进去,等待返回执行结果。接下来,因为忙着把功能覆盖完全,对这个版本的改动仅仅是因为 Runtime 执行阶段需要 mutable,而对其进行了 refcell 的简单包装,变成下面这样,本质上,并没有很大的变化:

#[derive(Debug)]
pub struct Client {
    id: AtomicUsize,
    url: Uri,
    run_time: RefCell<Runtime>,
    chain_id: Option<u32>,
    sha3_private_key: Option<Sha3PrivKey>,
    #[cfg(feature = "blake2b_hash")]
    blake2b_private_key: Option<Blake2bPrivKey>,
    debug: bool,
}

上面这种结构的好处就是容易写,非常符合同步写法的习惯,坏处就很多了:

全异步的尝试

为了解决上诉几个问题,我研究了几个异步库的实现,发现它们都和 hyper 类似,所有对 runtime 的管理都交给用户去完成,自己库中只做生成 future 的部分。这个思路很棒,于是我吭哧吭哧的对 cita-tool 进行改写,写成类似下面的代码:

pub struct Client {
    id: AtomicUsize,
    url: Uri,
    chain_id: Option<u32>,
    sha3_private_key: Option<Sha3PrivKey>,
    #[cfg(feature = "blake2b_hash")]
    blake2b_private_key: Option<Blake2bPrivKey>,
    debug: bool,
}

impl Client {
    fn task1(&self) -> Box<dyn Future<Item = RpcResponse, Error = ToolError> + 'static + Send>> {}
    fn task2(&self) -> Box<dyn Future<Item = RpcResponse, Error = ToolError> + 'static + Send>> {}
}

但是,在将库改动到最后三分之一的时候,突然遇到一个巨大的坑,大致逻辑是 task1 任务的生成过程依赖 task2 任务的完成,这个问题就麻烦了,用代码简单示例一下,如下:

fn task1(&self) -> Box<dyn Future<Item = RpcResponse, Error = ToolError> + 'static + Send>> {
    let task = self.task2
        .and_then(|response| {
            do_something();
            let task = xxx;
            tokio::spawn(task);
        });
    Box::new(task)
}

这种样式的代码在目前是肯定无法通过编译器检查的,这里有一个很神奇的问题,就是自引用(经群友指出,单看示例代码,并不是自引用问题,而是生命期问题,关键我也找不到代码了。。。emm 意思到位了就行)。官方也一直在解决这个问题,如果要对这个问题进行比较详细的了解,可以看 withoutboard 大佬的博客,可以看到官方的一些解决思路,以及 Pin 的演进过程。简单的解释一下,就是 self 的引用无法证明自己还在原地(没有被移动),如果移动了,这里就有一个悬垂指针的安全问题,继而编译器是不给通过的。

不管是 yeild 还是异步的语言支持,实际上都在这里卡住了,现在官方和社区正在紧张开发中,期待ing。

因为这个问题,重构到最后部分的代码,直接回滚,很遗憾,这部分重构并不能在 git 历史中找到,所以这里也是一段简化版的示意代码。那么有没有路能够绕开这个问题呢,其实在这个思路下还是有解的,就是把各种任务拆开,不要依赖结构体的存储,一方面这个改动思路几乎相当于重写整个库,另一方面我发现假定按照这个思路去做,可能会出现一个很神奇的结构体:

Box<Box<Box<Box<dyn Future<Item=...>>>>>

无限 Box 下去是夸张的说法,实际上到底能有几层还真不太确定,因为有几个任务确实存在相互依赖的问题。

于是这个方向上的尝试被我自己放弃了。

重新思考

之后呢,我开始重新思考这个问题的解决方案,山不转水转嘛,核心问题是要把这个 runtime 共享出去,做到既不浪费资源也不影响用户在使用该结构时被限制创建 runtime 。那么我们建个任务队列怎么样,runtime 从任务队列中取任务执行,然后返回对应的结果给需求方,这样全局共享一个 runtime,只需要在创建的时候将其扔到其他线程去就可以了,本身 Client 只需要握住任务队列的 sender 端就行了,并且同时解决了 Client 不能 Clone 的问题,这也是一个好思路。

接下来,目前版本的实现如下代码:

use futures::{future::join_all, future::JoinAll, sync, Future, Stream};

pub struct Client {
    id: AtomicUsize,
    url: Uri,
    sender: sync::mpsc::UnboundedSender<Box<dyn Future<Item = (), Error = ()> + Send + 'static>>,
    chain_id: Option<U256>,
    private_key: Option<PrivateKey>,
    debug: bool,
}

impl Client {
    /// Create a client for CITA
    pub fn new() -> Self {
        let (sender, receiver) = sync::mpsc::unbounded();

        ::std::thread::spawn(move || {
            let task = receiver
                .map_err(|_| ::std::io::Error::new(::std::io::ErrorKind::Other, ""))
                .for_each(|item| {
                    tokio::spawn(item);
                    Ok(())
                }).map_err(|_| ());

            tokio::run(task);
        });

        Client {
            id: AtomicUsize::new(0),
            url: "http://127.0.0.1:1337".parse().unwrap(),
            sender,
            chain_id: None,
            private_key: None,
            debug: false,
        }
    }
    
    fn run(
        &self,
        reqs: JoinAll<
            Vec<Box<dyn Future<Item = JsonRpcResponse, Error = ToolError> + 'static + Send>>,
        >,
    ) -> Result<Vec<JsonRpcResponse>, ToolError> {
        let (tx, rx) = sync::oneshot::channel::<Result<Vec<JsonRpcResponse>, ToolError>>();
        let req = reqs
            .then(move |res| tx.send(res))
            .map(|_| ())
            .map_err(|_| ());
        self.sender
            .unbounded_send(Box::new(req))
            .map_err(|e| ToolError::Customize(e.to_string()))?;
        rx.wait().map_err(|e| ToolError::Customize(e.to_string()))?
    }

    fn debug_request<'a, T: Iterator<Item = &'a JsonRpcParams>>(params: T) {
        params.for_each(|param| {
            println!("<--{}", param);
        });
	}
}

impl Clone for Client {
    fn clone(&self) -> Self {
        Client {
            id: AtomicUsize::new(self.id.load(Ordering::Relaxed)),
            url: self.url.clone(),
            sender: self.sender.clone(),
            chain_id: None,
            private_key: self.private_key,
            debug: self.debug,
        }
    }
}

如上实现,在初始化的时候,直接起一个线程作为 runtime 执行器,利用 channel 作为任务队列,在 run 函数里面,创建一个一次性 channel,将 futures 任务的返回值通过这个一次性 channel 传回来,然后顺便实现 Client 的 clone 函数,这样,很自然地共享了 runtime,并且可以多个 client 同时发送任务过去,并不会因为多个任务的问题导致返回错乱,这样一封装,资源浪费问题、影响用户体验问题都解决了。这个并不是最佳实践,只是对于这个场景的一种实现手段,说穿了其实不值一提,权当一个实现思路吧。

实现时候的坑

实现上诉功能的时候,又踩了一个 futures 0.1 的坑,这里稍微介绍一下 futures 这个库的发展:

0.1 版本的 futures,核心是两个 trait,可以简单地像下面这样去理解和类比(只是帮助理解,并不正确):

这两个 trait 可以进行相互转换,这个没毛病,但是这里有一个很神奇的实现,就是 sync::mpsc::UnboundedReceiver 这个结构,实现了 Stream ,但是你如果直接当成 stream 去用的时候,会报一个神奇的错误,说这个类型 () 没有实现 IntoFuture 所以所有有关的方法都不能使用,很气!当时考虑是自己 newtype 包装一下然后实现 stream 还是怎么去做。。。最后发现有个 map_err 可以把这个问题规避掉,就写成了上面代码的模样,先 map_err 把 () 转成 io::err ,然后就可以愉快得使用了。

再简单说一下 tokio

tokio 现在的 runtime 分为单线程和线程池两个实现,runtime 有两个方法,也可以通过类比的方式简单理解:

tokio 的整个任务分配是 work-stealing base 调度器,就目前来说,性能还是很棒的。

小结

这一篇大致讲解了 cita-tool 库核心结构的演化过程,一步步通过解决问题思路的转变,将问题解决掉,最后实现了一个相对来说还可以的方案。这个库的重构并不止是这个结构体,还有 trait 方案的演变,鉴于这一篇已经太长了,留待后面叙述。

评论区

加载更多

登录后评论