藏川线前段

--- 摄于 2017 年 9 月 藏川线前段

CITA系列之用户接口——Jsonrpc

写在最前面

  1. 这系列文章,仅代表我个人观点。
  2. CITA 代码上来说还在剧烈变动中,架构方面的变动可能性不大了(不排除有可能),但是具体实现上可能会有一些大的重构调整。
  3. 关于投资方面的事情,可以直接联系公司,我对这方面的邮件,一概无视。

这系列文章思路是按照模块从外部往内部讲,forever 模块、 evm 虚拟机实现、数据存储只会涉及接口,具体实现不会详细分析。之后,再联系几个模块讲解一些跨模块的功能,例如数据同步机制(涉及 network、chain、executor)、预执行机制(涉及bft、executor)。好,下面开始主题,Jsonrpc 模块。

Jsonrpc

CITA 的架构中,负责对外交互的模块有两个,一个是 Jsonrpc,一个是 Network,分别负责与人交互,与 peers 交互,这篇的主题是与人交互的 Jsonrpc。

顺便一提,CITA 项目将 Rust panic 的默认行为从线程退出改成了进程退出,于是就有了 forever 模块,这个模块类似于 goever,用来将意外死去的进程拉起来。

代码分享

Jsonrpc 相对来说比较简单,提供了两个对外接口,分别是 WebSocket 和 Http,默认都是开启的,如果需要关闭,可以通过配置文件进行调整。

这个模块的核心功能:

  1. 提供对外接口(核心数线程 + 1)
  2. 将 request 分类转发给对应的模块
  3. 监听 MQ 消息,将 response 返回给用户

这是一个无状态的模块,依赖 Rust 社区的 tokio/future/mio 这一套异步工具,值得期待的是,2018 年这套工具应该会完成稳定的任务,之后网络应用这种 IO 库应该会好写很多。

对外接口

tokio 系列经历了几次重构,目前 Jsonrpc 用的 hyper 库依赖的并不是最新的版本(最新版本有一个运行时,可以提供线程池及 work-steal 模式的任务分配),而是上一个版本的方案,依赖 tokio-core 作为异步的执行器,所以在 http 接口上,启动了与服务器核心数一致的线程,每个线程一个 core 对外部请求进行分发处理:

代码地址: https://github.com/cryptape/cita/blob/develop/cita-jsonrpc/src/main.rs#L202

let threads: usize = config
            .http_config
            .thread_number
            .unwrap_or_else(num_cpus::get);

for i in 0..threads {
    let addr = addr.clone().parse().unwrap();
    let tx = tx_relay.clone();
    let timeout = http_config.timeout;
    let http_responses = Arc::clone(&http_responses);
    let allow_origin = http_config.allow_origin.clone();
    let _ = thread::Builder::new()
        .name(format!("worker{}", i))
        .spawn(move || {
            let core = Core::new().unwrap();
            let handle = core.handle();
            let timeout = Duration::from_secs(timeout);
            let listener = http_server::listener(&addr, &handle).unwrap();
            Server::start(core, listener, tx, http_responses, timeout, &allow_origin);
        })
        .unwrap();
}

相对来说,WebSocket 只用了一个线程对外进行监听:

代码地址:https://github.com/cryptape/cita/blob/develop/cita-jsonrpc/src/main.rs#L187

处理的逻辑:

将接到的请求,转成 reqlib::Request + topic 格式,会生成一个 uuid 作为 index 保存对应的 jsonrpc id /版本号/以及用户的 channel(socket),并插入全局 Response 里,待超时或者 MQ 中返回对应的 Response 内容后,从中取出,返回用户,看代码的时候,可以重点关注一下这个全局 Response:

/// 类型 Arc<Mutex<HashMap<Uuid, TransferType>>>
let responses = Arc::new(Mutex::new(HashMap::with_capacity(backlog_capacity)));

处理请求代码:https://github.com/cryptape/cita/blob/develop/cita-jsonrpc/src/http_server.rs#L66

将 request 分类转发给对应的模块

代码:https://github.com/cryptape/cita/blob/develop/cita-jsonrpc/src/main.rs#L163

fn forward_service(
    topic: String,
    req: reqlib::Request,
    new_tx_request_buffer: &mut Vec<reqlib::Request>,
    time_stamp: &mut SystemTime,
    tx_pub: &Sender<(String, Vec<u8>)>,
    config: &NewTxFlowConfig,
) {
    if RoutingKey::from(&topic) != routing_key!(Jsonrpc >> RequestNewTx) {
        let data: Message = req.into();
        tx_pub.send((topic, data.try_into().unwrap())).unwrap();
    } else {
        new_tx_request_buffer.push(req);
        trace!(
            "New tx is pushed and has {} new tx and buffer time cost is {:?}",
            new_tx_request_buffer.len(),
            time_stamp.elapsed().unwrap()
        );
        if new_tx_request_buffer.len() > config.count_per_batch
            || time_stamp.elapsed().unwrap().subsec_nanos() > config.buffer_duration
        {
            batch_forward_new_tx(new_tx_request_buffer, time_stamp, tx_pub);
        }
    }
}

接到解析好的 Request 之后,如果 topic 是交易信息,会缓存在 buffer 中,超时或者达到一定量之后向 Auth 模块统一转发,而其他的请求信息就直接转发给对应的模块。

监听 MQ 消息,将 response 返回给用户

代码:https://github.com/cryptape/cita/blob/develop/cita-jsonrpc/src/main.rs#L231

pub fn handle(&mut self, key: &str, body: &[u8]) {
   let mut msg = Message::try_from(body).unwrap();
   trace!("get msg from routint_key {}", key);

   match RoutingKey::from(key) {
       routing_key!(Auth >> Response)
       | routing_key!(Chain >> Response)
       | routing_key!(Executor >> Response)
       | routing_key!(Net >> Response) => {
           let content = msg.take_response().unwrap();
           trace!("from response request_id {:?}", content.request_id);
           let value = { self.responses.lock().remove(&content.request_id) };
           if let Some(val) = value {
               match val {
                   TransferType::HTTP((req_info, sender)) => {
                       let _ = sender.send(Output::from(content, req_info.id, req_info.jsonrpc));
                   }
                   TransferType::WEBSOCKET((req_info, sender)) => {
                       let _ = sender.send(
                           serde_json::to_string(&Output::from(content, req_info.id, req_info.jsonrpc)).unwrap(),
                       );
                   }
               }
           } else {
               warn!("receive lost request_id {:?}", content.request_id);
           }
       }
       _ => {
           warn!("receive unexpect key {}", key);
       }
   }
}

主要就是监听 MQ 信息,根据 ws 或者 http 分别构建 json 数据并返回。

总结

这个模块的逻辑其实非常简单,类似于一个消息接收/分发的机制,它不需要知道消息是什么,只是根据 topic 去转发消息,然后本身有超时控制和辨别请求格式的功能。

下一篇应该是 NetWork,先把交互的两个模块讲完,相对于内部处理,网络这一块大部分人都经常接触,比较容易理解。

顺便一说,在 https://github.com/driftluo/cita-cli 这里,我在写一个对 cita 的命令行工具,可以理解为 client,有兴趣可以一起来写哦。

评论区

加载更多

登录后评论