go-ethereum 源码笔记(trie 模块-MPT 的实现)

默克尔树是区块链的基础结构,在比特币里它用来存放区块内的所有交易,用一个数字指纹(hash)来表示整个交易集合,在以太坊中,提出了一种新的数据结构叫做 Merkle Patricia Tree(常用翻译:默克尔帕特里夏树,缩写是 MPT),在以太坊中它用来组织管理账户,交易,收据等数据。从名字或许可以看出来它是一个组合名字,它实际上是默克尔树和帕特里夏树两种数据结构的结合,在阅读 trie 模块之前,读者应该对默克尔树,帕特里夏树这两个数据结构有一定了解,这些是阅读源代码的基础知识,这里只做概述。下面我们先分别介绍这两种数据结构。

Merkle Tree 默克尔树

默克尔树即哈希树,它是一种树形数据结构,由一组叶结点,一组中间节点和一个根节点构成。最下面的叶结点包含基础数据,每个中间节点是它的子节点的哈希值,根节点是他的子节点的哈希值,表示默克尔树的根部。它的作用是对大容量的数据进行快速比对。对于一个数据集,我们可以将其分成多个小的数据集,存在叶子节点上,默克尔树的根节点存储的哈希值就表示这个数据集的哈希值,当更新、添加或删除树节点时,都需要更新节点的哈希值,根节点的哈希值也会有所不同,这样可以用根节点到数据节点这一路径的数据来证明数据的正确性,而这个数据量相比于整个数据集来说是很小的。

举两个实际的例子,你就能很好的理解默克尔树的作用。

Dynano 数据库

在 Amazon 的 Dynamo 数据库中,大量使用到默克尔树。场景是这样的:为了保证 Dynamo 集群中的数据存储的持久性,一台机器上的数据在其他机器上存有备份,也就是副本。副本经常需要同步,保证数据的一致,这时候需要对数据进行比对,找到不一致的地方。网络传输时间是这个过程的瓶颈,我们需要尽可能减少数据传输量。比对两台机器的数据,传输不一致的数据当然是最佳选择,如何比对?首先应该想到的是应该比对哈希值,避免之间传输数据带来的巨大传输量,其次,用二分法能更快的找到差异数据,如果能想到这两点,我们就和 Merkle 想的一样了。在每台机器对每个区间的数据构建默克尔树,比对数据时,从根节点开始比较,如果一致,表明两个副本一致,否则就遍历这颗二叉树,定位到差异数据的复杂度是 $O(log(n))$。

BitTorrent

BitTorrent 是一种中心索引式的 P2P 文件分析通信协议。进行 P2P 下载时,先从中心索引服务器获取一个扩展名为 torrent 的索引文件,torrent 文件包含了共享文件的信息,包括文件名,大小,文件的 Hash 和指向 Tracker 的 URL,当我们使用 BitTorrent 下载文件时有几个步骤:

  1. 客户端 A 根据下载的种子文件得出资源的目标服务器地址,然后与目标服务器地址建立连接,进行数据块下载。
  2. 当一个数据块下载完成后,客户端 A 会计算该数据块的摘要,然后用这个摘要与先前种子文件里的摘要进行对比,如果一致,表示这个数据块下载成功,否则,重新下载。
  3. 当一个数据块下载成功,客户端 A 会与其他下载资源的客户端进行通信,告知它们它已经成功下载了一个数据库,其他客户端需要下载这个数据块时,会去客户端 A 下载。

如果不采用默克尔树,可以采取 hash list 的方式,即将文件分成一个一个的数据块后,分别对其取 hash,这带来一个问题是,torrent 文件应该是越小越好,否则会对种子文件服务器造成很大的负载压力,要想要 torrent 文件很小,那么数据块就应该分得足够大,但数据块足够大的话,客户端下载完一个数据块后校验发现数据块无效,重新传输代价又很大。引入默克尔树可以解决这个问题,将所有数据块构造成默克尔树之后,种子文件可以只存放根节点的哈希值。当出现传输错误时可以使用二分查找快速找到出错的数据块,然后重新传输,

从上述的两个例子可以看到引入默克尔树的意义,默克尔树将哈希表和二叉树的结合起来,由根节点,中间节点和叶子节点组成,叶子节点存的是数据或哈希值,中间节点是它的两个孩子的哈希值,根节点也是它的两个孩子的哈希值,它表示这一组数据的指纹。根据哈希表和二叉树的特点我们可以知道,叶子节点数据的任何变动,都可以通过父节点体现,而通过二叉树,可以很快定位到变更的数据。

因此,默克尔树的典型应用场景有:

  • 快速比较大量数据:如果两个默克尔树的根节点的值相同,意味着它们代表的值相同
  • 可以快速定位到变更的数据,复杂度是 $O(log(n))$
  • 零知识证明

Patricia 帕特里夏树

要搞懂帕特里夏树,首先要先搞懂字典树。

trie 字典树

也称前缀树,单词查找树,键树。它用来保存关联数组,其中的键通常是字符串,键不直接保存在节点中,而是由节点在树中的位置决定。一个节点的所有子孙都有相同的前缀,也就是这个节点对应的字符串。并不是所有节点都有对应的值,只有叶子节点和部分内部节点所对应的键才有相关的值。

