diff --git a/.changelog/unreleased/breaking-changes/3314-mempool-preupdate.md b/.changelog/unreleased/breaking-changes/3314-mempool-preupdate.md new file mode 100644 index 00000000000..4c2528939f7 --- /dev/null +++ b/.changelog/unreleased/breaking-changes/3314-mempool-preupdate.md @@ -0,0 +1,4 @@ +- `[mempool]` Add to the `Mempool` interface a new method `PreUpdate()`. This method should be + called before acquiring the mempool lock, to signal that a new update is coming. Also add to + `ErrMempoolIsFull` a new field `RecheckFull`. + ([\#3314](https://github.com/cometbft/cometbft/pull/3314)) diff --git a/.changelog/unreleased/improvements/3314-mempool-update-consider-full-when-rechecking.md b/.changelog/unreleased/improvements/3314-mempool-update-consider-full-when-rechecking.md new file mode 100644 index 00000000000..57c9d6311dc --- /dev/null +++ b/.changelog/unreleased/improvements/3314-mempool-update-consider-full-when-rechecking.md @@ -0,0 +1,4 @@ +- `[mempool]` Before updating the mempool, consider it as full if rechecking is still in progress. + This will stop accepting transactions in the mempool if the node can't keep up with re-CheckTx. + This improvement is implemented only in the v0 mempool. + ([\#3314](https://github.com/cometbft/cometbft/pull/3314)) diff --git a/blocksync/reactor_test.go b/blocksync/reactor_test.go index cdf5e7097cb..ef13e1c0f1a 100644 --- a/blocksync/reactor_test.go +++ b/blocksync/reactor_test.go @@ -86,6 +86,7 @@ func newReactor( mp := &mpmocks.Mempool{} mp.On("Lock").Return() mp.On("Unlock").Return() + mp.On("PreUpdate").Return() mp.On("FlushAppConn", mock.Anything).Return(nil) mp.On("Update", mock.Anything, diff --git a/consensus/replay_stubs.go b/consensus/replay_stubs.go index 5ffb20ad822..2ee246e3a46 100644 --- a/consensus/replay_stubs.go +++ b/consensus/replay_stubs.go @@ -17,6 +17,7 @@ var _ mempl.Mempool = emptyMempool{} func (emptyMempool) Lock() {} func (emptyMempool) Unlock() {} +func (emptyMempool) PreUpdate() {} func (emptyMempool) Size() int { return 0 } func (emptyMempool) SizeBytes() int64 { return 0 } func (emptyMempool) CheckTx(_ types.Tx, _ func(*abci.Response), _ mempl.TxInfo) error { diff --git a/mempool/mempool.go b/mempool/mempool.go index 244b6d9aa6a..cb91b60dcc7 100644 --- a/mempool/mempool.go +++ b/mempool/mempool.go @@ -58,6 +58,10 @@ type Mempool interface { // Unlock unlocks the mempool. Unlock() + // PreUpdate signals that a new update is coming, before acquiring the mempool lock. + // If the mempool is still rechecking at this point, it should be considered full. + PreUpdate() + // Update informs the mempool that the given txs were committed and can be // discarded. // diff --git a/mempool/mocks/mempool.go b/mempool/mocks/mempool.go index 68a536caa43..e0e66ddd98a 100644 --- a/mempool/mocks/mempool.go +++ b/mempool/mocks/mempool.go @@ -59,6 +59,11 @@ func (_m *Mempool) Lock() { _m.Called() } +// PreUpdate provides a mock function with given fields: +func (_m *Mempool) PreUpdate() { + _m.Called() +} + // ReapMaxBytesMaxGas provides a mock function with given fields: maxBytes, maxGas func (_m *Mempool) ReapMaxBytesMaxGas(maxBytes int64, maxGas int64) types.Txs { ret := _m.Called(maxBytes, maxGas) diff --git a/mempool/nop_mempool.go b/mempool/nop_mempool.go index 0e5b451e8dc..79cd3cbab76 100644 --- a/mempool/nop_mempool.go +++ b/mempool/nop_mempool.go @@ -40,6 +40,8 @@ func (*NopMempool) Lock() {} // Unlock does nothing. func (*NopMempool) Unlock() {} +func (*NopMempool) PreUpdate() {} + // Update does nothing. func (*NopMempool) Update( int64, diff --git a/mempool/v0/clist_mempool.go b/mempool/v0/clist_mempool.go index 3e68553e176..cf555c4c540 100644 --- a/mempool/v0/clist_mempool.go +++ b/mempool/v0/clist_mempool.go @@ -48,6 +48,8 @@ type CListMempool struct { // serial (ie. by abci responses which are called in serial). recheckCursor *clist.CElement // next expected response recheckEnd *clist.CElement // re-checking stops here + isRechecking atomic.Bool // true iff the rechecking process has begun and is not yet finished + recheckFull atomic.Bool // whether rechecking TXs cannot be completed before a new block is decided // Map for quick access to txs to record sender in CheckTx. // txsMap: txKey -> CElement @@ -140,6 +142,15 @@ func (mem *CListMempool) Unlock() { mem.updateMtx.Unlock() } +// Safe for concurrent use by multiple goroutines. +func (mem *CListMempool) PreUpdate() { + rechecking := mem.isRechecking.Load() + recheckFull := mem.recheckFull.Swap(rechecking) + if rechecking != recheckFull { + mem.logger.Debug("the state of recheckFull has flipped") + } +} + // Safe for concurrent use by multiple goroutines. func (mem *CListMempool) Size() int { return mem.txs.Len() @@ -350,12 +361,11 @@ func (mem *CListMempool) RemoveTxByKey(txKey types.TxKey) error { } func (mem *CListMempool) isFull(txSize int) error { - var ( - memSize = mem.Size() - txsBytes = mem.SizeBytes() - ) + memSize := mem.Size() + txsBytes := mem.SizeBytes() + recheckFull := mem.recheckFull.Load() - if memSize >= mem.config.Size || int64(txSize)+txsBytes > mem.config.MaxTxsBytes { + if memSize >= mem.config.Size || int64(txSize)+txsBytes > mem.config.MaxTxsBytes || recheckFull { return mempool.ErrMempoolIsFull{ NumTxs: memSize, MaxTxs: mem.config.Size, @@ -477,6 +487,8 @@ func (mem *CListMempool) resCbRecheck(req *abci.Request, res *abci.Response) { // matching the one we received from the ABCI application. // Return without processing any tx. mem.recheckCursor = nil + mem.isRechecking.Store(false) + mem.recheckFull.Store(false) return } @@ -499,6 +511,8 @@ func (mem *CListMempool) resCbRecheck(req *abci.Request, res *abci.Response) { } if mem.recheckCursor == mem.recheckEnd { mem.recheckCursor = nil + mem.isRechecking.Store(false) + mem.recheckFull.Store(false) } else { mem.recheckCursor = mem.recheckCursor.Next() } @@ -665,6 +679,7 @@ func (mem *CListMempool) recheckTxs() { mem.recheckCursor = mem.txs.Front() mem.recheckEnd = mem.txs.Back() + mem.isRechecking.Store(true) // Push txs to proxyAppConn // NOTE: globalCb may be called concurrently. diff --git a/mempool/v0/reactor_test.go b/mempool/v0/reactor_test.go index f4e5f46bd9e..b54c2759a0d 100644 --- a/mempool/v0/reactor_test.go +++ b/mempool/v0/reactor_test.go @@ -96,6 +96,7 @@ func TestReactorConcurrency(t *testing.T) { go func() { defer wg.Done() + reactors[0].mempool.PreUpdate() reactors[0].mempool.Lock() defer reactors[0].mempool.Unlock() @@ -113,6 +114,7 @@ func TestReactorConcurrency(t *testing.T) { go func() { defer wg.Done() + reactors[1].mempool.PreUpdate() reactors[1].mempool.Lock() defer reactors[1].mempool.Unlock() err := reactors[1].mempool.Update(1, []types.Tx{}, make([]*abci.ResponseDeliverTx, 0), nil, nil) diff --git a/mempool/v1/mempool.go b/mempool/v1/mempool.go index 301109c36fc..3650816bac1 100644 --- a/mempool/v1/mempool.go +++ b/mempool/v1/mempool.go @@ -117,6 +117,9 @@ func (txmp *TxMempool) Lock() { txmp.mtx.Lock() } // Unlock releases a write-lock on the mempool. func (txmp *TxMempool) Unlock() { txmp.mtx.Unlock() } +// PreUpdate does nothing on the v1 mempool +func (txmp *TxMempool) PreUpdate() {} + // Size returns the number of valid transactions in the mempool. It is // thread-safe. func (txmp *TxMempool) Size() int { return txmp.txs.Len() } diff --git a/state/execution.go b/state/execution.go index 45b945b26ed..a0678ca2aad 100644 --- a/state/execution.go +++ b/state/execution.go @@ -296,6 +296,7 @@ func (blockExec *BlockExecutor) Commit( block *types.Block, deliverTxResponses []*abci.ResponseDeliverTx, ) ([]byte, int64, error) { + blockExec.mempool.PreUpdate() blockExec.mempool.Lock() unlockMempool := func() { blockExec.mempool.Unlock() } diff --git a/state/execution_test.go b/state/execution_test.go index 5ae3c3c8723..d87feed15c9 100644 --- a/state/execution_test.go +++ b/state/execution_test.go @@ -50,6 +50,7 @@ func TestApplyBlock(t *testing.T) { mp := &mpmocks.Mempool{} mp.On("Lock").Return() mp.On("Unlock").Return() + mp.On("PreUpdate").Return() mp.On("FlushAppConn", mock.Anything).Return(nil) mp.On("Update", mock.Anything, @@ -222,6 +223,7 @@ func TestBeginBlockByzantineValidators(t *testing.T) { mp := &mpmocks.Mempool{} mp.On("Lock").Return() mp.On("Unlock").Return() + mp.On("PreUpdate").Return() mp.On("FlushAppConn", mock.Anything).Return(nil) mp.On("Update", mock.Anything, @@ -478,6 +480,7 @@ func TestEndBlockValidatorUpdates(t *testing.T) { mp := &mpmocks.Mempool{} mp.On("Lock").Return() mp.On("Unlock").Return() + mp.On("PreUpdate").Return() mp.On("FlushAppConn", mock.Anything).Return(nil) mp.On("Update", mock.Anything, @@ -604,6 +607,7 @@ func TestEmptyPrepareProposal(t *testing.T) { mp := &mpmocks.Mempool{} mp.On("Lock").Return() mp.On("Unlock").Return() + mp.On("PreUpdate").Return() mp.On("FlushAppConn", mock.Anything).Return(nil) mp.On("Update", mock.Anything, diff --git a/state/validation_test.go b/state/validation_test.go index 96be61cd08b..3b722446527 100644 --- a/state/validation_test.go +++ b/state/validation_test.go @@ -35,6 +35,7 @@ func TestValidateBlockHeader(t *testing.T) { mp := &mpmocks.Mempool{} mp.On("Lock").Return() mp.On("Unlock").Return() + mp.On("PreUpdate").Return() mp.On("FlushAppConn", mock.Anything).Return(nil) mp.On("Update", mock.Anything, @@ -120,6 +121,7 @@ func TestValidateBlockCommit(t *testing.T) { mp := &mpmocks.Mempool{} mp.On("Lock").Return() mp.On("Unlock").Return() + mp.On("PreUpdate").Return() mp.On("FlushAppConn", mock.Anything).Return(nil) mp.On("Update", mock.Anything, @@ -259,6 +261,7 @@ func TestValidateBlockEvidence(t *testing.T) { mp := &mpmocks.Mempool{} mp.On("Lock").Return() mp.On("Unlock").Return() + mp.On("PreUpdate").Return() mp.On("FlushAppConn", mock.Anything).Return(nil) mp.On("Update", mock.Anything,