go-ethereum 源码笔记(core 模块-交易池)

上一篇我们探究了以太坊账户,转账的实现,在文末的时候挖了一个坑,调用了提交转账的 api 后我们就没有再继续往下走了。这一篇我们更进一步,看看交易池是怎么实现的。

代码解析

数据结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
type TxPool struct {
config TxPoolConfig
chainconfig *params.ChainConfig
chain blockChain
gasPrice *big.Int
txFeed event.Feed
scope event.SubscriptionScope
chainHeadCh chan ChainHeadEvent
chainHeadSub event.Subscription
signer types.Signer
mu sync.RWMutex
currentState *state.StateDB
pendingState *state.ManagedState
currentMaxGas uint64
locals *accountSet
journal *txJournal
pending map[common.Address]*txList
queue map[common.Address]*txList
beats map[common.Address]time.Time
all *txLookup
priced *txPricedList
wg sync.WaitGroup
homestead bool
}

交易池用来存放当前的网络接收到的交易或本地提交的交易,如果已经加入到区块链中会被移除。

下面是重要字段的描述。

字段 描述
config TxPoolConfig 类型,包含了交易池的配置信息,如 PriceLimit,移除交易的最低 GasPrice 限制;PriceBump,替换相同 Nonce 的交易的价格的百分比;AccountSlots,每个账户 pending 的槽位的最小值;GlobalSlots,全局 pending 队列的最大值;AccountQueue,每个账户的 queueing 的槽位的最小值;GlobalQueue,全局 queueing 的最大值;Lifetime,在队列的最长等待时间
chainconfig 区块链的配置
gasPrice 最低的 GasPrice 限制
txFeed 可以通过 txFeed 来订阅 TxPool 的消息
chainHeadCh 可以通过 chainHeadCh 订阅区块头的消息
signer 封装了事务签名处理

初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain blockChain) *TxPool {
config = (&config).sanitize()
pool := &TxPool{
config: config,
chainconfig: chainconfig,
chain: chain,
signer: types.NewEIP155Signer(chainconfig.ChainID),
pending: make(map[common.Address]*txList),
queue: make(map[common.Address]*txList),
beats: make(map[common.Address]time.Time),
all: newTxLookup(),
chainHeadCh: make(chan ChainHeadEvent, chainHeadChanSize),
gasPrice: new(big.Int).SetUint64(config.PriceLimit),
}
pool.locals = newAccountSet(pool.signer)
pool.priced = newTxPricedList(pool.all)
pool.reset(nil, chain.CurrentBlock().Header())
if !config.NoLocals && config.Journal != "" {
pool.journal = newTxJournal(config.Journal)
if err := pool.journal.load(pool.AddLocals); err != nil {
log.Warn("Failed to load transaction journal", "err", err)
}
if err := pool.journal.rotate(pool.local()); err != nil {
log.Warn("Failed to rotate transaction journal", "err", err)
}
}
pool.chainHeadSub = pool.chain.SubscribeChainHeadEvent(pool.chainHeadCh)
pool.wg.Add(1)
go pool.loop()
return pool
}

初始化交易池后会调用 reset 方法,这个方法会检索区块链当前状态,确保交易池里的内容与区块链状态是匹配的。