trie 的思想是用空间换时间,利用字符串的公共前缀来减少无谓的字符串比较以达到提高查询效率的目的。如下图就是一个 trie 树。

trie

应用范围

  • 字符串检索

Trie 树可以用来查询字符串,判断一个字符串是否存在于某个字符串集合。

  • 词频统计

实现词频统计时,节点结构有一个字段来记录该节点表示的单词的个数。

  • 字符串排序

一次插入字符串到 Trie 树的过程就是一次排序的过程,先序输出 Trie 树的所有关键字就可以得到有序的字符串。

  • 前缀匹配

在搜索引擎中,使用 Trie 前缀可以找到所有以某个字符串为前缀的字符串集合。

Trie 的插入和查询效率都是 $O(m)$,其中 m 是待插入/查询的字符串的长度。相比于哈希查找,Trie 树中不同的关键字不会产生冲突,而且查询共同前缀的 key 时很高效,如果是使用哈希查找,需要遍历整个哈希表,时间效率为 $O(n)$,n 为哈希表的数据总数;但 Trie 的缺点是空间消耗比较大,如果是直接查找,效率是 $O(m)$,并且有 m 次 IO 的开销,对于磁盘的压力也很大,而哈希表的查找效率是 $O(1)$。

实例

Trie 在 redisearch 中有实现,用来实现搜索提示。

弄清原理后,可以尝试自己动手实现一下,在 leetcode 上有相关题目。

PatriciaTrie 帕特里夏树

又叫做基数树,压缩前缀树或紧凑前缀树(compact prefix tree),是一种更节省空间的前缀树。它与 Trie 的区别是,如果某个节点只有一个子树,那么这个子树跟父节点合并,这样可以缩短 Trie 里不必要的深度,节约存储空间,加快搜索节点的速度。

帕特里夏树的诞生是因为 Trie 有一定的缺陷,Trie 树会给每个字符串分配一个节点,如果有很多很长的,又没有公共节点的字符串就会使得 Trie 退化成一个数组,如果以太坊直接使用的是 Trie 话,有可能造成严重的存储空间的浪费,导致 Dos 攻击。

Merkle Patricia Tree 默克尔-帕特里夏树

Merkle Patricia Tree 默克尔-帕特里夏树是一种融合了默克尔树和前缀树两种结构优点的,经过改良的数据结构,在以太坊中用来组织交易信息、账户状态及其变更、收据相关的数据,为我们提供一个高效地、易更新的、且代表整个状态树的『指纹』。

每一个以太坊的区块头包含3颗 MPT 树,分别是:

  • 交易树 txTrie
    用来存储交易数据。路径是 rlp(transactionIndex),其中 transactionIndex 是挖矿结束后得到的交易索引,在交易执行完后才生成,在挖矿结束后不会再更新。
  • 状态树 stateTrie
    世界状态存储的地方,存储了所有账户的信息,包括余额,交易次数,EVM 指令(合约数据)等等,每次交易执行,stateTrie 都会变化,这部分内容我们将在 go-ethereum 源码笔记(core 模块-状态管理) 这一篇进行探讨。路径是 sha3(ethereumAddress),值是 rlp(ethereumAccount),其中 ethereumAccount 是一个列表 [nonce,balance,storageRoot,codeHash],而 storageRoot 是另一个独立的帕特里树,每个账户都有一个独立的帕特里夏树,它用来存储所有的合约数据。参考 Wiki Patricia Tree
  • 收据树 receiptTrie
    路径是 rlp(transactionIndex),其中 transactionIndex 是挖矿结束后得到的交易索引,在交易执行完后生成,在挖矿结束后不会再更新。

Geth 的 MPT

MPT 在以太坊中作为键值存储的数据结构,使得插入,查找,删除的复杂度为 O(log(n)),并且能获得默克尔树的全部特性。

根据以太坊官方博客所描述的,MPT 使得轻节点可以实现以下的查询:

  • 这笔交易被包含在特定的区块中了吗?
  • 这笔交易在过去的30天中,发出 X 类型事件的所有实例(例如,一个众筹合约完成了它的目标)
  • 目前我的账户余额是多少
  • 这个账户是否存在
  • 假设在这个合约中运行这笔交易,它的输出是什么

其实第1种情况可以由交易树处理,第2种情况可以由收据树处理,第3,4,5中情况可以由状态树来处理,计算前4个查询任务相当简单,服务器找到对象,获取默克尔分支,将分支回复给客户端。
对于第5种查询任务,利用状态树实现,这种方式称为默克尔状态转换证明。轻节点想要得到一个可信的结果,这个问题相当于,如果在根 S 的状态树上运行交易 T,其结果状态树将是根为 S’,log 为 L,输出为 O’。为推断这个证明,服务器创建一个假的区块,状态设为 S,请求这笔交易时假装是轻客户端,请求这笔交易时,需要客户端确定一个账户的余额,然后这个假的轻客户端会发出一个余额查询请求,服务器会回应这些请求,也会将这些请求中的数据合并以一个证明的方式发送给轻节点,轻节点这时会进行验证,将服务器提供的证明作为数据库使用,如果客户端验证的结果和服务器提供的是一样的,客户端就接受这个证明。看起来这种方式跟默克尔证明没有本质区别,都很好的利用了默克尔树的特性。

接下来我们换个角度,根据默克尔树和帕特里夏树两种结构的特性来说明 MPT 在以太坊中起到的作用。

