go-ethereum 源码笔记(core 模块-区块链操作)

区块链区块链,即区块组成的链,不妨先从区块谈起。这一篇我们将着眼于区块链的一些基本操作。在区块链中,区块存储有效信息,在阅读源代码之前,我们应该对区块头,区块体,区块链这些基本的数据结构有所了解。

数据结构

Block, Header, BlockChain 的数据结构请查阅 go-ethereum 源码笔记(概览)

区块链基本操作

创世区块

go-ethereum 源码笔记(cmd 模块-geth 命令) 这一篇,我们提到有一个 geth init 命令,它可以用来创建创世区块。如果我们将本地的 geth 节点连接测试网络或主网的话,我们不会再进行创世区块的创建,因为区块链已经存在了,这时候应该是从其他节点进行同步。而如果我们需要运行一个私有链的话,这时候就需要一个创建一个创世区块。这部分代码在 core/genesis.go 中。

数据结构

genesis.go 会定义创世区块的数据结构,提供创建,查询创世区块的方法。

首先看 Genesis 结构体,它定义了创世区块应包含的数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
type Genesis struct {
Config *params.ChainConfig `json:"config"`
Nonce uint64 `json:"nonce"`
Timestamp uint64 `json:"timestamp"`
ExtraData []byte `json:"extraData"`
GasLimit uint64 `json:"gasLimit" gencodec:"required"`
Difficulty *big.Int `json:"difficulty" gencodec:"required"`
Mixhash common.Hash `json:"mixHash"`
Coinbase common.Address `json:"coinbase"`
Alloc GenesisAlloc `json:"alloc" gencodec:"required"`

Number uint64 `json:"number"`
GasUsed uint64 `json:"gasUsed"`
ParentHash common.Hash `json:"parentHash"`
}

伴随创世区块的还有创世账户。

1
2
3
4
5
6
7
8
9
type GenesisAlloc map[common.Address]GenesisAccount

type GenesisAccount struct {
Code []byte `json:"code,omitempty"`
Storage map[common.Hash]common.Hash `json:"storage,omitempty"`
Balance *big.Int `json:"balance" gencodec:"required"`
Nonce uint64 `json:"nonce,omitempty"`
PrivateKey []byte `json:"secretKey,omitempty"`
}

创建创世区块

SetupGenesisBlock 函数用来在数据库中写入创世区块。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
func SetupGenesisBlock(db ethdb.Database, genesis *Genesis) (*params.ChainConfig, common.Hash, error) {
if genesis != nil && genesis.Config == nil {
return params.AllEthashProtocolChanges, common.Hash{}, errGenesisNoConfig
}

stored := rawdb.ReadCanonicalHash(db, 0)
if (stored == common.Hash{}) {
if genesis == nil {
log.Info("Writing default main-net genesis block")
genesis = DefaultGenesisBlock()
} else {
log.Info("Writing custom genesis block")
}
block, err := genesis.Commit(db)
return genesis.Config, block.Hash(), err
}

if genesis != nil {
hash := genesis.ToBlock(nil).Hash()
if hash != stored {
return genesis.Config, hash, &GenesisMismatchError{stored, hash}
}
}

newcfg := genesis.configOrDefault(stored)
storedcfg := rawdb.ReadChainConfig(db, stored)
if storedcfg == nil {
log.Warn("Found genesis block without chain config")
rawdb.WriteChainConfig(db, stored, newcfg)
return newcfg, stored, nil
}

if genesis == nil && stored != params.MainnetGenesisHash {
return storedcfg, stored, nil
}

height := rawdb.ReadHeaderNumber(db, rawdb.ReadHeadHeaderHash(db))
if height == nil {
return newcfg, stored, fmt.Errorf("missing block number for head header hash")
}
compatErr := storedcfg.CheckCompatible(newcfg, *height)
if compatErr != nil && *height != 0 && compatErr.RewindTo != 0 {
return newcfg, stored, compatErr
}
rawdb.WriteChainConfig(db, stored, newcfg)
return newcfg, stored, nil
}

