go-ethereum 源码笔记(core, eth 模块-链的索引,搜索)

阅读这部分代码之前需要对 Solidity 里的事件,以太坊的日志有所了解。

基础知识

事件和日志

在以太坊中,事件和日志的概念经常被混用可能会让人觉得困惑,所以我们先理清这两者关系。在 Solidity 和 web3.js 里,这个概念叫做 Event,常用中文翻译是事件,它的作用是在智能合约的运行中,将一些比较重要的数据记录到区块链上,在 geth 里称为日志,这些日志与合约地址是相关联的,geth 会不断地对事件,或者说日志进行索引,也会通过暴露 API 的方式让用户可以过滤,查找日志。比较典型的应用场景是,用户在页面上调用了智能合约,在智能合约里发生了一次转账的操作,这时记录一个事件,应用程序通过 web3.js 监听事件,一旦事件发生了,可以触发回调函数,进行下一步处理,比如在 UI 上做出响应。

每一笔交易的收据可能会包含0个到多个日志记录,它们代表着智能合约在运行过程中触发的事件。

布隆过滤器

在 geth 中,日志的索引,过滤通过布隆过滤器来实现。

布隆过滤器可以用来判断一个元素是否在一个集合中,对于这种场景,最简单的方式是采用 HashTable 的方式来存储,好处是快速,精确,但会耗费大量的存储空间。1970年布隆提出一种空间复杂度和空间复杂度都很低的数据结构,利用位数组来表示一个集合,键值经过 k 个独立哈希函数后,将对应的值的位数置1,查找时进行同样的哈希计算,如果对应位全为1说明该值存在,它的缺点是有一定的错误率,它不能告诉你某个元素一定在集合内,但它可以告诉你某个元素一定不在集合内或可能在集合内。误算率的大小与数据的数量,布隆过滤器的长度,以及哈希函数有关。

如果想要深入了解布隆过滤器,建议浏览:

误算率推导

假设布隆过滤器有 m 比特,里面有 n 个元素,每个元素对应 k 个哈希函数处理后的比特位,插入一个元素时,哈希函数会把过滤器中的某个比特位置置为1,对于一个特定的位置,如果这个元素经哈希过的 k 个比特位都没有把它置为 1,其概率是:

$\left(1-\frac{1}{m}\right)^k$

如果插入了 n 个元素还没有把某个特定的比特位置为1,其概率是:

$\left(1-\frac{1}{m}\right)^{kn}$

因此,插入了 n 个元素,某个特定比特位置为1的概率为:

$1-\left(1-\frac{1}{m}\right)^{kn}$

对于一个错误识别的元素,经哈希过的 k 个比特位均为1,概率为:

$\left(1-\left[1-\frac{1}{m}\right]^{kn}\right)^k$

有 $\lim_{x\to0}\frac{e^x-1}{x}=1$ ,则 $m\to\infty$时,上述公式近似于:

$\left( 1-e^{\frac{-kn}{m}} \right)^k$

设 $a=e^{\frac{n}{m}}$, 错误识别的概率为:

$f(k) = (1-a^{-k})^{k}$

我们需要知道,在 k 取什么值时,$f(k)$ 可以取到最值。先两边取对数:

$\frac{f’(k)}{f(k)}=\ln\left(1-a^{-k}\right)+k\cdot\frac{a^{-k}\cdot\ln{a}}{1-a^{-k}}$

令导数 $f’(k)$为0,有:

$\ln\left(1-a^{-k}\right)+k\cdot\frac{a^{-k}\cdot\ln{a}}{1-a^{-k}}=0$

$(1-a^{-k})\cdot\ln\left(1-a^{-k}\right)=a^{-k}\cdot\ln{a}^{-k}$

对于 $f(x)=xlnx$,可证,不存在 $x_1+x_2=1$,使得 $f(x_1) = f(x_2)$

可得 $a^{k}=2$

即 $k=\frac{m}{n}\cdot ln{2}$ 时,误判率 $f(k)$ 可取到最小值,为 $2^{-ln 2\cdot\frac{m}{n}}$

还有一个结论,当 $k=\frac{m}{n}\cdot ln{2}$,$\left(1-\frac{1}{m}\right)^{kn}=\frac{1}{2}$ 也就是说某个 bit 位在插入 n 个元素后未被置1的概率为 $\frac{1}{2}$,即空间使用率为 50%,也就是说,要保持错误率低,布隆过滤器的空间使用率应小于 50%。根据空间使用率,我们可以推导 n,k 确定的情况下,要保持错误率最低,所需的最小内存空间(即 m 的大小)是多少,这个值应该为 n*k 的两倍。

应用

  • Bigtable 使用布隆过滤器查找不存在的行或列,以减少磁盘查找的 IO 次数
  • Google Chrome 浏览器使用布隆过滤器加速安全浏览服务
  • LevelDB, Hbase, Accumulo 等 key-value 数据库使用布隆过滤器加速查询过程,避免很多不必要的磁盘 IO 操作

geth 的实现

geth 的索引,过滤功能包括对交易,区块,区块头,最新日志,特定日志的过滤,代码涉及到 eth/filters, eth/bloombits.go, core/chain_indexer.go, core/bloombits 等模块。内部的实现其实挺复杂,重构一下可能会好很多,深入到这些功能的实现,还是要有章法,采用从上之下的方式是比较好的,否则直接阅读内部实现的话,可能线索会比较错乱,不如先从 JSON-RPC 提供的几个 API 入手,从入口开始剖析。geth 的索引过滤功能底层的实现主要依赖布隆过滤器,读者如果在阅读下文之前就熟悉布隆过滤器自然再好不过,不了解的话可以先从 API 调用入手,在过滤功能的使用这小节结束后再好好学习相关基础知识以加强理解。

过滤功能的使用

这里指的使用是指比较高层次的,通过 web3.js 可以直接进行交互的调用。

使用布隆过滤器的代码主要在 eth/filter 文件夹里。