NewTxPool 方法里,如果本地可以发起交易,并且配置的 Journal 目录不为空,那么从指定的目录加载交易日志。NewTxPool 方法的最后会用一个 goroutine 调用 loop()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
func (pool *TxPool) loop() {
defer pool.wg.Done()
var prevPending, prevQueued, prevStales int
report := time.NewTicker(statsReportInterval)
defer report.Stop()
evict := time.NewTicker(evictionInterval)
defer evict.Stop()
journal := time.NewTicker(pool.config.Rejournal)
defer journal.Stop()
head := pool.chain.CurrentBlock()
for {
select {
case ev := <-pool.chainHeadCh:
if ev.Block != nil {
pool.mu.Lock()
if pool.chainconfig.IsHomestead(ev.Block.Number()) {
pool.homestead = true
}
pool.reset(head.Header(), ev.Block.Header())
head = ev.Block
pool.mu.Unlock()
}
case <-pool.chainHeadSub.Err():
return
case <-report.C:
pool.mu.RLock()
pending, queued := pool.stats()
stales := pool.priced.stales
pool.mu.RUnlock()
if pending != prevPending || queued != prevQueued || stales != prevStales {
log.Debug("Transaction pool status report", "executable", pending, "queued", queued, "stales", stales)
prevPending, prevQueued, prevStales = pending, queued, stales
}
case <-evict.C:
pool.mu.Lock()
for addr := range pool.queue {
if pool.locals.contains(addr) {
continue
}
if time.Since(pool.beats[addr]) > pool.config.Lifetime {
for _, tx := range pool.queue[addr].Flatten() {
pool.removeTx(tx.Hash(), true)
}
}
}
pool.mu.Unlock()
case <-journal.C:
if pool.journal != nil {
pool.mu.Lock()
if err := pool.journal.rotate(pool.local()); err != nil {
log.Warn("Failed to rotate local tx journal", "err", err)
}
pool.mu.Unlock()
}
}
}
}

loop() 用来接收区块链的事件,负责处理超时的交易和定时写交易日志。

处理交易

先回顾一下。上一篇谈到,转账时,在 submitTransaction 中会调用 SendTxSendTx 的实现在 eth/api_backend.go 中。

1
2
3
func (b *EthApiBackend) SendTx(ctx context.Context, signedTx *types.Transaction) error {
return b.eth.txPool.AddLocal(signedTx)
}

其中 AddLocal 方法在 core/tx_pool.go 中:

1
2
3
func (pool *TxPool) AddLocal(tx *types.Transaction) error {
return pool.addTx(tx, !pool.config.NoLocals)
}

继续看 addTx:

1
2
3
4
5
6
7
8
9
10
11
12
13
func (pool *TxPool) addTx(tx *types.Transaction, local bool) error {
pool.mu.Lock()
defer pool.mu.Unlock()
replace, err := pool.add(tx, local)
if err != nil {
return err
}
if !replace {
from, _ := types.Sender(pool.signer, tx)
pool.promoteExecutables([]common.Address{from})
}
return nil
}

addTx 将交易放入交易池中,pool.add(tx, local) 会返回一个 bool 类型,如果为 true,则表明这笔交易合法并且交易之前不存在于交易池,这时候调用 promoteExecutables,可以将可处理的交易变成待处理。所以说,交易池的交易大致分为两种,一种是提交了但还不能执行的,放在 queue 里等待能够被执行(比如 nonce 太高),还有就是等待执行的,放在 pending 里面等待执行。我们会分别探讨 pool.add(tx, local)pool.promoteExecutables 这两个方法如何处理这两种交易。