SetupGenesisBlock 会根据创世区块返回一个区块链的配置。从 db 参数中拿到的区块里如果没有创世区块的话,首先提交一个新区块。接着通过调用 genesis.configOrDefault(stored) 拿到当前链的配置,测试兼容性后将配置写回 DB 中。最后返回区块链的配置信息。

Genesis 有一个 ToBlock 方法,它会根据 Genesis 的数据,使用基于内存的数据库,创建一个区块并返回(通过 types.NewBlock)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
func (g *Genesis) ToBlock(db ethdb.Database) *types.Block {
if db == nil {
db, _ = ethdb.NewMemDatabase()
}
statedb, _ := state.New(common.Hash{}, state.NewDatabase(db))
for addr, account := range g.Alloc {
statedb.AddBalance(addr, account.Balance)
statedb.SetCode(addr, account.Code)
statedb.SetNonce(addr, account.Nonce)
for key, value := range account.Storage {
statedb.SetState(addr, key, value)
}
}
root := statedb.IntermediateRoot(false)
head := &types.Header{
Number: new(big.Int).SetUint64(g.Number),
Nonce: types.EncodeNonce(g.Nonce),
Time: new(big.Int).SetUint64(g.Timestamp),
ParentHash: g.ParentHash,
Extra: g.ExtraData,
GasLimit: g.GasLimit,
GasUsed: g.GasUsed,
Difficulty: g.Difficulty,
MixDigest: g.Mixhash,
Coinbase: g.Coinbase,
Root: root,
}
if g.GasLimit == 0 {
head.GasLimit = params.GenesisGasLimit
}
if g.Difficulty == nil {
head.Difficulty = params.GenesisDifficulty
}
statedb.Commit(false)
statedb.Database().TrieDB().Commit(root, true)

return types.NewBlock(head, nil, nil, nil)
}

Commit 方法将给定的 genesis 的区块和 state 写入数据库。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func (g *Genesis) Commit(db ethdb.Database) (*types.Block, error) {
block := g.ToBlock(db)
if block.Number().Sign() != 0 {
return nil, fmt.Errorf("can't commit genesis block with number > 0")
}
if err := WriteTd(db, block.Hash(), block.NumberU64(), g.Difficulty); err != nil {
return nil, err
}
if err := WriteBlock(db, block); err != nil {
return nil, err
}
if err := WriteBlockReceipts(db, block.Hash(), block.NumberU64(), nil); err != nil {
return nil, err
}
if err := WriteCanonicalHash(db, block.Hash(), block.NumberU64()); err != nil {
return nil, err
}
if err := WriteHeadBlockHash(db, block.Hash()); err != nil {
return nil, err
}
if err := WriteHeadHeaderHash(db, block.Hash()); err != nil {
return nil, err
}
config := g.Config
if config == nil {
config = params.AllEthashProtocolChanges
}
return block, WriteChainConfig(db, block.Hash(), config)
}

使用 NewBlockChain 初始化区块链

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *params.ChainConfig, engine consensus.Engine, vmConfig vm.Config) (*BlockChain, error) {
if cacheConfig == nil {
cacheConfig = &CacheConfig{
TrieNodeLimit: 256 * 1024 * 1024,
TrieTimeLimit: 5 * time.Minute,
}
}
bodyCache, _ := lru.New(bodyCacheLimit)
bodyRLPCache, _ := lru.New(bodyCacheLimit)
blockCache, _ := lru.New(blockCacheLimit)
futureBlocks, _ := lru.New(maxFutureBlocks)
badBlocks, _ := lru.New(badBlockLimit)

bc := &BlockChain{
chainConfig: chainConfig,
cacheConfig: cacheConfig,
db: db,
triegc: prque.New(),
stateCache: state.NewDatabase(db),
quit: make(chan struct{}),
bodyCache: bodyCache,
bodyRLPCache: bodyRLPCache,
blockCache: blockCache,
futureBlocks: futureBlocks,
engine: engine,
vmConfig: vmConfig,
badBlocks: badBlocks,
}
bc.SetValidator(NewBlockValidator(chainConfig, bc, engine))
bc.SetProcessor(NewStateProcessor(chainConfig, bc, engine))

var err error
bc.hc, err = NewHeaderChain(db, chainConfig, engine, bc.getProcInterrupt)
if err != nil {
return nil, err
}
bc.genesisBlock = bc.GetBlockByNumber(0)
if bc.genesisBlock == nil {
return nil, ErrNoGenesis
}
if err := bc.loadLastState(); err != nil {
return nil, err
}
for hash := range BadHashes {
if header := bc.GetHeaderByHash(hash); header != nil {
headerByNumber := bc.GetHeaderByNumber(header.Number.Uint64())
if headerByNumber != nil && headerByNumber.Hash() == header.Hash() {
log.Error("Found bad hash, rewinding chain", "number", header.Number, "hash", header.ParentHash)
bc.SetHead(header.Number.Uint64() - 1)
log.Error("Chain rewind was successful, resuming normal operation")
}
}
}
go bc.update()
return bc, nil
}

