go-ethereum 源码笔记(miner,consensus 模块-挖矿和共识算法)

前面的章节 go-ethereum 源码笔记(core 模块-区块链操作) 描述了区块链世界的核心:区块,区块链。我们已经知道区块链可以用来存储交易的数据,也知道了如何在区块链里发起一笔交易,而问题是,往区块链中增加数据应该是一个较困难的操作,按照比特币论文里的说法,即需要一个 PoW(Proof of Work,工作量证明),否则每个人都能轻易往区块链中增加数据,安全性和一致性无法保证。这一点上,以太坊法和比特币类似,尽管略有不同,但大致都需要矿工的角色贡献计算力,完成一个复杂的计算,即找到一个区块的哈希值,验证正确之后才能加入到区块链中。这个过程就叫做挖矿。矿工们去做这件事当然是有一定利益驱使的,每完成一次挖矿,他们就能获得一些以太币奖励。

挖矿涉及的 Ethash 算法涉及到 miner, consensu 模块。

geth 中的挖矿是 CPU 挖矿,现在挖矿基本都是 GPU 挖矿了,因为这两者的效率已经不在一个数量级。但是研究 geth 的代码能给我们一个很好的参考,可以帮助我们了解挖矿的原理。关于 GPU 的挖矿可以参考:ethereum-mining/ethminer,不过这块我还没有深入研究,有开发经验的朋友可以一起来讨论。

原理和流程

以太坊从一开始就计划采用 PoS(Proof of Stack),但现阶段仍采用 PoW 的方式,尽管是临时的,以太坊的 PoW 从设计到实现都算得上精美。比特币诞生之初的设计理念是『一CPU一票』,随着显卡挖矿,ASIC 矿机的出现,这个价值回归用户的口号似乎成为空词,实际上矿机的出现是技术和生产力的进步,本质上和一CPU一票没有不同,云挖矿的模式也早就出现,不管是 CPU 还是显卡,矿机,其实质都是『一份资产的投入可低一票』。话虽如此,现在 PoW 的貌似畸形的模式还是给普通民众留下口实。

geth 使用的 Ethash 算法,之前叫做 Dagger-Hashimoto,从名字也可以看出来这是两个算法的结合体。Dagger-Hashimoto 算法的目标是(来自 wiki /Dagger-Hashimoto.md):

  • 抵制矿机(ASIC,专门用于挖矿的芯片)
  • 轻客户端验证
  • 全链式存储

Hashimoto 算法

Hashimoto:I/O bound proof of work

作者是 Thaddeus Dryja,这个算法的目标是通过 IO 限制来抵制矿机。

Dagger 算法

Dagger: A Memory-Hard to Compute, Memory-Easy to Verify Scrypt Alternative

作者是 Vitalik,算法利用了 DAG(有向非循环图) 数据结构,目的是将挖矿操作限制为必须使用大内存,以抵抗矿机。

Ethash

Ethash 是以太坊使用的 PoW 算法,其原理可以用一个公式来概括:

$RAND(h, n) <= \frac{M}{d}$

其中 h 是区块头的哈希值(没有 Nonce),n 是 Nonce 值,M 是一个极大的数字,d 指挖矿难度,RAND 是一个根据参数生成随机值的操作,挖矿的过程简单来说就是寻找适合的 nonce,使上述不等式成立。原理和比特币的基本相同,但 Ethash 稍特别一点,因为 geth 的开发者在设计初期就考虑了抵制矿机的问题里,Ethash 的具体步骤为:

  1. 对于每个区块,先算出一个种子。种子的计算只依赖当前区块信息。
  2. 使用种子生成伪随机数据集,称为 cache。轻客户端需要保存 cache
  3. 基于 cache 生成 1GB 大小的数据集,称为 the DAG。这个数据集的每一个元素都依赖于 cache 中的某几个元素,只要有 cache 就可以快速计算出 DAG 中指定位置的元素。完整可挖矿客户端需要保存 DAG。
  4. 挖矿可以概括为从 DAG 中随机选择元素,然后暴力枚举选择一个 nonce 值,对其进行哈希计算,使其符合约定的难度,而这个难度其实就是要求哈希值的前缀包括多少个0。验证的时候,基于 cache 计算指定位置 DAG 元素,然后验证这个元素集合的哈希值结果小于某个值,这个过程只需要普通 CPU 和普通内存。
  5. cache 和 DAG 每过一个周期更新一次,一个周期长度是 30000 个区块。DAG 只取决于区块数量,大小会随着时间推移线性增长,从 1GB 开始,每年大约增加 7GB。由于 DAG 需要很长时间生成,所以 geth 每次会维护2个 DAG 集合。

源码解析

miner 模块

挖矿的入口在 miner/miner.go 中的 Miner:

1
2
3
4
5
6
7
8
9
10
type Miner struct {
mux *event.TypeMux
worker *worker
coinbase common.Address
mining int32
eth Backend
engine consensus.Engine
canStart int32
shouldStart int32
}