默克尔树的作用

轻节点扩展

MPT 实现了默克尔树,所以能提供一个默克尔证明这个功能,而通过默克尔证明我们可以实现区块链的轻节点扩展。

轻节点

在以太坊中,全节点是指维护整个区块链数据的节点,这包括整个区块链中的区块头,所有交易信息,账户状态,交易收据信息。随着时间推移,整个区块链数据量会非常大,这使得在 PC 或移动设备上运行全节点的可能性很小。中本聪的论文中描述了这种情况,并利用默克尔树来解决这个问题。我们可以运行一种节点,这个节点只维护区块链中所有的区块头信息,这种节点就叫做轻节点。

默克尔证明

默克尔证明是指一个轻节点向一个全节点发起一次证明请求,询问完整的默克尔树中是否存在一个指定的节点(这里指的是树的节点),全节点会向轻节点返回一个默克尔证明路径,由轻节点进行计算,验证其存在性。这里利用了哈希函数的特性保证了这个验证是真实可靠的。

当我们需要验证某笔交易确实存在于区块链时,轻节点不需要维护区块的信息,当它需要验证时,可以向全节点请求一条默克尔树的路径,该路径实际上是从根节点到叶子节点的所有节点的哈希值列表,轻节点收到这条路径后,一层层地计算根哈希,再与本地的根哈希匹配,如果匹配成功了则说明这笔交易是存在的。哈希操作是不可逆的,所以不存在一个所谓的恶意节点可以伪造一条假的路径使得计算的根哈希与轻节点的根哈希是一致的。有人可能会有疑问,为什么不直接向网络中的某个全节点请求交易数据是否在区块链中,如果这样做的话,就不能保证安全性了,区块链的美妙之处在于,上面存的数据是全网的共识,除非受到 51% 攻击,否则区块链的数据是完全可信的,这与中心化的数据库不同。如果是直接向网络中某节点或某些节点请求数据,可能这些节点都是恶意节点,不能保证安全性。需要说明的是,轻节点尽管没有参与维护整个区块链数据,它也是去中心化的,因为它所拿到的区块头数据是经过了全网共识的,所以有足够的安全保障。

帕特里树的作用

  • 存储任意长度的 key-value 键值对数据
  • 快速索引根据哈希存储的数据集
  • 引入了几种节点类型来提高效率,包括空节点,叶子节点,扩展节点,分支节点

MPT 在 geth 中的实现细节

数据结构

在以太坊中,MPT 有四种类型的树节点,目的是压缩整体树高、降低操作复杂度。树节点可以分为以下四类:

  • 空节点
  • 分支节点
  • 叶子节点
  • 扩展节点

这些是黄皮书中的定义,在 geth 的实现中,上述概念有不同的结构与之对应。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
type node interface {
fstring(string) string
cache() (hashNode, bool)
canUnload(cachegen, cachelimit uint16) bool
}
type (
fullNode struct {
Children [17]node
flags nodeFlag
}
shortNode struct {
Key []byte
Val node
flags nodeFlag
}
hashNode []byte
valueNode []byte
)
...
type nodeFlag struct {
hash hashNode
gen uint16
dirty bool
}

下图是一个世界状态的例子(摘自 Ethereum block architecture):

worldstatetrie

fullNode

fullNode 是一个可以携带多个子节点的节点。它有一个 node 数组类型的成员变量 Children,数组的前16个空位分别对应十六进制的0-9a-f,对于每个子节点,根据其 key 值的十六进制表示一一对应,Children 数组的第17位,fullNode 用来存储数据。

对应黄皮书中的分支节点。

shortNode

shortNode 是一个仅有一个子节点的节点。成员变量 Val 指向一个子节点,成员变量 Key 是一个由任意长度的字符串,这体现了压缩前缀树的特点,通过合并只有一个子节点的父节点和其子节点来缩短 Trie 的深度。

对应黄皮书里的扩展节点和叶子节点,通过 shortNode.Val 的类型来对应。

valueNode

valueNode 在 MPT 结构中存储真正的数据。充当 MPT 的叶子节点,不带子节点。

valueNode 是一个字节数组,但是它实现了 fstring(string) string, cache() (hashNode, bool), canUnload(cachegen, cachelimit uint16) bool 这三个接口(实际上 fullNode,shortNode,hashNode 也实现了这三个接口),因此可以作为 fullNode,shortNode 中的 node 使用。valueNode 可以承接数据,携带的的是数据的 RLP 哈希值,长度为 32 byte,RLP 编码的值存在 LevelDB 里。

hashNode

hashNode 是 fullNode 或 shortNode 对象的 RLP 编码的32 byte 的哈希值,表明该节点还没有载入内存。遍历 MPT 时有时会遇到一个 hashNode,表明原来的 node 需要动态加载,hashNode 以 nodeFlag 结构体的成员 hash 的形式存在,如果 fullNode 或 shortNode 的成员变量发生变化,那么就需要更新它的 hashNode,在增删改的过程结束了都会调用 trie.Hash(),整个 MPT 自底向上变量,对所有清空的 hashNode 重新赋值,最终得到根节点的 hashNode,也就是整个 MPT 结构的哈希值。