eth/filter 模块比较直观地提供了日志过滤的功能,这里我们可以先将布隆过滤器的实现视为黑盒,看看所谓的日志,收据的过滤到底是一个什么功能。

先看 eth/filters/api.go 里的内容。

api.go

数据结构
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
type filter struct {
typ Type
deadline *time.Timer
hashes []common.Hash
crit FilterCriteria
logs []*types.Log
s *Subscription
}
type PublicFilterAPI struct {
backend Backend
mux *event.TypeMux
quit chan struct{}
chainDb ethdb.Database
events *EventSystem
filtersMu sync.Mutex
filters map[rpc.ID]*filter
}

其中 filter 结构体用来存储过滤器的一些元信息,PublicFilterAPI 用来创建和管理这些过滤器,供外部客户端调用,它暴露的 API 在 Backend 里定义,这部分内容在 eth/filters/filter.go 里面,在之后会详细看这部分。

初始化

PublicFilterAPI 初始化的逻辑很简单,就一个 New 外加 go api.timeoutLoop() 一行而已,这里不再给出初始化的代码了。由于需要注册 API,同其他 backend 一样,调用 NewPublicFilterAPI 进行服务注册的部分在 eth/backend.go 中,有兴趣的话读者可以自己看看。

超时检测
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func (api *PublicFilterAPI) timeoutLoop() {
ticker := time.NewTicker(5 * time.Minute)
for {
<-ticker.C
api.filtersMu.Lock()
for id, f := range api.filters {
select {
case <-f.deadline.C:
f.s.Unsubscribe()
delete(api.filters, id)
default:
continue
}
}
api.filtersMu.Unlock()
}
}

PublicFilterAPI 初始化时只用了一个 goroutine 来启动 timeoutLoop()timeoutLoop() 做的事情很简单,对于加入到 PublicFilterAPI 的 filters,每过5分钟检查一次,如果过期了就删除 filter。

增加 filter

PublicFilterAPI 的主循环会处理 filter,那么 filter 是怎么添加的,其实是通过 NewPendingTransactionFilter 来添加,这个 API 也可以通过 JSON-RPC 直接调用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func (api *PublicFilterAPI) NewPendingTransactionFilter() rpc.ID {
var (
pendingTxs = make(chan []common.Hash)
pendingTxSub = api.events.SubscribePendingTxs(pendingTxs)
)
api.filtersMu.Lock()
api.filters[pendingTxSub.ID] = &filter{typ: PendingTransactionsSubscription, deadline: time.NewTimer(deadline), hashes: make([]common.Hash, 0), s: pendingTxSub}
api.filtersMu.Unlock()
go func() {
for {
select {
case ph := <-pendingTxs:
api.filtersMu.Lock()
if f, found := api.filters[pendingTxSub.ID]; found {
f.hashes = append(f.hashes, ph...)
}
api.filtersMu.Unlock()
case <-pendingTxSub.Err():
api.filtersMu.Lock()
delete(api.filters, pendingTxSub.ID)
api.filtersMu.Unlock()
return
}
}
}()
return pendingTxSub.ID
}

对于非长连接的情况,也就是说对于使用 HTTP 的方式进行过滤的情况,可以使用这个 API,它可以用来创建一个 filter,当有新交易时,会存储到 filter 的哈希池里。

然后客户端可以通过轮询该 filter 来获取过滤的结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func (api *PublicFilterAPI) GetFilterChanges(id rpc.ID) (interface{}, error) {
api.filtersMu.Lock()
defer api.filtersMu.Unlock()
if f, found := api.filters[id]; found {
if !f.deadline.Stop() {
<-f.deadline.C
}
f.deadline.Reset(deadline)
switch f.typ {
case PendingTransactionsSubscription, BlocksSubscription:
hashes := f.hashes
f.hashes = nil
return returnHashes(hashes), nil
case LogsSubscription:
logs := f.logs
f.logs = nil
return returnLogs(logs), nil
}
}
return []interface{}{}, fmt.Errorf("filter not found")
}

这部分也可以通过 JSON-RPC 进行调用。

对于长连接的情况,可以使用 rpc 包的发送订阅模式功能,直接调用 NewPendingTransactions

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func (api *PublicFilterAPI) NewPendingTransactions(ctx context.Context) (*rpc.Subscription, error) {
notifier, supported := rpc.NotifierFromContext(ctx)
if !supported {
return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
}
rpcSub := notifier.CreateSubscription()
go func() {
txHashes := make(chan []common.Hash, 128)
pendingTxSub := api.events.SubscribePendingTxs(txHashes)
for {
select {
case hashes := <-txHashes:
for _, h := range hashes {
notifier.Notify(rpcSub.ID, h)
}
case <-rpcSub.Err():
pendingTxSub.Unsubscribe()
return
case <-notifier.Closed():
pendingTxSub.Unsubscribe()
return
}
}
}()
return rpcSub, nil
}

以上是对交易的过滤,此外 PublicFilterAPI 还提供了对区块,区块头,最新日志的过滤,如果有兴趣的话可以看 func (api *PublicFilterAPI) NewBlockFilter(), func (api *PublicFilterAPI) NewHeads, func (api *PublicFilterAPI) Logs 这几个方法的实现。这些只是交易,区块等内容的过滤,对于获取特定条件的日志的功能则在 NewFilter, GetLogs, GetFilterLogs, GetFilterChanges 这几个方法的帮助下完成。比如 GetLogs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func (api *PublicFilterAPI) GetLogs(ctx context.Context, crit FilterCriteria) ([]*types.Log, error) {
var filter *Filter
if crit.BlockHash != nil {
filter = NewBlockFilter(api.backend, *crit.BlockHash, crit.Addresses, crit.Topics)
} else {
begin := rpc.LatestBlockNumber.Int64()
if crit.FromBlock != nil {
begin = crit.FromBlock.Int64()
}
end := rpc.LatestBlockNumber.Int64()
if crit.ToBlock != nil {
end = crit.ToBlock.Int64()
}
filter = NewRangeFilter(api.backend, begin, end, crit.Addresses, crit.Topics)
}
logs, err := filter.Logs(ctx)
if err != nil {
return nil, err
}
return returnLogs(logs), err
}