BlockChain 的初始化需要 ethdb.Database, *CacheConfig, params.ChainConfigconsensus.Enginevm.Config 参数。它们分别表示 db 对象;缓存配置(在 core/blockchain.go 中定义);区块链配置(可通过 core/genesis.go 中的 SetupGenesisBlock 拿到);一致性引擎(可通过 core/blockchain.go 中的 CreateConsensusEngine 得到);虚拟机配置(通过 core/vm 定义)这些实参需要提前定义,以 eth 的 backend.go 为例,你可以在初始化 Ethereum 对象时看到这些参数是怎么初始化的,当然你也可以查看对应的测试代码学习 NewBlockChain 如何使用。

回到 NewBlockChain 的具体代码,首先判断是否有默认 cacheConfig,如果没有根据默认配置创建 cacheConfig,再通过 hashicorp 公司的 lru 模块创建 bodyCache, bodyRLPCache 等缓存对象(lru 是 last recently used 的缩写,常见数据结构,不了解的朋友请自行查阅相关资料),根据这些信息创建 BlockChain 对象,然后通过调用 BlockChainSetValidatorSetProcessor 方法创建验证器和处理器,接下来通过 NewHeaderChain 获得区块头,尝试判断创始区块是否存在,bc.loadLastState() 加载区块最新状态,最后检查当前状态,确保本地运行的区块链上没有非法的区块。接下来我们深入到 loadLastState 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
func (bc *BlockChain) loadLastState() error {
head := GetHeadBlockHash(bc.db)
if head == (common.Hash{}) {
log.Warn("Empty database, resetting chain")
return bc.Reset()
}
currentBlock := bc.GetBlockByHash(head)
if currentBlock == nil {
log.Warn("Head block missing, resetting chain", "hash", head)
return bc.Reset()
}
if _, err := state.New(currentBlock.Root(), bc.stateCache); err != nil {
log.Warn("Head state missing, repairing chain", "number", currentBlock.Number(), "hash", currentBlock.Hash())
if err := bc.repair(&currentBlock); err != nil {
return err
}
}
bc.currentBlock.Store(currentBlock)

currentHeader := currentBlock.Header()
if head := GetHeadHeaderHash(bc.db); head != (common.Hash{}) {
if header := bc.GetHeaderByHash(head); header != nil {
currentHeader = header
}
}
bc.hc.SetCurrentHeader(currentHeader)

bc.currentFastBlock.Store(currentBlock)
if head := GetHeadFastBlockHash(bc.db); head != (common.Hash{}) {
if block := bc.GetBlockByHash(head); block != nil {
bc.currentFastBlock.Store(block)
}
}

// Issue a status log for the user
// ...

return nil
}

loadLastState 会从数据库中加载区块链状态,首先通过 GetHeadBlockHash 从数据库中取得当前区块头,如果当前区块不存在,即数据库为空的话,通过 Reset 将创始区块写入数据库以达到重置目的。如果当前区块不存在,同样通过 Reset 重置。接下来确认当前区块的世界状态是否正确,世界状态这是一个稍特别的概念,这个过程我们将在之后的文章中描述。如果有问题,则通过 repair 进行修复,repair 中是一个死循环,它会一直回溯当前区块,直到找到对应的世界状态。然后通过 bc.hc.SetCurrentHeader 设置当前区块头,并恢复快速同步区块。

