Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(StargateQueries): use a sync pool when unmarshalling responses of protobuf objects #7346

Merged
merged 7 commits into from
Jan 20, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

* [#7181](https://github.com/osmosis-labs/osmosis/pull/7181) Improve errors for out of gas

### Bug Fixes

* [#7346](https://github.com/osmosis-labs/osmosis/pull/7346) Prevent heavy gRPC load from app hashing nodes

## v22.0.0

### Fee Market Parameter Updates
Expand Down
2 changes: 1 addition & 1 deletion cmd/osmosisd/cmd/stargate-query.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ Example:
//nolint:staticcheck
func GetStructAndFill(queryPath, module, structName string, structArguments ...string) (interface{}, error) {
const ParamRequest = "QueryParamsRequest"
_, err := wasmbinding.GetWhitelistedQuery(queryPath)
err := wasmbinding.IsWhitelistedQuery(queryPath)
if err != nil {
return nil, err
}
Expand Down
6 changes: 5 additions & 1 deletion wasmbinding/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ package wasmbinding

import "github.com/cosmos/cosmos-sdk/codec"

func SetWhitelistedQuery(queryPath string, protoType codec.ProtoMarshaler) {
func SetWhitelistedQuery[T any, PT protoTypeG[T]](queryPath string, protoType PT) {
setWhitelistedQuery(queryPath, protoType)
}

func GetWhitelistedQuery(queryPath string) (codec.ProtoMarshaler, error) {
testinginprod marked this conversation as resolved.
Show resolved Hide resolved
return getWhitelistedQuery(queryPath)
}
5 changes: 4 additions & 1 deletion wasmbinding/query_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@ import (
// StargateQuerier dispatches whitelisted stargate queries
func StargateQuerier(queryRouter baseapp.GRPCQueryRouter, cdc codec.Codec) func(ctx sdk.Context, request *wasmvmtypes.StargateQuery) ([]byte, error) {
return func(ctx sdk.Context, request *wasmvmtypes.StargateQuery) ([]byte, error) {
protoResponseType, err := GetWhitelistedQuery(request.Path)
protoResponseType, err := getWhitelistedQuery(request.Path)
if err != nil {
return nil, err
}
// no matter what happens after this point, but we must return
// the response type to the pool.
defer returnStargateResponseToPool(request.Path, protoResponseType)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to return the sync.Pool object, so it does not leak


route := queryRouter.Route(request.Path)
if route == nil {
Expand Down
61 changes: 40 additions & 21 deletions wasmbinding/stargate_whitelist.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,15 @@ import (
epochtypes "github.com/osmosis-labs/osmosis/x/epochs/types"
)

// stargateWhitelist keeps whitelist and its deterministic
// stargateResponsePools keeps whitelist and its deterministic
// response binding for stargate queries.
// CONTRACT: since results of queries go into blocks, queries being added here should always be
// deterministic or can cause non-determinism in the state machine.
//
// The query can be multi-thread, so we have to use
// thread safe sync.Map.
var stargateWhitelist sync.Map
// The query is multi-threaded so we're using a sync.Pool
// to manage the allocation and de-allocation of newly created
// pb objects.
var stargateResponsePools = make(map[string]*sync.Pool)
Comment on lines -41 to +44
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trying to understand - is the main reason sync.Pool works and sync.Map doesn't is that the former allocates new objects for concurrent requests?

Copy link
Contributor Author

@testinginprod testinginprod Jan 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

basically the sync.Map was keeping safe the map, which was not needed since after init the map is readonly.

the value of the map was a pointer to a protobuf object.

So we had a map of Map[K, *V]

and simulate + delivertx (which shared the same string request path) where all editing the same pointer, meaning they were editing the same variable underneath, concurrently (not the map but the value associated with the key in that map).

So what were we using that value for? To unmarshal a stargate query into a protobuf object that then we marshal back as JSON for CosmWasm contracts.

So what this map of sync pool does is that it provides a way to create new objects matching to a specific gRPC query response type (creation=allocation), and when we're done with them we put them it the pool so we can use them again without allocating anymore. sync.Pool takes care of de-allocating them when they're not needed anymore so we do not have to worry.

hope this clarifies the issue

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding is yes, the later requires a pointer and shares the same struct to unmarshal into, whereas this creates a new object for each request. Utilizing sync.Pool allows us to reallocate the object once completed though so it doesn't have a large impact on performance.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exactly! We could have used a map of map[string]func() codec.ProtoMarshaler, where string is the request path and func() codec.ProtoMarshaler is a function that returns a newly and freshly created protobuf object to be used as target for response unmarshalling, but this could cause GC over-head in concurrent scenarios as the object is created and needs to immediately be GC'd (eg: during a lot of concurrent sims).

So sync.Pool simply allows us to recycle unused objects instead of immediately forcing the GC to de-allocate them immediately.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

testinginprod's explanation is much more thorough, thanks.


// Note: When adding a migration here, we should also add it to the Async ICQ params in the upgrade.
// In the future we may want to find a better way to keep these in sync
Expand Down Expand Up @@ -184,34 +185,52 @@ func init() {
setWhitelistedQuery("/osmosis.concentratedliquidity.v1beta1.Query/CFMMPoolIdLinkFromConcentratedPoolId", &concentratedliquidityquery.CFMMPoolIdLinkFromConcentratedPoolIdResponse{})
}

// GetWhitelistedQuery returns the whitelisted query at the provided path.
// IsWhitelistedQuery returns if the query is not whitelisted.
func IsWhitelistedQuery(queryPath string) error {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exposed this method in place of getWhitelistedQuery to avoid unexported usage of a function that can leak memory if not used properly

_, isWhitelisted := stargateResponsePools[queryPath]
if !isWhitelisted {
return wasmvmtypes.UnsupportedRequest{Kind: fmt.Sprintf("'%s' path is not allowed from the contract", queryPath)}
}
return nil
}

// getWhitelistedQuery returns the whitelisted query at the provided path.
// If the query does not exist, or it was setup wrong by the chain, this returns an error.
func GetWhitelistedQuery(queryPath string) (codec.ProtoMarshaler, error) {
protoResponseAny, isWhitelisted := stargateWhitelist.Load(queryPath)
// CONTRACT: must call returnStargateResponseToPool in order to avoid pointless allocs.
func getWhitelistedQuery(queryPath string) (codec.ProtoMarshaler, error) {
protoResponseAny, isWhitelisted := stargateResponsePools[queryPath]
if !isWhitelisted {
return nil, wasmvmtypes.UnsupportedRequest{Kind: fmt.Sprintf("'%s' path is not allowed from the contract", queryPath)}
}
protoResponseType, ok := protoResponseAny.(codec.ProtoMarshaler)
protoMarshaler, ok := protoResponseAny.Get().(codec.ProtoMarshaler)
Comment on lines -194 to +205
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering whether this cast is the primary source of the issue?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unreleated: #7346 (comment)

if !ok {
return nil, wasmvmtypes.Unknown{}
return nil, fmt.Errorf("failed to assert type to codec.ProtoMarshaler")
Copy link
Member

@czarcas7ic czarcas7ic Jan 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to use a wasmvmtypes error here?

Looking at the caller this seems fine, but wanted to flag to ensure I wasn't sneaking this in.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spoke with Roman offline, looks fine.

}
return protoResponseType, nil
return protoMarshaler, nil
}

func setWhitelistedQuery(queryPath string, protoType codec.ProtoMarshaler) {
stargateWhitelist.Store(queryPath, protoType)
type protoTypeG[T any] interface {
*T
codec.ProtoMarshaler
}

func setWhitelistedQuery[T any, PT protoTypeG[T]](queryPath string, _ PT) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this creates a sync.Pool for the given protobuf object, we use generics so we can properly instantiate an object that queryPath expects as response.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you comment in the code with this context please?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added comment here 801eae8

stargateResponsePools[queryPath] = &sync.Pool{
New: func() any {
return PT(new(T))
},
}
}

func returnStargateResponseToPool(queryPath string, pb codec.ProtoMarshaler) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this returns the protobuf object to its appropriate pool (based on the queryPath)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Transferring this into a comment would also be helpful IMO 🙏

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added comment here 2192082

stargateResponsePools[queryPath].Put(pb)
}

func GetStargateWhitelistedPaths() (keys []string) {
// Iterate over the map and collect the keys
stargateWhitelist.Range(func(key, value interface{}) bool {
keyStr, ok := key.(string)
if !ok {
panic("key is not a string")
}
keys = append(keys, keyStr)
return true
})

keys = make([]string, 0, len(stargateResponsePools))
for k := range stargateResponsePools {
keys = append(keys, k)
}
return keys
}