可以根据开始区块,结束区块,地址,Topics 来获取符合条件的日志。可以看到获取日志主要通过 filter 的 Logs 方法得到,这就引入了 Filter 对象,这部分内容在 eth/filters/filter.go 里。

filter.go

数据结构
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
type Backend interface {
ChainDb() ethdb.Database
EventMux() *event.TypeMux
HeaderByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*types.Header, error)
GetReceipts(ctx context.Context, blockHash common.Hash) (types.Receipts, error)
GetLogs(ctx context.Context, blockHash common.Hash) ([][]*types.Log, error)
SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription
SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription
SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription
SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription
BloomStatus() (uint64, uint64)
ServiceFilter(ctx context.Context, session *bloombits.MatcherSession)
}
type Filter struct {
backend Backend
db ethdb.Database
begin, end int64
addresses []common.Address
topics [][]common.Hash
matcher *bloombits.Matcher
}

Filter 结构体中定义了 Backend 接口,Backend 接口定义了布隆过滤器实现的接口,这些接口的实现是在 eth/api_backend.go 里的 L212-234,可以说是比较低层次的 API。而 Matcher 是布隆过滤器的 Matcher,这部分我们将在 Matcher 进行分析

Filter 的初始化
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func New(backend Backend, begin, end int64, addresses []common.Address, topics [][]common.Hash) *Filter {
var filters [][][]byte
if len(addresses) > 0 {
filter := make([][]byte, len(addresses))
for i, address := range addresses {
filter[i] = address.Bytes()
}
filters = append(filters, filter)
}
for _, topicList := range topics {
filter := make([][]byte, len(topicList))
for i, topic := range topicList {
filter[i] = topic.Bytes()
}
filters = append(filters, filter)
}
size, _ := backend.BloomStatus()
return &Filter{
backend: backend,
begin: begin,
end: end,
addresses: addresses,
topics: topics,
db: backend.ChainDb(),
matcher: bloombits.NewMatcher(size, filters),
}
}

如果 Filter 的参数中包含 addresses 或 topics,则都将其加入到 filters 容器中,在 matcher 的阶段会用到。

过滤日志
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) {
header, _ := f.backend.HeaderByNumber(ctx, rpc.LatestBlockNumber)
if header == nil {
return nil, nil
}
head := header.Number.Uint64()
if f.begin == -1 {
f.begin = int64(head)
}
end := uint64(f.end)
if f.end == -1 {
end = head
}
var (
logs []*types.Log
err error
)
size, sections := f.backend.BloomStatus()
if indexed := sections * size; indexed > uint64(f.begin) {
if indexed > end {
logs, err = f.indexedLogs(ctx, end)
} else {
logs, err = f.indexedLogs(ctx, indexed-1)
}
if err != nil {
return logs, err
}
}
rest, err := f.unindexedLogs(ctx, end)
logs = append(logs, rest...)
return logs, err
}

Logs 会从区块链中找到匹配的 log 项。首先根据最新的区块号拿到最新区块,这样如果没有 end 参数的,搜索的区间的 end 就是最新区块。接着调用 BloomStatus() 拿到索引的状态,需要注意的是,尽管每个区块头都有 logBloom,日志的过滤不会直接依次检索这些区块头的 logBloom,因为遍历区块头的话效率太低了,需要多次的磁盘 IO,geth 会在 LevelDB 里维护另一套索引,以4096个区块为一个 section,在一个 section 内的 logBloom 会存在一起,所以对于位于已索引的区块这一区间的搜索,会调用 indexedLogs 进行搜索,对于在已索引区间外的区块,会调用 unindexedLogs 进行搜索。我们先看 indexedLogs 的实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
func (f *Filter) indexedLogs(ctx context.Context, end uint64) ([]*types.Log, error) {
matches := make(chan uint64, 64)
session, err := f.matcher.Start(ctx, uint64(f.begin), end, matches)
if err != nil {
return nil, err
}
defer session.Close()
f.backend.ServiceFilter(ctx, session)
for {
select {
case number, ok := <-matches:
if !ok {
err := session.Error()
if err == nil {
f.begin = int64(end) + 1
}
return logs, err
}
f.begin = int64(number) + 1
header, err := f.backend.HeaderByNumber(ctx, rpc.BlockNumber(number))
if header == nil || err != nil {
return logs, err
}
found, err := f.checkMatches(ctx, header)
if err != nil {
return logs, err
}
logs = append(logs, found...)
case <-ctx.Done():
return logs, ctx.Err()
}
}
}

indexedLogs 会调用 Matcher 的 Start 方法启动 session,其结果会返回到 matches 这个 channel 里,找到区块后,通过调用 checkMatches 以验证该区块确实在区块链中(因为布隆过滤器有一定几率误判),验证完成后,结果会增加到 logs 这个变量中,直到接收到 Done 消息,然后返回 logs。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func (f *Filter) checkMatches(ctx context.Context, header *types.Header) (logs []*types.Log, err error) {
logsList, err := f.backend.GetLogs(ctx, header.Hash())
if err != nil {
return nil, err
}
var unfiltered []*types.Log
for _, logs := range logsList {
unfiltered = append(unfiltered, logs...)
}
logs = filterLogs(unfiltered, nil, nil, f.addresses, f.topics)
if len(logs) > 0 {
if logs[0].TxHash == (common.Hash{}) {
receipts, err := f.backend.GetReceipts(ctx, header.Hash())
if err != nil {
return nil, err
}
unfiltered = unfiltered[:0]
for _, receipt := range receipts {
unfiltered = append(unfiltered, receipt.Logs...)
}
logs = filterLogs(unfiltered, nil, nil, f.addresses, f.topics)
}
return logs, nil
}
return nil, nil
}