结合之前讲过的基础知识,可以看到 fullNode 体现了 Trie 的特点,shortNode 实现了 PatriciaTrie 的特性(当然也实现了 Trie 的特性),hashNode 既实现了 MPT 节点的动态加载,也实现了默克尔树的功能。

源码分析

Trie 的基本操作

我们首先将 Trie 看做一个黑盒,看看它具体暴露了什么接口,对全局有个总体的把握之后再深入其对应的细节。

初始化
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func New(root common.Hash, db *Database) (*Trie, error) {
if db == nil {
panic("trie.New called without a database")
}
trie := &Trie{
db: db,
originalRoot: root,
}
if (root != common.Hash{}) && root != emptyRoot {
rootnode, err := trie.resolveHash(root[:], nil)
if err != nil {
return nil, err
}
trie.root = rootnode
}
return trie, nil
}

通过调用 New 函数进行初始化,参数 root 是一个哈希值,如果哈希值不为空,说明数据中已存在 Trie 结构,调用 trie.resolveHash 方法来加载整个 Trie 树,如果 root 为空,新建 Trie 返回。

下面我们看看 Trie 结构的增删改查的过程,需要注意的是,MPT 是一棵树,这些数据结构的操作都是递归调用的过程,因此参数的上下文应该是当前这棵树下的含义。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
func (t *Trie) insert(n node, prefix, key []byte, value node) (bool, node, error) {
if len(key) == 0 {
if v, ok := n.(valueNode); ok {
return !bytes.Equal(v, value.(valueNode)), value, nil
}
return true, value, nil
}
switch n := n.(type) {
case *shortNode:
matchlen := prefixLen(key, n.Key)
if matchlen == len(n.Key) {
dirty, nn, err := t.insert(n.Val, append(prefix, key[:matchlen]...), key[matchlen:], value)
if !dirty || err != nil {
return false, n, err
}
return true, &shortNode{n.Key, nn, t.newFlag()}, nil
}
branch := &fullNode{flags: t.newFlag()}
var err error
_, branch.Children[n.Key[matchlen]], err = t.insert(nil, append(prefix, n.Key[:matchlen+1]...), n.Key[matchlen+1:], n.Val)
if err != nil {
return false, nil, err
}
_, branch.Children[key[matchlen]], err = t.insert(nil, append(prefix, key[:matchlen+1]...), key[matchlen+1:], value)
if err != nil {
return false, nil, err
}
if matchlen == 0 {
return true, branch, nil
}
return true, &shortNode{key[:matchlen], branch, t.newFlag()}, nil

case *fullNode:
dirty, nn, err := t.insert(n.Children[key[0]], append(prefix, key[0]), key[1:], value)
if !dirty || err != nil {
return false, n, err
}
n = n.copy()
n.flags = t.newFlag()
n.Children[key[0]] = nn
return true, n, nil

case nil:
return true, &shortNode{key, value, t.newFlag()}, nil

case hashNode:
rn, err := t.resolveHash(n, prefix)
if err != nil {
return false, nil, err
}
dirty, nn, err := t.insert(rn, prefix, key, value)
if !dirty || err != nil {
return false, rn, err
}
return true, nn, nil

default:
panic(fmt.Sprintf("%T: invalid node: %v", n, n))
}
}

参数 node 是当前插入的根节点,prefix 是当前已经处理的 key(即 Patricia 树中节点共有的前缀),key 是待处理的部分,完整的 key 应该是 prefix + key,value 是需要插入的值。返回的名为 dirty 的布尔值在 commit 阶段指明该树是否需要重新进行哈希计算。

如果当前根节点类型为 shortNode(当前节点为叶子节点或扩展节点),首先通过 prefixLen 方法计算公共前缀。

如果公共前缀长度等于 key 的长度,那么说明插入的 key 和待插入的树的 key 一样,这时分两种情况。

  • 如果 value 也一样,那么返回 false,即 dirty 为 false。
  • 如果 value 不一样,实际上这是一个 update 的操作,这时返回的 dirty 为 true。

如果公共前缀不完全匹配,又分两种情况。

  • 如果匹配长度为 0 的话,返回的是一个 fullNode,这个 fullNode 有两个 shortNode 类型的子节点,一个子节点即当前的根节点,另一个为以要插入的值为 node 的 shortNode;
  • 匹配长度大于零,这时公共节点提取出来形成一个独立的 shortNode(扩展节点),这个 shortNode 的 Val 是 fullNode,fullNode 的两个子节点,一个即当前的根节点,另一个为以要插入的值为 node 的 shortNode

这部分逻辑是当前根节点类型为 shortNode 的情况,可以看到这里实际上是帕特里夏树的体现,对于 Trie 进行空间使用率的优化,如果一个父节点只有一个子节点,那么这个父节点将与其子节点合并,这样可以缩短搜索深度。

对于当前根节点类型为 fullNode 的情况,直接往对应的孩子节点调用 insert 方法,insert 方法会返回插入后的根节点,然后将该孩子指向这个返回的根节点即可。

对于当前根节点类型为 nil 的情况,即要插入的 Trie 是空的,直接返回一个 shortNode,Val 字段即为要插入的 node。

对于当前节点为 hashNode 的情况,表明要插入的当前根节点还在数据库中,没有加载到内存中,首先调用 t.resolveHash(n, prefix) 方法进行加载,在调用 insert 方法进行插入。