NewBlockChain 调用 loadLastState 之后,会判断是否需要硬分叉,BadHashes 是手工配置的区块 hash 值,根据这些值我们可以决定是否以及如何进行硬分叉。最后以 goroutine 的方式调用 bc.update()

1
2
3
4
5
6
7
8
9
10
11
func (bc *BlockChain) update() {
futureTimer := time.Tick(5 * time.Second)
for {
select {
case <-futureTimer:
bc.procFutureBlocks()
case <-bc.quit:
return
}
}
}

update() 的作用是定时处理 Future 区块,简单地来说就是定时调用 procFutureBlocks

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func (bc *BlockChain) procFutureBlocks() {
blocks := make([]*types.Block, 0, bc.futureBlocks.Len())
for _, hash := range bc.futureBlocks.Keys() {
if block, exist := bc.futureBlocks.Peek(hash); exist {
blocks = append(blocks, block.(*types.Block))
}
}
if len(blocks) > 0 {
types.BlockBy(types.Number).Sort(blocks)
for i := range blocks {
bc.InsertChain(blocks[i : i+1])
}
}
}

procFutureBlocks 可以从 futureBlocks 拿到需要插入的区块,最终会调用 InsertChain 将区块插入到区块链中。

插入区块

1
2
3
4
5
func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) {
n, events, logs, err := bc.insertChain(chain)
bc.PostChainEvents(events, logs)
return n, err
}