checkMatches 做的事情简单来说就是根据 header 的哈希值,从 backend 拿到所有的收据,然后调用 filterLogs 对 topics 一一进行匹配,匹配上了则可以确定该 logs 确实在区块链中(布隆过滤器没有误判)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
func filterLogs(logs []*types.Log, fromBlock, toBlock *big.Int, addresses []common.Address, topics [][]common.Hash) []*types.Log {
var ret []*types.Log
Logs:
for _, log := range logs {
if fromBlock != nil && fromBlock.Int64() >= 0 && fromBlock.Uint64() > log.BlockNumber {
continue
}
if toBlock != nil && toBlock.Int64() >= 0 && toBlock.Uint64() < log.BlockNumber {
continue
}
if len(addresses) > 0 && !includes(addresses, log.Address) {
continue
}
if len(topics) > len(log.Topics) {
continue Logs
}
for i, topics := range topics {
fmt.Println(topics)
match := len(topics) == 0
for _, topic := range topics {
if log.Topics[i] == topic {
match = true
break
}
}
if !match {
continue Logs
}
}
ret = append(ret, log)
}
return ret
}

filterLogs 方法的逻辑很简单,遍历 topics,如果能和 log 中的 Topics 匹配上则添加到需返回的 logs 中。

indexedLogs 方法是对有索引的日志的查询,前面有介绍到在 Logs 方法中,还有一个对未索引日志的查询,即 unindexedLogs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func (f *Filter) unindexedLogs(ctx context.Context, end uint64) ([]*types.Log, error) {
var logs []*types.Log
for ; f.begin <= int64(end); f.begin++ {
header, err := f.backend.HeaderByNumber(ctx, rpc.BlockNumber(f.begin))
if header == nil || err != nil {
return logs, err
}
if bloomFilter(header.Bloom, f.addresses, f.topics) {
found, err := f.checkMatches(ctx, header)
if err != nil {
return logs, err
}
logs = append(logs, found...)
}
}
return logs, nil
}

该方法会根据传入的 end,遍历所有区块,首先通过 header 的 Bloom 判断 topics 是否可能存在,如果有可能存在,直接调用 checkMatches 来查看是否匹配。至此通过对已索引,未索引的日志进行过滤,就可以通过条件获取特定的日志。

以上就是对 geth 过滤功能的概览,可以说是过滤功能的高层次调用,接下来我们将深入这些模块的实现。

创建索引的实现

要实现布隆过滤器,首先需要给区块链创建索引,这样才能实现快速响应用户的日志搜索功能。这部分代码主要在 core/chain_indexer 模块里。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
type ChainIndexerBackend interface {
Reset(section uint64, prevHead common.Hash) error
Process(header *types.Header)
Commit() error
}
type ChainIndexerChain interface {
SubscribeChainEvent(ch chan<- ChainEvent) event.Subscription
}
type ChainIndexer struct {
chainDb ethdb.Database
indexDb ethdb.Database
backend ChainIndexerBackend
children []*ChainIndexer
active uint32
update chan struct{}
quit chan chan error
sectionSize uint64
confirmsReq uint64
storedSections uint64
knownSections uint64
cascadedHead uint64
throttling time.Duration
log log.Logger
lock sync.RWMutex
}

首先需要了解一些定义,这部分代码中经常出现的 section 是指一组区块头,而这个一组的数量默认为 4096。

ChainIndexerBackend 是一个接口,它定义了处理区块链 section 的方法,这个接口目前有 BloomIndexer 这个实现。其中 Reset(section uint64) 用来初始化一个新的区块链 section,可能会终止任何没有完成的操作;Process(header *types.Header) 对区块链 section 中的下一个区块头进行处理,增加新区块头到 index,调用者需要确保区块头的连续顺序;Commit() error 完成区块链 section 的元数据提交,并将其存储到数据库。

以下是 ChainIndexer 结构体中较重要的一些属性:

属性 描述
chainDb 区块链所在的数据库
indexDb 索引所在的数据库
backend 生成索引的后端,它实现了 ChainIndexerBacken 所定义的接口,这里的实现我们只探讨 eth/bloombits 中的 BloomIndexer,在 light 模式中有其他实现
children 子链的索引,这是为了处理临时分叉的情况
active 事件循环是否开始的标志
update 新生成区块头发送到这个 channel
quit 退出事件循环的 channel
sectionSize 索引器会一组一组处理区块头,默认的大小是 4096
confirmReq 处理完成的 section 之前的确认次数
storedSections 已经成功进行索引的 section 的数量
knownSections 已知的 section 数量
cascadedHead 级联到子索引最后一个完成的 section 的区块数
throttling 对磁盘的限制,防止大量区块进行索引

初始化 ChainIndexer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func NewChainIndexer(chainDb, indexDb ethdb.Database, backend ChainIndexerBackend, section, confirm uint64, throttling time.Duration, kind string) *ChainIndexer {
c := &ChainIndexer{
chainDb: chainDb,
indexDb: indexDb,
backend: backend,
update: make(chan struct{}, 1),
quit: make(chan chan error),
sectionSize: section,
confirmsReq: confirm,
throttling: throttling,
log: log.New("type", kind),
}
c.loadValidSections()
go c.updateLoop()
return c
}

初始化 ChainIndexer 时,先调用 loadValidSections,从数据库中加载之前处理过的信息。

1
2
3
4
5
6
func (c *ChainIndexer) loadValidSections() {
data, _ := c.indexDb.Get([]byte("count"))
if len(data) == 8 {
c.storedSections = binary.BigEndian.Uint64(data[:])
}
}