通过 func New(eth Backend, config *params.ChainConfig, mux *event.TypeMux, engine consensus.Engine) 方法,可以获得一个 Miner 实例(见 eth/backend.goeth.miner = miner.New(eth, eth.chainConfig, eth.EventMux(), eth.engine)),在实例化的过程中,会通过一个 goroutine 调用 miner.update()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func (self *Miner) update() {
events := self.mux.Subscribe(downloader.StartEvent{}, downloader.DoneEvent{}, downloader.FailedEvent{})
out:
for ev := range events.Chan() {
switch ev.Data.(type) {
case downloader.StartEvent:
atomic.StoreInt32(&self.canStart, 0)
if self.Mining() {
self.Stop()
atomic.StoreInt32(&self.shouldStart, 1)
log.Info("Mining aborted due to sync")
}
case downloader.DoneEvent, downloader.FailedEvent:
shouldStart := atomic.LoadInt32(&self.shouldStart) == 1

atomic.StoreInt32(&self.canStart, 1)
atomic.StoreInt32(&self.shouldStart, 0)
if shouldStart {
self.Start(self.coinbase)
}
events.Unsubscribe()
break out
}
}
}

启动消息监听,当收到 downloader 的 StartEvent,表示本节点正在从其他节点下载新区块,这是 miner 会停止进行中的挖掘工作;如果收到 downloader 的 DoneEvent 或 FailEvent 方法,表明本节点的下载任务已经结束,这时可以开始挖掘新区块。

1
2
3
4
5
6
7
8
9
10
11
12
func (self *Miner) Start(coinbase common.Address) {
atomic.StoreInt32(&self.shouldStart, 1)
self.SetEtherbase(coinbase)
if atomic.LoadInt32(&self.canStart) == 0 {
log.Info("Network syncing, will start miner afterwards")
return
}
atomic.StoreInt32(&self.mining, 1)
log.Info("Starting mining operation")
self.worker.start()
self.worker.commitNewWork()
}

Miner struct 中有一个 worker 类型成员变量,它指向 worker 中的 Work struct,当我们需要开始挖矿时,我们通过 miner.Start() 开始(见 eth/backend.go 中的 StartMining,L358)。在设置好 coinbase 和等待网络同步完成后,继续调用 self.worker.start()

miner/woker.go 负责管理挖矿,它有一组 agent,每个 agent 单独完成挖矿过程。见 worker.go 中的 L203-L213

1
2
3
4
5
6
7
func (self *worker) start() {
...
// spin up agents
for agent := range self.agents {
agent.Start()
}
}

agent 指的是实现了共识算法的 agent。这个我们接下来再详细描述。需要注意的是,实例化 Worker 的时候,也就是在 miner.go 的 New 方法调用 newWorker(config, engine, common.Address{}, eth, mux) 的过程中,我们会以 goroutine 的方式调用其他 worker 的其他方法:

1
2
3
4
5
6
7
8
9
10
func newWorker(config *params.ChainConfig, engine consensus.Engine, coinbase common.Address, eth Backend, mux *event.TypeMux) *worker {
...
worker.txSub = eth.TxPool().SubscribeTxPreEvent(worker.txCh)
worker.chainHeadSub = eth.BlockChain().SubscribeChainHeadEvent(worker.chainHeadCh)
worker.chainSideSub = eth.BlockChain().SubscribeChainSideEvent(worker.chainSideCh)
go worker.update()
go worker.wait()
worker.commitNewWork()
return worker
}

其中 go worker.update(), go worker.wait(), worker.commitNewWork() 这三行很重要,我们依次来看。

worker.update()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func (self *worker) update() {

for {
select {
case <-self.chainHeadCh:
self.commitNewWork()
case ev := <-self.chainSideCh:
self.uncleMu.Lock()
self.possibleUncles[ev.Block.Hash()] = ev.Block
self.uncleMu.Unlock()
case ev := <-self.txCh:
if atomic.LoadInt32(&self.mining) == 0 {
self.currentMu.Lock()
acc, _ := types.Sender(self.current.signer, ev.Tx)
txs := map[common.Address]types.Transactions{acc: {ev.Tx}}
txset := types.NewTransactionsByPriceAndNonce(self.current.signer, txs)

self.current.commitTransactions(self.mux, txset, self.chain, self.coinbase)
self.currentMu.Unlock()
} else {
if self.config.Clique != nil && self.config.Clique.Period == 0 {
self.commitNewWork()
}
}
}
}
}

worker.update 会监听 ChainHeadEventChainSideEventTxPreEvent 3个事件。通过 chainHeadCh, chainSideCh, txCh 这3个 channel 来实现。ChainHeadEvent 事件指的是区块链中已经加入一个新的区块作为链头,这时候 worker 会开始挖掘下一个区块(在代码库中搜索 ChainHeadEvent,可以在 blockchain.go 中的 L1191 看到该事件是怎么触发的)。ChainSideEvent 指区块链中加入了一个新区块作为当前链头的分支,woker 会把这个区块放在 possibleUncles 数组,作为下一个挖掘区块可能的 Uncle 之一。当一个新的交易 tx 被加入 TxPool 中,会触发 TxPreEvent,如果这时 worker 没有在挖矿,那么开始执行,并把 tx 加入到 Work.txs 数组中,下次挖掘新区块可以使用。