pool.add(tx, local)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) {
hash := tx.Hash()
if pool.all[hash] != nil {
log.Trace("Discarding already known transaction", "hash", hash)
return false, fmt.Errorf("known transaction: %x", hash)
}
if err := pool.validateTx(tx, local); err != nil {
log.Trace("Discarding invalid transaction", "hash", hash, "err", err)
invalidTxCounter.Inc(1)
return false, err
}
if uint64(len(pool.all)) >= pool.config.GlobalSlots+pool.config.GlobalQueue {
if pool.priced.Underpriced(tx, pool.locals) {
log.Trace("Discarding underpriced transaction", "hash", hash, "price", tx.GasPrice())
underpricedTxCounter.Inc(1)
return false, ErrUnderpriced
}
drop := pool.priced.Discard(len(pool.all)-int(pool.config.GlobalSlots+pool.config.GlobalQueue-1), pool.locals)
for _, tx := range drop {
log.Trace("Discarding freshly underpriced transaction", "hash", tx.Hash(), "price", tx.GasPrice())
underpricedTxCounter.Inc(1)
pool.removeTx(tx.Hash(), false)
}
}
from, _ := types.Sender(pool.signer, tx)
if list := pool.pending[from]; list != nil && list.Overlaps(tx) {
inserted, old := list.Add(tx, pool.config.PriceBump)
if !inserted {
pendingDiscardCounter.Inc(1)
return false, ErrReplaceUnderpriced
}
if old != nil {
delete(pool.all, old.Hash())
pool.priced.Removed()
pendingReplaceCounter.Inc(1)
}
pool.all[tx.Hash()] = tx
pool.priced.Put(tx)
pool.journalTx(from, tx)
log.Trace("Pooled new executable transaction", "hash", hash, "from", from, "to", tx.To())
go pool.txFeed.Send(TxPreEvent{tx})
return old != nil, nil
}
replace, err := pool.enqueueTx(hash, tx)
if err != nil {
return false, err
}
if local {
pool.locals.add(from)
}
pool.journalTx(from, tx)
log.Trace("Pooled new future transaction", "hash", hash, "from", from, "to", tx.To())
return replace, nil
}

首先,根据交易哈希值,确定交易池中是否已经有这笔交易,如果有,则退出。接下来调用 validateTx 验证交易是否合法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error {
if tx.Size() > 32*1024 {
return ErrOversizedData
}
if tx.Value().Sign() < 0 {
return ErrNegativeValue
}
if pool.currentMaxGas < tx.Gas() {
return ErrGasLimit
}
from, err := types.Sender(pool.signer, tx)
if err != nil {
return ErrInvalidSender
}
local = local || pool.locals.contains(from)
if !local && pool.gasPrice.Cmp(tx.GasPrice()) > 0 {
return ErrUnderpriced
}
if pool.currentState.GetNonce(from) > tx.Nonce() {
return ErrNonceTooLow
}
if pool.currentState.GetBalance(from).Cmp(tx.Cost()) < 0 {
return ErrInsufficientFunds
}
intrGas, err := IntrinsicGas(tx.Data(), tx.To() == nil, pool.homestead)
if err != nil {
return err
}
if tx.Gas() < intrGas {
return ErrIntrinsicGas
}
return nil
}

validateTx 有很多使用 if 语句的条件判断,大致会有如下判断:

  • 拒绝大于 32kb 的交易,防止 DDoS 攻击
  • 拒绝转账金额小于0的交易
  • 拒绝 gas 超过交易池 gas 上限的交易
  • 验证这笔交易的签名是否合法
  • 如果交易不是来自本地的,并且 gas 小于当前交易池中的 gas,拒绝这笔交易
  • 当前用户 nonce 如果大于这笔交易的 nonce,拒绝这笔交易
  • 当前账户余额不足,拒绝这笔交易,queue 和 pending 对应账户的交易会被删除
  • 拒绝当前交易固有花费小于交易池 gas 的交易

判断交易合法后,回到 add 方法,接着判断交易池的容量,如果交易池超过容量了,并且这笔交易的费用低于当前交易池中列表的最小值,拒绝这笔交易;如果这笔交易费用比当前交易池列表最小值高,那么从交易池中移除交易费用最低的交易,为这笔新交易腾出空间,也就是说按照 GasPrice 排出优先级。接着通过调用 Overlaps 通过检查这笔交易的 Nonce 值确认该用户是否已经存在这笔交易,如果已经存在,删除之前的交易,将该交易放入交易池,返回;如果不存在,调用 enqueueTx 将交易放入交易池,如果交易是本地发出的,将发送者保存在交易池的 local 中。注意到 add 方法最后会调用 pool.journalTx(from, tx)

1
2
3
4
5
6
7
8
func (pool *TxPool) journalTx(from common.Address, tx *types.Transaction) {
if pool.journal == nil || !pool.locals.contains(from) {
return
}
if err := pool.journal.insert(tx); err != nil {
log.Warn("Failed to journal local transaction", "err", err)
}
}