接下来会以一个 goroutine 的方式进入一个事件循环,它会调用 backend 处理区块链 section。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
func (c *ChainIndexer) updateLoop() {
var (
updating bool
updated time.Time
)
for {
select {
case errc := <-c.quit:
errc <- nil
return
case <-c.update:
c.lock.Lock()
if c.knownSections > c.storedSections {
if time.Since(updated) > 8*time.Second {
if c.knownSections > c.storedSections+1 {
updating = true
c.log.Info("Upgrading chain index", "percentage", c.storedSections*100/c.knownSections)
}
updated = time.Now()
}
section := c.storedSections
var oldHead common.Hash
if section > 0 {
oldHead = c.SectionHead(section - 1)
}
c.lock.Unlock()
newHead, err := c.processSection(section, oldHead)
if err != nil {
c.log.Error("Section processing failed", "error", err)
}
c.lock.Lock()
if err == nil && oldHead == c.SectionHead(section-1) {
c.setSectionHead(section, newHead)
c.setValidSections(section + 1)
if c.storedSections == c.knownSections && updating {
updating = false
c.log.Info("Finished upgrading chain index")
}
c.cascadedHead = c.storedSections*c.sectionSize - 1
for _, child := range c.children {
c.log.Trace("Cascading chain index update", "head", c.cascadedHead)
child.newHead(c.cascadedHead, false)
}
} else {
c.log.Debug("Chain index processing failed", "section", section, "err", err)
c.knownSections = c.storedSections
}
}
if c.knownSections > c.storedSections {
time.AfterFunc(c.throttling, func() {
select {
case c.update <- struct{}{}:
default:
}
})
}
c.lock.Unlock()
}
}
}

当索引需要更新时,其他 goroutine 会往 update 这个 channel 发送消息,主要的逻辑代码都在这个 case 分支。当已知的 section 数大于存储的 section 数,这时需要开始索引,先通过调用 SectionHead 拿到上一个 section 的最后一个区块的哈希值,接着调用 processSection 开始新 section 的索引。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func (c *ChainIndexer) processSection(section uint64, lastHead common.Hash) (common.Hash, error) {
if err := c.backend.Reset(section, lastHead); err != nil {
c.setValidSections(0)
return common.Hash{}, err
}
for number := section * c.sectionSize; number < (section+1)*c.sectionSize; number++ {
hash := GetCanonicalHash(c.chainDb, number)
if hash == (common.Hash{}) {
return common.Hash{}, fmt.Errorf("canonical block #%d unknown", number)
}
header := GetHeader(c.chainDb, hash, number)
if header == nil {
return common.Hash{}, fmt.Errorf("block #%d [%x…] not found", number, hash[:4])
} else if header.ParentHash != lastHead {
return common.Hash{}, fmt.Errorf("chain reorged during section processing")
}
c.backend.Process(header)
lastHead = header.Hash()
}
if err := c.backend.Commit(); err != nil {
c.log.Error("Section commit failed", "error", err)
return common.Hash{}, err
}
return lastHead, nil
}

这里我们只探讨 eth/bloombits 的实现,首先看 Reset 方法。

1
2
3
4
5
func (b *BloomIndexer) Reset(section uint64, lastSectionHead common.Hash) error {
gen, err := bloombits.NewGenerator(uint(b.size))
b.gen, b.section, b.head = gen, section, common.Hash{}
return err
}

注意这里的 lastSectionHead 参数其实没有用到,在 light 模式的实现中是有用到的,这里我们先不深究。实际上 reset 方法只是简单的初始化 core/bloombits/generator,这部分我们在布隆过滤器的实现这一小结会讨论,先跳过这部分。回到 processSection 方法,如果 Reset 方法返回错误了,我们会重置已经存储的 section。接下来我们遍历当前 section,通过调用 GetCanonicalHash 获得当前区块哈希,然后拿到当前区块链中的 header,通过调用 Process(header) 添加当前的区块索引到布隆过滤器的 bit 数组中。

这部分代码在 eth/bloombits.go 中

1
2
3
4
func (b *BloomIndexer) Process(header *types.Header) {
b.gen.AddBloom(uint(header.Number.Uint64()-b.section*b.size), header.Bloom)
b.head = header.Hash()
}

AddBloomcore/bloombits/generator.go 中。generator 用来生成基于 section 的布隆过滤器索引数据的对象,其内部的结构是:

1
2
3
4
5
type Generator struct {
blooms [types.BloomBitLength][]byte
sections uint
nextSec uint
}

bloom[2048][4096] 这个类型用来存储索引的数据,比如第20个 header 的 logBloom 存储在 bloom[2048][20] 里。sections 指明操作的 section 的数量,nextSec 表示增加一个 bloom 的时候,需要设置的下一个 bit。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func (b *Generator) AddBloom(index uint, bloom types.Bloom) error {
if b.nextBit >= b.sections {
return errSectionOutOfBounds
}
if b.nextBit != index {
return errors.New("bloom filter with unexpected index")
}
byteIndex := b.nextBit / 8
bitMask := byte(1) << byte(7-b.nextBit%8)
for i := 0; i < types.BloomBitLength; i++ {
bloomByteIndex := types.BloomByteLength - 1 - i/8
bloomBitMask := byte(1) << byte(i%8)
if (bloom[bloomByteIndex] & bloomBitMask) != 0 {
b.blooms[i][byteIndex] |= bitMask
}
}
b.nextBit++
return nil
}

AddBloom 增加一个区块的 bit 数组。当 nextBit 超过 sections 时,返回超出 section 最大数量的错误,确保 nextBit 始终等于 index,即 bloom 在 section 里的下标,通过 nextBit/8 得到需要设置的 byte 位置,接着就是根据 BloomBitLength 变量,生成一个 bloom 数据。

过滤功能的实现

前面创建索引的时候有涉及到布隆过滤器的一部分功能,接下来我们来深入探讨一下布隆过滤器功能的实现。这部分代码主要在 core/bloombits 里。core/bloombits 目录下,逻辑代码都在 generator.gomatcher.goscheduler.go 中,其中 generator 生成基于 section 的布隆过滤器索引数据的对象,matcher 用来匹配查询操作,scheduler 基于 section 的布隆过滤器的单个 bit 值检索进行调度。上文已经介绍了 generator.go 的功能,我们继续来看 scheduler.gomatcher.go

scheduler

