forked from ethereum/go-ethereum
-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathservice.go
120 lines (103 loc) · 3.34 KB
/
service.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
package service
import (
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/statediff"
b "github.com/ethereum/go-ethereum/statediff/builder"
e "github.com/ethereum/go-ethereum/statediff/extractor"
p "github.com/ethereum/go-ethereum/statediff/publisher"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
)
type BlockChain interface {
SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription
GetBlockByHash(hash common.Hash) *types.Block
AddToStateDiffProcessedCollection(hash common.Hash)
}
type StateDiffService struct {
Builder *b.Builder
Extractor e.Extractor
BlockChain BlockChain
}
func NewStateDiffService(db ethdb.Database, blockChain *core.BlockChain, config statediff.Config) (*StateDiffService, error) {
builder := b.NewBuilder(db, blockChain)
publisher, err := p.NewPublisher(config)
if err != nil {
return nil, err
}
extractor := e.NewExtractor(builder, publisher)
return &StateDiffService{
BlockChain: blockChain,
Extractor: extractor,
}, nil
}
func (StateDiffService) Protocols() []p2p.Protocol {
return []p2p.Protocol{}
}
func (StateDiffService) APIs() []rpc.API {
return []rpc.API{}
}
func (sds *StateDiffService) Loop(chainEventCh chan core.ChainEvent) {
chainEventSub := sds.BlockChain.SubscribeChainEvent(chainEventCh)
defer chainEventSub.Unsubscribe()
blocksCh := make(chan *types.Block, 10)
errCh := chainEventSub.Err()
quitCh := make(chan struct{})
go func() {
HandleChainEventChLoop:
for {
select {
//Notify chain event channel of events
case chainEvent := <-chainEventCh:
log.Debug("Event received from chainEventCh", "event", chainEvent)
blocksCh <- chainEvent.Block
//if node stopped
case err := <-errCh:
log.Warn("Error from chain event subscription, breaking loop.", "error", err)
break HandleChainEventChLoop
}
}
close(quitCh)
}()
//loop through chain events until no more
HandleBlockChLoop:
for {
select {
case block := <-blocksCh:
currentBlock := block
parentHash := currentBlock.ParentHash()
parentBlock := sds.BlockChain.GetBlockByHash(parentHash)
if parentBlock == nil {
log.Error("Parent block is nil, skipping this block",
"parent block hash", parentHash.String(),
"current block number", currentBlock.Number())
break HandleBlockChLoop
}
stateDiffLocation, err := sds.Extractor.ExtractStateDiff(*parentBlock, *currentBlock)
if err != nil {
log.Error("Error extracting statediff", "block number", currentBlock.Number(), "error", err)
} else {
log.Info("Statediff extracted", "block number", currentBlock.Number(), "location", stateDiffLocation)
sds.BlockChain.AddToStateDiffProcessedCollection(parentBlock.Root())
sds.BlockChain.AddToStateDiffProcessedCollection(currentBlock.Root())
}
case <-quitCh:
log.Debug("Quitting the statediff block channel")
return
}
}
}
func (sds *StateDiffService) Start(server *p2p.Server) error {
log.Info("Starting statediff service")
chainEventCh := make(chan core.ChainEvent, 10)
go sds.Loop(chainEventCh)
return nil
}
func (StateDiffService) Stop() error {
log.Info("Stopping statediff service")
return nil
}