本地的交易会使用 journal 的功能存在磁盘,节点重启后可以重写导入。

pool.promoteExecutables

pool.add(tx, local) 方法探讨完了,我们看看 promoteExecutables 方法,该方法的作用是将所有可以处理的交易放入 pending 区,并且移除所有非法交易。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
func (pool *TxPool) promoteExecutables(accounts []common.Address) {
if accounts == nil {
accounts = make([]common.Address, 0, len(pool.queue))
for addr := range pool.queue {
accounts = append(accounts, addr)
}
}
for _, addr := range accounts {
list := pool.queue[addr]
if list == nil {
continue
}
for _, tx := range list.Forward(pool.currentState.GetNonce(addr)) {
hash := tx.Hash()
log.Trace("Removed old queued transaction", "hash", hash)
delete(pool.all, hash)
pool.priced.Removed()
}
drops, _ := list.Filter(pool.currentState.GetBalance(addr), pool.currentMaxGas)
for _, tx := range drops {
hash := tx.Hash()
log.Trace("Removed unpayable queued transaction", "hash", hash)
delete(pool.all, hash)
pool.priced.Removed()
queuedNofundsCounter.Inc(1)
}
for _, tx := range list.Ready(pool.pendingState.GetNonce(addr)) {
hash := tx.Hash()
log.Trace("Promoting queued transaction", "hash", hash)
pool.promoteTx(addr, hash, tx)
}
if !pool.locals.contains(addr) {
for _, tx := range list.Cap(int(pool.config.AccountQueue)) {
hash := tx.Hash()
delete(pool.all, hash)
pool.priced.Removed()
queuedRateLimitCounter.Inc(1)
log.Trace("Removed cap-exceeding queued transaction", "hash", hash)
}
}
if list.Empty() {
delete(pool.queue, addr)
}
}
pending := uint64(0)
for _, list := range pool.pending {
pending += uint64(list.Len())
}
if pending > pool.config.GlobalSlots {
pendingBeforeCap := pending
spammers := prque.New()
for addr, list := range pool.pending {
if !pool.locals.contains(addr) && uint64(list.Len()) > pool.config.AccountSlots {
spammers.Push(addr, float32(list.Len()))
}
}
offenders := []common.Address{}
for pending > pool.config.GlobalSlots && !spammers.Empty() {
offender, _ := spammers.Pop()
offenders = append(offenders, offender.(common.Address))
if len(offenders) > 1 {
threshold := pool.pending[offender.(common.Address)].Len()
for pending > pool.config.GlobalSlots && pool.pending[offenders[len(offenders)-2]].Len() > threshold {
for i := 0; i < len(offenders)-1; i++ {
list := pool.pending[offenders[i]]
for _, tx := range list.Cap(list.Len() - 1) {
hash := tx.Hash()
delete(pool.all, hash)
pool.priced.Removed()
if nonce := tx.Nonce(); pool.pendingState.GetNonce(offenders[i]) > nonce {
pool.pendingState.SetNonce(offenders[i], nonce)
}
log.Trace("Removed fairness-exceeding pending transaction", "hash", hash)
}
pending--
}
}
}
}
if pending > pool.config.GlobalSlots && len(offenders) > 0 {
for pending > pool.config.GlobalSlots && uint64(pool.pending[offenders[len(offenders)-1]].Len()) > pool.config.AccountSlots {
for _, addr := range offenders {
list := pool.pending[addr]
for _, tx := range list.Cap(list.Len() - 1) {
hash := tx.Hash()
delete(pool.all, hash)
pool.priced.Removed()
if nonce := tx.Nonce(); pool.pendingState.GetNonce(addr) > nonce {
pool.pendingState.SetNonce(addr, nonce)
}
log.Trace("Removed fairness-exceeding pending transaction", "hash", hash)
}
pending--
}
}
}
pendingRateLimitCounter.Inc(int64(pendingBeforeCap - pending))
}
queued := uint64(0)
for _, list := range pool.queue {
queued += uint64(list.Len())
}
if queued > pool.config.GlobalQueue {
addresses := make(addresssByHeartbeat, 0, len(pool.queue))
for addr := range pool.queue {
if !pool.locals.contains(addr) {
addresses = append(addresses, addressByHeartbeat{addr, pool.beats[addr]})
}
}
sort.Sort(addresses)
for drop := queued - pool.config.GlobalQueue; drop > 0 && len(addresses) > 0; {
addr := addresses[len(addresses)-1]
list := pool.queue[addr.address]
addresses = addresses[:len(addresses)-1]
if size := uint64(list.Len()); size <= drop {
for _, tx := range list.Flatten() {
pool.removeTx(tx.Hash(), true)
}
drop -= size
queuedRateLimitCounter.Inc(int64(size))
continue
}
txs := list.Flatten()
for i := len(txs) - 1; i >= 0 && drop > 0; i-- {
pool.removeTx(txs[i].Hash(), true)
drop--
queuedRateLimitCounter.Inc(1)
}
}
}
}

