From 63d046dd59f06855416e3570304fbaf9ce6aedfd Mon Sep 17 00:00:00 2001 From: Aleksandr Bezobchuk Date: Sun, 24 Sep 2023 15:24:30 -0400 Subject: [PATCH] fix(baseapp): select txs correctly with no-op mempool (#17769) --- CHANGELOG.md | 1 + baseapp/abci_utils.go | 153 ++++++++++++++++++++++++++----------- baseapp/abci_utils_test.go | 95 +++++++++++++++++++++++ baseapp/baseapp.go | 8 ++ 4 files changed, 212 insertions(+), 45 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a8b7fb2afd87..e639a38361a3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -62,6 +62,7 @@ Ref: https://keepachangelog.com/en/1.0.0/ ### Bug Fixes +* (baseapp) [#17769](https://github.com/cosmos/cosmos-sdk/pull/17769) Ensure we respect block size constraints in the `DefaultProposalHandler`'s `PrepareProposal` handler when a nil or no-op mempool is used. We provide a `TxSelector` type to assist in making transaction selection generalized. We also fix a comparison bug in tx selection when `req.maxTxBytes` is reached. * (types) [#16583](https://github.com/cosmos/cosmos-sdk/pull/16583), [#17372](https://github.com/cosmos/cosmos-sdk/pull/17372), [#17421](https://github.com/cosmos/cosmos-sdk/pull/17421), [#17713](https://github.com/cosmos/cosmos-sdk/pull/17713) Introduce `PreBlock`, which executes in `FinalizeBlock` before `BeginBlock`. It allows the application to modify consensus parameters and have access to VE state. Note, `PreFinalizeBlockHook` is replaced by`PreBlocker`. * (baseapp) [#17518](https://github.com/cosmos/cosmos-sdk/pull/17518) Utilizing voting power from vote extensions (CometBFT) instead of the current bonded tokens (x/staking) to determine if a set of vote extensions are valid. * (config) [#17649](https://github.com/cosmos/cosmos-sdk/pull/17649) Fix `mempool.max-txs` configuration is invalid in `app.config`. diff --git a/baseapp/abci_utils.go b/baseapp/abci_utils.go index 4b2568d57dd7..9a8da22c191c 100644 --- a/baseapp/abci_utils.go +++ b/baseapp/abci_utils.go @@ -139,6 +139,8 @@ type ( ProposalTxVerifier interface { PrepareProposalVerifyTx(tx sdk.Tx) ([]byte, error) ProcessProposalVerifyTx(txBz []byte) (sdk.Tx, error) + TxDecode(txBz []byte) (sdk.Tx, error) + TxEncode(tx sdk.Tx) ([]byte, error) } // DefaultProposalHandler defines the default ABCI PrepareProposal and @@ -146,16 +148,23 @@ type ( DefaultProposalHandler struct { mempool mempool.Mempool txVerifier ProposalTxVerifier + txSelector TxSelector } ) -func NewDefaultProposalHandler(mp mempool.Mempool, txVerifier ProposalTxVerifier) DefaultProposalHandler { - return DefaultProposalHandler{ +func NewDefaultProposalHandler(mp mempool.Mempool, txVerifier ProposalTxVerifier) *DefaultProposalHandler { + return &DefaultProposalHandler{ mempool: mp, txVerifier: txVerifier, + txSelector: NewDefaultTxSelector(), } } +// SetTxSelector sets the TxSelector function on the DefaultProposalHandler. +func (h *DefaultProposalHandler) SetTxSelector(ts TxSelector) { + h.txSelector = ts +} + // PrepareProposalHandler returns the default implementation for processing an // ABCI proposal. The application's mempool is enumerated and all valid // transactions are added to the proposal. Transactions are valid if they: @@ -176,28 +185,37 @@ func NewDefaultProposalHandler(mp mempool.Mempool, txVerifier ProposalTxVerifier // - If no mempool is set or if the mempool is a no-op mempool, the transactions // requested from CometBFT will simply be returned, which, by default, are in // FIFO order. -func (h DefaultProposalHandler) PrepareProposalHandler() sdk.PrepareProposalHandler { +func (h *DefaultProposalHandler) PrepareProposalHandler() sdk.PrepareProposalHandler { return func(ctx sdk.Context, req *abci.RequestPrepareProposal) (*abci.ResponsePrepareProposal, error) { + var maxBlockGas uint64 + if b := ctx.ConsensusParams().Block; b != nil { + maxBlockGas = uint64(b.MaxGas) + } + + defer h.txSelector.Clear() + // If the mempool is nil or NoOp we simply return the transactions // requested from CometBFT, which, by default, should be in FIFO order. + // + // Note, we still need to ensure the transactions returned respect req.MaxTxBytes. _, isNoOp := h.mempool.(mempool.NoOpMempool) if h.mempool == nil || isNoOp { - return &abci.ResponsePrepareProposal{Txs: req.Txs}, nil - } + for _, txBz := range req.Txs { + tx, err := h.txVerifier.TxDecode(txBz) + if err != nil { + return nil, err + } - var maxBlockGas int64 - if b := ctx.ConsensusParams().Block; b != nil { - maxBlockGas = b.MaxGas - } + stop := h.txSelector.SelectTxForProposal(uint64(req.MaxTxBytes), maxBlockGas, tx, txBz) + if stop { + break + } + } - var ( - selectedTxs [][]byte - totalTxBytes int64 - totalTxGas uint64 - ) + return &abci.ResponsePrepareProposal{Txs: h.txSelector.SelectedTxs()}, nil + } iterator := h.mempool.Select(ctx, req.Txs) - for iterator != nil { memTx := iterator.Tx() @@ -205,40 +223,15 @@ func (h DefaultProposalHandler) PrepareProposalHandler() sdk.PrepareProposalHand // which calls mempool.Insert, in theory everything in the pool should be // valid. But some mempool implementations may insert invalid txs, so we // check again. - bz, err := h.txVerifier.PrepareProposalVerifyTx(memTx) + txBz, err := h.txVerifier.PrepareProposalVerifyTx(memTx) if err != nil { err := h.mempool.Remove(memTx) if err != nil && !errors.Is(err, mempool.ErrTxNotFound) { return nil, err } } else { - var txGasLimit uint64 - txSize := int64(len(bz)) - - gasTx, ok := memTx.(GasTx) - if ok { - txGasLimit = gasTx.GetGas() - } - - // only add the transaction to the proposal if we have enough capacity - if (txSize + totalTxBytes) < req.MaxTxBytes { - // If there is a max block gas limit, add the tx only if the limit has - // not been met. - if maxBlockGas > 0 { - if (txGasLimit + totalTxGas) <= uint64(maxBlockGas) { - totalTxGas += txGasLimit - totalTxBytes += txSize - selectedTxs = append(selectedTxs, bz) - } - } else { - totalTxBytes += txSize - selectedTxs = append(selectedTxs, bz) - } - } - - // Check if we've reached capacity. If so, we cannot select any more - // transactions. - if totalTxBytes >= req.MaxTxBytes || (maxBlockGas > 0 && (totalTxGas >= uint64(maxBlockGas))) { + stop := h.txSelector.SelectTxForProposal(uint64(req.MaxTxBytes), maxBlockGas, memTx, txBz) + if stop { break } } @@ -246,7 +239,7 @@ func (h DefaultProposalHandler) PrepareProposalHandler() sdk.PrepareProposalHand iterator = iterator.Next() } - return &abci.ResponsePrepareProposal{Txs: selectedTxs}, nil + return &abci.ResponsePrepareProposal{Txs: h.txSelector.SelectedTxs()}, nil } } @@ -261,7 +254,7 @@ func (h DefaultProposalHandler) PrepareProposalHandler() sdk.PrepareProposalHand // DefaultPrepareProposal. It is very important that the same validation logic // is used in both steps, and applications must ensure that this is the case in // non-default handlers. -func (h DefaultProposalHandler) ProcessProposalHandler() sdk.ProcessProposalHandler { +func (h *DefaultProposalHandler) ProcessProposalHandler() sdk.ProcessProposalHandler { // If the mempool is nil or NoOp we simply return ACCEPT, // because PrepareProposal may have included txs that could fail verification. _, isNoOp := h.mempool.(mempool.NoOpMempool) @@ -330,3 +323,73 @@ func NoOpVerifyVoteExtensionHandler() sdk.VerifyVoteExtensionHandler { return &abci.ResponseVerifyVoteExtension{Status: abci.ResponseVerifyVoteExtension_ACCEPT}, nil } } + +// TxSelector defines a helper type that assists in selecting transactions during +// mempool transaction selection in PrepareProposal. It keeps track of the total +// number of bytes and total gas of the selected transactions. It also keeps +// track of the selected transactions themselves. +type TxSelector interface { + // SelectedTxs should return a copy of the selected transactions. + SelectedTxs() [][]byte + + // Clear should clear the TxSelector, nulling out all relevant fields. + Clear() + + // SelectTxForProposal should attempt to select a transaction for inclusion in + // a proposal based on inclusion criteria defined by the TxSelector. It must + // return if the caller should halt the transaction selection loop + // (typically over a mempool) or otherwise. + SelectTxForProposal(maxTxBytes, maxBlockGas uint64, memTx sdk.Tx, txBz []byte) bool +} + +type defaultTxSelector struct { + totalTxBytes uint64 + totalTxGas uint64 + selectedTxs [][]byte +} + +func NewDefaultTxSelector() TxSelector { + return &defaultTxSelector{} +} + +func (ts *defaultTxSelector) SelectedTxs() [][]byte { + txs := make([][]byte, len(ts.selectedTxs)) + copy(txs, ts.selectedTxs) + return txs +} + +func (ts *defaultTxSelector) Clear() { + ts.totalTxBytes = 0 + ts.totalTxGas = 0 + ts.selectedTxs = nil +} + +func (ts *defaultTxSelector) SelectTxForProposal(maxTxBytes, maxBlockGas uint64, memTx sdk.Tx, txBz []byte) bool { + txSize := uint64(len(txBz)) + + var txGasLimit uint64 + if memTx != nil { + if gasTx, ok := memTx.(GasTx); ok { + txGasLimit = gasTx.GetGas() + } + } + + // only add the transaction to the proposal if we have enough capacity + if (txSize + ts.totalTxBytes) <= maxTxBytes { + // If there is a max block gas limit, add the tx only if the limit has + // not been met. + if maxBlockGas > 0 { + if (txGasLimit + ts.totalTxGas) <= maxBlockGas { + ts.totalTxGas += txGasLimit + ts.totalTxBytes += txSize + ts.selectedTxs = append(ts.selectedTxs, txBz) + } + } else { + ts.totalTxBytes += txSize + ts.selectedTxs = append(ts.selectedTxs, txBz) + } + } + + // check if we've reached capacity; if so, we cannot select any more transactions + return ts.totalTxBytes >= maxTxBytes || (maxBlockGas > 0 && (ts.totalTxGas >= maxBlockGas)) +} diff --git a/baseapp/abci_utils_test.go b/baseapp/abci_utils_test.go index 8c7fda91de95..8919ee81ba8c 100644 --- a/baseapp/abci_utils_test.go +++ b/baseapp/abci_utils_test.go @@ -8,14 +8,22 @@ import ( "github.com/cometbft/cometbft/crypto/secp256k1" cmtprotocrypto "github.com/cometbft/cometbft/proto/tendermint/crypto" cmtproto "github.com/cometbft/cometbft/proto/tendermint/types" + dbm "github.com/cosmos/cosmos-db" protoio "github.com/cosmos/gogoproto/io" "github.com/cosmos/gogoproto/proto" "github.com/golang/mock/gomock" "github.com/stretchr/testify/suite" + "cosmossdk.io/log" + "github.com/cosmos/cosmos-sdk/baseapp" + baseapptestutil "github.com/cosmos/cosmos-sdk/baseapp/testutil" "github.com/cosmos/cosmos-sdk/baseapp/testutil/mock" + codectestutil "github.com/cosmos/cosmos-sdk/codec/testutil" + "github.com/cosmos/cosmos-sdk/testutil/testdata" sdk "github.com/cosmos/cosmos-sdk/types" + "github.com/cosmos/cosmos-sdk/types/mempool" + authtx "github.com/cosmos/cosmos-sdk/x/auth/tx" ) const ( @@ -274,6 +282,93 @@ func (s *ABCIUtilsTestSuite) TestValidateVoteExtensionsTwoVotesNilAbsent() { s.Require().Error(baseapp.ValidateVoteExtensions(s.ctx, s.valStore, 3, chainID, llc)) } +func (s *ABCIUtilsTestSuite) TestDefaultProposalHandler_NoOpMempoolTxSelection() { + // create a codec for marshaling + cdc := codectestutil.CodecOptions{}.NewCodec() + baseapptestutil.RegisterInterfaces(cdc.InterfaceRegistry()) + + // create a baseapp along with a tx config for tx generation + txConfig := authtx.NewTxConfig(cdc, authtx.DefaultSignModes) + app := baseapp.NewBaseApp(s.T().Name(), log.NewNopLogger(), dbm.NewMemDB(), txConfig.TxDecoder()) + + // create a proposal handler + ph := baseapp.NewDefaultProposalHandler(mempool.NoOpMempool{}, app) + handler := ph.PrepareProposalHandler() + + // build a tx + _, _, addr := testdata.KeyTestPubAddr() + builder := txConfig.NewTxBuilder() + s.Require().NoError(builder.SetMsgs( + &baseapptestutil.MsgCounter{Counter: 0, FailOnHandler: false, Signer: addr.String()}, + )) + builder.SetGasLimit(100) + setTxSignature(s.T(), builder, 0) + + // encode the tx to be used in the proposal request + tx := builder.GetTx() + txBz, err := txConfig.TxEncoder()(tx) + s.Require().NoError(err) + s.Require().Len(txBz, 152) + + testCases := map[string]struct { + ctx sdk.Context + req *abci.RequestPrepareProposal + expectedTxs int + }{ + "small max tx bytes": { + ctx: s.ctx, + req: &abci.RequestPrepareProposal{ + Txs: [][]byte{txBz, txBz, txBz, txBz, txBz}, + MaxTxBytes: 10, + }, + expectedTxs: 0, + }, + "small max gas": { + ctx: s.ctx.WithConsensusParams(cmtproto.ConsensusParams{ + Block: &cmtproto.BlockParams{ + MaxGas: 10, + }, + }), + req: &abci.RequestPrepareProposal{ + Txs: [][]byte{txBz, txBz, txBz, txBz, txBz}, + MaxTxBytes: 456, + }, + expectedTxs: 0, + }, + "large max tx bytes": { + ctx: s.ctx, + req: &abci.RequestPrepareProposal{ + Txs: [][]byte{txBz, txBz, txBz, txBz, txBz}, + MaxTxBytes: 456, + }, + expectedTxs: 3, + }, + "max gas and tx bytes": { + ctx: s.ctx.WithConsensusParams(cmtproto.ConsensusParams{ + Block: &cmtproto.BlockParams{ + MaxGas: 200, + }, + }), + req: &abci.RequestPrepareProposal{ + Txs: [][]byte{txBz, txBz, txBz, txBz, txBz}, + MaxTxBytes: 456, + }, + expectedTxs: 2, + }, + } + + for name, tc := range testCases { + s.Run(name, func() { + // iterate multiple times to ensure the tx selector is cleared each time + for i := 0; i < 5; i++ { + resp, err := handler(tc.ctx, tc.req) + s.Require().NoError(err) + s.Require().Len(resp.Txs, tc.expectedTxs) + } + }) + } +} + func marshalDelimitedFn(msg proto.Message) ([]byte, error) { var buf bytes.Buffer if err := protoio.NewDelimitedWriter(&buf).WriteMsg(msg); err != nil { diff --git a/baseapp/baseapp.go b/baseapp/baseapp.go index 691c48707f09..d890d8022044 100644 --- a/baseapp/baseapp.go +++ b/baseapp/baseapp.go @@ -1092,6 +1092,14 @@ func (app *BaseApp) ProcessProposalVerifyTx(txBz []byte) (sdk.Tx, error) { return tx, nil } +func (app *BaseApp) TxDecode(txBytes []byte) (sdk.Tx, error) { + return app.txDecoder(txBytes) +} + +func (app *BaseApp) TxEncode(tx sdk.Tx) ([]byte, error) { + return app.txEncoder(tx) +} + // Close is called in start cmd to gracefully cleanup resources. func (app *BaseApp) Close() error { var errs []error