Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: remove mempool.postCheck #158

Merged
merged 10 commits into from
Jan 6, 2021
2 changes: 1 addition & 1 deletion mempool/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func TestCacheAfterUpdate(t *testing.T) {
tx := types.Tx{byte(v)}
updateTxs = append(updateTxs, tx)
}
mempool.Update(int64(tcIndex), updateTxs, abciResponses(len(updateTxs), abci.CodeTypeOK), nil, nil)
mempool.Update(int64(tcIndex), updateTxs, abciResponses(len(updateTxs), abci.CodeTypeOK), nil)

for _, v := range tc.reAddIndices {
tx := types.Tx{byte(v)}
Expand Down
84 changes: 51 additions & 33 deletions mempool/clist_mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ type CListMempool struct {
height int64 // the last block Update()'d to
txsBytes int64 // total size of mempool, in bytes

reserved int // the number of checking tx and it should be considered when checking mempool full
reservedBytes int64 // size of checking tx and it should be considered when checking mempool full
reservedMtx sync.Mutex

// notify listeners (ie. consensus) when txs are available
notifiedTxsAvailable bool
txsAvailable chan struct{} // fires once for each height, when the mempool is not empty
Expand All @@ -43,7 +47,6 @@ type CListMempool struct {
// CheckTx or ReapMaxBytesMaxGas(ReapMaxTxs) methods.
updateMtx sync.RWMutex
preCheck PreCheckFunc
postCheck PostCheckFunc

wal *auto.AutoFile // a log of mempool txs
txs *clist.CList // concurrent linked-list of good txs
Expand Down Expand Up @@ -118,12 +121,6 @@ func WithPreCheck(f PreCheckFunc) CListMempoolOption {
return func(mem *CListMempool) { mem.preCheck = f }
}

// WithPostCheck sets a filter for the mempool to reject a tx if f(tx) returns
// false. This is ran after CheckTx.
func WithPostCheck(f PostCheckFunc) CListMempoolOption {
return func(mem *CListMempool) { mem.postCheck = f }
}

// WithMetrics sets the metrics.
func WithMetrics(metrics *Metrics) CListMempoolOption {
return func(mem *CListMempool) { mem.metrics = metrics }
Expand Down Expand Up @@ -278,9 +275,19 @@ func (mem *CListMempool) CheckTx(tx types.Tx, cb func(*abci.Response), txInfo Tx

// NOTE: proxyAppConn may error if tx buffer is full
if err := mem.proxyAppConn.Error(); err != nil {
// remove from cache
mem.cache.Remove(tx)
return err
}

// reserve mempool that should be called just before calling `mem.proxyAppConn.CheckTxAsync()`
if err := mem.reserve(int64(txSize)); err != nil {
// remove from cache
mem.cache.Remove(tx)
return err
}

// CONTRACT: `app.CheckTxAsync()` should check whether `GasWanted` is valid (0 <= GasWanted <= block.masGas)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added a CONTRACT comment.

reqRes := mem.proxyAppConn.CheckTxAsync(abci.RequestCheckTx{Tx: tx})
reqRes.SetCallback(mem.reqResCb(tx, txInfo.SenderID, txInfo.SenderP2PID, cb))

Expand Down Expand Up @@ -380,6 +387,35 @@ func (mem *CListMempool) isFull(txSize int) error {
return nil
}

func (mem *CListMempool) reserve(txSize int64) error {
mem.reservedMtx.Lock()
defer mem.reservedMtx.Unlock()

var (
memSize = mem.Size()
txsBytes = mem.TxsBytes()
)

if memSize+mem.reserved >= mem.config.Size || txSize+mem.reservedBytes+txsBytes > mem.config.MaxTxsBytes {
return ErrMempoolIsFull{
memSize + mem.reserved, mem.config.Size,
txsBytes + mem.reservedBytes, mem.config.MaxTxsBytes,
}
}

mem.reserved++
mem.reservedBytes += txSize
return nil
}

func (mem *CListMempool) releaseReserve(txSize int64) {
mem.reservedMtx.Lock()
defer mem.reservedMtx.Unlock()

mem.reserved--
mem.reservedBytes -= txSize
}

// callback, which is called after the app checked the tx for the first time.
//
// The case where the app checks the tx for the second and subsequent times is
Expand All @@ -392,20 +428,7 @@ func (mem *CListMempool) resCbFirstTime(
) {
switch r := res.Value.(type) {
case *abci.Response_CheckTx:
var postCheckErr error
if mem.postCheck != nil {
postCheckErr = mem.postCheck(tx, r.CheckTx)
}
if (r.CheckTx.Code == abci.CodeTypeOK) && postCheckErr == nil {
// Check mempool isn't full again to reduce the chance of exceeding the
// limits.
if err := mem.isFull(len(tx)); err != nil {
// remove from cache (mempool might have a space later)
mem.cache.Remove(tx)
mem.logger.Error(err.Error())
return
}

if r.CheckTx.Code == abci.CodeTypeOK {
memTx := &mempoolTx{
height: mem.height,
gasWanted: r.CheckTx.GasWanted,
Expand All @@ -423,11 +446,14 @@ func (mem *CListMempool) resCbFirstTime(
} else {
// ignore bad transaction
mem.logger.Info("Rejected bad transaction",
"tx", txID(tx), "peerID", peerP2PID, "res", r, "err", postCheckErr)
"tx", txID(tx), "peerID", peerP2PID, "res", r)
mem.metrics.FailedTxs.Add(1)
// remove from cache (it might be good later)
mem.cache.Remove(tx)
}

// release `reserve` regardless it's OK or not (it might be good later)
mem.releaseReserve(int64(len(tx)))
default:
// ignore other messages
}
Expand All @@ -448,15 +474,11 @@ func (mem *CListMempool) resCbRecheck(req *abci.Request, res *abci.Response) {
memTx.tx,
tx))
}
var postCheckErr error
if mem.postCheck != nil {
postCheckErr = mem.postCheck(tx, r.CheckTx)
}
if (r.CheckTx.Code == abci.CodeTypeOK) && postCheckErr == nil {
if r.CheckTx.Code == abci.CodeTypeOK {
// Good, nothing to do.
} else {
// Tx became invalidated due to newly committed block.
mem.logger.Info("Tx is no longer valid", "tx", txID(tx), "res", r, "err", postCheckErr)
mem.logger.Info("Tx is no longer valid", "tx", txID(tx), "res", r)
// NOTE: we remove tx from the cache because it might be good later
mem.removeTx(tx, mem.recheckCursor, true)
}
Expand Down Expand Up @@ -550,13 +572,12 @@ func (mem *CListMempool) ReapMaxTxs(max int) types.Txs {
return txs
}

// Lock() must be help by the caller during execution.
// Lock() must be held by the caller during execution.
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix typo

func (mem *CListMempool) Update(
height int64,
txs types.Txs,
deliverTxResponses []*abci.ResponseDeliverTx,
preCheck PreCheckFunc,
postCheck PostCheckFunc,
) error {
// Set height
mem.height = height
Expand All @@ -565,9 +586,6 @@ func (mem *CListMempool) Update(
if preCheck != nil {
mem.preCheck = preCheck
}
if postCheck != nil {
mem.postCheck = postCheck
}

for i, tx := range txs {
if deliverTxResponses[i].Code == abci.CodeTypeOK {
Expand Down
37 changes: 14 additions & 23 deletions mempool/clist_mempool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,31 +147,22 @@ func TestMempoolFilters(t *testing.T) {
emptyTxArr := []types.Tx{[]byte{}}

nopPreFilter := func(tx types.Tx) error { return nil }
nopPostFilter := func(tx types.Tx, res *abci.ResponseCheckTx) error { return nil }

// each table driven test creates numTxsToCreate txs with checkTx, and at the end clears all remaining txs.
// each tx has 20 bytes + amino overhead = 21 bytes, 1 gas
tests := []struct {
numTxsToCreate int
preFilter PreCheckFunc
postFilter PostCheckFunc
expectedNumTxs int
}{
{10, nopPreFilter, nopPostFilter, 10},
{10, PreCheckAminoMaxBytes(10), nopPostFilter, 0},
{10, PreCheckAminoMaxBytes(20), nopPostFilter, 0},
{10, PreCheckAminoMaxBytes(22), nopPostFilter, 10},
{10, nopPreFilter, PostCheckMaxGas(-1), 10},
{10, nopPreFilter, PostCheckMaxGas(0), 0},
{10, nopPreFilter, PostCheckMaxGas(1), 10},
{10, nopPreFilter, PostCheckMaxGas(3000), 10},
{10, PreCheckAminoMaxBytes(10), PostCheckMaxGas(20), 0},
{10, PreCheckAminoMaxBytes(30), PostCheckMaxGas(20), 10},
{10, PreCheckAminoMaxBytes(22), PostCheckMaxGas(1), 10},
{10, PreCheckAminoMaxBytes(22), PostCheckMaxGas(0), 0},
{10, nopPreFilter, 10},
{10, PreCheckAminoMaxBytes(10), 0},
{10, PreCheckAminoMaxBytes(20), 0},
{10, PreCheckAminoMaxBytes(22), 10},
{10, PreCheckAminoMaxBytes(30), 10},
}
for tcIndex, tt := range tests {
mempool.Update(1, emptyTxArr, abciResponses(len(emptyTxArr), abci.CodeTypeOK), tt.preFilter, tt.postFilter)
mempool.Update(1, emptyTxArr, abciResponses(len(emptyTxArr), abci.CodeTypeOK), tt.preFilter)
checkTxs(t, mempool, tt.numTxsToCreate, UnknownPeerID)
require.Equal(t, tt.expectedNumTxs, mempool.Size(), "mempool had the incorrect size, on test case %d", tcIndex)
mempool.Flush()
Expand All @@ -186,7 +177,7 @@ func TestMempoolUpdate(t *testing.T) {

// 1. Adds valid txs to the cache
{
mempool.Update(1, []types.Tx{[]byte{0x01}}, abciResponses(1, abci.CodeTypeOK), nil, nil)
mempool.Update(1, []types.Tx{[]byte{0x01}}, abciResponses(1, abci.CodeTypeOK), nil)
err := mempool.CheckTx([]byte{0x01}, nil, TxInfo{})
if assert.Error(t, err) {
assert.Equal(t, ErrTxInCache, err)
Expand All @@ -197,15 +188,15 @@ func TestMempoolUpdate(t *testing.T) {
{
err := mempool.CheckTx([]byte{0x02}, nil, TxInfo{})
require.NoError(t, err)
mempool.Update(1, []types.Tx{[]byte{0x02}}, abciResponses(1, abci.CodeTypeOK), nil, nil)
mempool.Update(1, []types.Tx{[]byte{0x02}}, abciResponses(1, abci.CodeTypeOK), nil)
assert.Zero(t, mempool.Size())
}

// 3. Removes invalid transactions from the cache and the mempool (if present)
{
err := mempool.CheckTx([]byte{0x03}, nil, TxInfo{})
require.NoError(t, err)
mempool.Update(1, []types.Tx{[]byte{0x03}}, abciResponses(1, 1), nil, nil)
mempool.Update(1, []types.Tx{[]byte{0x03}}, abciResponses(1, 1), nil)
assert.Zero(t, mempool.Size())

err = mempool.CheckTx([]byte{0x03}, nil, TxInfo{})
Expand Down Expand Up @@ -234,7 +225,7 @@ func TestTxsAvailable(t *testing.T) {
// it should fire once now for the new height
// since there are still txs left
committedTxs, txs := txs[:50], txs[50:]
if err := mempool.Update(1, committedTxs, abciResponses(len(committedTxs), abci.CodeTypeOK), nil, nil); err != nil {
if err := mempool.Update(1, committedTxs, abciResponses(len(committedTxs), abci.CodeTypeOK), nil); err != nil {
t.Error(err)
}
ensureFire(t, mempool.TxsAvailable(), timeoutMS)
Expand All @@ -246,7 +237,7 @@ func TestTxsAvailable(t *testing.T) {

// now call update with all the txs. it should not fire as there are no txs left
committedTxs = append(txs, moreTxs...) //nolint: gocritic
if err := mempool.Update(2, committedTxs, abciResponses(len(committedTxs), abci.CodeTypeOK), nil, nil); err != nil {
if err := mempool.Update(2, committedTxs, abciResponses(len(committedTxs), abci.CodeTypeOK), nil); err != nil {
t.Error(err)
}
ensureNoFire(t, mempool.TxsAvailable(), timeoutMS)
Expand Down Expand Up @@ -305,7 +296,7 @@ func TestSerialReap(t *testing.T) {
binary.BigEndian.PutUint64(txBytes, uint64(i))
txs = append(txs, txBytes)
}
if err := mempool.Update(0, txs, abciResponses(len(txs), abci.CodeTypeOK), nil, nil); err != nil {
if err := mempool.Update(0, txs, abciResponses(len(txs), abci.CodeTypeOK), nil); err != nil {
t.Error(err)
}
}
Expand Down Expand Up @@ -489,7 +480,7 @@ func TestMempoolTxsBytes(t *testing.T) {
assert.EqualValues(t, 1, mempool.TxsBytes())

// 3. zero again after tx is removed by Update
mempool.Update(1, []types.Tx{[]byte{0x01}}, abciResponses(1, abci.CodeTypeOK), nil, nil)
mempool.Update(1, []types.Tx{[]byte{0x01}}, abciResponses(1, abci.CodeTypeOK), nil)
assert.EqualValues(t, 0, mempool.TxsBytes())

// 4. zero after Flush
Expand Down Expand Up @@ -534,7 +525,7 @@ func TestMempoolTxsBytes(t *testing.T) {
require.NotEmpty(t, res2.Data)

// Pretend like we committed nothing so txBytes gets rechecked and removed.
mempool.Update(1, []types.Tx{}, abciResponses(0, abci.CodeTypeOK), nil, nil)
mempool.Update(1, []types.Tx{}, abciResponses(0, abci.CodeTypeOK), nil)
assert.EqualValues(t, 0, mempool.TxsBytes())
}

Expand Down
25 changes: 0 additions & 25 deletions mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ type Mempool interface {
blockTxs types.Txs,
deliverTxResponses []*abci.ResponseDeliverTx,
newPreFn PreCheckFunc,
newPostFn PostCheckFunc,
) error

// Flush removes all transactions from the mempool and cache
Expand Down Expand Up @@ -80,11 +79,6 @@ type Mempool interface {
// transaction doesn't exceeded the block size.
type PreCheckFunc func(types.Tx) error

// PostCheckFunc is an optional filter executed after CheckTx and rejects
// transaction if false is returned. An example would be to ensure a
// transaction doesn't require more gas than available for the block.
type PostCheckFunc func(types.Tx, *abci.ResponseCheckTx) error

// TxInfo are parameters that get passed when attempting to add a tx to the
// mempool.
type TxInfo struct {
Expand Down Expand Up @@ -115,22 +109,3 @@ func PreCheckAminoMaxBytes(maxBytes int64) PreCheckFunc {
return nil
}
}

// PostCheckMaxGas checks that the wanted gas is smaller or equal to the passed
// maxGas. Returns nil if maxGas is -1.
func PostCheckMaxGas(maxGas int64) PostCheckFunc {
return func(tx types.Tx, res *abci.ResponseCheckTx) error {
if maxGas == -1 {
return nil
}
if res.GasWanted < 0 {
return fmt.Errorf("gas wanted %d is negative",
res.GasWanted)
}
if res.GasWanted > maxGas {
return fmt.Errorf("gas wanted %d is greater than max gas %d",
res.GasWanted, maxGas)
}
return nil
}
}
1 change: 0 additions & 1 deletion mock/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ func (Mempool) Update(
_ types.Txs,
_ []*abci.ResponseDeliverTx,
_ mempl.PreCheckFunc,
_ mempl.PostCheckFunc,
) error {
return nil
}
Expand Down
1 change: 0 additions & 1 deletion node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,6 @@ func createMempoolAndMempoolReactor(config *cfg.Config, proxyApp proxy.AppConns,
state.LastBlockHeight,
mempl.WithMetrics(memplMetrics),
mempl.WithPreCheck(sm.TxPreCheck(state)),
mempl.WithPostCheck(sm.TxPostCheck(state)),
)
mempoolLogger := logger.With("module", "mempool")
mempoolReactor := mempl.NewReactor(config.Mempool, mempool)
Expand Down
1 change: 0 additions & 1 deletion node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,6 @@ func TestCreateProposalBlock(t *testing.T) {
state.LastBlockHeight,
mempl.WithMetrics(memplMetrics),
mempl.WithPreCheck(sm.TxPreCheck(state)),
mempl.WithPostCheck(sm.TxPostCheck(state)),
)
mempool.SetLogger(logger)

Expand Down
8 changes: 1 addition & 7 deletions state/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,13 +240,7 @@ func (blockExec *BlockExecutor) Commit(

// Update mempool.
updateMempoolStartTime := time.Now().UnixNano()
err = blockExec.mempool.Update(
block.Height,
block.Txs,
deliverTxResponses,
TxPreCheck(state),
TxPostCheck(state),
)
err = blockExec.mempool.Update(block.Height, block.Txs, deliverTxResponses, TxPreCheck(state))
updateMempoolEndTime := time.Now().UnixNano()

updateMempoolTimeMs := float64(updateMempoolEndTime-updateMempoolStartTime) / 1000000
Expand Down
6 changes: 0 additions & 6 deletions state/tx_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,3 @@ func TxPreCheck(state State) mempl.PreCheckFunc {
)
return mempl.PreCheckAminoMaxBytes(maxDataBytes)
}

// TxPostCheck returns a function to filter transactions after processing.
// The function limits the gas wanted by a transaction to the block's maximum total gas.
func TxPostCheck(state State) mempl.PostCheckFunc {
return mempl.PostCheckMaxGas(state.ConsensusParams.Block.MaxGas)
}