该方法首先遍历所有当前账户交易,通过 list.Forward 方法迭代当前账户,检查 nonce,如果 nonce 太低,删除该交易。接着通过 list.Filter 方法检查余额不足或 gas 不足的交易,删除不满足的交易。这时得到的所有可执行的交易,通过调用 promoteTx 加入到 pending 里,接着移除超过了限制的交易。对于已经加入到 promoted 的交易,调用 pool.txFeed.Send 将消息发给订阅者,在 eth 协议里,这个交易会被广播出去。

经过上面的处理,pending 的数量可能会超过系统配置的数量,这时需要进行一些处理,移除一些交易。

pending 处理完后,继续处理 future queue,队列里的数量也可能会超过 GlobalQueue 里的数量,根据心跳时间排列所有交易,移除最旧的交易。

tx_list.go

前文中有很多地方用到 txList 这个结构体里的方法,这些方法都在 core/tx_list.go 里,算是交易池的工具箱了,代码也有一些参考价值,所以分出一个独立的小节,选几个典型的方法讲解。

nonceHeap

我们在上文中知道维护交易池在很多情况下都需要一个根据 nonce 值排序的优先级队列,如果用堆来实现优先级队列,插入,删除的性能是 $O(log n)$,在 Golang 中,container/heap 包定义了堆的接口:

1
2
3
4
5
type Interface interface {
sort.Interface
Push(x interface{})
Pop() interface{}
}

其中 sort.Interface 为:

1
2
3
4
5
type Interface interface {
Len() int
Less(i, j int) bool
Swap(i, j int)
}

只需要实现 LenLessSwapPushPop 这几个接口就可以实现堆。nonceHeap 实现了一个以 nonce 为基准的堆。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
type nonceHeap []uint64
func (h nonceHeap) Len() int { return len(h) }
func (h nonceHeap) Less(i, j int) bool { return h[i] < h[j] }
func (h nonceHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h *nonceHeap) Push(x interface{}) {
*h = append(*h, x.(uint64))
}
func (h *nonceHeap) Pop() interface{} {
old := *h
n := len(old)
x := old[n-1]
*h = old[0 : n-1]
return x
}

txSortedMap

txSortedMap 用来存储同一个账户下的所有交易。

1
2
3
4
5
6
7
8
9
10
11
12
type txSortedMap struct {
items map[uint64]*types.Transaction
index *nonceHeap
cache types.Transactions
}
func newTxSortedMap() *txSortedMap {
return &txSortedMap{
items: make(map[uint64]*types.Transaction),
index: new(nonceHeap),
}
}

其中 items 是交易数据的 hash map,以 nonce 为索引,index 是以 nonce 为基准的优先级队列,cache 用来缓存已经排好序的交易。

