diff --git a/.changelog/unreleased/breaking-changes/3314-mempool-preupdate.md b/.changelog/unreleased/breaking-changes/3314-mempool-preupdate.md new file mode 100644 index 00000000000..4c2528939f7 --- /dev/null +++ b/.changelog/unreleased/breaking-changes/3314-mempool-preupdate.md @@ -0,0 +1,4 @@ +- `[mempool]` Add to the `Mempool` interface a new method `PreUpdate()`. This method should be + called before acquiring the mempool lock, to signal that a new update is coming. Also add to + `ErrMempoolIsFull` a new field `RecheckFull`. + ([\#3314](https://github.com/cometbft/cometbft/pull/3314)) diff --git a/.changelog/unreleased/improvements/3314-mempool-update-consider-full-when-rechecking.md b/.changelog/unreleased/improvements/3314-mempool-update-consider-full-when-rechecking.md new file mode 100644 index 00000000000..57c9d6311dc --- /dev/null +++ b/.changelog/unreleased/improvements/3314-mempool-update-consider-full-when-rechecking.md @@ -0,0 +1,4 @@ +- `[mempool]` Before updating the mempool, consider it as full if rechecking is still in progress. + This will stop accepting transactions in the mempool if the node can't keep up with re-CheckTx. + This improvement is implemented only in the v0 mempool. + ([\#3314](https://github.com/cometbft/cometbft/pull/3314)) diff --git a/blocksync/reactor_test.go b/blocksync/reactor_test.go index 72a5f7d8f7b..ef13e1c0f1a 100644 --- a/blocksync/reactor_test.go +++ b/blocksync/reactor_test.go @@ -86,7 +86,7 @@ func newReactor( mp := &mpmocks.Mempool{} mp.On("Lock").Return() mp.On("Unlock").Return() - mp.On("ResetUpdate").Return() + mp.On("PreUpdate").Return() mp.On("FlushAppConn", mock.Anything).Return(nil) mp.On("Update", mock.Anything, diff --git a/consensus/replay_stubs.go b/consensus/replay_stubs.go index 26646bfa374..2ee246e3a46 100644 --- a/consensus/replay_stubs.go +++ b/consensus/replay_stubs.go @@ -17,7 +17,7 @@ var _ mempl.Mempool = emptyMempool{} func (emptyMempool) Lock() {} func (emptyMempool) Unlock() {} -func (emptyMempool) ResetUpdate() {} +func (emptyMempool) PreUpdate() {} func (emptyMempool) Size() int { return 0 } func (emptyMempool) SizeBytes() int64 { return 0 } func (emptyMempool) CheckTx(_ types.Tx, _ func(*abci.Response), _ mempl.TxInfo) error { diff --git a/mempool/mempool.go b/mempool/mempool.go index 5956b83e0a9..cb91b60dcc7 100644 --- a/mempool/mempool.go +++ b/mempool/mempool.go @@ -58,9 +58,9 @@ type Mempool interface { // Unlock unlocks the mempool. Unlock() - // Cancel's any ongoing mempool Update. Safe to call while Mempool is locked. - // As a consequence, the Lock'd thread is expected to terminate soon after this is called. - ResetUpdate() + // PreUpdate signals that a new update is coming, before acquiring the mempool lock. + // If the mempool is still rechecking at this point, it should be considered full. + PreUpdate() // Update informs the mempool that the given txs were committed and can be // discarded. diff --git a/mempool/mocks/mempool.go b/mempool/mocks/mempool.go index 9cd570cd3a9..e0e66ddd98a 100644 --- a/mempool/mocks/mempool.go +++ b/mempool/mocks/mempool.go @@ -59,6 +59,11 @@ func (_m *Mempool) Lock() { _m.Called() } +// PreUpdate provides a mock function with given fields: +func (_m *Mempool) PreUpdate() { + _m.Called() +} + // ReapMaxBytesMaxGas provides a mock function with given fields: maxBytes, maxGas func (_m *Mempool) ReapMaxBytesMaxGas(maxBytes int64, maxGas int64) types.Txs { ret := _m.Called(maxBytes, maxGas) @@ -91,11 +96,6 @@ func (_m *Mempool) ReapMaxTxs(max int) types.Txs { return r0 } -// ResetUpdate provides a mock function with given fields: -func (_m *Mempool) ResetUpdate() { - _m.Called() -} - // RemoveTxByKey provides a mock function with given fields: txKey func (_m *Mempool) RemoveTxByKey(txKey types.TxKey) error { ret := _m.Called(txKey) diff --git a/mempool/nop_mempool.go b/mempool/nop_mempool.go index c16e4e7edad..79cd3cbab76 100644 --- a/mempool/nop_mempool.go +++ b/mempool/nop_mempool.go @@ -40,7 +40,7 @@ func (*NopMempool) Lock() {} // Unlock does nothing. func (*NopMempool) Unlock() {} -func (*NopMempool) ResetUpdate() {} +func (*NopMempool) PreUpdate() {} // Update does nothing. func (*NopMempool) Update( diff --git a/mempool/v0/clist_mempool.go b/mempool/v0/clist_mempool.go index 0683069aaab..cf555c4c540 100644 --- a/mempool/v0/clist_mempool.go +++ b/mempool/v0/clist_mempool.go @@ -46,9 +46,10 @@ type CListMempool struct { // Track whether we're rechecking txs. // These are not protected by a mutex and are expected to be mutated in // serial (ie. by abci responses which are called in serial). - recheckCursor *clist.CElement // next expected response - recheckEnd *clist.CElement // re-checking stops here - stopRechecking atomic.Bool + recheckCursor *clist.CElement // next expected response + recheckEnd *clist.CElement // re-checking stops here + isRechecking atomic.Bool // true iff the rechecking process has begun and is not yet finished + recheckFull atomic.Bool // whether rechecking TXs cannot be completed before a new block is decided // Map for quick access to txs to record sender in CheckTx. // txsMap: txKey -> CElement @@ -142,8 +143,12 @@ func (mem *CListMempool) Unlock() { } // Safe for concurrent use by multiple goroutines. -func (mem *CListMempool) ResetUpdate() { - mem.stopRechecking.Store(true) +func (mem *CListMempool) PreUpdate() { + rechecking := mem.isRechecking.Load() + recheckFull := mem.recheckFull.Swap(rechecking) + if rechecking != recheckFull { + mem.logger.Debug("the state of recheckFull has flipped") + } } // Safe for concurrent use by multiple goroutines. @@ -356,12 +361,11 @@ func (mem *CListMempool) RemoveTxByKey(txKey types.TxKey) error { } func (mem *CListMempool) isFull(txSize int) error { - var ( - memSize = mem.Size() - txsBytes = mem.SizeBytes() - ) + memSize := mem.Size() + txsBytes := mem.SizeBytes() + recheckFull := mem.recheckFull.Load() - if memSize >= mem.config.Size || int64(txSize)+txsBytes > mem.config.MaxTxsBytes { + if memSize >= mem.config.Size || int64(txSize)+txsBytes > mem.config.MaxTxsBytes || recheckFull { return mempool.ErrMempoolIsFull{ NumTxs: memSize, MaxTxs: mem.config.Size, @@ -483,6 +487,8 @@ func (mem *CListMempool) resCbRecheck(req *abci.Request, res *abci.Response) { // matching the one we received from the ABCI application. // Return without processing any tx. mem.recheckCursor = nil + mem.isRechecking.Store(false) + mem.recheckFull.Store(false) return } @@ -505,6 +511,8 @@ func (mem *CListMempool) resCbRecheck(req *abci.Request, res *abci.Response) { } if mem.recheckCursor == mem.recheckEnd { mem.recheckCursor = nil + mem.isRechecking.Store(false) + mem.recheckFull.Store(false) } else { mem.recheckCursor = mem.recheckCursor.Next() } @@ -665,21 +673,17 @@ func (mem *CListMempool) Update( } func (mem *CListMempool) recheckTxs() { - mem.stopRechecking.Store(false) if mem.Size() == 0 { panic("recheckTxs is called, but the mempool is empty") } mem.recheckCursor = mem.txs.Front() mem.recheckEnd = mem.txs.Back() + mem.isRechecking.Store(true) // Push txs to proxyAppConn // NOTE: globalCb may be called concurrently. for e := mem.txs.Front(); e != nil; e = e.Next() { - if mem.stopRechecking.Load() { - break - } - memTx := e.Value.(*mempoolTx) mem.proxyAppConn.CheckTxAsync(abci.RequestCheckTx{ Tx: memTx.tx, @@ -687,7 +691,6 @@ func (mem *CListMempool) recheckTxs() { }) } - mem.stopRechecking.Store(false) mem.proxyAppConn.FlushAsync() } diff --git a/mempool/v0/reactor_test.go b/mempool/v0/reactor_test.go index f4e5f46bd9e..b54c2759a0d 100644 --- a/mempool/v0/reactor_test.go +++ b/mempool/v0/reactor_test.go @@ -96,6 +96,7 @@ func TestReactorConcurrency(t *testing.T) { go func() { defer wg.Done() + reactors[0].mempool.PreUpdate() reactors[0].mempool.Lock() defer reactors[0].mempool.Unlock() @@ -113,6 +114,7 @@ func TestReactorConcurrency(t *testing.T) { go func() { defer wg.Done() + reactors[1].mempool.PreUpdate() reactors[1].mempool.Lock() defer reactors[1].mempool.Unlock() err := reactors[1].mempool.Update(1, []types.Tx{}, make([]*abci.ResponseDeliverTx, 0), nil, nil) diff --git a/mempool/v1/mempool.go b/mempool/v1/mempool.go index b14ac87b338..3650816bac1 100644 --- a/mempool/v1/mempool.go +++ b/mempool/v1/mempool.go @@ -117,7 +117,8 @@ func (txmp *TxMempool) Lock() { txmp.mtx.Lock() } // Unlock releases a write-lock on the mempool. func (txmp *TxMempool) Unlock() { txmp.mtx.Unlock() } -func (txmp *TxMempool) ResetUpdate() {} +// PreUpdate does nothing on the v1 mempool +func (txmp *TxMempool) PreUpdate() {} // Size returns the number of valid transactions in the mempool. It is // thread-safe. diff --git a/state/execution.go b/state/execution.go index 23e76fb9cd6..a0678ca2aad 100644 --- a/state/execution.go +++ b/state/execution.go @@ -296,7 +296,7 @@ func (blockExec *BlockExecutor) Commit( block *types.Block, deliverTxResponses []*abci.ResponseDeliverTx, ) ([]byte, int64, error) { - blockExec.mempool.ResetUpdate() + blockExec.mempool.PreUpdate() blockExec.mempool.Lock() unlockMempool := func() { blockExec.mempool.Unlock() } diff --git a/state/execution_test.go b/state/execution_test.go index 175fbb595eb..d87feed15c9 100644 --- a/state/execution_test.go +++ b/state/execution_test.go @@ -50,7 +50,7 @@ func TestApplyBlock(t *testing.T) { mp := &mpmocks.Mempool{} mp.On("Lock").Return() mp.On("Unlock").Return() - mp.On("ResetUpdate").Return() + mp.On("PreUpdate").Return() mp.On("FlushAppConn", mock.Anything).Return(nil) mp.On("Update", mock.Anything, @@ -223,7 +223,7 @@ func TestBeginBlockByzantineValidators(t *testing.T) { mp := &mpmocks.Mempool{} mp.On("Lock").Return() mp.On("Unlock").Return() - mp.On("ResetUpdate").Return() + mp.On("PreUpdate").Return() mp.On("FlushAppConn", mock.Anything).Return(nil) mp.On("Update", mock.Anything, @@ -480,7 +480,7 @@ func TestEndBlockValidatorUpdates(t *testing.T) { mp := &mpmocks.Mempool{} mp.On("Lock").Return() mp.On("Unlock").Return() - mp.On("ResetUpdate").Return() + mp.On("PreUpdate").Return() mp.On("FlushAppConn", mock.Anything).Return(nil) mp.On("Update", mock.Anything, @@ -607,7 +607,7 @@ func TestEmptyPrepareProposal(t *testing.T) { mp := &mpmocks.Mempool{} mp.On("Lock").Return() mp.On("Unlock").Return() - mp.On("ResetUpdate").Return() + mp.On("PreUpdate").Return() mp.On("FlushAppConn", mock.Anything).Return(nil) mp.On("Update", mock.Anything, diff --git a/state/validation_test.go b/state/validation_test.go index 247c79c8dbe..3b722446527 100644 --- a/state/validation_test.go +++ b/state/validation_test.go @@ -35,7 +35,7 @@ func TestValidateBlockHeader(t *testing.T) { mp := &mpmocks.Mempool{} mp.On("Lock").Return() mp.On("Unlock").Return() - mp.On("ResetUpdate").Return() + mp.On("PreUpdate").Return() mp.On("FlushAppConn", mock.Anything).Return(nil) mp.On("Update", mock.Anything, @@ -121,7 +121,7 @@ func TestValidateBlockCommit(t *testing.T) { mp := &mpmocks.Mempool{} mp.On("Lock").Return() mp.On("Unlock").Return() - mp.On("ResetUpdate").Return() + mp.On("PreUpdate").Return() mp.On("FlushAppConn", mock.Anything).Return(nil) mp.On("Update", mock.Anything, @@ -261,7 +261,7 @@ func TestValidateBlockEvidence(t *testing.T) { mp := &mpmocks.Mempool{} mp.On("Lock").Return() mp.On("Unlock").Return() - mp.On("ResetUpdate").Return() + mp.On("PreUpdate").Return() mp.On("FlushAppConn", mock.Anything).Return(nil) mp.On("Update", mock.Anything,