以上就是插入一个新节点的全部逻辑,基本上概括了帕特里夏树的特点,删,改,查的操作实际上大同小异,因此接下来的讲解会简略一点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
func (t *Trie) Delete(key []byte) {
if err := t.TryDelete(key); err != nil {
log.Error(fmt.Sprintf("Unhandled trie error: %v", err))
}
}

func (t *Trie) TryDelete(key []byte) error {
k := keybytesToHex(key)
_, n, err := t.delete(t.root, nil, k)
if err != nil {
return err
}
t.root = n
return nil
}

func (t *Trie) delete(n node, prefix, key []byte) (bool, node, error) {
switch n := n.(type) {
case *shortNode:
matchlen := prefixLen(key, n.Key)
if matchlen < len(n.Key) {
return false, n, nil
}
if matchlen == len(key) {
return true, nil, nil
}
dirty, child, err := t.delete(n.Val, append(prefix, key[:len(n.Key)]...), key[len(n.Key):])
if !dirty || err != nil {
return false, n, err
}
switch child := child.(type) {
case *shortNode:
return true, &shortNode{concat(n.Key, child.Key...), child.Val, t.newFlag()}, nil
default:
return true, &shortNode{n.Key, child, t.newFlag()}, nil
}

case *fullNode:
dirty, nn, err := t.delete(n.Children[key[0]], append(prefix, key[0]), key[1:])
if !dirty || err != nil {
return false, n, err
}
n = n.copy()
n.flags = t.newFlag()
n.Children[key[0]] = nn

pos := -1
for i, cld := range n.Children {
if cld != nil {
if pos == -1 {
pos = i
} else {
pos = -2
break
}
}
}
if pos >= 0 {
if pos != 16 {
cnode, err := t.resolve(n.Children[pos], prefix)
if err != nil {
return false, nil, err
}
if cnode, ok := cnode.(*shortNode); ok {
k := append([]byte{byte(pos)}, cnode.Key...)
return true, &shortNode{k, cnode.Val, t.newFlag()}, nil
}
}
return true, &shortNode{[]byte{byte(pos)}, n.Children[pos], t.newFlag()}, nil
}
return true, n, nil

case valueNode:
return true, nil, nil

case nil:
return false, nil, nil

case hashNode:
rn, err := t.resolveHash(n, prefix)
if err != nil {
return false, nil, err
}
dirty, nn, err := t.delete(rn, prefix, key)
if !dirty || err != nil {
return false, rn, err
}
return true, nn, nil

default:
panic(fmt.Sprintf("%T: invalid node: %v (%v)", n, n, key))
}
}

删除的真正逻辑在 delete(n node, prefix, key []byte) (bool, node, error) 这个方法里,其中参数 key 是通过 keybytesToHex 获得的 hex 编码。删除的过程也是递归调用,其中 node,prefix,key 参数意义与插入时的参数一致。

对于 shortNode 情况,首先进行 key 的匹配,如果匹配的长度小于根节点的 key 的长度,意味着要删除的节点不在这颗树上,返回 false 以及根节点,不做任何操作;如果匹配的长度正好等于 key 的长度,意味着完全匹配,返回 true 以及 nil(表明该节点已经被删除)。

如果匹配的长度等于根节点的 key 的长度,并且小于参数 key 的长度,那么意味着要删除的节点是该树的子节点,需要删除的是根树的一颗子树,那么继续递归调用。

对于删除 fullNode 的情况,如果 fullNode 删除后有两个及以上的子节点,删除后返回根节点即可,如果删除后只有一个子节点,那么需要将该根节点变成 shortNode 返回。

对于删除 hashNode 的情况,先加载节点到内存中再递归调用删除操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func (t *Trie) Update(key, value []byte) {
if err := t.TryUpdate(key, value); err != nil {
log.Error(fmt.Sprintf("Unhandled trie error: %v", err))
}
}

func (t *Trie) TryUpdate(key, value []byte) error {
k := keybytesToHex(key)
if len(value) != 0 {
_, n, err := t.insert(t.root, nil, k, valueNode(value))
if err != nil {
return err
}
t.root = n
} else {
_, n, err := t.delete(t.root, nil, k)
if err != nil {
return err
}
t.root = n
}
return nil
}

更新的操作,如果 value 长度不为零,即 value 不为空的话,直接调用插入操作,如果为空,则删除根节点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
func (t *Trie) Get(key []byte) []byte {
res, err := t.TryGet(key)
if err != nil {
log.Error(fmt.Sprintf("Unhandled trie error: %v", err))
}
return res
}

func (t *Trie) TryGet(key []byte) ([]byte, error) {
key = keybytesToHex(key)
value, newroot, didResolve, err := t.tryGet(t.root, key, 0)
if err == nil && didResolve {
t.root = newroot
}
return value, err
}

