Skip to content

Commit

Permalink
core: handle deleted logs in smaller batches
Browse files Browse the repository at this point in the history
  • Loading branch information
holiman committed Sep 9, 2022
1 parent 2ec393a commit 4169105
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 48 deletions.
50 changes: 18 additions & 32 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -2000,21 +2000,6 @@ func (bc *BlockChain) collectLogs(hash common.Hash, removed bool) []*types.Log {
return logs
}

// mergeLogs returns a merged log slice with specified sort order.
func mergeLogs(logs [][]*types.Log, reverse bool) []*types.Log {
var ret []*types.Log
if reverse {
for i := len(logs) - 1; i >= 0; i-- {
ret = append(ret, logs[i]...)
}
} else {
for i := 0; i < len(logs); i++ {
ret = append(ret, logs[i]...)
}
}
return ret
}

// reorg takes two blocks, an old chain and a new chain and will reconstruct the
// blocks and inserts them to be part of the new canonical chain and accumulates
// potential missing transactions and post an event about them.
Expand All @@ -2028,8 +2013,6 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {

deletedTxs []common.Hash
addedTxs []common.Hash

deletedLogs [][]*types.Log
)
// Reduce the longer chain to the same number as the shorter one
if oldBlock.NumberU64() > newBlock.NumberU64() {
Expand All @@ -2039,12 +2022,6 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
for _, tx := range oldBlock.Transactions() {
deletedTxs = append(deletedTxs, tx.Hash())
}

// Collect deleted logs for notification
logs := bc.collectLogs(oldBlock.Hash(), true)
if len(logs) > 0 {
deletedLogs = append(deletedLogs, logs)
}
}
} else {
// New chain is longer, stash all blocks away for subsequent insertion
Expand All @@ -2071,12 +2048,6 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
for _, tx := range oldBlock.Transactions() {
deletedTxs = append(deletedTxs, tx.Hash())
}

// Collect deleted logs for notification
logs := bc.collectLogs(oldBlock.Hash(), true)
if len(logs) > 0 {
deletedLogs = append(deletedLogs, logs)
}
newChain = append(newChain, newBlock)

// Step back with both chains
Expand Down Expand Up @@ -2150,11 +2121,26 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
log.Crit("Failed to delete useless indexes", "err", err)
}

// Notify about removed logs first.
// Send out events for deleted logs. The number of
// restored logs can be very high, so the events are sent in batches of
// size < 512.
var deletedLogs []*types.Log
for i := len(oldChain) - 1; i >= 0; i-- {
oldBlock := oldChain[i]
// Collect deleted logs for notification
if logs := bc.collectLogs(oldBlock.Hash(), true); len(logs) > 0 {
//deletedLogs = append(deletedLogs, logs)
deletedLogs = append(deletedLogs, logs...)
}
if len(deletedLogs) > 512 {
bc.rmLogsFeed.Send(RemovedLogsEvent{deletedLogs})
deletedLogs = nil
}
}
if len(deletedLogs) > 0 {
bc.rmLogsFeed.Send(RemovedLogsEvent{mergeLogs(deletedLogs, true)})
bc.rmLogsFeed.Send(RemovedLogsEvent{deletedLogs})
}

// Send events for 'reborn' logs, i.e. logs that were previously
// removed but now got restored in the new chain branch. The number of
// restored logs can be very high, so the events are sent in batches of
Expand Down
44 changes: 28 additions & 16 deletions core/blockchain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1160,23 +1160,25 @@ func TestLogRebirth(t *testing.T) {

// This chain contains a single log.
genDb, chain, _ := GenerateChainWithGenesis(gspec, engine, 3, func(i int, gen *BlockGen) {
if i == 1 {
tx, err := types.SignNewTx(key1, signer, &types.LegacyTx{
Nonce: gen.TxNonce(addr1),
GasPrice: gen.header.BaseFee,
Gas: 1000000,
Data: logCode,
})
if err != nil {
t.Fatalf("failed to create tx: %v", err)
if i < 2 {
for ii := 0; ii < 5; ii++ {
tx, err := types.SignNewTx(key1, signer, &types.LegacyTx{
Nonce: gen.TxNonce(addr1),
GasPrice: gen.header.BaseFee,
Gas: uint64(1000001),
Data: logCode,
})
if err != nil {
t.Fatalf("failed to create tx: %v", err)
}
gen.AddTx(tx)
}
gen.AddTx(tx)
}
})
if _, err := blockchain.InsertChain(chain); err != nil {
t.Fatalf("failed to insert chain: %v", err)
}
checkLogEvents(t, newLogCh, rmLogsCh, 1, 0)
checkLogEvents(t, newLogCh, rmLogsCh, 10, 0)

// Generate long reorg chain containing more logs. Inserting the
// chain removes one log and adds four.
Expand All @@ -1189,7 +1191,7 @@ func TestLogRebirth(t *testing.T) {
tx, err := types.SignNewTx(key1, signer, &types.LegacyTx{
Nonce: gen.TxNonce(addr1),
GasPrice: gen.header.BaseFee,
Gas: uint64(1000000 + 10*i + ii),
Gas: uint64(1000000),
Data: logCode,
})
if err != nil {
Expand All @@ -1202,7 +1204,7 @@ func TestLogRebirth(t *testing.T) {
if _, err := blockchain.InsertChain(forkChain); err != nil {
t.Fatalf("failed to insert forked chain: %v", err)
}
checkLogEvents(t, newLogCh, rmLogsCh, 10, 1)
checkLogEvents(t, newLogCh, rmLogsCh, 10, 10)

// This chain segment is rooted in the original chain, but doesn't contain any logs.
// When inserting it, the canonical chain switches away from forkChain and re-emits
Expand All @@ -1211,7 +1213,7 @@ func TestLogRebirth(t *testing.T) {
if _, err := blockchain.InsertChain(newBlocks); err != nil {
t.Fatalf("failed to insert forked chain: %v", err)
}
checkLogEvents(t, newLogCh, rmLogsCh, 1, 10)
checkLogEvents(t, newLogCh, rmLogsCh, 10, 10)
}

// This test is a variation of TestLogRebirth. It verifies that log events are emitted
Expand Down Expand Up @@ -1276,18 +1278,28 @@ func checkLogEvents(t *testing.T, logsCh <-chan []*types.Log, rmLogsCh <-chan Re
x := <-logsCh
countNew += len(x)
for _, log := range x {
// We expect logs to be in ascending order: 0:0, 0:1, 1:0 ...
// We expect added logs to be in ascending order: 0:0, 0:1, 1:0 ...
have := 100*int(log.BlockNumber) + int(log.TxIndex)
if have < prev {
t.Fatalf("Expected new logs to arrive in ascending order")
t.Fatalf("Expected new logs to arrive in ascending order (%d < %d)", have, prev)
}
prev = have
}
}
prev = 0
for len(rmLogsCh) > 0 {
x := <-rmLogsCh
countRm += len(x.Logs)
for _, log := range x.Logs {
// We expect removed logs to be in ascending order: 0:0, 0:1, 1:0 ...
have := 100*int(log.BlockNumber) + int(log.TxIndex)
if have < prev {
t.Fatalf("Expected removed logs to arrive in ascending order (%d < %d)", have, prev)
}
prev = have
}
}

if countNew != wantNew {
t.Fatalf("wrong number of log events: got %d, want %d", countNew, wantNew)
}
Expand Down

0 comments on commit 4169105

Please sign in to comment.