From 6ecf14cbc8fd924803812f30a4fc07d1fb1fa050 Mon Sep 17 00:00:00 2001 From: Dev Ojha Date: Fri, 28 Jun 2024 00:27:28 +0300 Subject: [PATCH 1/2] Revert "Add ResetUpdate, so non-block proposers no longer fail to sync (#113)" This reverts commit 158bcbebca63f469f61f7148afd77b0f0d19c240. --- blocksync/reactor_test.go | 1 - consensus/replay_stubs.go | 1 - mempool/mempool.go | 4 ---- mempool/mocks/mempool.go | 5 ----- mempool/nop_mempool.go | 2 -- mempool/v0/clist_mempool.go | 16 ++-------------- mempool/v1/mempool.go | 2 -- state/execution.go | 1 - state/execution_test.go | 4 ---- state/validation_test.go | 3 --- 10 files changed, 2 insertions(+), 37 deletions(-) diff --git a/blocksync/reactor_test.go b/blocksync/reactor_test.go index 72a5f7d8f7b..cdf5e7097cb 100644 --- a/blocksync/reactor_test.go +++ b/blocksync/reactor_test.go @@ -86,7 +86,6 @@ func newReactor( mp := &mpmocks.Mempool{} mp.On("Lock").Return() mp.On("Unlock").Return() - mp.On("ResetUpdate").Return() mp.On("FlushAppConn", mock.Anything).Return(nil) mp.On("Update", mock.Anything, diff --git a/consensus/replay_stubs.go b/consensus/replay_stubs.go index 26646bfa374..5ffb20ad822 100644 --- a/consensus/replay_stubs.go +++ b/consensus/replay_stubs.go @@ -17,7 +17,6 @@ var _ mempl.Mempool = emptyMempool{} func (emptyMempool) Lock() {} func (emptyMempool) Unlock() {} -func (emptyMempool) ResetUpdate() {} func (emptyMempool) Size() int { return 0 } func (emptyMempool) SizeBytes() int64 { return 0 } func (emptyMempool) CheckTx(_ types.Tx, _ func(*abci.Response), _ mempl.TxInfo) error { diff --git a/mempool/mempool.go b/mempool/mempool.go index 5956b83e0a9..244b6d9aa6a 100644 --- a/mempool/mempool.go +++ b/mempool/mempool.go @@ -58,10 +58,6 @@ type Mempool interface { // Unlock unlocks the mempool. Unlock() - // Cancel's any ongoing mempool Update. Safe to call while Mempool is locked. - // As a consequence, the Lock'd thread is expected to terminate soon after this is called. - ResetUpdate() - // Update informs the mempool that the given txs were committed and can be // discarded. // diff --git a/mempool/mocks/mempool.go b/mempool/mocks/mempool.go index 9cd570cd3a9..68a536caa43 100644 --- a/mempool/mocks/mempool.go +++ b/mempool/mocks/mempool.go @@ -91,11 +91,6 @@ func (_m *Mempool) ReapMaxTxs(max int) types.Txs { return r0 } -// ResetUpdate provides a mock function with given fields: -func (_m *Mempool) ResetUpdate() { - _m.Called() -} - // RemoveTxByKey provides a mock function with given fields: txKey func (_m *Mempool) RemoveTxByKey(txKey types.TxKey) error { ret := _m.Called(txKey) diff --git a/mempool/nop_mempool.go b/mempool/nop_mempool.go index c16e4e7edad..0e5b451e8dc 100644 --- a/mempool/nop_mempool.go +++ b/mempool/nop_mempool.go @@ -40,8 +40,6 @@ func (*NopMempool) Lock() {} // Unlock does nothing. func (*NopMempool) Unlock() {} -func (*NopMempool) ResetUpdate() {} - // Update does nothing. func (*NopMempool) Update( int64, diff --git a/mempool/v0/clist_mempool.go b/mempool/v0/clist_mempool.go index 0683069aaab..3e68553e176 100644 --- a/mempool/v0/clist_mempool.go +++ b/mempool/v0/clist_mempool.go @@ -46,9 +46,8 @@ type CListMempool struct { // Track whether we're rechecking txs. // These are not protected by a mutex and are expected to be mutated in // serial (ie. by abci responses which are called in serial). - recheckCursor *clist.CElement // next expected response - recheckEnd *clist.CElement // re-checking stops here - stopRechecking atomic.Bool + recheckCursor *clist.CElement // next expected response + recheckEnd *clist.CElement // re-checking stops here // Map for quick access to txs to record sender in CheckTx. // txsMap: txKey -> CElement @@ -141,11 +140,6 @@ func (mem *CListMempool) Unlock() { mem.updateMtx.Unlock() } -// Safe for concurrent use by multiple goroutines. -func (mem *CListMempool) ResetUpdate() { - mem.stopRechecking.Store(true) -} - // Safe for concurrent use by multiple goroutines. func (mem *CListMempool) Size() int { return mem.txs.Len() @@ -665,7 +659,6 @@ func (mem *CListMempool) Update( } func (mem *CListMempool) recheckTxs() { - mem.stopRechecking.Store(false) if mem.Size() == 0 { panic("recheckTxs is called, but the mempool is empty") } @@ -676,10 +669,6 @@ func (mem *CListMempool) recheckTxs() { // Push txs to proxyAppConn // NOTE: globalCb may be called concurrently. for e := mem.txs.Front(); e != nil; e = e.Next() { - if mem.stopRechecking.Load() { - break - } - memTx := e.Value.(*mempoolTx) mem.proxyAppConn.CheckTxAsync(abci.RequestCheckTx{ Tx: memTx.tx, @@ -687,7 +676,6 @@ func (mem *CListMempool) recheckTxs() { }) } - mem.stopRechecking.Store(false) mem.proxyAppConn.FlushAsync() } diff --git a/mempool/v1/mempool.go b/mempool/v1/mempool.go index b14ac87b338..301109c36fc 100644 --- a/mempool/v1/mempool.go +++ b/mempool/v1/mempool.go @@ -117,8 +117,6 @@ func (txmp *TxMempool) Lock() { txmp.mtx.Lock() } // Unlock releases a write-lock on the mempool. func (txmp *TxMempool) Unlock() { txmp.mtx.Unlock() } -func (txmp *TxMempool) ResetUpdate() {} - // Size returns the number of valid transactions in the mempool. It is // thread-safe. func (txmp *TxMempool) Size() int { return txmp.txs.Len() } diff --git a/state/execution.go b/state/execution.go index 23e76fb9cd6..45b945b26ed 100644 --- a/state/execution.go +++ b/state/execution.go @@ -296,7 +296,6 @@ func (blockExec *BlockExecutor) Commit( block *types.Block, deliverTxResponses []*abci.ResponseDeliverTx, ) ([]byte, int64, error) { - blockExec.mempool.ResetUpdate() blockExec.mempool.Lock() unlockMempool := func() { blockExec.mempool.Unlock() } diff --git a/state/execution_test.go b/state/execution_test.go index 175fbb595eb..5ae3c3c8723 100644 --- a/state/execution_test.go +++ b/state/execution_test.go @@ -50,7 +50,6 @@ func TestApplyBlock(t *testing.T) { mp := &mpmocks.Mempool{} mp.On("Lock").Return() mp.On("Unlock").Return() - mp.On("ResetUpdate").Return() mp.On("FlushAppConn", mock.Anything).Return(nil) mp.On("Update", mock.Anything, @@ -223,7 +222,6 @@ func TestBeginBlockByzantineValidators(t *testing.T) { mp := &mpmocks.Mempool{} mp.On("Lock").Return() mp.On("Unlock").Return() - mp.On("ResetUpdate").Return() mp.On("FlushAppConn", mock.Anything).Return(nil) mp.On("Update", mock.Anything, @@ -480,7 +478,6 @@ func TestEndBlockValidatorUpdates(t *testing.T) { mp := &mpmocks.Mempool{} mp.On("Lock").Return() mp.On("Unlock").Return() - mp.On("ResetUpdate").Return() mp.On("FlushAppConn", mock.Anything).Return(nil) mp.On("Update", mock.Anything, @@ -607,7 +604,6 @@ func TestEmptyPrepareProposal(t *testing.T) { mp := &mpmocks.Mempool{} mp.On("Lock").Return() mp.On("Unlock").Return() - mp.On("ResetUpdate").Return() mp.On("FlushAppConn", mock.Anything).Return(nil) mp.On("Update", mock.Anything, diff --git a/state/validation_test.go b/state/validation_test.go index 247c79c8dbe..96be61cd08b 100644 --- a/state/validation_test.go +++ b/state/validation_test.go @@ -35,7 +35,6 @@ func TestValidateBlockHeader(t *testing.T) { mp := &mpmocks.Mempool{} mp.On("Lock").Return() mp.On("Unlock").Return() - mp.On("ResetUpdate").Return() mp.On("FlushAppConn", mock.Anything).Return(nil) mp.On("Update", mock.Anything, @@ -121,7 +120,6 @@ func TestValidateBlockCommit(t *testing.T) { mp := &mpmocks.Mempool{} mp.On("Lock").Return() mp.On("Unlock").Return() - mp.On("ResetUpdate").Return() mp.On("FlushAppConn", mock.Anything).Return(nil) mp.On("Update", mock.Anything, @@ -261,7 +259,6 @@ func TestValidateBlockEvidence(t *testing.T) { mp := &mpmocks.Mempool{} mp.On("Lock").Return() mp.On("Unlock").Return() - mp.On("ResetUpdate").Return() mp.On("FlushAppConn", mock.Anything).Return(nil) mp.On("Update", mock.Anything, From df1483ca0e86e6d97fee11c1d281d776f26d870c Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Tue, 25 Jun 2024 00:02:49 +0200 Subject: [PATCH 2/2] fix(mempool)!: stop accepting TXs in the mempool if we can't keep up with reCheckTX (backport #3314) (#3338) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This PR is a combination of ideas from @ValarDragon, @hvanz and @sergio-mena to alleviate nodes that, while not having their mempool full "officially", they have too many TXs lingering in the mempool which causes them to fall behind. The mechanism works as follows: * We mark when we start and end reChecking * If, by the time a new block is decided we are still running the previous `reCheckTx`, we declare the mempool as rechecktx-full * Otherwise, we declare the mempool as not rechecktx-full We have tested this and it fixes the failing nightlies that are blocking us from cutting `v1.0.0-rc1`. Some UTs need to be fixed, hence posting as draft for the moment. --- #### PR checklist - [ ] Tests written/updated - [x] Changelog entry added in `.changelog` (we use [unclog](https://github.com/informalsystems/unclog) to manage our changelog) - [ ] Updated relevant documentation (`docs/` or `spec/`) and code comments - [x] Title follows the [Conventional Commits](https://www.conventionalcommits.org/en/v1.0.0/) spec
This is an automatic backport of pull request #3314 done by [Mergify](https://mergify.com). Co-authored-by: Sergio Mena Co-authored-by: Andy Nogueira Co-authored-by: Dev Ojha Co-authored-by: hvanz Co-authored-by: HernĂ¡n Vanzetto <15466498+hvanz@users.noreply.github.com> --- .../3314-mempool-preupdate.md | 4 +++ ...ol-update-consider-full-when-rechecking.md | 4 +++ blocksync/reactor_test.go | 1 + consensus/replay_stubs.go | 1 + mempool/mempool.go | 4 +++ mempool/mocks/mempool.go | 5 ++++ mempool/nop_mempool.go | 2 ++ mempool/v0/clist_mempool.go | 25 +++++++++++++++---- mempool/v0/reactor_test.go | 2 ++ mempool/v1/mempool.go | 3 +++ state/execution.go | 1 + state/execution_test.go | 4 +++ state/validation_test.go | 3 +++ 13 files changed, 54 insertions(+), 5 deletions(-) create mode 100644 .changelog/unreleased/breaking-changes/3314-mempool-preupdate.md create mode 100644 .changelog/unreleased/improvements/3314-mempool-update-consider-full-when-rechecking.md diff --git a/.changelog/unreleased/breaking-changes/3314-mempool-preupdate.md b/.changelog/unreleased/breaking-changes/3314-mempool-preupdate.md new file mode 100644 index 00000000000..4c2528939f7 --- /dev/null +++ b/.changelog/unreleased/breaking-changes/3314-mempool-preupdate.md @@ -0,0 +1,4 @@ +- `[mempool]` Add to the `Mempool` interface a new method `PreUpdate()`. This method should be + called before acquiring the mempool lock, to signal that a new update is coming. Also add to + `ErrMempoolIsFull` a new field `RecheckFull`. + ([\#3314](https://github.com/cometbft/cometbft/pull/3314)) diff --git a/.changelog/unreleased/improvements/3314-mempool-update-consider-full-when-rechecking.md b/.changelog/unreleased/improvements/3314-mempool-update-consider-full-when-rechecking.md new file mode 100644 index 00000000000..57c9d6311dc --- /dev/null +++ b/.changelog/unreleased/improvements/3314-mempool-update-consider-full-when-rechecking.md @@ -0,0 +1,4 @@ +- `[mempool]` Before updating the mempool, consider it as full if rechecking is still in progress. + This will stop accepting transactions in the mempool if the node can't keep up with re-CheckTx. + This improvement is implemented only in the v0 mempool. + ([\#3314](https://github.com/cometbft/cometbft/pull/3314)) diff --git a/blocksync/reactor_test.go b/blocksync/reactor_test.go index cdf5e7097cb..ef13e1c0f1a 100644 --- a/blocksync/reactor_test.go +++ b/blocksync/reactor_test.go @@ -86,6 +86,7 @@ func newReactor( mp := &mpmocks.Mempool{} mp.On("Lock").Return() mp.On("Unlock").Return() + mp.On("PreUpdate").Return() mp.On("FlushAppConn", mock.Anything).Return(nil) mp.On("Update", mock.Anything, diff --git a/consensus/replay_stubs.go b/consensus/replay_stubs.go index 5ffb20ad822..2ee246e3a46 100644 --- a/consensus/replay_stubs.go +++ b/consensus/replay_stubs.go @@ -17,6 +17,7 @@ var _ mempl.Mempool = emptyMempool{} func (emptyMempool) Lock() {} func (emptyMempool) Unlock() {} +func (emptyMempool) PreUpdate() {} func (emptyMempool) Size() int { return 0 } func (emptyMempool) SizeBytes() int64 { return 0 } func (emptyMempool) CheckTx(_ types.Tx, _ func(*abci.Response), _ mempl.TxInfo) error { diff --git a/mempool/mempool.go b/mempool/mempool.go index 244b6d9aa6a..cb91b60dcc7 100644 --- a/mempool/mempool.go +++ b/mempool/mempool.go @@ -58,6 +58,10 @@ type Mempool interface { // Unlock unlocks the mempool. Unlock() + // PreUpdate signals that a new update is coming, before acquiring the mempool lock. + // If the mempool is still rechecking at this point, it should be considered full. + PreUpdate() + // Update informs the mempool that the given txs were committed and can be // discarded. // diff --git a/mempool/mocks/mempool.go b/mempool/mocks/mempool.go index 68a536caa43..e0e66ddd98a 100644 --- a/mempool/mocks/mempool.go +++ b/mempool/mocks/mempool.go @@ -59,6 +59,11 @@ func (_m *Mempool) Lock() { _m.Called() } +// PreUpdate provides a mock function with given fields: +func (_m *Mempool) PreUpdate() { + _m.Called() +} + // ReapMaxBytesMaxGas provides a mock function with given fields: maxBytes, maxGas func (_m *Mempool) ReapMaxBytesMaxGas(maxBytes int64, maxGas int64) types.Txs { ret := _m.Called(maxBytes, maxGas) diff --git a/mempool/nop_mempool.go b/mempool/nop_mempool.go index 0e5b451e8dc..79cd3cbab76 100644 --- a/mempool/nop_mempool.go +++ b/mempool/nop_mempool.go @@ -40,6 +40,8 @@ func (*NopMempool) Lock() {} // Unlock does nothing. func (*NopMempool) Unlock() {} +func (*NopMempool) PreUpdate() {} + // Update does nothing. func (*NopMempool) Update( int64, diff --git a/mempool/v0/clist_mempool.go b/mempool/v0/clist_mempool.go index 3e68553e176..cf555c4c540 100644 --- a/mempool/v0/clist_mempool.go +++ b/mempool/v0/clist_mempool.go @@ -48,6 +48,8 @@ type CListMempool struct { // serial (ie. by abci responses which are called in serial). recheckCursor *clist.CElement // next expected response recheckEnd *clist.CElement // re-checking stops here + isRechecking atomic.Bool // true iff the rechecking process has begun and is not yet finished + recheckFull atomic.Bool // whether rechecking TXs cannot be completed before a new block is decided // Map for quick access to txs to record sender in CheckTx. // txsMap: txKey -> CElement @@ -140,6 +142,15 @@ func (mem *CListMempool) Unlock() { mem.updateMtx.Unlock() } +// Safe for concurrent use by multiple goroutines. +func (mem *CListMempool) PreUpdate() { + rechecking := mem.isRechecking.Load() + recheckFull := mem.recheckFull.Swap(rechecking) + if rechecking != recheckFull { + mem.logger.Debug("the state of recheckFull has flipped") + } +} + // Safe for concurrent use by multiple goroutines. func (mem *CListMempool) Size() int { return mem.txs.Len() @@ -350,12 +361,11 @@ func (mem *CListMempool) RemoveTxByKey(txKey types.TxKey) error { } func (mem *CListMempool) isFull(txSize int) error { - var ( - memSize = mem.Size() - txsBytes = mem.SizeBytes() - ) + memSize := mem.Size() + txsBytes := mem.SizeBytes() + recheckFull := mem.recheckFull.Load() - if memSize >= mem.config.Size || int64(txSize)+txsBytes > mem.config.MaxTxsBytes { + if memSize >= mem.config.Size || int64(txSize)+txsBytes > mem.config.MaxTxsBytes || recheckFull { return mempool.ErrMempoolIsFull{ NumTxs: memSize, MaxTxs: mem.config.Size, @@ -477,6 +487,8 @@ func (mem *CListMempool) resCbRecheck(req *abci.Request, res *abci.Response) { // matching the one we received from the ABCI application. // Return without processing any tx. mem.recheckCursor = nil + mem.isRechecking.Store(false) + mem.recheckFull.Store(false) return } @@ -499,6 +511,8 @@ func (mem *CListMempool) resCbRecheck(req *abci.Request, res *abci.Response) { } if mem.recheckCursor == mem.recheckEnd { mem.recheckCursor = nil + mem.isRechecking.Store(false) + mem.recheckFull.Store(false) } else { mem.recheckCursor = mem.recheckCursor.Next() } @@ -665,6 +679,7 @@ func (mem *CListMempool) recheckTxs() { mem.recheckCursor = mem.txs.Front() mem.recheckEnd = mem.txs.Back() + mem.isRechecking.Store(true) // Push txs to proxyAppConn // NOTE: globalCb may be called concurrently. diff --git a/mempool/v0/reactor_test.go b/mempool/v0/reactor_test.go index f4e5f46bd9e..b54c2759a0d 100644 --- a/mempool/v0/reactor_test.go +++ b/mempool/v0/reactor_test.go @@ -96,6 +96,7 @@ func TestReactorConcurrency(t *testing.T) { go func() { defer wg.Done() + reactors[0].mempool.PreUpdate() reactors[0].mempool.Lock() defer reactors[0].mempool.Unlock() @@ -113,6 +114,7 @@ func TestReactorConcurrency(t *testing.T) { go func() { defer wg.Done() + reactors[1].mempool.PreUpdate() reactors[1].mempool.Lock() defer reactors[1].mempool.Unlock() err := reactors[1].mempool.Update(1, []types.Tx{}, make([]*abci.ResponseDeliverTx, 0), nil, nil) diff --git a/mempool/v1/mempool.go b/mempool/v1/mempool.go index 301109c36fc..3650816bac1 100644 --- a/mempool/v1/mempool.go +++ b/mempool/v1/mempool.go @@ -117,6 +117,9 @@ func (txmp *TxMempool) Lock() { txmp.mtx.Lock() } // Unlock releases a write-lock on the mempool. func (txmp *TxMempool) Unlock() { txmp.mtx.Unlock() } +// PreUpdate does nothing on the v1 mempool +func (txmp *TxMempool) PreUpdate() {} + // Size returns the number of valid transactions in the mempool. It is // thread-safe. func (txmp *TxMempool) Size() int { return txmp.txs.Len() } diff --git a/state/execution.go b/state/execution.go index 45b945b26ed..a0678ca2aad 100644 --- a/state/execution.go +++ b/state/execution.go @@ -296,6 +296,7 @@ func (blockExec *BlockExecutor) Commit( block *types.Block, deliverTxResponses []*abci.ResponseDeliverTx, ) ([]byte, int64, error) { + blockExec.mempool.PreUpdate() blockExec.mempool.Lock() unlockMempool := func() { blockExec.mempool.Unlock() } diff --git a/state/execution_test.go b/state/execution_test.go index 5ae3c3c8723..d87feed15c9 100644 --- a/state/execution_test.go +++ b/state/execution_test.go @@ -50,6 +50,7 @@ func TestApplyBlock(t *testing.T) { mp := &mpmocks.Mempool{} mp.On("Lock").Return() mp.On("Unlock").Return() + mp.On("PreUpdate").Return() mp.On("FlushAppConn", mock.Anything).Return(nil) mp.On("Update", mock.Anything, @@ -222,6 +223,7 @@ func TestBeginBlockByzantineValidators(t *testing.T) { mp := &mpmocks.Mempool{} mp.On("Lock").Return() mp.On("Unlock").Return() + mp.On("PreUpdate").Return() mp.On("FlushAppConn", mock.Anything).Return(nil) mp.On("Update", mock.Anything, @@ -478,6 +480,7 @@ func TestEndBlockValidatorUpdates(t *testing.T) { mp := &mpmocks.Mempool{} mp.On("Lock").Return() mp.On("Unlock").Return() + mp.On("PreUpdate").Return() mp.On("FlushAppConn", mock.Anything).Return(nil) mp.On("Update", mock.Anything, @@ -604,6 +607,7 @@ func TestEmptyPrepareProposal(t *testing.T) { mp := &mpmocks.Mempool{} mp.On("Lock").Return() mp.On("Unlock").Return() + mp.On("PreUpdate").Return() mp.On("FlushAppConn", mock.Anything).Return(nil) mp.On("Update", mock.Anything, diff --git a/state/validation_test.go b/state/validation_test.go index 96be61cd08b..3b722446527 100644 --- a/state/validation_test.go +++ b/state/validation_test.go @@ -35,6 +35,7 @@ func TestValidateBlockHeader(t *testing.T) { mp := &mpmocks.Mempool{} mp.On("Lock").Return() mp.On("Unlock").Return() + mp.On("PreUpdate").Return() mp.On("FlushAppConn", mock.Anything).Return(nil) mp.On("Update", mock.Anything, @@ -120,6 +121,7 @@ func TestValidateBlockCommit(t *testing.T) { mp := &mpmocks.Mempool{} mp.On("Lock").Return() mp.On("Unlock").Return() + mp.On("PreUpdate").Return() mp.On("FlushAppConn", mock.Anything).Return(nil) mp.On("Update", mock.Anything, @@ -259,6 +261,7 @@ func TestValidateBlockEvidence(t *testing.T) { mp := &mpmocks.Mempool{} mp.On("Lock").Return() mp.On("Unlock").Return() + mp.On("PreUpdate").Return() mp.On("FlushAppConn", mock.Anything).Return(nil) mp.On("Update", mock.Anything,