InsertChain 将尝试将给定的区块插入到规范的区块链中,或者创建一个分支,插入后,会通过 PostChainEvents 触发所有事件。下面我们看看 insertChain 的实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
func (bc *BlockChain) insertChain(chain types.Blocks) (int, []interface{}, []*types.Log, error) {
for i := 1; i < len(chain); i++ {
if chain[i].NumberU64() != chain[i-1].NumberU64()+1 || chain[i].ParentHash() != chain[i-1].Hash() {
log.Error("Non contiguous block insert", "number", chain[i].Number(), "hash", chain[i].Hash(),
"parent", chain[i].ParentHash(), "prevnumber", chain[i-1].Number(), "prevhash", chain[i-1].Hash())

return 0, nil, nil, fmt.Errorf("non contiguous insert: item %d is #%d [%x…], item %d is #%d [%x…] (parent [%x…])", i-1, chain[i-1].NumberU64(),
chain[i-1].Hash().Bytes()[:4], i, chain[i].NumberU64(), chain[i].Hash().Bytes()[:4], chain[i].ParentHash().Bytes()[:4])
}
}
//...
var (
stats = insertStats{startTime: mclock.Now()}
events = make([]interface{}, 0, len(chain))
lastCanon *types.Block
coalescedLogs []*types.Log
)
headers := make([]*types.Header, len(chain))
seals := make([]bool, len(chain))

for i, block := range chain {
headers[i] = block.Header()
seals[i] = true
}
abort, results := bc.engine.VerifyHeaders(bc, headers, seals)
defer close(abort)

for i, block := range chain {
// ...
// Wait for the block's verification to complete ...
bstart := time.Now()

err := <-results
if err == nil {
err = bc.Validator().ValidateBody(block)
}
switch {
case err == ErrKnownBlock:
if bc.CurrentBlock().NumberU64() >= block.NumberU64() {
stats.ignored++
continue
}

case err == consensus.ErrFutureBlock:
max := big.NewInt(time.Now().Unix() + maxTimeFutureBlocks)
if block.Time().Cmp(max) > 0 {
return i, events, coalescedLogs, fmt.Errorf("future block: %v > %v", block.Time(), max)
}
bc.futureBlocks.Add(block.Hash(), block)
stats.queued++
continue

case err == consensus.ErrUnknownAncestor && bc.futureBlocks.Contains(block.ParentHash()):
bc.futureBlocks.Add(block.Hash(), block)
stats.queued++
continue

case err == consensus.ErrPrunedAncestor:
currentBlock := bc.CurrentBlock()
localTd := bc.GetTd(currentBlock.Hash(), currentBlock.NumberU64())
externTd := new(big.Int).Add(bc.GetTd(block.ParentHash(), block.NumberU64()-1), block.Difficulty())
if localTd.Cmp(externTd) > 0 {
if err = bc.WriteBlockWithoutState(block, externTd); err != nil {
return i, events, coalescedLogs, err
}
continue
}
var winner []*types.Block

parent := bc.GetBlock(block.ParentHash(), block.NumberU64()-1)
for !bc.HasState(parent.Root()) {
winner = append(winner, parent)
parent = bc.GetBlock(parent.ParentHash(), parent.NumberU64()-1)
}
for j := 0; j < len(winner)/2; j++ {
winner[j], winner[len(winner)-1-j] = winner[len(winner)-1-j], winner[j]
}
bc.chainmu.Unlock()
_, evs, logs, err := bc.insertChain(winner)
bc.chainmu.Lock()
events, coalescedLogs = evs, logs

if err != nil {
return i, events, coalescedLogs, err
}

case err != nil:
bc.reportBlock(block, nil, err)
return i, events, coalescedLogs, err
}
var parent *types.Block
if i == 0 {
parent = bc.GetBlock(block.ParentHash(), block.NumberU64()-1)
} else {
parent = chain[i-1]
}
state, err := state.New(parent.Root(), bc.stateCache)
if err != nil {
return i, events, coalescedLogs, err
}
receipts, logs, usedGas, err := bc.processor.Process(block, state, bc.vmConfig)
if err != nil {
bc.reportBlock(block, receipts, err)
return i, events, coalescedLogs, err
}
err = bc.Validator().ValidateState(block, parent, state, receipts, usedGas)
if err != nil {
bc.reportBlock(block, receipts, err)
return i, events, coalescedLogs, err
}
proctime := time.Since(bstart)

status, err := bc.WriteBlockWithState(block, receipts, state)
if err != nil {
return i, events, coalescedLogs, err
}
switch status {
case CanonStatTy:
log.Debug("Inserted new block", "number", block.Number(), "hash", block.Hash(), "uncles", len(block.Uncles()),
"txs", len(block.Transactions()), "gas", block.GasUsed(), "elapsed", common.PrettyDuration(time.Since(bstart)))

coalescedLogs = append(coalescedLogs, logs...)
blockInsertTimer.UpdateSince(bstart)
events = append(events, ChainEvent{block, block.Hash(), logs})
lastCanon = block

bc.gcproc += proctime

case SideStatTy:
log.Debug("Inserted forked block", "number", block.Number(), "hash", block.Hash(), "diff", block.Difficulty(), "elapsed",
common.PrettyDuration(time.Since(bstart)), "txs", len(block.Transactions()), "gas", block.GasUsed(), "uncles", len(block.Uncles()))

blockInsertTimer.UpdateSince(bstart)
events = append(events, ChainSideEvent{block})
}
stats.processed++
stats.usedGas += usedGas
stats.report(chain, i, bc.stateCache.TrieDB().Size())
}
if lastCanon != nil && bc.CurrentBlock().Hash() == lastCanon.Hash() {
events = append(events, ChainHeadEvent{lastCanon})
}
return 0, events, coalescedLogs, nil
}

首先做一个健康检查,确保要插入的链是有序且相互连接的。接下来通过 bc.engine.VerifyHeaders 调用一致性引擎来验证区块头是有效的。进入 for i, block := range chain 循环后,接收 results 这个 chan,可以获得一致性引擎获得区块头的结果,如果是已经插入的区块,跳过;如果是未来的区块,时间距离不是很长,加入到 futureBlocks 中,否则返回一条错误信息;如果没能找到该区块祖先,但在 futureBlocks 能找到,也加入到 futureBlocks 中。

加入 futureBlocks 的过程结束后,通过 core/state_processor.go 中的 Process 改变世界状态(关于世界状态的管理,可以阅读后续的文章 go-ethereum 源码笔记(core 模块-状态管理))。在返回收据,日志,使用的 Gas 后。通过 bc.Validator().ValidateState 再次验证,通过后,通过 WriteBlockAndState 写入区块以及相关状态到区块链,WriteBlockAndState 我们接下来会详谈。最后,如果我们生成了一个新的区块头,最新的区块头等于 lastCanon 的哈希值,发布一个 ChainHeadEvent 的事件。

