Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core: place a cap on reorglogs #25711

Merged
merged 9 commits into from
Sep 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 31 additions & 47 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -2000,21 +2000,6 @@ func (bc *BlockChain) collectLogs(hash common.Hash, removed bool) []*types.Log {
return logs
}

// mergeLogs returns a merged log slice with specified sort order.
func mergeLogs(logs [][]*types.Log, reverse bool) []*types.Log {
var ret []*types.Log
if reverse {
for i := len(logs) - 1; i >= 0; i-- {
ret = append(ret, logs[i]...)
}
} else {
for i := 0; i < len(logs); i++ {
ret = append(ret, logs[i]...)
}
}
return ret
}

// reorg takes two blocks, an old chain and a new chain and will reconstruct the
// blocks and inserts them to be part of the new canonical chain and accumulates
// potential missing transactions and post an event about them.
Expand All @@ -2028,9 +2013,6 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {

deletedTxs []common.Hash
addedTxs []common.Hash

deletedLogs [][]*types.Log
rebirthLogs [][]*types.Log
)
// Reduce the longer chain to the same number as the shorter one
if oldBlock.NumberU64() > newBlock.NumberU64() {
Expand All @@ -2040,12 +2022,6 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
for _, tx := range oldBlock.Transactions() {
deletedTxs = append(deletedTxs, tx.Hash())
}

// Collect deleted logs for notification
logs := bc.collectLogs(oldBlock.Hash(), true)
if len(logs) > 0 {
deletedLogs = append(deletedLogs, logs)
}
}
} else {
// New chain is longer, stash all blocks away for subsequent insertion
Expand All @@ -2072,12 +2048,6 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
for _, tx := range oldBlock.Transactions() {
deletedTxs = append(deletedTxs, tx.Hash())
}

// Collect deleted logs for notification
logs := bc.collectLogs(oldBlock.Hash(), true)
if len(logs) > 0 {
deletedLogs = append(deletedLogs, logs)
}
newChain = append(newChain, newBlock)

// Step back with both chains
Expand Down Expand Up @@ -2151,28 +2121,42 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
log.Crit("Failed to delete useless indexes", "err", err)
}