func (t *Trie) tryGet(origNode node, key []byte, pos int) (value []byte, newnode node, didResolve bool, err error) {
switch n := (origNode).(type) {
case nil:
return nil, nil, false, nil
case valueNode:
return n, n, false, nil
case *shortNode:
if len(key)-pos < len(n.Key) || !bytes.Equal(n.Key, key[pos:pos+len(n.Key)]) {
return nil, n, false, nil
}
value, newnode, didResolve, err = t.tryGet(n.Val, key, pos+len(n.Key))
if err == nil && didResolve {
n = n.copy()
n.Val = newnode
n.flags.gen = t.cachegen
}
return value, n, didResolve, err
case *fullNode:
value, newnode, didResolve, err = t.tryGet(n.Children[key[pos]], key, pos+1)
if err == nil && didResolve {
n = n.copy()
n.flags.gen = t.cachegen
n.Children[key[pos]] = newnode
}
return value, n, didResolve, err
case hashNode:
child, err := t.resolveHash(n, key[:pos])
if err != nil {
return nil, n, true, err
}
value, newnode, _, err := t.tryGet(child, key, pos)
return value, newnode, true, err
default:
panic(fmt.Sprintf("%T: invalid node: %v", origNode, origNode))
}
}

查询的操作,实际上最终查的还是 valueNode,根据 key,根据 pos 指示的 key 位置跟根节点的 Key 属性值进行匹配,递归调用即可,与上述的操作很类似。

提交

提交阶段会将内存中的所有 trie 写入到数据库中。在这期间会根据 dirty 值做一件非常重要的事情,如果 dirty 置为 true 了,表明其代表的父节点有改动需要提交,hashNode 的成员 hash 设为空,需重新进行计算得到 hashNode 的哈希值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
func (t *Trie) Commit(onleaf LeafCallback) (root common.Hash, err error) {
if t.db == nil {
panic("commit called on trie with nil database")
}
hash, cached, err := t.hashRoot(t.db, onleaf)
if err != nil {
return common.Hash{}, err
}
t.root = cached
t.cachegen++
return common.BytesToHash(hash.(hashNode)), nil
}

func (t *Trie) hashRoot(db *Database, onleaf LeafCallback) (node, node, error) {
if t.root == nil {
return hashNode(emptyRoot.Bytes()), nil, nil
}
h := newHasher(t.cachegen, t.cachelimit, onleaf)
defer returnHasherToPool(h)
return h.hash(t.root, db, true)
}

func (h *hasher) hash(n node, db *Database, force bool) (node, node, error) {
if hash, dirty := n.cache(); hash != nil {
if db == nil {
return hash, n, nil
}
if n.canUnload(h.cachegen, h.cachelimit) {
cacheUnloadCounter.Inc(1)
return hash, hash, nil
}
if !dirty {
return hash, n, nil
}
}
collapsed, cached, err := h.hashChildren(n, db)
if err != nil {
return hashNode{}, n, err
}
hashed, err := h.store(collapsed, db, force)
if err != nil {
return hashNode{}, n, err
}
cachedHash, _ := hashed.(hashNode)
switch cn := cached.(type) {
case *shortNode:
cn.flags.hash = cachedHash
if db != nil {
cn.flags.dirty = false
}
case *fullNode:
cn.flags.hash = cachedHash
if db != nil {
cn.flags.dirty = false
}
}
return hashed, cached, nil
}

func (h *hasher) hashChildren(original node, db *Database) (node, node, error) {
var err error

switch n := original.(type) {
case *shortNode:
collapsed, cached := n.copy(), n.copy()
collapsed.Key = hexToCompact(n.Key)
cached.Key = common.CopyBytes(n.Key)

if _, ok := n.Val.(valueNode); !ok {
collapsed.Val, cached.Val, err = h.hash(n.Val, db, false)
if err != nil {
return original, original, err
}
}
return collapsed, cached, nil

case *fullNode:
collapsed, cached := n.copy(), n.copy()

for i := 0; i < 16; i++ {
if n.Children[i] != nil {
collapsed.Children[i], cached.Children[i], err = h.hash(n.Children[i], db, false)
if err != nil {
return original, original, err
}
}
}
cached.Children[16] = n.Children[16]
return collapsed, cached, nil

default:
return n, original, nil
}
}

提交 MPT 时,会调用 Trie.hashRoot 从根节点开始折叠,而 hashRoot 会调用 trie/hasher.go 里的 hash 方法,hash 方法和 hashChildren 方法会递归调用遍历整个 MPT,大致逻辑是:如果 node 没有子节点,直接返回;如果 node 是 shortNode,将 valueNode 作为参数调用 hash 方法;如果 node 是 fullNode,对其每个子节点调用 hash 方法。最终将这个 node 作为参数调用 store 方法,获得 RLP 编码,并进行哈希计算,获取哈希值,也就是 MPT 根节点的哈希值。

Trie 的序列化和反序列化

上一篇文章 go-ethereum 源码笔记(rlp 模块-序列化与反序列化) 介绍了 geth 里是怎么进行序列化和反序列化的,而通过上文我们知道了 MPT 有一个折叠和动态加载节点的过程,这个过程需要通过 rlp 模块进行序列化和反序列化,与 LevelDB 进行交互,我们来具体看看这个过程是怎么实现的。

store
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
func (h *hasher) store(n node, db *Database, force bool) (node, error) {
if _, isHash := n.(hashNode); n == nil || isHash {
return n, nil
}
h.tmp.Reset()
if err := rlp.Encode(&h.tmp, n); err != nil {
panic("encode error: " + err.Error())
}
if len(h.tmp) < 32 && !force {
return n, nil
}
hash, _ := n.cache()
if hash == nil {
hash = h.makeHashNode(h.tmp)
}

if db != nil {
hash := common.BytesToHash(hash)

db.lock.Lock()
db.insert(hash, h.tmp, n)
db.lock.Unlock()

if h.onleaf != nil {
switch n := n.(type) {
case *shortNode:
if child, ok := n.Val.(valueNode); ok {
h.onleaf(child, hash)
}
case *fullNode:
for i := 0; i < 16; i++ {
if child, ok := n.Children[i].(valueNode); ok {
h.onleaf(child, hash)
}
}
}
}
}
return hash, nil
}