现在我们来看看 WriteBlockAndState 是如何写入区块及相关状态到区块链的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types.Receipt, state *state.StateDB) (status WriteStatus, err error) {
ptd := bc.GetTd(block.ParentHash(), block.NumberU64()-1)
if ptd == nil {
return NonStatTy, consensus.ErrUnknownAncestor
}
// ...
currentBlock := bc.CurrentBlock()
localTd := bc.GetTd(currentBlock.Hash(), currentBlock.NumberU64())
externTd := new(big.Int).Add(block.Difficulty(), ptd)
if err := bc.hc.WriteTd(block.Hash(), block.NumberU64(), externTd); err != nil {
return NonStatTy, err
}
batch := bc.db.NewBatch()
if err := WriteBlock(batch, block); err != nil {
return NonStatTy, err
}
root, err := state.Commit(bc.chainConfig.IsEIP158(block.Number()))
if err != nil {
return NonStatTy, err
}
triedb := bc.stateCache.TrieDB()

if bc.cacheConfig.Disabled {
if err := triedb.Commit(root, false); err != nil {
return NonStatTy, err
}
} else {
triedb.Reference(root, common.Hash{})
bc.triegc.Push(root, -float32(block.NumberU64()))

if current := block.NumberU64(); current > triesInMemory {
header := bc.GetHeaderByNumber(current - triesInMemory)
chosen := header.Number.Uint64()

var (
size = triedb.Size()
limit = common.StorageSize(bc.cacheConfig.TrieNodeLimit) * 1024 * 1024
)
if size > limit || bc.gcproc > bc.cacheConfig.TrieTimeLimit {
if chosen < lastWrite+triesInMemory {
switch {
case size >= 2*limit:
log.Warn("State memory usage too high, committing", "size", size, "limit", limit, "optimum", float64(chosen-lastWrite)/triesInMemory)
case bc.gcproc >= 2*bc.cacheConfig.TrieTimeLimit:
log.Info("State in memory for too long, committing", "time", bc.gcproc, "allowance", bc.cacheConfig.TrieTimeLimit, "optimum", float64(chosen-lastWrite)/triesInMemory)
}
}
if chosen >= lastWrite+triesInMemory || size >= 2*limit || bc.gcproc >= 2*bc.cacheConfig.TrieTimeLimit {
triedb.Commit(header.Root, true)
lastWrite = chosen
bc.gcproc = 0
}
}
for !bc.triegc.Empty() {
root, number := bc.triegc.Pop()
if uint64(-number) > chosen {
bc.triegc.Push(root, number)
break
}
triedb.Dereference(root.(common.Hash), common.Hash{})
}
}
}
if err := WriteBlockReceipts(batch, block.Hash(), block.NumberU64(), receipts); err != nil {
return NonStatTy, err
}
reorg := externTd.Cmp(localTd) > 0
currentBlock = bc.CurrentBlock()
if !reorg && externTd.Cmp(localTd) == 0 {
reorg = block.NumberU64() < currentBlock.NumberU64() || (block.NumberU64() == currentBlock.NumberU64() && mrand.Float64() < 0.5)
}
if reorg {
if block.ParentHash() != currentBlock.Hash() {
if err := bc.reorg(currentBlock, block); err != nil {
return NonStatTy, err
}
}
if err := WriteTxLookupEntries(batch, block); err != nil {
return NonStatTy, err
}
if err := WritePreimages(bc.db, block.NumberU64(), state.Preimages()); err != nil {
return NonStatTy, err
}
status = CanonStatTy
} else {
status = SideStatTy
}
if err := batch.Write(); err != nil {
return NonStatTy, err
}

if status == CanonStatTy {
bc.insert(block)
}
bc.futureBlocks.Remove(block.Hash())
return status, nil
}