scheduler 用来调度检索操作,作为调度器,它还承担删除重复数据,缓存结果的功能,以保证在复杂的过滤条件下降低 IO 的开销。

数据结构
1
2
3
4
5
6
7
8
9
10
11
12
13
14
type request struct {
section uint64
bit uint
}
type response struct {
cached []byte
done chan struct{}
}
type scheduler struct {
bit uint
responses map[uint64]*response
lock sync.Mutex
}

request 表示一个布隆检索任务。其中section 表示区块段号,每段 4096 个区块,bit 代表检索的是布隆过滤器的哪一位(一共2048位)。在同一个 section 的 logBloom 会存储在一起,对于每个 section,用一个二维数组 A[2048][4096] 来存储,上文已简略提到,第一维2048代表布隆过滤器的长度为2048个字节,第二位4096表示一个 section 里的所有区块,每个位置按照顺序代表其中一个区块。response 表示当前调度的请求的状态,该属性会在 scheduler 被引用。每产生一个请求,都会生成一个 response 对象来表示这个请求的状态,cached 会用来缓存这个 section 的结果。

scheduler 的 bit 用来表示请求的是布隆过滤器的哪一个 bit 位(0-2047),而 response 记录当前正在进行的请求或已经缓存的结果。

schedulerrun 方法是调度逻辑代码的入口:

1
2
3
4
5
6
7
func (s *scheduler) run(sections chan uint64, dist chan *request, done chan []byte, quit chan struct{}, wg *sync.WaitGroup) {
pend := make(chan uint64, cap(dist))
wg.Add(2)
go s.scheduleRequests(sections, dist, pend, quit, wg)
go s.scheduleDeliveries(pend, done, quit, wg)
}

其中参数 sections 是需要检索的区块段的 channel,dist 是输出通道,表示从本地检索还是从网络中检索,往这个 channel 发送请求,结果可以从 done 通道中拿到。pend := make(chan uint64, cap(dist)) 在请求和响应之间创建一个与分发通道大小相同的转发器通道。

先看 scheduleRequests 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
func (s *scheduler) scheduleRequests(reqs chan uint64, dist chan *request, pend chan uint64, quit chan struct{}, wg *sync.WaitGroup) {
defer wg.Done()
defer close(pend)
for {
select {
case <-quit:
return
case section, ok := <-reqs:
if !ok {
return
}
unique := false
s.lock.Lock()
if s.responses[section] == nil {
s.responses[section] = &response{
done: make(chan struct{}),
}
unique = true
}
s.lock.Unlock()
if unique {
select {
case <-quit:
return
case dist <- &request{bit: s.bit, section: section}:
}
}
select {
case <-quit:
return
case pend <- section:
}
}
}
}

schedulerRequests 会从 reqs 这个 channel 接收到 section 消息,然后将接收到的 section 封装成 request 发送到 dist 这个 channel,构建对象 response[section],接着 section 还会发送给 pend 队列,run 方法调用的 scheduleDeliveries 会进行处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func (s *scheduler) scheduleDeliveries(pend chan uint64, done chan []byte, quit chan struct{}, wg *sync.WaitGroup) {
defer wg.Done()
defer close(done)
for {
select {
case <-quit:
return
case idx, ok := <-pend:
if !ok {
return
}
s.lock.Lock()
res := s.responses[idx]
s.lock.Unlock()
select {
case <-quit:
return
case <-res.done:
}
select {
case <-quit:
return
case done <- res.cached:
}
}
}
}

scheduleDelivers 接收到 pend 消息后,会阻塞在 response[section].done 上。接着等待外部调用 deliver 方法。

1
2
3
4
5
6
7
8
9
10
11
func (s *scheduler) deliver(sections []uint64, data [][]byte) {
s.lock.Lock()
defer s.lock.Unlock()
for i, section := range sections {
if res := s.responses[section]; res != nil && res.cached == nil {
res.cached = data[i]
close(res.done)
}
}
}

调用 deliver 方法,section 的 request 请求结果会写入 response[section].cached,然后关闭 response[section].done 这个 channel 。

scheduleDelivers 接收到 response[section].done 信息后,response[section].cached 会发送到 done 这个channel,至此,一次检索的调度就结束了。

在下面的 Matcher 中,distributor 就有调用 schedule 的 deliver 方法的例子。

Matcher

Matcher 完成真正的匹配工作,对 filter 进行二进制的与/或操作。

数据结构
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
type partialMatches struct {
section uint64
bitset []byte
}
type Retrieval struct {
Bit uint
Sections []uint64
Bitsets [][]byte
Context context.Context
Error error
}
type Matcher struct {
sectionSize uint64
filters [][]bloomIndexes
schedulers map[uint]*scheduler
retrievers chan chan uint
counters chan chan uint
retrievals chan chan *Retrieval
deliveries chan *Retrieval
running uint32
}

partialMatches 表示部分匹配的结果,Retrieval 表示一次区块布隆过滤器的检索工作,在使用过程中,该对象会被发送给 eth/bloombits.go 中的 startBloomHandlers 来处理,该方法从数据库中加载布隆过滤器索引,然后放在 Bitsets 里返回(待确认)。Matcher 是一个操作调度器(scheduler)和匹配器(matcher)的流水线系统,它会对比特流进行二进制的与/或操作,对数据内容进行检索,创建可能的区块。

构造 Matcher:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
func NewMatcher(sectionSize uint64, filters [][][]byte) *Matcher {
m := &Matcher{
sectionSize: sectionSize,
schedulers: make(map[uint]*scheduler),
retrievers: make(chan chan uint),
counters: make(chan chan uint),
retrievals: make(chan chan *Retrieval),
deliveries: make(chan *Retrieval),
}
for _, filter := range filters {
if len(filter) == 0 {
continue
}
bloomBits := make([]bloomIndexes, len(filter))
for i, clause := range filter {
if clause == nil {
bloomBits = nil
break
}
bloomBits[i] = calcBloomIndexes(clause)
}
if bloomBits != nil {
m.filters = append(m.filters, bloomBits)
}
}
for _, bloomIndexLists := range m.filters {
for _, bloomIndexList := range bloomIndexLists {
for _, bloomIndex := range bloomIndexList {
m.addScheduler(bloomIndex)
}
}
}
return m
}