前面有提到 store 方法,它通过调用 rlp.Encode 对节点进行 RLP 编码,然后计算哈希值,通过 db.insert 将其插入数据库,需注意的是分别对 shortNode,fullNode 调用的 onleaf方法,它用来存储外部的 MPT,如果没有猜错的话,它应该是用来存储账户相关内容的,待验证。

resolve
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
func (t *Trie) resolveHash(n hashNode, prefix []byte) (node, error) {
cacheMissCounter.Inc(1)

hash := common.BytesToHash(n)
if node := t.db.node(hash, t.cachegen); node != nil {
return node, nil
}
return nil, &MissingNodeError{NodeHash: hash, Path: prefix}
}

func (db *Database) node(hash common.Hash, cachegen uint16) node {
db.lock.RLock()
node := db.nodes[hash]
db.lock.RUnlock()

if node != nil {
return node.obj(hash, cachegen)
}
enc, err := db.diskdb.Get(hash[:])
if err != nil || enc == nil {
return nil
}
return mustDecodeNode(hash[:], enc, cachegen)
}

func mustDecodeNode(hash, buf []byte, cachegen uint16) node {
n, err := decodeNode(hash, buf, cachegen)
if err != nil {
panic(fmt.Sprintf("node %x: %v", hash, err))
}
return n
}

func decodeNode(hash, buf []byte, cachegen uint16) (node, error) {
if len(buf) == 0 {
return nil, io.ErrUnexpectedEOF
}
elems, _, err := rlp.SplitList(buf)
if err != nil {
return nil, fmt.Errorf("decode error: %v", err)
}
switch c, _ := rlp.CountValues(elems); c {
case 2:
n, err := decodeShort(hash, elems, cachegen)
return n, wrapError(err, "short")
case 17:
n, err := decodeFull(hash, elems, cachegen)
return n, wrapError(err, "full")
default:
return nil, fmt.Errorf("invalid number of list elements: %v", c)
}
}

前面提到,在遍历的过程中,如果遇到了 hashNode,需要动态加载这个节点,这个方法就是 resolve。resolve 方法会调用 trie/database.gonode 方法,先从内存里拿,否则从硬盘里拿,接着调用 trie/node.gomustDecodeNodemustDecodeNodedecodeNode 的简单封装,在这里可以看到调用 rlp 模块进行反序列化的过程,根据 RLP 的 list 的长度来判断这个编码是什么节点,如果是2那么就是 shortNode,如果是17就是 fullNode,根据节点的不同调用相应的 decode 方法。

默克尔证明

前面已经介绍了默克尔证明的原理,而且我们现在也有了 MPT 的基础,这里我们直接看默克尔证明的代码。默克尔证明的相关逻辑主要在 trie/proof.go 里。

proof
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
func (t *Trie) Prove(key []byte, fromLevel uint, proofDb ethdb.Putter) error {
key = keybytesToHex(key)
nodes := []node{}
tn := t.root
for len(key) > 0 && tn != nil {
switch n := tn.(type) {
case *shortNode:
if len(key) < len(n.Key) || !bytes.Equal(n.Key, key[:len(n.Key)]) {
tn = nil
} else {
tn = n.Val
key = key[len(n.Key):]
}
nodes = append(nodes, n)
case *fullNode:
tn = n.Children[key[0]]
key = key[1:]
nodes = append(nodes, n)
case hashNode:
var err error
tn, err = t.resolveHash(n, nil)
if err != nil {
log.Error(fmt.Sprintf("Unhandled trie error: %v", err))
return err
}
default:
panic(fmt.Sprintf("%T: invalid node: %v", tn, tn))
}
}
hasher := newHasher(0, 0, nil)
for i, n := range nodes {
n, _, _ = hasher.hashChildren(n, nil)
hn, _ := hasher.store(n, nil, false)
if hash, ok := hn.(hashNode); ok || i == 0 {
if fromLevel > 0 {
fromLevel--
} else {
enc, _ := rlp.EncodeToBytes(n)
if !ok {
hash = crypto.Keccak256(enc)
}
proofDb.Put(hash, enc)
}
}
}
return nil
}

Prove 方法用来获取指定 key 的默克尔证明。这个方法会遍历整个 MPT,获取从根节点到叶子节点这条路径上的所有节点的 hash 值列表。对于 key 不匹配的情况,则会返回一个最长匹配的列表。

VerifyProof
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
func VerifyProof(rootHash common.Hash, key []byte, proofDb DatabaseReader) (value []byte, nodes int, err error) {
key = keybytesToHex(key)
wantHash := rootHash
for i := 0; ; i++ {
buf, _ := proofDb.Get(wantHash[:])
if buf == nil {
return nil, i, fmt.Errorf("proof node %d (hash %064x) missing", i, wantHash)
}
n, err := decodeNode(wantHash[:], buf, 0)
if err != nil {
return nil, i, fmt.Errorf("bad proof node %d: %v", i, err)
}
keyrest, cld := get(n, key)
switch cld := cld.(type) {
case nil:
return nil, i, nil
case hashNode:
key = keyrest
copy(wantHash[:], cld)
case valueNode:
return cld, i + 1, nil
}
}
}

