Skip to content

Commit

Permalink
fix(mempool)!: stop accepting TXs in the mempool if we can't keep up …
Browse files Browse the repository at this point in the history
…with reCheckTX (backport cometbft#3314) (cometbft#3338)

This PR is a combination of ideas from @ValarDragon, @hvanz and
@sergio-mena to alleviate nodes that, while not having their mempool
full "officially", they have too many TXs lingering in the mempool which
causes them to fall behind.
The mechanism works as follows:

* We mark when we start and end reChecking
* If, by the time a new block is decided we are still running the
previous `reCheckTx`, we declare the mempool as rechecktx-full
* Otherwise, we declare the mempool as not rechecktx-full

We have tested this and it fixes the failing nightlies that are blocking
us from cutting `v1.0.0-rc1`.

Some UTs need to be fixed, hence posting as draft for the moment.

---

#### PR checklist

- [ ] Tests written/updated
- [x] Changelog entry added in `.changelog` (we use
[unclog](https://github.com/informalsystems/unclog) to manage our
changelog)
- [ ] Updated relevant documentation (`docs/` or `spec/`) and code
comments
- [x] Title follows the [Conventional
Commits](https://www.conventionalcommits.org/en/v1.0.0/) spec
<hr>This is an automatic backport of pull request cometbft#3314 done by
[Mergify](https://mergify.com).

Co-authored-by: Sergio Mena <[email protected]>
Co-authored-by: Andy Nogueira <[email protected]>
Co-authored-by: Dev Ojha <[email protected]>
Co-authored-by: hvanz <[email protected]>
Co-authored-by: Hernán Vanzetto <[email protected]>
  • Loading branch information
6 people committed Jun 27, 2024
1 parent 6ecf14c commit df1483c
Show file tree
Hide file tree
Showing 13 changed files with 54 additions and 5 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
- `[mempool]` Add to the `Mempool` interface a new method `PreUpdate()`. This method should be
called before acquiring the mempool lock, to signal that a new update is coming. Also add to
`ErrMempoolIsFull` a new field `RecheckFull`.
([\#3314](https://github.com/cometbft/cometbft/pull/3314))
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
- `[mempool]` Before updating the mempool, consider it as full if rechecking is still in progress.
This will stop accepting transactions in the mempool if the node can't keep up with re-CheckTx.
This improvement is implemented only in the v0 mempool.
([\#3314](https://github.com/cometbft/cometbft/pull/3314))
1 change: 1 addition & 0 deletions blocksync/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ func newReactor(
mp := &mpmocks.Mempool{}
mp.On("Lock").Return()
mp.On("Unlock").Return()
mp.On("PreUpdate").Return()
mp.On("FlushAppConn", mock.Anything).Return(nil)
mp.On("Update",
mock.Anything,
Expand Down
1 change: 1 addition & 0 deletions consensus/replay_stubs.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ var _ mempl.Mempool = emptyMempool{}

func (emptyMempool) Lock() {}
func (emptyMempool) Unlock() {}
func (emptyMempool) PreUpdate() {}
func (emptyMempool) Size() int { return 0 }
func (emptyMempool) SizeBytes() int64 { return 0 }
func (emptyMempool) CheckTx(_ types.Tx, _ func(*abci.Response), _ mempl.TxInfo) error {
Expand Down
4 changes: 4 additions & 0 deletions mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ type Mempool interface {
// Unlock unlocks the mempool.
Unlock()

// PreUpdate signals that a new update is coming, before acquiring the mempool lock.
// If the mempool is still rechecking at this point, it should be considered full.
PreUpdate()

// Update informs the mempool that the given txs were committed and can be
// discarded.
//
Expand Down
5 changes: 5 additions & 0 deletions mempool/mocks/mempool.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions mempool/nop_mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ func (*NopMempool) Lock() {}
// Unlock does nothing.
func (*NopMempool) Unlock() {}

func (*NopMempool) PreUpdate() {}

// Update does nothing.
func (*NopMempool) Update(
int64,
Expand Down
25 changes: 20 additions & 5 deletions mempool/v0/clist_mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ type CListMempool struct {
// serial (ie. by abci responses which are called in serial).
recheckCursor *clist.CElement // next expected response
recheckEnd *clist.CElement // re-checking stops here
isRechecking atomic.Bool // true iff the rechecking process has begun and is not yet finished
recheckFull atomic.Bool // whether rechecking TXs cannot be completed before a new block is decided

// Map for quick access to txs to record sender in CheckTx.
// txsMap: txKey -> CElement
Expand Down Expand Up @@ -140,6 +142,15 @@ func (mem *CListMempool) Unlock() {
mem.updateMtx.Unlock()
}

// Safe for concurrent use by multiple goroutines.
func (mem *CListMempool) PreUpdate() {
rechecking := mem.isRechecking.Load()
recheckFull := mem.recheckFull.Swap(rechecking)
if rechecking != recheckFull {
mem.logger.Debug("the state of recheckFull has flipped")
}
}

// Safe for concurrent use by multiple goroutines.
func (mem *CListMempool) Size() int {
return mem.txs.Len()
Expand Down Expand Up @@ -350,12 +361,11 @@ func (mem *CListMempool) RemoveTxByKey(txKey types.TxKey) error {
}

func (mem *CListMempool) isFull(txSize int) error {
var (
memSize = mem.Size()
txsBytes = mem.SizeBytes()
)
memSize := mem.Size()
txsBytes := mem.SizeBytes()
recheckFull := mem.recheckFull.Load()

if memSize >= mem.config.Size || int64(txSize)+txsBytes > mem.config.MaxTxsBytes {
if memSize >= mem.config.Size || int64(txSize)+txsBytes > mem.config.MaxTxsBytes || recheckFull {
return mempool.ErrMempoolIsFull{
NumTxs: memSize,
MaxTxs: mem.config.Size,
Expand Down Expand Up @@ -477,6 +487,8 @@ func (mem *CListMempool) resCbRecheck(req *abci.Request, res *abci.Response) {
// matching the one we received from the ABCI application.
// Return without processing any tx.
mem.recheckCursor = nil
mem.isRechecking.Store(false)
mem.recheckFull.Store(false)
return
}

Expand All @@ -499,6 +511,8 @@ func (mem *CListMempool) resCbRecheck(req *abci.Request, res *abci.Response) {
}
if mem.recheckCursor == mem.recheckEnd {
mem.recheckCursor = nil
mem.isRechecking.Store(false)
mem.recheckFull.Store(false)
} else {
mem.recheckCursor = mem.recheckCursor.Next()
}
Expand Down Expand Up @@ -665,6 +679,7 @@ func (mem *CListMempool) recheckTxs() {

mem.recheckCursor = mem.txs.Front()
mem.recheckEnd = mem.txs.Back()
mem.isRechecking.Store(true)

// Push txs to proxyAppConn
// NOTE: globalCb may be called concurrently.
Expand Down
2 changes: 2 additions & 0 deletions mempool/v0/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ func TestReactorConcurrency(t *testing.T) {
go func() {
defer wg.Done()

reactors[0].mempool.PreUpdate()
reactors[0].mempool.Lock()
defer reactors[0].mempool.Unlock()

Expand All @@ -113,6 +114,7 @@ func TestReactorConcurrency(t *testing.T) {
go func() {
defer wg.Done()

reactors[1].mempool.PreUpdate()
reactors[1].mempool.Lock()
defer reactors[1].mempool.Unlock()
err := reactors[1].mempool.Update(1, []types.Tx{}, make([]*abci.ResponseDeliverTx, 0), nil, nil)
Expand Down
3 changes: 3 additions & 0 deletions mempool/v1/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@ func (txmp *TxMempool) Lock() { txmp.mtx.Lock() }
// Unlock releases a write-lock on the mempool.
func (txmp *TxMempool) Unlock() { txmp.mtx.Unlock() }

// PreUpdate does nothing on the v1 mempool
func (txmp *TxMempool) PreUpdate() {}

// Size returns the number of valid transactions in the mempool. It is
// thread-safe.
func (txmp *TxMempool) Size() int { return txmp.txs.Len() }
Expand Down
1 change: 1 addition & 0 deletions state/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,7 @@ func (blockExec *BlockExecutor) Commit(
block *types.Block,
deliverTxResponses []*abci.ResponseDeliverTx,
) ([]byte, int64, error) {
blockExec.mempool.PreUpdate()
blockExec.mempool.Lock()
unlockMempool := func() { blockExec.mempool.Unlock() }

Expand Down
4 changes: 4 additions & 0 deletions state/execution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ func TestApplyBlock(t *testing.T) {
mp := &mpmocks.Mempool{}
mp.On("Lock").Return()
mp.On("Unlock").Return()
mp.On("PreUpdate").Return()
mp.On("FlushAppConn", mock.Anything).Return(nil)
mp.On("Update",
mock.Anything,
Expand Down Expand Up @@ -222,6 +223,7 @@ func TestBeginBlockByzantineValidators(t *testing.T) {
mp := &mpmocks.Mempool{}
mp.On("Lock").Return()
mp.On("Unlock").Return()
mp.On("PreUpdate").Return()
mp.On("FlushAppConn", mock.Anything).Return(nil)
mp.On("Update",
mock.Anything,
Expand Down Expand Up @@ -478,6 +480,7 @@ func TestEndBlockValidatorUpdates(t *testing.T) {
mp := &mpmocks.Mempool{}
mp.On("Lock").Return()
mp.On("Unlock").Return()
mp.On("PreUpdate").Return()
mp.On("FlushAppConn", mock.Anything).Return(nil)
mp.On("Update",
mock.Anything,
Expand Down Expand Up @@ -604,6 +607,7 @@ func TestEmptyPrepareProposal(t *testing.T) {
mp := &mpmocks.Mempool{}
mp.On("Lock").Return()
mp.On("Unlock").Return()
mp.On("PreUpdate").Return()
mp.On("FlushAppConn", mock.Anything).Return(nil)
mp.On("Update",
mock.Anything,
Expand Down
3 changes: 3 additions & 0 deletions state/validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func TestValidateBlockHeader(t *testing.T) {
mp := &mpmocks.Mempool{}
mp.On("Lock").Return()
mp.On("Unlock").Return()
mp.On("PreUpdate").Return()
mp.On("FlushAppConn", mock.Anything).Return(nil)
mp.On("Update",
mock.Anything,
Expand Down Expand Up @@ -120,6 +121,7 @@ func TestValidateBlockCommit(t *testing.T) {
mp := &mpmocks.Mempool{}
mp.On("Lock").Return()
mp.On("Unlock").Return()
mp.On("PreUpdate").Return()
mp.On("FlushAppConn", mock.Anything).Return(nil)
mp.On("Update",
mock.Anything,
Expand Down Expand Up @@ -259,6 +261,7 @@ func TestValidateBlockEvidence(t *testing.T) {
mp := &mpmocks.Mempool{}
mp.On("Lock").Return()
mp.On("Unlock").Return()
mp.On("PreUpdate").Return()
mp.On("FlushAppConn", mock.Anything).Return(nil)
mp.On("Update",
mock.Anything,
Expand Down

0 comments on commit df1483c

Please sign in to comment.