worker.wait()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
func (self *worker) wait() {
for {
mustCommitNewWork := true
for result := range self.recv {
atomic.AddInt32(&self.atWork, -1)

if result == nil {
continue
}
block := result.Block
work := result.Work

for _, r := range work.receipts {
for _, l := range r.Logs {
l.BlockHash = block.Hash()
}
}
for _, log := range work.state.Logs() {
log.BlockHash = block.Hash()
}
stat, err := self.chain.WriteBlockWithState(block, work.receipts, work.state)
if err != nil {
log.Error("Failed writing block to chain", "err", err)
continue
}
if stat == core.CanonStatTy {
mustCommitNewWork = false
}
log.Error("I got new block")
self.mux.Post(core.NewMinedBlockEvent{Block: block})
var (
events []interface{}
logs = work.state.Logs()
)
events = append(events, core.ChainEvent{Block: block, Hash: block.Hash(), Logs: logs})
if stat == core.CanonStatTy {
events = append(events, core.ChainHeadEvent{Block: block})
}
self.chain.PostChainEvents(events, logs)

self.unconfirmed.(block.NumberU64(), block.Hash())

if mustCommitNewWork {
self.commitNewWork()
}
}
}
}

worker.wait 执行挖完一个区块后的操作,通过 Result 这个 chan 实现,agent 完成挖矿后,从 chan 中获取 Block 和 Work 对象,Block 会被写到数据库中,加入本地的区块链,成为新的链头。完成这个操作后,会发送一条 NewMinedBlockEvent 事件,其他节点会决定是否接受这个新区块成为区块链新的链头。

worker.commitNewWork()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
func (self *worker) commitNewWork() {
self.mu.Lock()
defer self.mu.Unlock()
self.uncleMu.Lock()
defer self.uncleMu.Unlock()
self.currentMu.Lock()
defer self.currentMu.Unlock()

tstart := time.Now()
parent := self.chain.CurrentBlock()

tstamp := tstart.Unix()
if parent.Time().Cmp(new(big.Int).SetInt64(tstamp)) >= 0 {
tstamp = parent.Time().Int64() + 1
}
if now := time.Now().Unix(); tstamp > now+1 {
wait := time.Duration(tstamp-now) * time.Second
log.Info("Mining too far in the future", "wait", common.PrettyDuration(wait))
time.Sleep(wait)
}

num := parent.Number()
header := &types.Header{
ParentHash: parent.Hash(),
Number: num.Add(num, common.Big1),
GasLimit: core.CalcGasLimit(parent),
Extra: self.extra,
Time: big.NewInt(tstamp),
}
if self.isRunning() {
if self.coinbase == (common.Address{}) {
log.Error("Refusing to mine without etherbase")
return
}
header.Coinbase = self.coinbase
}
if err := self.engine.Prepare(self.chain, header); err != nil {
log.Error("Failed to prepare header for mining", "err", err)
return
}
if daoBlock := self.config.DAOForkBlock; daoBlock != nil {
limit := new(big.Int).Add(daoBlock, params.DAOForkExtraRange)
if header.Number.Cmp(daoBlock) >= 0 && header.Number.Cmp(limit) < 0 {
if self.config.DAOForkSupport {
header.Extra = common.CopyBytes(params.DAOForkBlockExtra)
} else if bytes.Equal(header.Extra, params.DAOForkBlockExtra) {
header.Extra = []byte{}
}
}
}
err := self.makeCurrent(parent, header)
if err != nil {
log.Error("Failed to create mining context", "err", err)
return
}
env := self.current
if self.config.DAOForkSupport && self.config.DAOForkBlock != nil && self.config.DAOForkBlock.Cmp(header.Number) == 0 {
misc.ApplyDAOHardFork(env.state)
}

var (
uncles []*types.Header
badUncles []common.Hash
)
for hash, uncle := range self.possibleUncles {
if len(uncles) == 2 {
break
}
if err := self.commitUncle(env, uncle.Header()); err != nil {
log.Trace("Bad uncle found and will be removed", "hash", hash)
log.Trace(fmt.Sprint(uncle))

badUncles = append(badUncles, hash)
} else {
log.Debug("Committing new uncle to block", "hash", hash)
uncles = append(uncles, uncle.Header())
}
}
for _, hash := range badUncles {
delete(self.possibleUncles, hash)
}

var (
emptyBlock *types.Block
fullBlock *types.Block
)

emptyState := env.state.Copy()
if emptyBlock, err = self.engine.Finalize(self.chain, header, emptyState, nil, uncles, nil); err != nil {
log.Error("Failed to finalize block for temporary sealing", "err", err)
} else {
if self.isRunning() {
log.Info("Commit new empty mining work", "number", emptyBlock.Number(), "uncles", len(uncles))
self.push(&Package{nil, emptyState, emptyBlock})
}
}

pending, err := self.eth.TxPool().Pending()
if err != nil {
log.Error("Failed to fetch pending transactions", "err", err)
return
}
txs := types.NewTransactionsByPriceAndNonce(self.current.signer, pending)
env.commitTransactions(self.mux, txs, self.chain, self.coinbase)

if fullBlock, err = self.engine.Finalize(self.chain, header, env.state, env.txs, uncles, env.receipts); err != nil {
log.Error("Failed to finalize block for sealing", "err", err)
return
}

if self.isRunning() {
log.Info("Commit new full mining work", "number", fullBlock.Number(), "txs", env.tcount, "uncles", len(uncles), "elapsed", common.PrettyDuration(time.Since(tstart)))
self.unconfirmed.Shift(fullBlock.NumberU64() - 1)
self.push(&Package{env.receipts, env.state, fullBlock})
}
self.updateSnapshot()
}