NewMatcher 根据传入的 filters 分别进行初始化操作,接着可以在可以在 Start 中创建对应数量的 schedule,subMatch。

启动 Matcher:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
func (m *Matcher) Start(ctx context.Context, begin, end uint64, results chan uint64) (*MatcherSession, error) {
if atomic.SwapUint32(&m.running, 1) == 1 {
return nil, errors.New("matcher already running")
}
defer atomic.StoreUint32(&m.running, 0)
session := &MatcherSession{
matcher: m,
quit: make(chan struct{}),
kill: make(chan struct{}),
ctx: ctx,
}
for _, scheduler := range m.schedulers {
scheduler.reset()
}
sink := m.run(begin, end, cap(results), session)
session.pend.Add(1)
go func() {
defer session.pend.Done()
defer close(results)
for {
select {
case <-session.quit:
return
case res, ok := <-sink:
if !ok {
return
}
sectionStart := res.section * m.sectionSize
first := sectionStart
if begin > first {
first = begin
}
last := sectionStart + m.sectionSize - 1
if end < last {
last = end
}
for i := first; i <= last; i++ {
next := res.bitset[(i-sectionStart)/8]
if next == 0 {
if i%8 == 0 {
i += 7
}
continue
}
if bit := 7 - i%8; next&(1<<bit) != 0 {
select {
case <-session.quit:
return
case results <- i:
}
}
}
}
}
}()
return session, nil
}

Start 方法首先启动一个 session,这个 session 会被返回,它可以用来管理日志过滤的生命周期,调用者会将它作为 ServiceFilter 的参数,根据 bloomFilterThreads 这个常数值(默认为3),启动 bloomFilterThreads 个 session 的 Multiplex,该方法会不断地从 distributor 领取任务,将任务投递给 bloomRequest 队列,从队列中获取结果,然后投递给 distributor,这个 Multiplex 非常重要。

接下来会调用 run 方法,该方法会返回一个 channel,该 channel 会一直返回搜索的结果,直到返回一个退出的信号,Start 方法才会结束,对于过滤的结果,其中包括 section 和 bitmap,bitmap 表明了 section 中哪些区块可能存在值,这时需要遍历这个 bitmap,找到被置位的区块,然后把区块号返回到 results 通道。接下来我们看 run 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func (m *Matcher) run(begin, end uint64, buffer int, session *MatcherSession) chan *partialMatches {
source := make(chan *partialMatches, buffer)
session.pend.Add(1)
go func() {
defer session.pend.Done()
defer close(source)
for i := begin / m.sectionSize; i <= end/m.sectionSize; i++ {
select {
case <-session.quit:
return
case source <- &partialMatches{i, bytes.Repeat([]byte{0xff}, int(m.sectionSize/8))}:
}
}
}()
next := source
dist := make(chan *request, buffer)
for _, bloom := range m.filters {
next = m.subMatch(next, dist, bloom, session)
}
session.pend.Add(1)
go m.distributor(dist, session)
return next
}

run 方法会创建一个子匹配器的流水线,一个用于地址集合,一个用于 topic 集合。之所以称为流水线是因为它会一个一个地调用子匹配器,之前的子匹配器找到了匹配的区块后才会调用下一个子匹配器,接收到的结果会与自身结果匹配后结合,发到下一个子匹配器。最终返回一个接收结果的接收器通道。该方法首先起一个 go routine, 构造 subMatch 的第一个输入源,这个源的 bitset 字段是 0xff,表示完全匹配,这个结果会作为第一个子匹配器的输入。在结尾还会用新线程的方式调用 distributor,这个方法我们之后再谈,接下来我们看 subMatch 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
func (m *Matcher) subMatch(source chan *partialMatches, dist chan *request, bloom []bloomIndexes, session *MatcherSession) chan *partialMatches {
sectionSources := make([][3]chan uint64, len(bloom))
sectionSinks := make([][3]chan []byte, len(bloom))
for i, bits := range bloom {
for j, bit := range bits {
sectionSources[i][j] = make(chan uint64, cap(source))
sectionSinks[i][j] = make(chan []byte, cap(source))
m.schedulers[bit].run(sectionSources[i][j], dist, sectionSinks[i][j], session.quit, &session.pend)
}
}
process := make(chan *partialMatches, cap(source))
results := make(chan *partialMatches, cap(source))
session.pend.Add(2)
go func() {
defer session.pend.Done()
defer close(process)
defer func() {
for _, bloomSources := range sectionSources {
for _, bitSource := range bloomSources {
close(bitSource)
}
}
}()
for {
select {
case <-session.quit:
return
case subres, ok := <-source:
if !ok {
return
}
for _, bloomSources := range sectionSources {
for _, bitSource := range bloomSources {
select {
case <-session.quit:
return
case bitSource <- subres.section:
}
}
}
select {
case <-session.quit:
return
case process <- subres:
}
}
}
}()
go func() {
defer session.pend.Done()
defer close(results)
for {
select {
case <-session.quit:
return
case subres, ok := <-process:
if !ok {
return
}
var orVector []byte
for _, bloomSinks := range sectionSinks {
var andVector []byte
for _, bitSink := range bloomSinks {
var data []byte
select {
case <-session.quit:
return
case data = <-bitSink:
}
if andVector == nil {
andVector = make([]byte, int(m.sectionSize/8))
copy(andVector, data)
} else {
bitutil.ANDBytes(andVector, andVector, data)
}
}
if orVector == nil {
orVector = andVector
} else {
bitutil.ORBytes(orVector, orVector, andVector)
}
}
if orVector == nil {
orVector = make([]byte, int(m.sectionSize/8))
}
if subres.bitset != nil {
bitutil.ANDBytes(orVector, orVector, subres.bitset)
}
if bitutil.TestBytes(orVector) {
select {
case <-session.quit:
return
case results <- &partialMatches{subres.section, orVector}:
}
}
}
}
}()
return results
}