Get
1
2
3
func (m *txSortedMap) Get(nonce uint64) *types.Transaction {
return m.items[nonce]
}

Get 方法获取指定 nonce 的交易。

Put
1
2
3
4
5
6
7
func (m *txSortedMap) Put(tx *types.Transaction) {
nonce := tx.Nonce()
if m.items[nonce] == nil {
heap.Push(m.index, nonce)
}
m.items[nonce], m.cache = tx, nil
}

Put 方法将交易插入到 map 中,同时更新 items。

Forward
1
2
3
4
5
6
7
8
9
10
11
12
func (m *txSortedMap) Forward(threshold uint64) types.Transactions {
var removed types.Transactions
for m.index.Len() > 0 && (*m.index)[0] < threshold {
nonce := heap.Pop(m.index).(uint64)
removed = append(removed, m.items[nonce])
delete(m.items, nonce)
}
if m.cache != nil {
m.cache = m.cache[len(removed):]
}
return removed
}

Forward 可以用来删除所有 nonce 小于 threshold 的交易,返回的是所有被移除的交易。

Filter
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func (m *txSortedMap) Filter(filter func(*types.Transaction) bool) types.Transactions {
var removed types.Transactions
for nonce, tx := range m.items {
if filter(tx) {
removed = append(removed, tx)
delete(m.items, nonce)
}
}
if len(removed) > 0 {
*m.index = make([]uint64, 0, len(m.items))
for nonce := range m.items {
*m.index = append(*m.index, nonce)
}
heap.Init(m.index)
m.cache = nil
}
return removed
}

Filter 接收 filter 函数,删除所有使得 filter 函数调用返回 true 的交易,返回这些被移除的交易。

Cap
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func (m *txSortedMap) Cap(threshold int) types.Transactions {
if len(m.items) <= threshold {
return nil
}
var drops types.Transactions
sort.Sort(*m.index)
for size := len(m.items); size > threshold; size-- {
drops = append(drops, m.items[(*m.index)[size-1]])
delete(m.items, (*m.index)[size-1])
}
*m.index = (*m.index)[:threshold]
heap.Init(m.index)
if m.cache != nil {
m.cache = m.cache[:len(m.cache)-len(drops)]
}
return drops
}

Cap 根据 threshold 参数对 items 参数进行限制,删除超出的交易,重建堆,返回这些被移除的交易。

Remove
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func (m *txSortedMap) Remove(nonce uint64) bool {
_, ok := m.items[nonce]
if !ok {
return false
}
for i := 0; i < m.index.Len(); i++ {
if (*m.index)[i] == nonce {
heap.Remove(m.index, i)
break
}
}
delete(m.items, nonce)
m.cache = nil
return true
}

根据 nonce 从堆里移除交易,如果没有这个交易返回 false

Ready
1
2
3
4
5
6
7
8
9
10
11
12
13
14
func (m *txSortedMap) Ready(start uint64) types.Transactions {
if m.index.Len() == 0 || (*m.index)[0] > start {
return nil
}
var ready types.Transactions
for next := (*m.index)[0]; m.index.Len() > 0 && (*m.index)[0] == next; next++ {
ready = append(ready, m.items[next])
delete(m.items, next)
heap.Pop(m.index)
}
m.cache = nil
return ready
}

Ready 返回从指定 nonce 开始,连续的交易。

Flatten
1
2
3
4
5
6
7
8
9
10
11
12
func (m *txSortedMap) Flatten() types.Transactions {
if m.cache == nil {
m.cache = make(types.Transactions, 0, len(m.items))
for _, tx := range m.items {
m.cache = append(m.cache, tx)
}
sort.Sort(types.TxByNonce(m.cache))
}
txs := make(types.Transactions, len(m.cache))
copy(txs, m.cache)
return txs
}

返回一个基于 nonce 排序的交易列表,缓存到 cache 字段里。

txList

