Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core: place a cap on reorglogs #25711

Merged
merged 9 commits into from
Sep 9, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 19 additions & 13 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -2030,7 +2030,6 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
addedTxs []common.Hash

deletedLogs [][]*types.Log
rebirthLogs [][]*types.Log
)
// Reduce the longer chain to the same number as the shorter one
if oldBlock.NumberU64() > newBlock.NumberU64() {
Expand Down Expand Up @@ -2151,24 +2150,31 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
log.Crit("Failed to delete useless indexes", "err", err)
}

// Collect the logs
// Notify about removed logs first.
if len(deletedLogs) > 0 {
rjl493456442 marked this conversation as resolved.
Show resolved Hide resolved
bc.rmLogsFeed.Send(RemovedLogsEvent{mergeLogs(deletedLogs, true)})
}

// Send events for 'reborn' logs, i.e. logs that were previously
// removed but now got restored in the new chain branch. The number of
// restored logs can be very high, so the events are sent in batches of
// size < 512.
var rebirthLogs []*types.Log
for i := len(newChain) - 1; i >= 1; i-- {
// Collect reborn logs due to chain reorg
logs := bc.collectLogs(newChain[i].Hash(), false)
if len(logs) > 0 {
rebirthLogs = append(rebirthLogs, logs)
if logs := bc.collectLogs(newChain[i].Hash(), false); len(logs) > 0 {
rebirthLogs = append(rebirthLogs, logs...)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note for reviewer: I scrapped the append-then-use-mergeLogs, and just expanded the logs in-place immediately. I think it's done correctly

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have now fixed up the rebirth-log test, so I have now verified that it does indeed work as before

}
if len(rebirthLogs) > 512 {
bc.logsFeed.Send(rebirthLogs)
rebirthLogs = nil
}
}
// If any logs need to be fired, do it now. In theory we could avoid creating
// this goroutine if there are no events to fire, but realistcally that only
// ever happens if we're reorging empty blocks, which will only happen on idle
// networks where performance is not an issue either way.
if len(deletedLogs) > 0 {
bc.rmLogsFeed.Send(RemovedLogsEvent{mergeLogs(deletedLogs, true)})
}
if len(rebirthLogs) > 0 {
bc.logsFeed.Send(mergeLogs(rebirthLogs, false))
bc.logsFeed.Send(rebirthLogs)
}

// Send notifications for blocks removed from the canon chain.
if len(oldChain) > 0 {
for i := len(oldChain) - 1; i >= 0; i-- {
bc.chainSideFeed.Send(ChainSideEvent{Block: oldChain[i]})
Expand Down
68 changes: 48 additions & 20 deletions core/blockchain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1159,9 +1159,14 @@ func TestLogRebirth(t *testing.T) {
blockchain.SubscribeRemovedLogsEvent(rmLogsCh)

// This chain contains a single log.
genDb, chain, _ := GenerateChainWithGenesis(gspec, engine, 2, func(i int, gen *BlockGen) {
genDb, chain, _ := GenerateChainWithGenesis(gspec, engine, 3, func(i int, gen *BlockGen) {
if i == 1 {
tx, err := types.SignTx(types.NewContractCreation(gen.TxNonce(addr1), new(big.Int), 1000000, gen.header.BaseFee, logCode), signer, key1)
tx, err := types.SignNewTx(key1, signer, &types.LegacyTx{
Nonce: gen.TxNonce(addr1),
GasPrice: gen.header.BaseFee,
Gas: 1000000,
Data: logCode,
})
if err != nil {
t.Fatalf("failed to create tx: %v", err)
}
Expand All @@ -1173,22 +1178,31 @@ func TestLogRebirth(t *testing.T) {
}
checkLogEvents(t, newLogCh, rmLogsCh, 1, 0)

// Generate long reorg chain containing another log. Inserting the
// chain removes one log and adds one.
_, forkChain, _ := GenerateChainWithGenesis(gspec, engine, 2, func(i int, gen *BlockGen) {
if i == 1 {
tx, err := types.SignTx(types.NewContractCreation(gen.TxNonce(addr1), new(big.Int), 1000000, gen.header.BaseFee, logCode), signer, key1)
// Generate long reorg chain containing more logs. Inserting the
// chain removes one log and adds four.
_, forkChain, _ := GenerateChainWithGenesis(gspec, engine, 3, func(i int, gen *BlockGen) {
if i == 2 {
// The last (head) block is not part of the reorg-chain, we can ignore it
return
}
for ii := 0; ii < 5; ii++ {
tx, err := types.SignNewTx(key1, signer, &types.LegacyTx{
Nonce: gen.TxNonce(addr1),
GasPrice: gen.header.BaseFee,
Gas: uint64(1000000 + 10*i + ii),
Data: logCode,
})
if err != nil {
t.Fatalf("failed to create tx: %v", err)
}
gen.AddTx(tx)
gen.OffsetTime(-9) // higher block difficulty
}
gen.OffsetTime(-9) // higher block difficulty
})
if _, err := blockchain.InsertChain(forkChain); err != nil {
t.Fatalf("failed to insert forked chain: %v", err)
}
checkLogEvents(t, newLogCh, rmLogsCh, 1, 1)
checkLogEvents(t, newLogCh, rmLogsCh, 10, 1)

// This chain segment is rooted in the original chain, but doesn't contain any logs.
// When inserting it, the canonical chain switches away from forkChain and re-emits
Expand All @@ -1197,7 +1211,7 @@ func TestLogRebirth(t *testing.T) {
if _, err := blockchain.InsertChain(newBlocks); err != nil {
t.Fatalf("failed to insert forked chain: %v", err)
}
checkLogEvents(t, newLogCh, rmLogsCh, 1, 1)
checkLogEvents(t, newLogCh, rmLogsCh, 1, 10)
}

// This test is a variation of TestLogRebirth. It verifies that log events are emitted
Expand Down Expand Up @@ -1252,19 +1266,33 @@ func TestSideLogRebirth(t *testing.T) {

func checkLogEvents(t *testing.T, logsCh <-chan []*types.Log, rmLogsCh <-chan RemovedLogsEvent, wantNew, wantRemoved int) {
t.Helper()

if len(logsCh) != wantNew {
t.Fatalf("wrong number of log events: got %d, want %d", len(logsCh), wantNew)
var (
countNew int
countRm int
prev int
)
// Drain events.
for len(logsCh) > 0 {
x := <-logsCh
countNew += len(x)
for _, log := range x {
// We expect logs to be in ascending order: 0:0, 0:1, 1:0 ...
have := 100*int(log.BlockNumber) + int(log.TxIndex)
if have < prev {
t.Fatalf("Expected new logs to arrive in ascending order")
}
prev = have
}
}
if len(rmLogsCh) != wantRemoved {
t.Fatalf("wrong number of removed log events: got %d, want %d", len(rmLogsCh), wantRemoved)
for len(rmLogsCh) > 0 {
x := <-rmLogsCh
countRm += len(x.Logs)
}
// Drain events.
for i := 0; i < len(logsCh); i++ {
<-logsCh
if countNew != wantNew {
t.Fatalf("wrong number of log events: got %d, want %d", countNew, wantNew)
}
for i := 0; i < len(rmLogsCh); i++ {
<-rmLogsCh
if countRm != wantRemoved {
t.Fatalf("wrong number of removed log events: got %d, want %d", countRm, wantRemoved)
}
}

Expand Down