commitNewWork 的作用是完成待挖掘区块的组装,最后通过 func (self *worker) push(p *Package) 让 agent 开始工作。具体来说,首先获取以系统当前时间作为新区块的时间,但要确保父区块的时间要早于新区块时间,否则进行 sleep 操作;接着构造区块头,确定父区块哈希值,当前区块编号,Gas 消耗数,附加数据,时间等,区块头的其他属性会在公式算法中确定;然后调用 engine.Prepare,准备好 Header 对象;处理 DAO 硬分叉的情况,增加附加数据;再接下来会从交易池里获取交易,加入到新区块的交易列表中,从 possibleUncles 获取叔区块;最后调用一致性引擎的 Finalize() 方法,给区块头增加 Root, TxHash, ReceiptHash 等属性,将创建的 Package 通过 channel 发送给 agent,进行挖矿操作。

consensus 模块

通过以上的分析,我们了解到 miner/miner.go, miner/worker.go 这几个模块的只是一些对 chan 的创建和订阅,封装挖矿所需数据的操作,真正的挖矿的过程在 agent 里面。miner/worker.go 中定义了 agent 的接口:

1
2
3
4
5
6
7
type Agent interface {
Work() chan<- *Work
SetReturnCh(chan<- *Result)
Stop()
Start()
GetHashRate() int64
}

其实现在 miner/agent.go 中,而 miner/agent.go 的代码实际上是 consensus 模块的封装,我们先来看看 miner/agent.go 的代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func (self *CpuAgent) Start() {
if !atomic.CompareAndSwapInt32(&self.isMining, 0, 1) {
return
}
go self.update()
}

func (self *CpuAgent) update() {
out:
for {
select {
case p := <-self.taskCh:
self.mu.Lock()
if self.quitCurrentOp != nil {
close(self.quitCurrentOp)
}
self.quitCurrentOp = make(chan struct{})
go self.mine(p, self.quitCurrentOp)
self.mu.Unlock()
case <-self.stop:
self.mu.Lock()
if self.quitCurrentOp != nil {
close(self.quitCurrentOp)
self.quitCurrentOp = nil
}
self.mu.Unlock()
break out
}
}
}

前面我们提到 commitNewWork 提供挖矿数据的组装,然后通过 func (self *worker) push(p *Package) 将数据写入 update()taskCh 这个 chan 中,CpuAgent 通过这个 chan 拿到 Package 对象,然后调用 self.mine(work, self, quitCurrentOp),如果收到停止的消息,就退出相关操作。

1
2
3
4
5
6
7
8
9
10
11
func (self *CpuAgent) mine(work *Work, stop <-chan struct{}) {
if result, err := self.engine.Seal(self.chain, work.Block, stop); result != nil {
log.Info("Successfully sealed new block", "number", result.Number(), "hash", result.Hash())
self.returnCh <- &Result{work, result}
} else {
if err != nil {
log.Warn("Block sealing failed", "err", err)
}
self.returnCh <- nil
}
}

可以看到 mine 方法是 engine.Seal 的简单封装,共识算法会对传入的区块进行真正的挖矿操作,如果成功,Blockwork 对象会通过 chan 传给 work.wait 进行处理。

共识算法 ethash

共识算法 Engine 接口的定义在 consensus/consensus.go 中:

1
2
3
4
5
6
7
8
9
10
11
12
type Engine interface {
Author(header *types.Header) (common.Address, error)
VerifyHeader(chain ChainReader, header *types.Header, seal bool) error
VerifyHeaders(chain ChainReader, headers []*types.Header, seals []bool) (chan<- struct{}, <-chan error)
VerifySeal(chain ChainReader, header *types.Header) error
Prepare(chain ChainReader, header *types.Header) error
Finalize(chain ChainReader, header *types.Header, state *state.StateDB, txs []*types.Transaction,
uncles []*types.Header, receipts []*types.Receipt) (*types.Block, error)
Seal(chain ChainReader, block *types.Block, stop <-chan struct{}) (*types.Block, error)
CalcDifficulty(chain ChainReader, time uint64, parent *types.Header) *big.Int
APIs(chain ChainReader) []rpc.API
}

有两个实现,一个是 ethash, 一个是 clique,clique 只在测试网络中有使用,这里我们主要讨论 ethash。在公式算法的引擎中,最重要的两个方法大概是 SealVerifySeal,一个用来挖矿,找到 nonce,一个用来验证挖矿结果,下面我们将讨论 ethash 中的 SealVerifySeal 的实现。

最开始我们简述了 Dagger-Hashimoto 算法,然后我们从 CPU 挖矿的入口函数开始,层层剖析,最终发现挖矿的核心逻辑在 consensus 目录下,接下来我们就来看看 CPU 挖矿到底是怎么实现的。

前面说到,挖矿需要一个基于一个数据集,那么我们先来看看数据集是怎么生成的。

1
2
3
4
5
6
7
8
9
10
11
func (ethash *Ethash) dataset(block uint64) *dataset {
epoch := block / epochLength
currentI, futureI := ethash.datasets.get(epoch)
current := currentI.(*dataset)
current.generate(ethash.config.DatasetDir, ethash.config.DatasetsOnDisk, ethash.config.PowMode == ModeTest)
if futureI != nil {
future := futureI.(*dataset)
go future.generate(ethash.config.DatasetDir, ethash.config.DatasetsOnDisk, ethash.config.PowMode == ModeTest)
}
return current
}