txList 用来存储连续的可执行的交易。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
type txList struct {
strict bool
txs *txSortedMap
costcap *big.Int
gascap uint64
}
func newTxList(strict bool) *txList {
return &txList{
strict: strict,
txs: newTxSortedMap(),
costcap: new(big.Int),
}
}
Add
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func (l *txList) Add(tx *types.Transaction, priceBump uint64) (bool, *types.Transaction) {
old := l.txs.Get(tx.Nonce())
if old != nil {
threshold := new(big.Int).Div(new(big.Int).Mul(old.GasPrice(), big.NewInt(100+int64(priceBump))), big.NewInt(100))
if old.GasPrice().Cmp(tx.GasPrice()) >= 0 || threshold.Cmp(tx.GasPrice()) > 0 {
return false, nil
}
}
l.txs.Put(tx)
if cost := tx.Cost(); l.costcap.Cmp(cost) < 0 {
l.costcap = cost
}
if gas := tx.Gas(); l.gascap < gas {
l.gascap = gas
}
return true, old
}

Add 方法会尝试插入一个交易,如果新的交易比老的交易的 GasPrice 值高出一定的数量,则会替换老的交易。

Filter
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func (l *txList) Filter(costLimit *big.Int, gasLimit uint64) (types.Transactions, types.Transactions) {
if l.costcap.Cmp(costLimit) <= 0 && l.gascap <= gasLimit {
return nil, nil
}
l.costcap = new(big.Int).Set(costLimit)
l.gascap = gasLimit
removed := l.txs.Filter(func(tx *types.Transaction) bool { return tx.Cost().Cmp(costLimit) > 0 || tx.Gas() > gasLimit })
var invalids types.Transactions
if l.strict && len(removed) > 0 {
lowest := uint64(math.MaxUint64)
for _, tx := range removed {
if nonce := tx.Nonce(); lowest > nonce {
lowest = nonce
}
}
invalids = l.txs.Filter(func(tx *types.Transaction) bool { return tx.Nonce() > lowest })
}
return removed, invalids
}

Filter 方法根据参数 costgasLimit 的值移除所有比该值更高的交易,被移除的交易会返回以便进一步处理。

Remove
1
2
3
4
5
6
7
8
9
10
func (l *txList) Remove(tx *types.Transaction) (bool, types.Transactions) {
nonce := tx.Nonce()
if removed := l.txs.Remove(nonce); !removed {
return false, nil
}
if l.strict {
return true, l.txs.Filter(func(tx *types.Transaction) bool { return tx.Nonce() > nonce })
}
return true, nil
}

删除给定 Nonce 的交易,如果是严格模式,删除所有 nonce 大于给定 Nonce 的交易。

Overlaps, Forward, Cap, Ready, Empty, Empty, Flatten
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func (l *txList) Overlaps(tx *types.Transaction) bool {
return l.txs.Get(tx.Nonce()) != nil
}
func (l *txList) Forward(threshold uint64) types.Transactions {
return l.txs.Forward(threshold)
}
func (l *txList) Cap(threshold int) types.Transactions {
return l.txs.Cap(threshold)
}
func (l *txList) Ready(start uint64) types.Transactions {
return l.txs.Ready(start)
}
func (l *txList) Len() int {
return l.txs.Len()
}
func (l *txList) Empty() bool {
return l.Len() == 0
}
func (l *txList) Flatten() types.Transactions {
return l.txs.Flatten()
}

Overlaps, Forward, Cap, Ready, Empty, Empty, Flatten 直接调用 txSortedMap 的对应方法。

priceHeap

priceHeap 类似于上面提到的 nonceHeap,不过比较优先级时,优先比较 GasPrice,如果相同则比较 Nonce。这里不再赘述

txPricedList

txPricedList 类似于上面提到的 txList,其中有 Put, Removed, Cap, Underpriced, Discard 方法。

总结

以上就是交易池的主要逻辑,在提交交易之后,数据会通过 geth 的 p2p 系统广播出去,接下来就等矿工进行挖矿了。

References