Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(baseapp): select txs correctly with no-op mempool #17769

Merged
merged 18 commits into from
Sep 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ Ref: https://keepachangelog.com/en/1.0.0/

### Bug Fixes

* (baseapp) [#17769](https://github.com/cosmos/cosmos-sdk/pull/17769) Ensure we respect block size constraints in the `DefaultProposalHandler`'s `PrepareProposal` handler when a nil or no-op mempool is used. We provide a `TxSelector` type to assist in making transaction selection generalized. We also fix a comparison bug in tx selection when `req.maxTxBytes` is reached.
* (types) [#16583](https://github.com/cosmos/cosmos-sdk/pull/16583), [#17372](https://github.com/cosmos/cosmos-sdk/pull/17372), [#17421](https://github.com/cosmos/cosmos-sdk/pull/17421), [#17713](https://github.com/cosmos/cosmos-sdk/pull/17713) Introduce `PreBlock`, which executes in `FinalizeBlock` before `BeginBlock`. It allows the application to modify consensus parameters and have access to VE state. Note, `PreFinalizeBlockHook` is replaced by`PreBlocker`.
* (baseapp) [#17518](https://github.com/cosmos/cosmos-sdk/pull/17518) Utilizing voting power from vote extensions (CometBFT) instead of the current bonded tokens (x/staking) to determine if a set of vote extensions are valid.
* (config) [#17649](https://github.com/cosmos/cosmos-sdk/pull/17649) Fix `mempool.max-txs` configuration is invalid in `app.config`.
Expand Down
153 changes: 108 additions & 45 deletions baseapp/abci_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,23 +139,32 @@ type (
ProposalTxVerifier interface {
PrepareProposalVerifyTx(tx sdk.Tx) ([]byte, error)
ProcessProposalVerifyTx(txBz []byte) (sdk.Tx, error)
TxDecode(txBz []byte) (sdk.Tx, error)
TxEncode(tx sdk.Tx) ([]byte, error)
}

// DefaultProposalHandler defines the default ABCI PrepareProposal and
// ProcessProposal handlers.
DefaultProposalHandler struct {
mempool mempool.Mempool
txVerifier ProposalTxVerifier
txSelector TxSelector
}
)

func NewDefaultProposalHandler(mp mempool.Mempool, txVerifier ProposalTxVerifier) DefaultProposalHandler {
return DefaultProposalHandler{
func NewDefaultProposalHandler(mp mempool.Mempool, txVerifier ProposalTxVerifier) *DefaultProposalHandler {
return &DefaultProposalHandler{
mempool: mp,
txVerifier: txVerifier,
txSelector: NewDefaultTxSelector(),
}
}

// SetTxSelector sets the TxSelector function on the DefaultProposalHandler.
func (h *DefaultProposalHandler) SetTxSelector(ts TxSelector) {
h.txSelector = ts
}

// PrepareProposalHandler returns the default implementation for processing an
// ABCI proposal. The application's mempool is enumerated and all valid
// transactions are added to the proposal. Transactions are valid if they:
Expand All @@ -176,77 +185,61 @@ func NewDefaultProposalHandler(mp mempool.Mempool, txVerifier ProposalTxVerifier
// - If no mempool is set or if the mempool is a no-op mempool, the transactions
// requested from CometBFT will simply be returned, which, by default, are in
// FIFO order.
func (h DefaultProposalHandler) PrepareProposalHandler() sdk.PrepareProposalHandler {
func (h *DefaultProposalHandler) PrepareProposalHandler() sdk.PrepareProposalHandler {
return func(ctx sdk.Context, req *abci.RequestPrepareProposal) (*abci.ResponsePrepareProposal, error) {
var maxBlockGas uint64
if b := ctx.ConsensusParams().Block; b != nil {
maxBlockGas = uint64(b.MaxGas)
}

defer h.txSelector.Clear()

// If the mempool is nil or NoOp we simply return the transactions
// requested from CometBFT, which, by default, should be in FIFO order.
//
// Note, we still need to ensure the transactions returned respect req.MaxTxBytes.
_, isNoOp := h.mempool.(mempool.NoOpMempool)
if h.mempool == nil || isNoOp {
return &abci.ResponsePrepareProposal{Txs: req.Txs}, nil
}
for _, txBz := range req.Txs {
tx, err := h.txVerifier.TxDecode(txBz)
if err != nil {
return nil, err
}

var maxBlockGas int64
if b := ctx.ConsensusParams().Block; b != nil {
maxBlockGas = b.MaxGas
}
stop := h.txSelector.SelectTxForProposal(uint64(req.MaxTxBytes), maxBlockGas, tx, txBz)
if stop {
break
}
}

var (
selectedTxs [][]byte
totalTxBytes int64
totalTxGas uint64
)
return &abci.ResponsePrepareProposal{Txs: h.txSelector.SelectedTxs()}, nil
}

iterator := h.mempool.Select(ctx, req.Txs)

for iterator != nil {
memTx := iterator.Tx()

// NOTE: Since transaction verification was already executed in CheckTx,
// which calls mempool.Insert, in theory everything in the pool should be
// valid. But some mempool implementations may insert invalid txs, so we
// check again.
bz, err := h.txVerifier.PrepareProposalVerifyTx(memTx)
txBz, err := h.txVerifier.PrepareProposalVerifyTx(memTx)
if err != nil {
err := h.mempool.Remove(memTx)
if err != nil && !errors.Is(err, mempool.ErrTxNotFound) {
return nil, err
}
} else {
var txGasLimit uint64
txSize := int64(len(bz))

gasTx, ok := memTx.(GasTx)
if ok {
txGasLimit = gasTx.GetGas()
}

// only add the transaction to the proposal if we have enough capacity
if (txSize + totalTxBytes) < req.MaxTxBytes {
// If there is a max block gas limit, add the tx only if the limit has
// not been met.
if maxBlockGas > 0 {
if (txGasLimit + totalTxGas) <= uint64(maxBlockGas) {
totalTxGas += txGasLimit
totalTxBytes += txSize
selectedTxs = append(selectedTxs, bz)
}
} else {
totalTxBytes += txSize
selectedTxs = append(selectedTxs, bz)
}
}

// Check if we've reached capacity. If so, we cannot select any more
// transactions.
if totalTxBytes >= req.MaxTxBytes || (maxBlockGas > 0 && (totalTxGas >= uint64(maxBlockGas))) {
stop := h.txSelector.SelectTxForProposal(uint64(req.MaxTxBytes), maxBlockGas, memTx, txBz)
if stop {
break
}
}

iterator = iterator.Next()
}

return &abci.ResponsePrepareProposal{Txs: selectedTxs}, nil
return &abci.ResponsePrepareProposal{Txs: h.txSelector.SelectedTxs()}, nil
}
}

Expand All @@ -261,7 +254,7 @@ func (h DefaultProposalHandler) PrepareProposalHandler() sdk.PrepareProposalHand
// DefaultPrepareProposal. It is very important that the same validation logic
// is used in both steps, and applications must ensure that this is the case in
// non-default handlers.
func (h DefaultProposalHandler) ProcessProposalHandler() sdk.ProcessProposalHandler {
func (h *DefaultProposalHandler) ProcessProposalHandler() sdk.ProcessProposalHandler {
// If the mempool is nil or NoOp we simply return ACCEPT,
// because PrepareProposal may have included txs that could fail verification.
_, isNoOp := h.mempool.(mempool.NoOpMempool)
Expand Down Expand Up @@ -330,3 +323,73 @@ func NoOpVerifyVoteExtensionHandler() sdk.VerifyVoteExtensionHandler {
return &abci.ResponseVerifyVoteExtension{Status: abci.ResponseVerifyVoteExtension_ACCEPT}, nil
}
}

// TxSelector defines a helper type that assists in selecting transactions during
// mempool transaction selection in PrepareProposal. It keeps track of the total
// number of bytes and total gas of the selected transactions. It also keeps
// track of the selected transactions themselves.
type TxSelector interface {
// SelectedTxs should return a copy of the selected transactions.
SelectedTxs() [][]byte

// Clear should clear the TxSelector, nulling out all relevant fields.
Clear()

// SelectTxForProposal should attempt to select a transaction for inclusion in
// a proposal based on inclusion criteria defined by the TxSelector. It must
// return <true> if the caller should halt the transaction selection loop
// (typically over a mempool) or <false> otherwise.
SelectTxForProposal(maxTxBytes, maxBlockGas uint64, memTx sdk.Tx, txBz []byte) bool
}

type defaultTxSelector struct {
totalTxBytes uint64
totalTxGas uint64
selectedTxs [][]byte
}

func NewDefaultTxSelector() TxSelector {
return &defaultTxSelector{}
}

func (ts *defaultTxSelector) SelectedTxs() [][]byte {
txs := make([][]byte, len(ts.selectedTxs))
copy(txs, ts.selectedTxs)
return txs
}

func (ts *defaultTxSelector) Clear() {
ts.totalTxBytes = 0
ts.totalTxGas = 0
ts.selectedTxs = nil
}

func (ts *defaultTxSelector) SelectTxForProposal(maxTxBytes, maxBlockGas uint64, memTx sdk.Tx, txBz []byte) bool {
txSize := uint64(len(txBz))

var txGasLimit uint64
if memTx != nil {
if gasTx, ok := memTx.(GasTx); ok {
txGasLimit = gasTx.GetGas()
}
}

// only add the transaction to the proposal if we have enough capacity
if (txSize + ts.totalTxBytes) <= maxTxBytes {
// If there is a max block gas limit, add the tx only if the limit has
// not been met.
if maxBlockGas > 0 {
if (txGasLimit + ts.totalTxGas) <= maxBlockGas {
ts.totalTxGas += txGasLimit
ts.totalTxBytes += txSize
ts.selectedTxs = append(ts.selectedTxs, txBz)
}
} else {
ts.totalTxBytes += txSize
ts.selectedTxs = append(ts.selectedTxs, txBz)
}
}

// check if we've reached capacity; if so, we cannot select any more transactions
return ts.totalTxBytes >= maxTxBytes || (maxBlockGas > 0 && (ts.totalTxGas >= maxBlockGas))
}
95 changes: 95 additions & 0 deletions baseapp/abci_utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,22 @@ import (
"github.com/cometbft/cometbft/crypto/secp256k1"
cmtprotocrypto "github.com/cometbft/cometbft/proto/tendermint/crypto"
cmtproto "github.com/cometbft/cometbft/proto/tendermint/types"
dbm "github.com/cosmos/cosmos-db"
protoio "github.com/cosmos/gogoproto/io"
"github.com/cosmos/gogoproto/proto"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/suite"

"cosmossdk.io/log"

"github.com/cosmos/cosmos-sdk/baseapp"
baseapptestutil "github.com/cosmos/cosmos-sdk/baseapp/testutil"
"github.com/cosmos/cosmos-sdk/baseapp/testutil/mock"
codectestutil "github.com/cosmos/cosmos-sdk/codec/testutil"
"github.com/cosmos/cosmos-sdk/testutil/testdata"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/cosmos/cosmos-sdk/types/mempool"
authtx "github.com/cosmos/cosmos-sdk/x/auth/tx"
)

const (
Expand Down Expand Up @@ -274,6 +282,93 @@ func (s *ABCIUtilsTestSuite) TestValidateVoteExtensionsTwoVotesNilAbsent() {
s.Require().Error(baseapp.ValidateVoteExtensions(s.ctx, s.valStore, 3, chainID, llc))
}

func (s *ABCIUtilsTestSuite) TestDefaultProposalHandler_NoOpMempoolTxSelection() {
// create a codec for marshaling
cdc := codectestutil.CodecOptions{}.NewCodec()
baseapptestutil.RegisterInterfaces(cdc.InterfaceRegistry())

// create a baseapp along with a tx config for tx generation
txConfig := authtx.NewTxConfig(cdc, authtx.DefaultSignModes)
app := baseapp.NewBaseApp(s.T().Name(), log.NewNopLogger(), dbm.NewMemDB(), txConfig.TxDecoder())

// create a proposal handler
ph := baseapp.NewDefaultProposalHandler(mempool.NoOpMempool{}, app)
handler := ph.PrepareProposalHandler()

// build a tx
_, _, addr := testdata.KeyTestPubAddr()
builder := txConfig.NewTxBuilder()
s.Require().NoError(builder.SetMsgs(
&baseapptestutil.MsgCounter{Counter: 0, FailOnHandler: false, Signer: addr.String()},
))
builder.SetGasLimit(100)
setTxSignature(s.T(), builder, 0)

// encode the tx to be used in the proposal request
tx := builder.GetTx()
txBz, err := txConfig.TxEncoder()(tx)
s.Require().NoError(err)
s.Require().Len(txBz, 152)

testCases := map[string]struct {
ctx sdk.Context
req *abci.RequestPrepareProposal
expectedTxs int
}{
"small max tx bytes": {
ctx: s.ctx,
req: &abci.RequestPrepareProposal{
Txs: [][]byte{txBz, txBz, txBz, txBz, txBz},
MaxTxBytes: 10,
},
expectedTxs: 0,
},
"small max gas": {
ctx: s.ctx.WithConsensusParams(cmtproto.ConsensusParams{
Block: &cmtproto.BlockParams{
MaxGas: 10,
},
}),
req: &abci.RequestPrepareProposal{
Txs: [][]byte{txBz, txBz, txBz, txBz, txBz},
MaxTxBytes: 456,
},
expectedTxs: 0,
},
"large max tx bytes": {
ctx: s.ctx,
req: &abci.RequestPrepareProposal{
Txs: [][]byte{txBz, txBz, txBz, txBz, txBz},
MaxTxBytes: 456,
},
expectedTxs: 3,
},
"max gas and tx bytes": {
ctx: s.ctx.WithConsensusParams(cmtproto.ConsensusParams{
Block: &cmtproto.BlockParams{
MaxGas: 200,
},
}),
req: &abci.RequestPrepareProposal{
Txs: [][]byte{txBz, txBz, txBz, txBz, txBz},
MaxTxBytes: 456,
},
expectedTxs: 2,
},
}

for name, tc := range testCases {
s.Run(name, func() {
// iterate multiple times to ensure the tx selector is cleared each time
for i := 0; i < 5; i++ {
resp, err := handler(tc.ctx, tc.req)
s.Require().NoError(err)
s.Require().Len(resp.Txs, tc.expectedTxs)
}
})
}
}

func marshalDelimitedFn(msg proto.Message) ([]byte, error) {
var buf bytes.Buffer
if err := protoio.NewDelimitedWriter(&buf).WriteMsg(msg); err != nil {
Expand Down
8 changes: 8 additions & 0 deletions baseapp/baseapp.go
Original file line number Diff line number Diff line change
Expand Up @@ -1092,6 +1092,14 @@ func (app *BaseApp) ProcessProposalVerifyTx(txBz []byte) (sdk.Tx, error) {
return tx, nil
}

func (app *BaseApp) TxDecode(txBytes []byte) (sdk.Tx, error) {
return app.txDecoder(txBytes)
}

func (app *BaseApp) TxEncode(tx sdk.Tx) ([]byte, error) {
return app.txEncoder(tx)
}

// Close is called in start cmd to gracefully cleanup resources.
func (app *BaseApp) Close() error {
var errs []error
Expand Down