Skip to content

Commit

Permalink
Add ResetUpdate, so non-block proposers no longer fail to sync (#113)
Browse files Browse the repository at this point in the history
* Add ResetUpdate (needs test)

* Fix one more build
  • Loading branch information
ValarDragon authored Jun 20, 2024
1 parent 4b9bace commit 158bcbe
Show file tree
Hide file tree
Showing 10 changed files with 37 additions and 2 deletions.
1 change: 1 addition & 0 deletions blocksync/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ func newReactor(
mp := &mpmocks.Mempool{}
mp.On("Lock").Return()
mp.On("Unlock").Return()
mp.On("ResetUpdate").Return()
mp.On("FlushAppConn", mock.Anything).Return(nil)
mp.On("Update",
mock.Anything,
Expand Down
1 change: 1 addition & 0 deletions consensus/replay_stubs.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ var _ mempl.Mempool = emptyMempool{}

func (emptyMempool) Lock() {}
func (emptyMempool) Unlock() {}
func (emptyMempool) ResetUpdate() {}
func (emptyMempool) Size() int { return 0 }
func (emptyMempool) SizeBytes() int64 { return 0 }
func (emptyMempool) CheckTx(_ types.Tx, _ func(*abci.Response), _ mempl.TxInfo) error {
Expand Down
4 changes: 4 additions & 0 deletions mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ type Mempool interface {
// Unlock unlocks the mempool.
Unlock()

// Cancel's any ongoing mempool Update. Safe to call while Mempool is locked.
// As a consequence, the Lock'd thread is expected to terminate soon after this is called.
ResetUpdate()

// Update informs the mempool that the given txs were committed and can be
// discarded.
//
Expand Down
5 changes: 5 additions & 0 deletions mempool/mocks/mempool.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions mempool/nop_mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ func (*NopMempool) Lock() {}
// Unlock does nothing.
func (*NopMempool) Unlock() {}

func (*NopMempool) ResetUpdate() {}

// Update does nothing.
func (*NopMempool) Update(
int64,
Expand Down
16 changes: 14 additions & 2 deletions mempool/v0/clist_mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,9 @@ type CListMempool struct {
// Track whether we're rechecking txs.
// These are not protected by a mutex and are expected to be mutated in
// serial (ie. by abci responses which are called in serial).
recheckCursor *clist.CElement // next expected response
recheckEnd *clist.CElement // re-checking stops here
recheckCursor *clist.CElement // next expected response
recheckEnd *clist.CElement // re-checking stops here
stopRechecking atomic.Bool

// Map for quick access to txs to record sender in CheckTx.
// txsMap: txKey -> CElement
Expand Down Expand Up @@ -140,6 +141,11 @@ func (mem *CListMempool) Unlock() {
mem.updateMtx.Unlock()
}

// Safe for concurrent use by multiple goroutines.
func (mem *CListMempool) ResetUpdate() {
mem.stopRechecking.Store(true)
}

// Safe for concurrent use by multiple goroutines.
func (mem *CListMempool) Size() int {
return mem.txs.Len()
Expand Down Expand Up @@ -659,6 +665,7 @@ func (mem *CListMempool) Update(
}

func (mem *CListMempool) recheckTxs() {
mem.stopRechecking.Store(false)
if mem.Size() == 0 {
panic("recheckTxs is called, but the mempool is empty")
}
Expand All @@ -669,13 +676,18 @@ func (mem *CListMempool) recheckTxs() {
// Push txs to proxyAppConn
// NOTE: globalCb may be called concurrently.
for e := mem.txs.Front(); e != nil; e = e.Next() {
if mem.stopRechecking.Load() {
break
}

memTx := e.Value.(*mempoolTx)
mem.proxyAppConn.CheckTxAsync(abci.RequestCheckTx{
Tx: memTx.tx,
Type: abci.CheckTxType_Recheck,
})
}

mem.stopRechecking.Store(false)
mem.proxyAppConn.FlushAsync()
}

Expand Down
2 changes: 2 additions & 0 deletions mempool/v1/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ func (txmp *TxMempool) Lock() { txmp.mtx.Lock() }
// Unlock releases a write-lock on the mempool.
func (txmp *TxMempool) Unlock() { txmp.mtx.Unlock() }

func (txmp *TxMempool) ResetUpdate() {}

// Size returns the number of valid transactions in the mempool. It is
// thread-safe.
func (txmp *TxMempool) Size() int { return txmp.txs.Len() }
Expand Down
1 change: 1 addition & 0 deletions state/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,7 @@ func (blockExec *BlockExecutor) Commit(
block *types.Block,
deliverTxResponses []*abci.ResponseDeliverTx,
) ([]byte, int64, error) {
blockExec.mempool.ResetUpdate()
blockExec.mempool.Lock()
unlockMempool := func() { blockExec.mempool.Unlock() }

Expand Down
4 changes: 4 additions & 0 deletions state/execution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ func TestApplyBlock(t *testing.T) {
mp := &mpmocks.Mempool{}
mp.On("Lock").Return()
mp.On("Unlock").Return()
mp.On("ResetUpdate").Return()
mp.On("FlushAppConn", mock.Anything).Return(nil)
mp.On("Update",
mock.Anything,
Expand Down Expand Up @@ -222,6 +223,7 @@ func TestBeginBlockByzantineValidators(t *testing.T) {
mp := &mpmocks.Mempool{}
mp.On("Lock").Return()
mp.On("Unlock").Return()
mp.On("ResetUpdate").Return()
mp.On("FlushAppConn", mock.Anything).Return(nil)
mp.On("Update",
mock.Anything,
Expand Down Expand Up @@ -478,6 +480,7 @@ func TestEndBlockValidatorUpdates(t *testing.T) {
mp := &mpmocks.Mempool{}
mp.On("Lock").Return()
mp.On("Unlock").Return()
mp.On("ResetUpdate").Return()
mp.On("FlushAppConn", mock.Anything).Return(nil)
mp.On("Update",
mock.Anything,
Expand Down Expand Up @@ -604,6 +607,7 @@ func TestEmptyPrepareProposal(t *testing.T) {
mp := &mpmocks.Mempool{}
mp.On("Lock").Return()
mp.On("Unlock").Return()
mp.On("ResetUpdate").Return()
mp.On("FlushAppConn", mock.Anything).Return(nil)
mp.On("Update",
mock.Anything,
Expand Down
3 changes: 3 additions & 0 deletions state/validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func TestValidateBlockHeader(t *testing.T) {
mp := &mpmocks.Mempool{}
mp.On("Lock").Return()
mp.On("Unlock").Return()
mp.On("ResetUpdate").Return()
mp.On("FlushAppConn", mock.Anything).Return(nil)
mp.On("Update",
mock.Anything,
Expand Down Expand Up @@ -120,6 +121,7 @@ func TestValidateBlockCommit(t *testing.T) {
mp := &mpmocks.Mempool{}
mp.On("Lock").Return()
mp.On("Unlock").Return()
mp.On("ResetUpdate").Return()
mp.On("FlushAppConn", mock.Anything).Return(nil)
mp.On("Update",
mock.Anything,
Expand Down Expand Up @@ -259,6 +261,7 @@ func TestValidateBlockEvidence(t *testing.T) {
mp := &mpmocks.Mempool{}
mp.On("Lock").Return()
mp.On("Unlock").Return()
mp.On("ResetUpdate").Return()
mp.On("FlushAppConn", mock.Anything).Return(nil)
mp.On("Update",
mock.Anything,
Expand Down

0 comments on commit 158bcbe

Please sign in to comment.