consensus/ethash/ethash.go中,dataset 方法对数据集进行了封装。首先尝试从内存中取得,如果不存在则在文件目录中取得,如果还是不存在则通过 func (d *dataset) generate(dir string, limit int, test bool) 生成。具体来说,首先,我们计算 epoch,前面说到,每 30000 个区块就会换 DAG,这里的 30000 也就是 epochLength,也就是说 epoch 不变的的话,DAG 也不需要变。

先通过 get 尝试从内存中取得 dataset。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func (lru *lru) get(epoch uint64) (item, future interface{}) {
lru.mu.Lock()
defer lru.mu.Unlock()
item, ok := lru.cache.Get(epoch)
if !ok {
if lru.future > 0 && lru.future == epoch {
item = lru.futureItem
} else {
log.Trace("Requiring new ethash "+lru.what, "epoch", epoch)
item = lru.new(epoch)
}
lru.cache.Add(epoch, item)
}
if epoch < maxEpoch-1 && lru.future < epoch+1 {
log.Trace("Requiring new future ethash "+lru.what, "epoch", epoch+1)
future = lru.new(epoch + 1)
lru.future = epoch + 1
lru.futureItem = future
}
return item, future
}

get 方法会返回两个 interface(实际类型为 dataset),第一个是当前 epoch 对应的 dataset,第二个值是未来会用到的 dataset(epoch +1),如果不为空,表明需要重新生成,如果为空,表明之前已经生成过了。回到 dataset 方法,我们看看 DAG 是怎么通过 generate 方法生成的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
func (d *dataset) generate(dir string, limit int, test bool) {
d.once.Do(func() {
csize := cacheSize(d.epoch*epochLength + 1)
dsize := datasetSize(d.epoch*epochLength + 1)
seed := seedHash(d.epoch*epochLength + 1)
if test {
csize = 1024
dsize = 32 * 1024
}
if dir == "" {
cache := make([]uint32, csize/4)
generateCache(cache, d.epoch, seed)

d.dataset = make([]uint32, dsize/4)
generateDataset(d.dataset, d.epoch, cache)
}
var endian string
if !isLittleEndian() {
endian = ".be"
}
path := filepath.Join(dir, fmt.Sprintf("full-R%d-%x%s", algorithmRevision, seed[:8], endian))
logger := log.New("epoch", d.epoch)

runtime.SetFinalizer(d, (*dataset).finalizer)

d.dump, d.mmap, d.dataset, err = memoryMap(path)
if err == nil {
logger.Debug("Loaded old ethash dataset from disk")
return
}
logger.Debug("Failed to load old ethash dataset", "err", err)
cache := make([]uint32, csize/4)
generateCache(cache, d.epoch, seed)

d.dump, d.mmap, d.dataset, err = memoryMapAndGenerate(path, dsize, func(buffer []uint32) { generateDataset(buffer, d.epoch, cache) })
if err != nil {
logger.Error("Failed to generate mapped ethash dataset", "err", err)

d.dataset = make([]uint32, dsize/2)
generateDataset(d.dataset, d.epoch, cache)
}
for ep := int(d.epoch) - limit; ep >= 0; ep-- {
seed := seedHash(uint64(ep)*epochLength + 1)
path := filepath.Join(dir, fmt.Sprintf("full-R%d-%x%s", algorithmRevision, seed[:8], endian))
os.Remove(path)
}
})
}

首先通过 csize := cacheSize(d.epoch*epochLength + 1)dsize := datasetSize(d.epoch*epochLength + 1) 这两个调用得到缓存大小和数据集大小,前面有介绍,这两个大小是根据区块数量线性增长的,深入到这两个方法中,你会发现 cacheSize 调用 calcCacheSize 这样的一层层调用,ethereum 的维基页面里给出的 Python 代码可能更直观一点:

1
2
3
4
5
6
7
8
9
10
11
12
13
def get_cache_size(block_number):
sz = CACHE_BYTES_INIT + CACHE_BYTES_GROWTH * (block_number // EPOCH_LENGTH)
sz -= HASH_BYTES
while not isprime(sz / HASH_BYTES):
sz -= 2 * HASH_BYTES
return sz

def get_full_size(block_number):
sz = DATASET_BYTES_INIT + DATASET_BYTES_GROWTH * (block_number // EPOCH_LENGTH)
sz -= MIX_BYTES
while not isprime(sz / MIX_BYTES):
sz -= 2 * MIX_BYTES
return sz

前 6140000 个区块的缓存大小和数据集大小已经硬编码在 consensus/ethash/algorithm.godatasetSizescacheSizes 中,显然这样能加快得到这两个集合的速度。接下来我们通过 seedHash 来生成 seed。

1
2
3
4
5
6
7
8
9
10
11
func seedHash(block uint64) []byte {
seed := make([]byte, 32)
if block < epochLength {
return seed
}
keccak256 := makeHasher(sha3.NewKeccak256())
for i := 0; i < int(block/epochLength); i++ {
keccak256(seed, seed)
}
return seed
}

继续看 generate 方法,可以看到,主要逻辑在 generateCachegenerateDataset 这两个方法,我们依次来看。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
func generateCache(dest []uint32, epoch uint64, seed []byte) {
logger := log.New("epoch", epoch)

start := time.Now()
defer func() {
elapsed := time.Since(start)

logFn := logger.Debug
if elapsed > 3*time.Second {
logFn = logger.Info
}
logFn("Generated ethash verification cache", "elapsed", common.PrettyDuration(elapsed))
}()
header := *(*reflect.SliceHeader)(unsafe.Pointer(&dest))
header.Len *= 4
header.Cap *= 4
cache := *(*[]byte)(unsafe.Pointer(&header))

size := uint64(len(cache))
rows := int(size) / hashBytes

var progress uint32

done := make(chan struct{})
defer close(done)

go func() {
for {
select {
case <-done:
return
case <-time.After(3 * time.Second):
logger.Info("Generating ethash verification cache", "percentage", atomic.LoadUint32(&progress)*100/uint32(rows)/4, "elapsed", common.PrettyDuration(time.Since(start)))
}
}
}()
keccak512 := makeHasher(sha3.NewKeccak512())

keccak512(cache, seed)
for offset := uint64(hashBytes); offset < size; offset += hashBytes {
keccak512(cache[offset:], cache[offset-hashBytes:offset])
atomic.AddUint32(&progress, 1)
}
temp := make([]byte, hashBytes)

for i := 0; i < cacheRounds; i++ {
for j := 0; j < rows; j++ {
var (
srcOff = ((j - 1 + rows) % rows) * hashBytes
dstOff = j * hashBytes
xorOff = (binary.LittleEndian.Uint32(cache[dstOff:]) % uint32(rows)) * hashBytes
)
bitutil.XORBytes(temp, cache[srcOff:srcOff+hashBytes], cache[xorOff:xorOff+hashBytes])
keccak512(cache[dstOff:], temp)

atomic.AddUint32(&progress, 1)
}
}
if !isLittleEndian() {
swap(cache)
}
}

generateCache 方法会根据 epochseed 的值生成 cache。整个过程可以用下面的 Python 代码描述:

1
2
3
4
5
6
7
8
9
10
11
12
def mkcache(cache_size, seed):
n = cache_size // HASH_BYTES
o = [sha3_512(seed)]
for i in range(1, n):
o.append(sha3_512(o[-1]))

for _ in range(CACHE_ROUNDS):
for i in range(n):
v = o[i][0] % n
o[i] = sha3_512(map(xor, o[(i-1+n) % n], o[v]))

return o

生成数据集

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
func generateDataset(dest []uint32, epoch uint64, cache []uint32) {
logger := log.New("epoch", epoch)

start := time.Now()
defer func() {
elapsed := time.Since(start)

logFn := logger.Debug
if elapsed > 3*time.Second {
logFn = logger.Info
}
logFn("Generated ethash verification cache", "elapsed", common.PrettyDuration(elapsed))
}()

swapped := !isLittleEndian()

header := *(*reflect.SliceHeader)(unsafe.Pointer(&dest))
header.Len *= 4
header.Cap *= 4
dataset := *(*[]byte)(unsafe.Pointer(&header))

threads := runtime.NumCPU()
size := uint64(len(dataset))

var pend sync.WaitGroup
pend.Add(threads)

var progress uint32
for i := 0; i < threads; i++ {
go func(id int) {
defer pend.Done()

keccak512 := makeHasher(sha3.NewKeccak512())
batch := uint32((size + hashBytes*uint64(threads) - 1) / (hashBytes * uint64(threads)))
first := uint32(id) * batch
limit := first + batch
if limit > uint32(size/hashBytes) {
limit = uint32(size / hashBytes)
}
percent := uint32(size / hashBytes / 100)
for index := first; index < limit; index++ {
item := generateDatasetItem(cache, index, keccak512)
if swapped {
swap(item)
}
copy(dataset[index*hashBytes:], item)

if status := atomic.AddUint32(&progress, 1); status%percent == 0 {
logger.Info("Generating DAG in progress", "percentage", uint64(status*100)/(size/hashBytes), "elapsed", common.PrettyDuration(time.Since(start)))
}
}
}(i)
}
pend.Wait()
}

知道了如何生成数据集后,现在我们来看看 ethash 算法是如何根据数据集进行挖矿的。

前面有说到,ethash 的 mine 方法实际上是 engine.Seal 的简单封装。engine.Seal 接口在 consensu/consensus.go 中定义,而实现是在 consensus/ethash/sealer.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
func (ethash *Ethash) Seal(chain consensus.ChainReader, block *types.Block, stop <-chan struct{}) (*types.Block, error) {
if ethash.config.PowMode == ModeFake || ethash.config.PowMode == ModeFullFake {
header := block.Header()
header.Nonce, header.MixDigest = types.BlockNonce{}, common.Hash{}
return block.WithSeal(header), nil
}
if ethash.shared != nil {
return ethash.shared.Seal(chain, block, stop)
}
abort := make(chan struct{})
found := make(chan *types.Block)

ethash.lock.Lock()
threads := ethash.threads
if ethash.rand == nil {
seed, err := crand.Int(crand.Reader, big.NewInt(math.MaxInt64))
if err != nil {
ethash.lock.Unlock()
return nil, err
}
ethash.rand = rand.New(rand.NewSource(seed.Int64()))
}
ethash.lock.Unlock()
if threads == 0 {
threads = runtime.NumCPU()
}
if threads < 0 {
threads = 0
}
var pend sync.WaitGroup
for i := 0; i < threads; i++ {
pend.Add(1)
go func(id int, nonce uint64) {
defer pend.Done()
ethash.mine(block, id, nonce, abort, found)
}(i, uint64(ethash.rand.Int63()))
}
var result *types.Block
select {
case <-stop:
close(abort)
case result = <-found:
close(abort)
case <-ethash.update:
close(abort)
pend.Wait()
return ethash.Seal(chain, block, stop)
}
pend.Wait()
return result, nil
}

如果是 fake 模式,立即返回 0 nonce,这部分是为了方便单元测试。
如果是共享 pow,转到它的共享对象执行 seal 操作。
接下来通过多个 goroutine 调用 ethash.mine,因此需要上锁,保证缓存的安全。
Seal 的核心还是在 ethash.mine(block, id, nonce, abort, found) 这一行,不过我们先往后面看。
seal 最后会监听 stop, found, ethash.update 这几个 channel,如果外部意外终止了,停止所有挖矿线程,如果其中有一个线程挖到正确区块,终止其他线程,如果 ethash 对象发生了变化,停止当前所有操作,重新调用 ethash.Seal

接下来我们来看看 ethash 的 mine 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
func (ethash *Ethash) mine(block *types.Block, id int, seed uint64, abort chan struct{}, found chan *types.Block) {
var (
header = block.Header()
hash = header.HashNoNonce().Bytes()
target = new(big.Int).Div(maxUint256, header.Difficulty)
number = header.Number.Uint64()
dataset = ethash.dataset(number)
)
var (
attempts = int64(0)
nonce = seed
)
logger := log.New("miner", id)
logger.Trace("Started ethash search for new nonces", "seed", seed)
search:
for {
select {
case <-abort:
logger.Trace("Ethash nonce search aborted", "attempts", nonce-seed)
ethash.hashrate.Mark(attempts)
break search
default:
attempts++
if (attempts % (1 << 15)) == 0 {
ethash.hashrate.Mark(attempts)
attempts = 0
}
digest, result := hashimotoFull(dataset.dataset, hash, nonce)
if new(big.Int).SetBytes(result).Cmp(target) <= 0 {
header = types.CopyHeader(header)
header.Nonce = types.EncodeNonce(nonce)
header.MixDigest = common.BytesToHash(digest)

select {
case found <- block.WithSeal(header):
logger.Trace("Ethash nonce found and reported", "attempts", nonce-seed, "nonce", nonce)
case <-abort:
logger.Trace("Ethash nonce found but discarded", "attempts", nonce-seed, "nonce", nonce)
}
break search
}
nonce++
}
}
runtime.KeepAlive(dataset)
}

首先是变量的初始化,从区块头部提取一些数据,得到哈希值,目标值等等,注意 target = new(big.Int).Div(maxUint256, header.Difficulty) 这一行, 难度越高,target 也就越小,也就越难得到正确的结果。接下来 nonse 会初始化为 seed 值,然后进入一个死循环,不断增加 nonce 的值,通过调用 hashimotoFull 算法不断尝试,直到找到正确 nonse,写入到 found 这个 chan 里。

接下来进入到 hashimotoFull 算法中,看看 pow 的核心算法,这部分在 ethash/algorithm.go 文件中。

1
2
3
4
5
6
7
func hashimotoFull(dataset []uint32, hash []byte, nonce uint64) ([]byte, []byte) {
lookup := func(index uint32) []uint32 {
offset := index * hashWords
return dataset[offset : offset+hashWords]
}
return hashimoto(hash, nonce, uint64(len(dataset))*4, lookup)
}

注意 hashimotoFullhashimotoLight的区别。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func hashimotoLight(size uint64, cache []uint32, hash []byte, nonce uint64) ([]byte, []byte) {
keccak512 := makeHasher(sha3.NewKeccak512())

lookup := func(index uint32) []uint32 {
rawData := generateDatasetItem(cache, index, keccak512)

data := make([]uint32, len(rawData)/4)
for i := 0; i < len(data); i++ {
data[i] = binary.LittleEndian.Uint32(rawData[i*4:])
}
return data
}
return hashimoto(hash, nonce, size, lookup)
}

hashimotoLightlookup 函数不需要一个完整的 dataset,只需要一个占存储空间很小的 cache,然后临时生成一个 dataset,而 hashimotoFull 是直接从 dataset 拿到所需数据。因此 hashimotoLight 可以用于轻量级客户端的验证。hashimotoLighthashimotoFull 最终会调用 hashimoto,我们继续分析。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func hashimoto(hash []byte, nonce uint64, size uint64, lookup func(index uint32) []uint32) ([]byte, []byte) {
rows := uint32(size / mixBytes)
seed := make([]byte, 40)
copy(seed, hash)
binary.LittleEndian.PutUint64(seed[32:], nonce)
seed = crypto.Keccak512(seed)
seedHead := binary.LittleEndian.Uint32(seed)
mix := make([]uint32, mixBytes/4)
for i := 0; i < len(mix); i++ {
mix[i] = binary.LittleEndian.Uint32(seed[i%16*4:])
}
temp := make([]uint32, len(mix))
for i := 0; i < loopAccesses; i++ {
parent := fnv(uint32(i)^seedHead, mix[i%len(mix)]) % rows
for j := uint32(0); j < mixBytes/hashBytes; j++ {
copy(temp[j*hashWords:], lookup(2*parent+j))
}
fnvHash(mix, temp)
}
for i := 0; i < len(mix); i += 4 {
mix[i/4] = fnv(fnv(fnv(mix[i], mix[i+1]), mix[i+2]), mix[i+3])
}
mix = mix[:len(mix)/4]
digest := make([]byte, common.HashLength)
for i, val := range mix {
binary.LittleEndian.PutUint32(digest[i*4:], val)
}
return digest, crypto.Keccak256(append(seed, digest...))
}

算法的流程可以概括为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def hashimoto(header, nonce, full_size, dataset_lookup):
n = full_size / HASH_BYTES
w = MIX_BYTES // WORD_BYTES
mixhashes = MIX_BYTES / HASH_BYTES
s = sha3_512(header + nonce[::-1])
mix = []
for _ in range(MIX_BYTES / HASH_BYTES):
mix.extend(s)
for i in range(ACCESSES):
p = fnv(i ^ s[0], mix[i % w]) % (n // mixhashes) * mixhashes
newdata = []
for j in range(MIX_BYTES / HASH_BYTES):
newdata.extend(dataset_lookup(p + j))
mix = map(fnv, mix, newdata)
cmix = []
for i in range(0, len(mix), 4):
cmix.append(fnv(fnv(fnv(mix[i], mix[i+1]), mix[i+2]), mix[i+3]))
return {
"mix digest": serialize_hash(cmix),
"result": serialize_hash(sha3_256(s+cmix))
}

其数据流可以用一张图片表示:

hashimoto

上图摘自 [以太坊源代码分析]III. 挖矿和共识算法的奥秘

其流程是(参考以太坊源代码分析]III. 挖矿和共识算法的奥秘的描述):

  • 首先,将 hash 和 nonce 合并成一个40 bytes长的数组,取它的 SHA-512 哈希值取名 seed,长度为64 bytes。
  • 然后,将 seed[] 转化成以 uint32 为元素的数组 mix[],注意一个 uint32 数等于4 bytes,所以 seed[] 只能转化成16个 uint32 数,而 mix[] 数组长度32,所以此时 mix[] 数组前后各半是等值的。
  • 接着,使用 lookup() 函数。用一个循环,不断调用 lookup() 从外部数据集中取出 uint32 元素类型数组,向 mix[] 数组中混入未知的数据。循环的次数可用参数调节,目前设为64次。每次循环中,变化生成参数 index,从而使得每次调用 lookup() 函数取出的数组都各不相同。这里混入数据的方式是一种类似向量『异或』的操作,来自于 FNV 算法
  • 待混淆数据完成后,得到一个基本上面目全非的 mix[],长度为32的 uint32 数组。这时,将其折叠(压缩)成一个长度缩小成原长1/4的uint32数组,折叠的操作方法来自于 FNV 算法。
  • 最后,将折叠后的 mix[] 由长度为8的 uint32 型数组直接转化成一个长度32的 byte 数组,这就是返回值 digest;同时将之前的 seed[] 数组与 digest 合并再取一次 SHA-256 哈希值,得到的长度32的 byte 数组,即返回值 result。

经过多次多种哈希运算,hashimoto 返回两个长度均为32的 byte 数组 digestresult,前文已提到,在 Ethashmine 方法里,挖矿时需要经过一个死循环,直到找到一个 nonce,使得 hashimoto 返回的 resulttarget 是相等的,这时就表示符合要求,digest 被取 SHA3-256 哈希后也会存到区块头的 MixDigest 字段里,待 Ethash.VerifySeal() 进行验证。

VerifySeal 验证挖矿结果

谈完挖矿,我们看看如何验证挖矿结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
func (ethash *Ethash) VerifySeal(chain consensus.ChainReader, header *types.Header) error {
if ethash.config.PowMode == ModeFake || ethash.config.PowMode == ModeFullFake {
time.Sleep(ethash.fakeDelay)
if ethash.fakeFail == header.Number.Uint64() {
return errInvalidPoW
}
return nil
}
if ethash.shared != nil {
return ethash.shared.VerifySeal(chain, header)
}
if header.Difficulty.Sign() <= 0 {
return errInvalidDifficulty
}
number := header.Number.Uint64()

cache := ethash.cache(number)
size := datasetSize(number)
if ethash.config.PowMode == ModeTest {
size = 32 * 1024
}
digest, result := hashimotoLight(size, cache.cache, header.HashNoNonce().Bytes(), header.Nonce.Uint64())
runtime.KeepAlive(cache)

if !bytes.Equal(header.MixDigest[:], digest) {
return errInvalidMixDigest
}
target := new(big.Int).Div(maxUint256, header.Difficulty)
if new(big.Int).SetBytes(result).Cmp(target) > 0 {
return errInvalidPoW
}
return nil
}

VerifySeal 的核心是 digest, result := hashimotoLight(size, cache.cache, header.HashNoNonce().Bytes(), header.Nonce.Uint64()) 这一行,这里不需要一个完整的 dataset,只需要 cache 生成就行了,与挖矿时的区别其实就两点,一,这个 Nonce 是区块头里面的。二,dataset 由 cache 临时生成。计算出 digest 后,与区块头的 MixDigest 进行比较即可。

难度动态调整

难度可以用来度量挖出一个区块平均需要的运算次数。

前面的博文里有提过以太坊的挖矿难度调整算法。其公式是:

1
2
3
本区块难度 = 父区块难度 + 难度调整 + 难度炸弹
难度调整 = 父区块难度 // 2048 * MAX(1 - (block_timestamp - parent_timestamp) // 10, -99)
难度炸弹 = INT(2**((block_number // 100000) - 2))

Clique 算法

Clique 算法只在测试网中使用,有机会再研究。

参考资料