CITA系列之核心存储——Chain
ps:非常抱歉,这一期晚了整整一天。造成这一切的原因是,我在捣鼓(重装)系统,期间碰到了一些坑坑坎坎,还要把环境恢复到可工作状态,着实费了不少力气,不过现在已经搞定了,干净的新系统还是很棒的。
上一篇 Executor,大致说了 Executor 中维护着一个全局状态,或者说全局配置,Executor 根据接到的块对全局状态进行修改,然后向前推演,而 Chain 在拆出 Executor 之后,只维护了一个状态,就是全局高度,同时负责对 block 的序列化存储,这个模块已经相对来说非常简单了,即不存在状态机,也没有复杂的任务。
Chain
Chain 的任务非常简单:
- 接受 MQ 消息,大致分为以下任务:
- 查询请求,如果可以解决,直接返回,不可以解决,转发给 Executor(查询的对象不在 block 上)
- 新块到来,验证后缓存在 block_map 里
- 执行结果到来,从 block_map 里拿出对应的块,验证后存盘
- 超时检查,超过预定时间,块执行结果未到来,广播 chain 当前状态,通知 network、bft 做对应的处理
- 垃圾回收,为了应对存储,chain 缓存了一部分 block 信息在内存中,每过一段时间回收处理一次
Chain 主要的事情就上面这些,对比起 Executor 等模块来说,相当简单。
代码分享
MQ 消息的处理分别用两个线程去处理,一个只读,一个写入:
查询数据:
thread::spawn(move || loop {
if let Ok((key, msg)) = rx.recv() {
forward.dispatch_msg(&key, &msg);
}
});
写入块,包括超时处理:
thread::spawn(move || {
let mut timeout_factor = 0u8;
loop {
if let Ok(einfo) = write_receiver
.recv_timeout(Duration::new(18 * (2u64.pow(timeout_factor as u32)), 0))
{
block_processor.set_executed_result(einfo);
timeout_factor = 0;
} else {
// Here will be these status:
// 1. Executor process restarts, lost cached block information.
// 2. Executor encountered an invalid block and cleared the block map.
// 3. Bft restarted, lost chain status information, unable to consensus, unable to generate block.
//
// This will trigger:
// 1. Network retransmits block information or initiates a synchronization request,
// and then the executor will receive a block message
// 2. Bft will receive the latest status of chain
block_processor.reset_max_store_height();
block_processor.broadcast_current_status();
if timeout_factor < 6 {
timeout_factor += 1
}
}
}
});
垃圾回收:
//garbage collect
let mut i: u32 = 0;
loop {
thread::sleep(time::Duration::from_millis(10_000));
if i > 100 {
chain.collect_garbage();
i = 0;
}
i += 1;
}
出块完成后,将当前块的交易 hash 发给 auth,同时广播当前状态:
代码:https://github.com/cryptape/cita/blob/develop/cita-chain/core/src/libchain/chain.rs#L581
总结
看到这里的朋友,对 cita 的绝大部分地方都可以说有一个轮廓了,再加上这个模块除了存储管理,剩下的东西实在不多,就略略指了一个方向,剩下的东西可以自行研究。
下一篇:Auth

评论区
加载更多