CITA系列之执行器——Executor
写在前面
在之前,chain 和 executor 是一个模块,后来,由于想要支持多执行器并行,将 executor 单独拆出来,目前还有一些冗余代码、一些需要改动的地方,而且,并行计算还没有开始支持。cita 与 以太坊 的架构上有一些不同,以太坊的共识,是直接共识执行结果,而 cita 采用的是复制状态机的模型,共识的是交易的顺序,共识开始(预执行)或者完成后,才进行交易执行,状态更新等操作。
Executor
executor 总共做了两件事:
- 监听 MQ 消息并做想要的响应
- executor 状态机,执行交易,更新状态,并把结果发送给 chain
- 超时处理
- gRPC 接口,用于对接外部执行器,比如 go 合约执行器
- 维护全局状态树并持久化,包括 account 信息、状态信息
go 合约的实现类似于 fabric ,用 gRPC 留了一个接口,链上注册函数,执行过程留给外部执行器,等待返回结果并存储,理论上这个模式的执行器可以支持任何语言的合约执行,前提是有对应语言的外部执行器。
executor 的状态机状态如下:
#[derive(Debug, Clone)]
pub enum Stage {
/// Exeuting block
ExecutingBlock,
/// Executing proposal
ExecutingProposal,
/// Finish executing proposal and wait
WaitFinalized,
/// Finalized
Idle,
}
注释比较清晰了,这里涉及一个预执行 proposal 的状态,整个流程就是 ExecutingProposal -> WaitFinalized(ExecutingBlock) -> Idle -> ExecutingProposal(ExecutingBlock) 这样的逻辑,内部还有一个标志位 is_interrupted 用来控制交易是否继续执行。
代码分享
gRPC server
代码:https://github.com/cryptape/cita/blob/develop/cita-executor/src/main.rs#L159
thread::spawn(move || loop {
if server.is_none() {
server = vm_grpc_server(
grpc_ext.grpc_port,
Arc::clone(&service_map),
Arc::clone(&grpc_ext.ext),
);
} else {
thread::sleep(Duration::new(8, 0));
}
});
尝试建立 gRPC server ,发送交易到外部及获取结果在 executor 执行交易处
消息分发及处理
代码:https://github.com/cryptape/cita/blob/develop/cita-executor/src/executor_instance.rs#L82
这里监听 MQ 消息分为几类:
- 返回(响应)查询结果:
Auth >> MiscellaneousReq:auth 启动的时候需要查询创世块的 chain id 信息Chain >> Request:jsonrpc 的查询请求 -> chain 接到后发现自己没有这个数据 -> 给 executor,拿到数据后发送信息个 jsonrpc
- 同步 chain 状态:
Chain >> RichStatus:executor 会将执行结果缓存起来,根据 chain 落盘情况删除缓存的执行结果
- 接收 block,缓存,发送信号给执行线程:
Consensus >> BlockWithProof:接到 bft 发来的共识块,检验高度、hash、时间后,如果状态不在 Idle,则先判断 block 与 proposal block 是否相同,如果不相同,则将标志位is_interrupted设置为 true。后面操作一样,都是将 block 缓存在block_map里面,并发送高度给执行交易线程Net >> SyncResponse:接到 network 发来的同步块,检验高度、hash、块的状态,插入block_map,发送高度给执行线程Consensus >> SignedProposal || Net >> SignedProposal:接到 bft 发来的已经签名的 proposal,检验高度、hash、时间后,如果状态为ExecutingProposal,判断block_map中的 proposal 是否与目前接到的一致,不一致的时候,设置标志位is_interrupted为 True,并将 proposal 插入block_map,发送高度给执行线程;如果状态为WaitFinalized,判断block_map中的 proposal 是否与目前接到的一致,不一致的时候,将 proposal 插入block_map,发送高度给执行线程;如果状态为 Idle,将 proposal 插入block_map,发送高度给执行线程
交易执行
代码:https://github.com/cryptape/cita/blob/develop/cita-executor/src/executor_instance.rs#L189
整体思路是,取出 block_map 中对应高度的块,分为三类:
- 共识块
- 如果
is_interrupted为 true,设置状态为ExecutingBlock,进入执行交易流程; - 如果
is_interrupted为 false,状态为WaitFinalized,close_block有值(预执行结果),设置状态为ExecutingBlock,进入finalize_proposal流程,替换 proof、proposer、更新全局配置、状态写入磁盘、缓存执行结果并发送给 chain、更新缓存的最近 255 块 hash; - 如果
is_interrupted为 false,状态为WaitFinalized,close_block无值(预执行结果),执行共识块交易,完成后直接进入finalaze_block流程; - 如果状态为
Idle,执行共识块交易,完成后直接进入finalaze_block流程;
- 如果
- 同步块
- 如果同步块有对应的 proof,那么进入同步块处理阶段,一个循环,将
block_map中连续的同步块都一个个取出来,验证 proof 、hash、高度等信息,成功后,执行同步块交易,完成后直接进入finalaze_block流程,最后根据高度,丢弃block_map中已经完成的块;如果中途有一个失败,则直接清空block_map;
- 如果同步块有对应的 proof,那么进入同步块处理阶段,一个循环,将
- proposal 块
- 这里只有两种可能,一个是
Idle,一个是WaitFinalized,如果是空闲,那就直接去执行 proposal 的块交易,如果是等待 共识块的状态,这必然是前面的 proposal 块与后来的 proposal 块不一致,要根据新的 proposal 块进行执行;
- 这里只有两种可能,一个是
这一块因为存在预执行,所以状态显得比较复杂。块的来源从上面看是有三个的,预执行就是在共识未达成之前,取一个已经签名的 proposal 开始执行,并每执行 255 个交易检查一下是否需要放弃当前块,去执行新的 proposal 或者是 共识块,executor 认为,同一高度后来的 proposal 是趋向于正确的块,打断这一块的代码在如下地址:
https://github.com/cryptape/cita/blob/develop/cita-executor/core/src/libexecutor/block.rs#L421
接到的每个块想要执行首先要经过一系列的验证后才能进入执行阶段,尤其重要的是这个 proof,是共识的证明,如果是共识块,结构体是 block_with_proof,即自带 proof。而同步块的 proof 是在后一个块上,如果是最后一个块,则会把 proof 作为 u64::Max 块发送过来,即要证明当前块有效,必须有下一个块到来,延时确认。这一块同步逻辑会单独作为一篇的主题进行讲解。
总结
目前的 executor,维护了全局状态,导致 executor 和 chain 的状态需要实时同步,并不是单纯的执行器,理论上执行器应该是个类似于 actor 样的东西,接收数据,按照既定逻辑执行,然后返回值,无状态的执行器。这方面的考虑也在进行中,也许后期会直接把状态给剥离掉,变成单纯的执行器。
看到这里的朋友,可以看出其实每个模块里面的一些思路都是类似的,消息分发,超时机制等等,这方面可以抽一个微服务框架出来,最近也在考虑做这个事情,抽离一些不必要的重复逻辑,让每个模块专注于处理自己的事情,框架负责稳定性和消息分发。
好,下一篇应该是 chain,那我们下周再会。

评论区
加载更多