Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(mempool)!: stop accepting TXs in the mempool if we can't keep up with reCheckTX (backport #3314) #3338 #117

Merged
merged 2 commits into from
Jun 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
- `[mempool]` Add to the `Mempool` interface a new method `PreUpdate()`. This method should be
called before acquiring the mempool lock, to signal that a new update is coming. Also add to
`ErrMempoolIsFull` a new field `RecheckFull`.
([\#3314](https://github.com/cometbft/cometbft/pull/3314))
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
- `[mempool]` Before updating the mempool, consider it as full if rechecking is still in progress.
This will stop accepting transactions in the mempool if the node can't keep up with re-CheckTx.
This improvement is implemented only in the v0 mempool.
([\#3314](https://github.com/cometbft/cometbft/pull/3314))
2 changes: 1 addition & 1 deletion blocksync/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func newReactor(
mp := &mpmocks.Mempool{}
mp.On("Lock").Return()
mp.On("Unlock").Return()
mp.On("ResetUpdate").Return()
mp.On("PreUpdate").Return()
mp.On("FlushAppConn", mock.Anything).Return(nil)
mp.On("Update",
mock.Anything,
Expand Down
2 changes: 1 addition & 1 deletion consensus/replay_stubs.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ var _ mempl.Mempool = emptyMempool{}

func (emptyMempool) Lock() {}
func (emptyMempool) Unlock() {}
func (emptyMempool) ResetUpdate() {}
func (emptyMempool) PreUpdate() {}
func (emptyMempool) Size() int { return 0 }
func (emptyMempool) SizeBytes() int64 { return 0 }
func (emptyMempool) CheckTx(_ types.Tx, _ func(*abci.Response), _ mempl.TxInfo) error {
Expand Down
6 changes: 3 additions & 3 deletions mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@ type Mempool interface {
// Unlock unlocks the mempool.
Unlock()

// Cancel's any ongoing mempool Update. Safe to call while Mempool is locked.
// As a consequence, the Lock'd thread is expected to terminate soon after this is called.
ResetUpdate()
// PreUpdate signals that a new update is coming, before acquiring the mempool lock.
// If the mempool is still rechecking at this point, it should be considered full.
PreUpdate()

// Update informs the mempool that the given txs were committed and can be
// discarded.
Expand Down
10 changes: 5 additions & 5 deletions mempool/mocks/mempool.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion mempool/nop_mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (*NopMempool) Lock() {}
// Unlock does nothing.
func (*NopMempool) Unlock() {}

func (*NopMempool) ResetUpdate() {}
func (*NopMempool) PreUpdate() {}

// Update does nothing.
func (*NopMempool) Update(
Expand Down
35 changes: 19 additions & 16 deletions mempool/v0/clist_mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,10 @@ type CListMempool struct {
// Track whether we're rechecking txs.
// These are not protected by a mutex and are expected to be mutated in
// serial (ie. by abci responses which are called in serial).
recheckCursor *clist.CElement // next expected response
recheckEnd *clist.CElement // re-checking stops here
stopRechecking atomic.Bool
recheckCursor *clist.CElement // next expected response
recheckEnd *clist.CElement // re-checking stops here
isRechecking atomic.Bool // true iff the rechecking process has begun and is not yet finished
recheckFull atomic.Bool // whether rechecking TXs cannot be completed before a new block is decided

// Map for quick access to txs to record sender in CheckTx.
// txsMap: txKey -> CElement
Expand Down Expand Up @@ -142,8 +143,12 @@ func (mem *CListMempool) Unlock() {
}

// Safe for concurrent use by multiple goroutines.
func (mem *CListMempool) ResetUpdate() {
mem.stopRechecking.Store(true)
func (mem *CListMempool) PreUpdate() {
rechecking := mem.isRechecking.Load()
recheckFull := mem.recheckFull.Swap(rechecking)
if rechecking != recheckFull {
mem.logger.Debug("the state of recheckFull has flipped")
}
}

// Safe for concurrent use by multiple goroutines.
Expand Down Expand Up @@ -356,12 +361,11 @@ func (mem *CListMempool) RemoveTxByKey(txKey types.TxKey) error {
}

func (mem *CListMempool) isFull(txSize int) error {
var (
memSize = mem.Size()
txsBytes = mem.SizeBytes()
)
memSize := mem.Size()
txsBytes := mem.SizeBytes()
recheckFull := mem.recheckFull.Load()

if memSize >= mem.config.Size || int64(txSize)+txsBytes > mem.config.MaxTxsBytes {
if memSize >= mem.config.Size || int64(txSize)+txsBytes > mem.config.MaxTxsBytes || recheckFull {
return mempool.ErrMempoolIsFull{
NumTxs: memSize,
MaxTxs: mem.config.Size,
Expand Down Expand Up @@ -483,6 +487,8 @@ func (mem *CListMempool) resCbRecheck(req *abci.Request, res *abci.Response) {
// matching the one we received from the ABCI application.
// Return without processing any tx.
mem.recheckCursor = nil
mem.isRechecking.Store(false)
mem.recheckFull.Store(false)
return
}

Expand All @@ -505,6 +511,8 @@ func (mem *CListMempool) resCbRecheck(req *abci.Request, res *abci.Response) {
}
if mem.recheckCursor == mem.recheckEnd {
mem.recheckCursor = nil
mem.isRechecking.Store(false)
mem.recheckFull.Store(false)
} else {
mem.recheckCursor = mem.recheckCursor.Next()
}
Expand Down Expand Up @@ -665,29 +673,24 @@ func (mem *CListMempool) Update(
}

func (mem *CListMempool) recheckTxs() {
mem.stopRechecking.Store(false)
if mem.Size() == 0 {
panic("recheckTxs is called, but the mempool is empty")
}

mem.recheckCursor = mem.txs.Front()
mem.recheckEnd = mem.txs.Back()
mem.isRechecking.Store(true)

// Push txs to proxyAppConn
// NOTE: globalCb may be called concurrently.
for e := mem.txs.Front(); e != nil; e = e.Next() {
if mem.stopRechecking.Load() {
break
}

memTx := e.Value.(*mempoolTx)
mem.proxyAppConn.CheckTxAsync(abci.RequestCheckTx{
Tx: memTx.tx,
Type: abci.CheckTxType_Recheck,
})
}

mem.stopRechecking.Store(false)
mem.proxyAppConn.FlushAsync()
}

Expand Down
2 changes: 2 additions & 0 deletions mempool/v0/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ func TestReactorConcurrency(t *testing.T) {
go func() {
defer wg.Done()

reactors[0].mempool.PreUpdate()
reactors[0].mempool.Lock()
defer reactors[0].mempool.Unlock()

Expand All @@ -113,6 +114,7 @@ func TestReactorConcurrency(t *testing.T) {
go func() {
defer wg.Done()

reactors[1].mempool.PreUpdate()
reactors[1].mempool.Lock()
defer reactors[1].mempool.Unlock()
err := reactors[1].mempool.Update(1, []types.Tx{}, make([]*abci.ResponseDeliverTx, 0), nil, nil)
Expand Down
3 changes: 2 additions & 1 deletion mempool/v1/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,8 @@ func (txmp *TxMempool) Lock() { txmp.mtx.Lock() }
// Unlock releases a write-lock on the mempool.
func (txmp *TxMempool) Unlock() { txmp.mtx.Unlock() }

func (txmp *TxMempool) ResetUpdate() {}
// PreUpdate does nothing on the v1 mempool
func (txmp *TxMempool) PreUpdate() {}

// Size returns the number of valid transactions in the mempool. It is
// thread-safe.
Expand Down
2 changes: 1 addition & 1 deletion state/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ func (blockExec *BlockExecutor) Commit(
block *types.Block,
deliverTxResponses []*abci.ResponseDeliverTx,
) ([]byte, int64, error) {
blockExec.mempool.ResetUpdate()
blockExec.mempool.PreUpdate()
blockExec.mempool.Lock()
unlockMempool := func() { blockExec.mempool.Unlock() }

Expand Down
8 changes: 4 additions & 4 deletions state/execution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func TestApplyBlock(t *testing.T) {
mp := &mpmocks.Mempool{}
mp.On("Lock").Return()
mp.On("Unlock").Return()
mp.On("ResetUpdate").Return()
mp.On("PreUpdate").Return()
mp.On("FlushAppConn", mock.Anything).Return(nil)
mp.On("Update",
mock.Anything,
Expand Down Expand Up @@ -223,7 +223,7 @@ func TestBeginBlockByzantineValidators(t *testing.T) {
mp := &mpmocks.Mempool{}
mp.On("Lock").Return()
mp.On("Unlock").Return()
mp.On("ResetUpdate").Return()
mp.On("PreUpdate").Return()
mp.On("FlushAppConn", mock.Anything).Return(nil)
mp.On("Update",
mock.Anything,
Expand Down Expand Up @@ -480,7 +480,7 @@ func TestEndBlockValidatorUpdates(t *testing.T) {
mp := &mpmocks.Mempool{}
mp.On("Lock").Return()
mp.On("Unlock").Return()
mp.On("ResetUpdate").Return()
mp.On("PreUpdate").Return()
mp.On("FlushAppConn", mock.Anything).Return(nil)
mp.On("Update",
mock.Anything,
Expand Down Expand Up @@ -607,7 +607,7 @@ func TestEmptyPrepareProposal(t *testing.T) {
mp := &mpmocks.Mempool{}
mp.On("Lock").Return()
mp.On("Unlock").Return()
mp.On("ResetUpdate").Return()
mp.On("PreUpdate").Return()
mp.On("FlushAppConn", mock.Anything).Return(nil)
mp.On("Update",
mock.Anything,
Expand Down
6 changes: 3 additions & 3 deletions state/validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func TestValidateBlockHeader(t *testing.T) {
mp := &mpmocks.Mempool{}
mp.On("Lock").Return()
mp.On("Unlock").Return()
mp.On("ResetUpdate").Return()
mp.On("PreUpdate").Return()
mp.On("FlushAppConn", mock.Anything).Return(nil)
mp.On("Update",
mock.Anything,
Expand Down Expand Up @@ -121,7 +121,7 @@ func TestValidateBlockCommit(t *testing.T) {
mp := &mpmocks.Mempool{}
mp.On("Lock").Return()
mp.On("Unlock").Return()
mp.On("ResetUpdate").Return()
mp.On("PreUpdate").Return()
mp.On("FlushAppConn", mock.Anything).Return(nil)
mp.On("Update",
mock.Anything,
Expand Down Expand Up @@ -261,7 +261,7 @@ func TestValidateBlockEvidence(t *testing.T) {
mp := &mpmocks.Mempool{}
mp.On("Lock").Return()
mp.On("Unlock").Return()
mp.On("ResetUpdate").Return()
mp.On("PreUpdate").Return()
mp.On("FlushAppConn", mock.Anything).Return(nil)
mp.On("Update",
mock.Anything,
Expand Down
Loading