subMatch 会创建一个子匹配器,用于过滤一组 address 或 topic,在组内会进行 bit 位的或操作,然后将上一个结果与当前过滤结果进行位的与操作,如果结果不全为空,结果传递到下一个子匹配器。每个 address/topic 的匹配通过获取属于该 address/topics 的三个布隆过滤器位索引的给定部分以及这些向量二进制做与的运算。

注意这里的 bloom 参数的类型是 []bloomIndexes,首先根据这个值创建相应个数的 schedulers,调用其对应的 run 方法。前面我们有介绍 schedulers 的 run 方法,它的作用是根据后端的实现(从硬盘或网络中)执行过滤操作,结果可以通过调用 scheduler 的 deliver 获得。

我们继续看上面提到的 distributor 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
func (m *Matcher) distributor(dist chan *request, session *MatcherSession) {
defer session.pend.Done()
var (
requests = make(map[uint][]uint64)
unallocs = make(map[uint]struct{})
retrievers chan chan uint
)
var (
allocs int
shutdown = session.quit
)
assign := func(bit uint) {
select {
case fetcher := <-m.retrievers:
allocs++
fetcher <- bit
default:
retrievers = m.retrievers
unallocs[bit] = struct{}{}
}
}
for {
select {
case <-shutdown:
if allocs == 0 {
return
}
shutdown = nil
case <-session.kill:
return
case req := <-dist:
queue := requests[req.bit]
index := sort.Search(len(queue), func(i int) bool { return queue[i] >= req.section })
requests[req.bit] = append(queue[:index], append([]uint64{req.section}, queue[index:]...)...)
if len(queue) == 0 {
assign(req.bit)
}
case fetcher := <-retrievers:
bit, best := uint(0), uint64(math.MaxUint64)
for idx := range unallocs {
if requests[idx][0] < best {
bit, best = idx, requests[idx][0]
}
}
delete(unallocs, bit)
if len(unallocs) == 0 {
retrievers = nil
}
allocs++
fetcher <- bit
case fetcher := <-m.counters:
fetcher <- uint(len(requests[<-fetcher]))
case fetcher := <-m.retrievals:
task := <-fetcher
if want := len(task.Sections); want >= len(requests[task.Bit]) {
task.Sections = requests[task.Bit]
delete(requests, task.Bit)
} else {
task.Sections = append(task.Sections[:0], requests[task.Bit][:want]...)
requests[task.Bit] = append(requests[task.Bit][:0], requests[task.Bit][want:]...)
}
fetcher <- task
if len(requests[task.Bit]) > 0 {
assign(task.Bit)
}
case result := <-m.deliveries:
var (
sections = make([]uint64, 0, len(result.Sections))
bitsets = make([][]byte, 0, len(result.Bitsets))
missing = make([]uint64, 0, len(result.Sections))
)
for i, bitset := range result.Bitsets {
if len(bitset) == 0 {
missing = append(missing, result.Sections[i])
continue
}
sections = append(sections, result.Sections[i])
bitsets = append(bitsets, bitset)
}
m.schedulers[result.Bit].deliver(sections, bitsets)
allocs--
if len(missing) > 0 {
queue := requests[result.Bit]
for _, section := range missing {
index := sort.Search(len(queue), func(i int) bool { return queue[i] >= section })
queue = append(queue[:index], append([]uint64{section}, queue[index:]...)...)
}
requests[result.Bit] = queue
if len(queue) == len(missing) {
assign(result.Bit)
}
}
if allocs == 0 && shutdown == nil {
return
}
}
}
}

distributor 接收来自 scheduler 的请求,首先根据 dist 参数,进入下面的分支:

1
2
3
4
5
6
7
8
9
10
case req := <-dist:
// New retrieval request arrived to be distributed to some fetcher process
queue := requests[req.bit]
index := sort.Search(len(queue), func(i int) bool { return queue[i] >= req.section })
requests[req.bit] = append(queue[:index], append([]uint64{req.section}, queue[index:]...)...)
// If it's a new bit and we have waiting fetchers, allocate to them
if len(queue) == 0 {
assign(req.bit)
}

如果是新的 bit,通过 assign 指派给等待的 fetcher。这时通过 Multiplex, AllocateSections 的作用,最终会进入到 case fetcher := <-m.retrievals: 分支。回顾一下 Multiplex 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
func (s *MatcherSession) Multiplex(batch int, wait time.Duration, mux chan chan *Retrieval) {
for {
bit, ok := s.AllocateRetrieval()
if !ok {
return
}
if s.PendingSections(bit) < batch {
select {
case <-s.quit:
// Session terminating, we can't meaningfully service, abort
s.AllocateSections(bit, 0)
s.DeliverSections(bit, []uint64{}, [][]byte{})
return
case <-time.After(wait):
// Throttling up, fetch whatever's available
}
}
sections := s.AllocateSections(bit, batch)
request := make(chan *Retrieval)
select {
case <-s.quit:
s.DeliverSections(bit, sections, make([][]byte, len(sections)))
return
case mux <- request:
request <- &Retrieval{Bit: bit, Sections: sections, Context: s.ctx}
result := <-request
if result.Error != nil {
s.err.Store(result.Error)
s.Close()
}
s.DeliverSections(result.Bit, result.Sections, result.Bitsets)
}
}
}

Multiplex 会不断领取任务,投递给 bloomRequest 队列,从队列中获取结果后,又通过 DeliverSections 发送给 deliveries 这个 channel,从而调用 schedulers 的 deliver 方法,在前面 schedule 的讲解中,我们知道这个结果会发到 done 这个 channel 中,这个结果最终会被 subMatch 接收(查看 scheduler 的 run 方法),一层层发送给最后一个 subMath,最终返回 results。

References