WriteBlockWithState 将区块以及相关所有的状态写入数据库。首先通过 bc.GetTd(block.ParentHash(), block.NumberU64()-1) 获取待插入区块的总难度,bc.GetTd(bc.currentBlock.Hash(), bc.currentBlock.NumberU64()) 计算当前区块的区块链的总难度,externTd := new(big.Int).Add(block.Difficulty(), ptd) 获得新的区块链的总难度。通过 bc.hc.WriteTd(block.Hash(), block.NumberU64(), externTd) 写入区块 hash,高度,对应总难度。然后使用 batch 的方式写入区块的其他数据。插入数据后,判断这个区块的父区块是否为当前区块,如果不是,说明存在分叉,调用 reorg 重新组织区块链。插入成功后,调用 bc.futureBlocks.Remove(block.Hash())futureBlocks 中移除区块。

下面我们来看看 reorg 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
if oldBlock.NumberU64() > newBlock.NumberU64() {
for ; oldBlock != nil && oldBlock.NumberU64() != newBlock.NumberU64(); oldBlock = bc.GetBlock(oldBlock.ParentHash(), oldBlock.NumberU64()-1) {
oldChain = append(oldChain, oldBlock)
deletedTxs = append(deletedTxs, oldBlock.Transactions()...)

collectLogs(oldBlock.Hash())
}
} else {
for ; newBlock != nil && newBlock.NumberU64() != oldBlock.NumberU64(); newBlock = bc.GetBlock(newBlock.ParentHash(), newBlock.NumberU64()-1) {
newChain = append(newChain, newBlock)
}
}
if oldBlock == nil {
return fmt.Errorf("Invalid old chain")
}
if newBlock == nil {
return fmt.Errorf("Invalid new chain")
}

for {
if oldBlock.Hash() == newBlock.Hash() {
commonBlock = oldBlock
break
}

oldChain = append(oldChain, oldBlock)
newChain = append(newChain, newBlock)
deletedTxs = append(deletedTxs, oldBlock.Transactions()...)
collectLogs(oldBlock.Hash())

oldBlock, newBlock = bc.GetBlock(oldBlock.ParentHash(), oldBlock.NumberU64()-1), bc.GetBlock(newBlock.ParentHash(), newBlock.NumberU64()-1)
if oldBlock == nil {
return fmt.Errorf("Invalid old chain")
}
if newBlock == nil {
return fmt.Errorf("Invalid new chain")
}
}
if len(oldChain) > 0 && len(newChain) > 0 {
logFn := log.Debug
if len(oldChain) > 63 {
logFn = log.Warn
}
logFn("Chain split detected", "number", commonBlock.Number(), "hash", commonBlock.Hash(),
"drop", len(oldChain), "dropfrom", oldChain[0].Hash(), "add", len(newChain), "addfrom", newChain[0].Hash())
} else {
log.Error("Impossible reorg, please file an issue", "oldnum", oldBlock.Number(), "oldhash", oldBlock.Hash(), "newnum", newBlock.Number(), "newhash", newBlock.Hash())
}
var addedTxs types.Transactions
for i := len(newChain) - 1; i >= 0; i-- {
if err := WriteTxLookupEntries(bc.db, newChain[i]); err != nil {
return err
}
addedTxs = append(addedTxs, newChain[i].Transactions()...)
}
diff := types.TxDifference(deletedTxs, addedTxs)
for _, tx := range diff {
DeleteTxLookupEntry(bc.db, tx.Hash())
}
if len(deletedLogs) > 0 {
go bc.rmLogsFeed.Send(RemovedLogsEvent{deletedLogs})
}
if len(oldChain) > 0 {
go func() {
for _, block := range oldChain {
bc.chainSideFeed.Send(ChainSideEvent{Block: block})
}
}()
}

return nil
}

上面提到,reorg 方法用来将新区块链替换本地区块链为规范链。对于老链比新链高的情况,减少老链,让它和新链一样高;否则的话减少新链,待后续插入。潜在的会丢失的交易会被当做事件发布。接着进入一个 for 循环,找到两条链共同的祖先。再将上述减少新链阶段保存的 newChain 一块块插入到链中,更新规范区块链的 key,并且写入交易的查询信息。最后是清理工作,删除交易查询信息,删除日志,并通过 bc.rmLogsFeed.Send 发送消息通知,删除了哪些旧链则通过 bc.chainSideFeed.Send 进行消息通知。

至此,插入区块的操作就完成了。

References