// Collect the logs
for i := len(newChain) - 1; i >= 1; i-- {
// Collect reborn logs due to chain reorg
logs := bc.collectLogs(newChain[i].Hash(), false)
if len(logs) > 0 {
rebirthLogs = append(rebirthLogs, logs)
// Send out events for logs from the old canon chain, and 'reborn'
// logs from the new canon chain. The number of logs can be very
// high, so the events are sent in batches of size around 512.

// Deleted logs + blocks:
var deletedLogs []*types.Log
for i := len(oldChain) - 1; i >= 0; i-- {
// Also send event for blocks removed from the canon chain.
bc.chainSideFeed.Send(ChainSideEvent{Block: oldChain[i]})

// Collect deleted logs for notification
if logs := bc.collectLogs(oldChain[i].Hash(), true); len(logs) > 0 {
deletedLogs = append(deletedLogs, logs...)
}
if len(deletedLogs) > 512 {
bc.rmLogsFeed.Send(RemovedLogsEvent{deletedLogs})
deletedLogs = nil
}
}
// If any logs need to be fired, do it now. In theory we could avoid creating
// this goroutine if there are no events to fire, but realistcally that only
// ever happens if we're reorging empty blocks, which will only happen on idle
// networks where performance is not an issue either way.
if len(deletedLogs) > 0 {
rjl493456442 marked this conversation as resolved.
Show resolved Hide resolved
bc.rmLogsFeed.Send(RemovedLogsEvent{mergeLogs(deletedLogs, true)})
bc.rmLogsFeed.Send(RemovedLogsEvent{deletedLogs})
}
if len(rebirthLogs) > 0 {
bc.logsFeed.Send(mergeLogs(rebirthLogs, false))
}
if len(oldChain) > 0 {
for i := len(oldChain) - 1; i >= 0; i-- {
bc.chainSideFeed.Send(ChainSideEvent{Block: oldChain[i]})

// New logs:
var rebirthLogs []*types.Log
for i := len(newChain) - 1; i >= 1; i-- {
if logs := bc.collectLogs(newChain[i].Hash(), false); len(logs) > 0 {
rebirthLogs = append(rebirthLogs, logs...)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note for reviewer: I scrapped the append-then-use-mergeLogs, and just expanded the logs in-place immediately. I think it's done correctly

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have now fixed up the rebirth-log test, so I have now verified that it does indeed work as before

}
if len(rebirthLogs) > 512 {
bc.logsFeed.Send(rebirthLogs)
rebirthLogs = nil
}
}
if len(rebirthLogs) > 0 {
bc.logsFeed.Send(rebirthLogs)
}
return nil
}
Expand Down
94 changes: 67 additions & 27 deletions core/blockchain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1158,37 +1158,53 @@ func TestLogRebirth(t *testing.T) {
blockchain.SubscribeLogsEvent(newLogCh)
blockchain.SubscribeRemovedLogsEvent(rmLogsCh)

// This chain contains a single log.
genDb, chain, _ := GenerateChainWithGenesis(gspec, engine, 2, func(i int, gen *BlockGen) {
if i == 1 {
tx, err := types.SignTx(types.NewContractCreation(gen.TxNonce(addr1), new(big.Int), 1000000, gen.header.BaseFee, logCode), signer, key1)
if err != nil {
t.Fatalf("failed to create tx: %v", err)
// This chain contains 10 logs.
genDb, chain, _ := GenerateChainWithGenesis(gspec, engine, 3, func(i int, gen *BlockGen) {
if i < 2 {
for ii := 0; ii < 5; ii++ {
tx, err := types.SignNewTx(key1, signer, &types.LegacyTx{
Nonce: gen.TxNonce(addr1),
GasPrice: gen.header.BaseFee,
Gas: uint64(1000001),
Data: logCode,
})
if err != nil {
t.Fatalf("failed to create tx: %v", err)
}
gen.AddTx(tx)
}
gen.AddTx(tx)
}
})
if _, err := blockchain.InsertChain(chain); err != nil {
t.Fatalf("failed to insert chain: %v", err)
}
checkLogEvents(t, newLogCh, rmLogsCh, 1, 0)
checkLogEvents(t, newLogCh, rmLogsCh, 10, 0)

// Generate long reorg chain containing another log. Inserting the
// chain removes one log and adds one.
_, forkChain, _ := GenerateChainWithGenesis(gspec, engine, 2, func(i int, gen *BlockGen) {
if i == 1 {
tx, err := types.SignTx(types.NewContractCreation(gen.TxNonce(addr1), new(big.Int), 1000000, gen.header.BaseFee, logCode), signer, key1)
// Generate long reorg chain containing more logs. Inserting the
// chain removes one log and adds four.
_, forkChain, _ := GenerateChainWithGenesis(gspec, engine, 3, func(i int, gen *BlockGen) {
if i == 2 {
// The last (head) block is not part of the reorg-chain, we can ignore it
return
}
for ii := 0; ii < 5; ii++ {
tx, err := types.SignNewTx(key1, signer, &types.LegacyTx{
Nonce: gen.TxNonce(addr1),
GasPrice: gen.header.BaseFee,
Gas: uint64(1000000),
Data: logCode,
})
if err != nil {
t.Fatalf("failed to create tx: %v", err)
}
gen.AddTx(tx)
gen.OffsetTime(-9) // higher block difficulty
}
gen.OffsetTime(-9) // higher block difficulty
})
if _, err := blockchain.InsertChain(forkChain); err != nil {
t.Fatalf("failed to insert forked chain: %v", err)
}
checkLogEvents(t, newLogCh, rmLogsCh, 1, 1)
checkLogEvents(t, newLogCh, rmLogsCh, 10, 10)

// This chain segment is rooted in the original chain, but doesn't contain any logs.
// When inserting it, the canonical chain switches away from forkChain and re-emits
Expand All @@ -1197,7 +1213,7 @@ func TestLogRebirth(t *testing.T) {
if _, err := blockchain.InsertChain(newBlocks); err != nil {
t.Fatalf("failed to insert forked chain: %v", err)
}
checkLogEvents(t, newLogCh, rmLogsCh, 1, 1)
checkLogEvents(t, newLogCh, rmLogsCh, 10, 10)
}

// This test is a variation of TestLogRebirth. It verifies that log events are emitted
Expand Down Expand Up @@ -1252,19 +1268,43 @@ func TestSideLogRebirth(t *testing.T) {

func checkLogEvents(t *testing.T, logsCh <-chan []*types.Log, rmLogsCh <-chan RemovedLogsEvent, wantNew, wantRemoved int) {
t.Helper()

if len(logsCh) != wantNew {
t.Fatalf("wrong number of log events: got %d, want %d", len(logsCh), wantNew)
}
if len(rmLogsCh) != wantRemoved {
t.Fatalf("wrong number of removed log events: got %d, want %d", len(rmLogsCh), wantRemoved)
}
var (
countNew int
countRm int
prev int
)
// Drain events.
for i := 0; i < len(logsCh); i++ {
<-logsCh
for len(logsCh) > 0 {
x := <-logsCh
countNew += len(x)
for _, log := range x {
// We expect added logs to be in ascending order: 0:0, 0:1, 1:0 ...
have := 100*int(log.BlockNumber) + int(log.TxIndex)
if have < prev {
t.Fatalf("Expected new logs to arrive in ascending order (%d < %d)", have, prev)
}
prev = have
}
}
prev = 0
for len(rmLogsCh) > 0 {
x := <-rmLogsCh
countRm += len(x.Logs)
for _, log := range x.Logs {
// We expect removed logs to be in ascending order: 0:0, 0:1, 1:0 ...
have := 100*int(log.BlockNumber) + int(log.TxIndex)
if have < prev {
t.Fatalf("Expected removed logs to arrive in ascending order (%d < %d)", have, prev)
}
prev = have
}
}

if countNew != wantNew {
t.Fatalf("wrong number of log events: got %d, want %d", countNew, wantNew)
}
for i := 0; i < len(rmLogsCh); i++ {
<-rmLogsCh
if countRm != wantRemoved {
t.Fatalf("wrong number of removed log events: got %d, want %d", countRm, wantRemoved)
}
}

Expand Down