func get(tn node, key []byte) ([]byte, node) {
for {
switch n := tn.(type) {
case *shortNode:
if len(key) < len(n.Key) || !bytes.Equal(n.Key, key[:len(n.Key)]) {
return nil, nil
}
tn = n.Val
key = key[len(n.Key):]
case *fullNode:
tn = n.Children[key[0]]
key = key[1:]
case hashNode:
return key, n
case nil:
return key, nil
case valueNode:
return nil, n
default:
panic(fmt.Sprintf("%T: invalid node: %v", tn, tn))
}
}
}

VerifyProof 方法以 roothash,key,proof 证明为参数,验证 key 是否存在于 MPT 的某条路径里,如果 key 确实存在,返回这个节点的索引 i,err 字段是 nil,否则返回 error。

MPT 安全性

哈希计算

MPT 存储的 key-value 值没有长度限制,虽然说使用的是 Patricia Trie,仍然会有整棵树深度越来越深的问题,这可能会导致查询节点需要多次 IO,影响效率;导致 Dos 攻击。
以太坊中里,与 MPT 交互时,key 还会进行一次 sha3 的哈希计算,这样一来,key 的长度是固定的32字节,可以避免树里出现很深的路径,同时也实现了 key 的加密存储。

这部分代码在 trie/secure_trie.gotrie/hasher.gocrypto/sha3 里面,没有什么特别的,基本上就是对增删改查的操作做了一层加 keccak256 算法的封装。

编码

前文介绍过分支节点,以太坊想要在效率和存储空间上达到一个平衡,所以 fullNode 只有17个孩子节点,这个值是与16进制有关的,MPT 的 key 值实际上有三种编码方法,Raw 编码,Hex 编码,HP 编码,Raw 编码即原始的字节数组,这种方式的问题是它的一个字节的范围很大,fullNode 的子树这么多会影响检索效率,当树节点需要存储到数据库时,会根据16进制来进行编码,Hex 编码和 HP 编码没本质区别,可以理解为 Hex 编码是存在于内存的中间形式,在以太坊的黄皮书了介绍的是 Hex Prefix Encoding,即 HP 编码。我们一一来看。

Raw 编码(keybytes encoding)

原生的 key 字节数组,不做修改,这种方式是 MPT 对外提供 API 的默认方式,如果数据需要插入到树里,Raw 编码需要转换为 Hex 编码。

Hex 编码(hex encoding)

Hex 编码用于对内存里的树节点 key 进行编码,当树节点需要持久化到数据库时,Hex 编码被转换为 HP 编码。

具体来说,这个编码的每个字节包含 key 的半个字节,尾部加一个 byte 的『终结符』16,表示这是 hex 格式。节点被加载到内存时 key 使用的是这种编码,因为方便访问。

1
2
3
4
5
6
7
8
9
10
func keybytesToHex(str []byte) []byte {
l := len(str)*2 + 1
var nibbles = make([]byte, l)
for i, b := range str {
nibbles[i*2] = b / 16
nibbles[i*2+1] = b % 16
}
nibbles[l-1] = 16
return nibbles
}

以上是 Raw 编码转换为 Hex 编码的代码。

HP 编码(compact encoding)

全称是 Hex Prefix 编码,hex 编码解决了 key 是 keybytes 形式的数据插入 MPT 的问题,但这种方式有数据冗余的问题,对于 shortNode,目前 hex 格式下的 key,长度会是原来 keybytes 格式下的两倍,这一点对于节点的哈希计算影响很大,compact 编码用于对 hex 格式进行优化。compact encoding 的主要思路是将 Hex 格式字符串先恢复到 keybytes 格式,同时加入当前编码的标记位,表示奇偶不同长度的 hex 格式。

具体来说,compact 编码首先会将 hex 尾部标记的 byte 去掉,然后将原来 hex 编码的包含的 key 的半个字节(称为 nibble)一一合并为1 byte,最后如果 hex 格式编码有效长度为奇数,在头部增加 0011xxxx,其中 xxxx 为第一个 nibble,否则在头部增加 00100000。节点在写入数据库时使用的是 compact 编码,因为可以节约磁盘。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func hexToCompact(hex []byte) []byte {
terminator := byte(0)
if hasTerm(hex) {
terminator = 1
hex = hex[:len(hex)-1]
}
buf := make([]byte, len(hex)/2+1)
buf[0] = terminator << 5
if len(hex)&1 == 1 {
buf[0] |= 1 << 4
buf[0] |= hex[0]
hex = hex[1:]
}
decodeNibbles(hex, buf[1:])
return buf
}

如果该节点有 value,则 teerminator 为1,其中 buf[0] = terminator << 5 表示,如果是叶子节点,buf[0] 为 00100000,否则为 00000000,接下来的 if len(hex)&1 == 1 if 判断表示 hex 长度为奇数的情况,这时将 buf[0] 赋值为 0011xxxx,其中 xxxx 为 hex[0] 的值,即第一个 nibble 的值,最后调用 decodeNibbles,将两个两个的 nibble 字节合并为一个字节。以上就是 HP 编码的整个过程。

Reference