Skip to content

Commit

Permalink
fix: concurrency issues between simulation, grpc queries and ABCI com…
Browse files Browse the repository at this point in the history
…mit flow (cosmos#213) (cosmos#216)
  • Loading branch information
mergify[bot] authored and roysc committed Jul 10, 2022
1 parent 9e0f6e8 commit a32bdf1
Show file tree
Hide file tree
Showing 6 changed files with 233 additions and 78 deletions.
2 changes: 1 addition & 1 deletion baseapp/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -624,7 +624,7 @@ func (app *BaseApp) createQueryContext(height int64, prove bool) (sdk.Context, e
return sdk.Context{},
sdkerrors.Wrapf(
sdkerrors.ErrInvalidRequest,
"failed to load state at height %d; %s (latest height: %d)", height, err, lastBlockHeight,
"failed to load state at height %d; %w", height, err,
)
}

Expand Down
5 changes: 5 additions & 0 deletions client/export_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package client

var (
SelectHeight = selectHeight
)
58 changes: 38 additions & 20 deletions client/grpc_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"reflect"
"strconv"
"strings"

"github.com/cosmos/cosmos-sdk/codec"
proto "github.com/gogo/protobuf/proto"
Expand Down Expand Up @@ -57,9 +58,23 @@ func (ctx Context) Invoke(grpcCtx gocontext.Context, method string, req, reply i
return err
}

if ctx.GRPCClient != nil {
// Certain queries must not be be concurrent with ABCI to function correctly.
// As a result, we direct them to the ABCI flow where they get syncronized.
_, isSimulationRequest := req.(*tx.SimulateRequest)
isTendermintQuery := strings.Contains(method, "tendermint")

isGRPCAllowed := !isTendermintQuery && !isSimulationRequest

requestedHeight, err := selectHeight(ctx, grpcCtx)
if err != nil {
return err
}

if ctx.GRPCClient != nil && isGRPCAllowed {
md := metadata.Pairs(grpctypes.GRPCBlockHeightHeader, strconv.FormatInt(requestedHeight, 10))
context := metadata.NewOutgoingContext(grpcCtx, md)
// Case 2-1. Invoke grpc.
return ctx.GRPCClient.Invoke(grpcCtx, method, req, reply, opts...)
return ctx.GRPCClient.Invoke(context, method, req, reply, opts...)
}

// Case 2-2. Querying state via abci query.
Expand All @@ -68,26 +83,10 @@ func (ctx Context) Invoke(grpcCtx gocontext.Context, method string, req, reply i
return err
}

// parse height header
md, _ := metadata.FromOutgoingContext(grpcCtx)
if heights := md.Get(grpctypes.GRPCBlockHeightHeader); len(heights) > 0 {
height, err := strconv.ParseInt(heights[0], 10, 64)
if err != nil {
return err
}
if height < 0 {
return sdkerrors.Wrapf(
sdkerrors.ErrInvalidRequest,
"client.Context.Invoke: height (%d) from %q must be >= 0", height, grpctypes.GRPCBlockHeightHeader)
}

ctx = ctx.WithHeight(height)
}

abciReq := abci.RequestQuery{
Path: method,
Data: reqBz,
Height: ctx.Height,
Height: requestedHeight,
}

res, err := ctx.QueryABCI(abciReq)
Expand All @@ -105,7 +104,7 @@ func (ctx Context) Invoke(grpcCtx gocontext.Context, method string, req, reply i
// We then parse all the call options, if the call option is a
// HeaderCallOption, then we manually set the value of that header to the
// metadata.
md = metadata.Pairs(grpctypes.GRPCBlockHeightHeader, strconv.FormatInt(res.Height, 10))
md := metadata.Pairs(grpctypes.GRPCBlockHeightHeader, strconv.FormatInt(res.Height, 10))
for _, callOpt := range opts {
header, ok := callOpt.(grpc.HeaderCallOption)
if !ok {
Expand Down Expand Up @@ -175,3 +174,22 @@ func (f failingInterfaceRegistry) ListAllInterfaces() []string {
func (f failingInterfaceRegistry) ListImplementations(ifaceTypeURL string) []string {
panic("cannot be called")
}

// selectHeight returns the height chosen from client context and grpc context.
// If exists, height extracted from grpcCtx takes precedence.
func selectHeight(clientContext Context, grpcCtx gocontext.Context) (int64, error) {
var height int64
if clientContext.Height > 0 {
height = clientContext.Height
}

md, _ := metadata.FromOutgoingContext(grpcCtx)
if heights := md.Get(grpctypes.GRPCBlockHeightHeader); len(heights) > 0 {
var err error
height, err = strconv.ParseInt(heights[0], 10, 64)
if err != nil {
return 0, err
}
}
return height, nil
}
174 changes: 142 additions & 32 deletions client/grpc_query_test.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
//go:build norace
// +build norace

package client_test

import (
"context"
"fmt"
"testing"

"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"

"github.com/cosmos/cosmos-sdk/client"
"github.com/cosmos/cosmos-sdk/testutil/network"
"github.com/cosmos/cosmos-sdk/testutil/testdata"
sdk "github.com/cosmos/cosmos-sdk/types"
Expand All @@ -25,14 +24,31 @@ type IntegrationTestSuite struct {
network *network.Network
}

type testcase struct {
clientContextHeight int64
grpcHeight int64
expectedHeight int64
}

const (
// if clientContextHeight or grpcHeight is set to this flag,
// the test assumes that the respective height is not provided.
heightNotSetFlag = int64(-1)
// given the current block time, this should never be reached by the time
// a test is run.
invalidBeyondLatestHeight = 1_000_000_000
// if this flag is set to expectedHeight, an error is assumed.
errorHeightFlag = int64(-2)
)

func (s *IntegrationTestSuite) SetupSuite() {
s.T().Log("setting up integration test suite")

var err error
s.network, err = network.New(s.T(), s.T().TempDir(), network.DefaultConfig())
s.Require().NoError(err)

_, err = s.network.WaitForHeight(2)
_, err = s.network.WaitForHeight(3)
s.Require().NoError(err)
}

Expand All @@ -41,44 +57,138 @@ func (s *IntegrationTestSuite) TearDownSuite() {
s.network.Cleanup()
}

func (s *IntegrationTestSuite) TestGRPCQuery() {
func (s *IntegrationTestSuite) TestGRPCQuery_TestService() {
val0 := s.network.Validators[0]

// gRPC query to test service should work
testClient := testdata.NewQueryClient(val0.ClientCtx)
testRes, err := testClient.Echo(context.Background(), &testdata.EchoRequest{Message: "hello"})
s.Require().NoError(err)
s.Require().Equal("hello", testRes.Message)
}

// gRPC query to bank service should work
denom := fmt.Sprintf("%stoken", val0.Moniker)
bankClient := banktypes.NewQueryClient(val0.ClientCtx)
var header metadata.MD
bankRes, err := bankClient.Balance(
context.Background(),
&banktypes.QueryBalanceRequest{Address: val0.Address.String(), Denom: denom},
grpc.Header(&header), // Also fetch grpc header
)
s.Require().NoError(err)
s.Require().Equal(
sdk.NewCoin(denom, s.network.Config.AccountTokens),
*bankRes.GetBalance(),
)
blockHeight := header.Get(grpctypes.GRPCBlockHeightHeader)
s.Require().NotEmpty(blockHeight[0]) // Should contain the block height

// Request metadata should work
val0.ClientCtx = val0.ClientCtx.WithHeight(1) // We set clientCtx to height 1
bankClient = banktypes.NewQueryClient(val0.ClientCtx)
bankRes, err = bankClient.Balance(
context.Background(),
&banktypes.QueryBalanceRequest{Address: val0.Address.String(), Denom: denom},
grpc.Header(&header),
)
blockHeight = header.Get(grpctypes.GRPCBlockHeightHeader)
s.Require().Equal([]string{"1"}, blockHeight)
func (s *IntegrationTestSuite) TestGRPCQuery_BankService_VariousInputs() {
val0 := s.network.Validators[0]

const method = "/cosmos.bank.v1beta1.Query/Balance"

testcases := map[string]testcase{
"clientContextHeight 1; grpcHeight not set - clientContextHeight selected": {
clientContextHeight: 1, // chosen
grpcHeight: heightNotSetFlag,
expectedHeight: 1,
},
"clientContextHeight not set; grpcHeight is 2 - grpcHeight is chosen": {
clientContextHeight: heightNotSetFlag,
grpcHeight: 2, // chosen
expectedHeight: 2,
},
"both not set - 0 returned": {
clientContextHeight: heightNotSetFlag,
grpcHeight: heightNotSetFlag,
expectedHeight: 3, // latest height
},
"clientContextHeight 3; grpcHeight is 0 - grpcHeight is chosen": {
clientContextHeight: 1,
grpcHeight: 0, // chosen
expectedHeight: 3, // latest height
},
"clientContextHeight 3; grpcHeight is 3 - 3 is returned": {
clientContextHeight: 3,
grpcHeight: 3,
expectedHeight: 3,
},
"clientContextHeight is 1_000_000_000; grpcHeight is 1_000_000_000 - requested beyond latest height - error": {
clientContextHeight: invalidBeyondLatestHeight,
grpcHeight: invalidBeyondLatestHeight,
expectedHeight: errorHeightFlag,
},
}

for name, tc := range testcases {
s.T().Run(name, func(t *testing.T) {
// Setup
clientCtx := val0.ClientCtx
clientCtx.Height = 0

if tc.clientContextHeight != heightNotSetFlag {
clientCtx = clientCtx.WithHeight(tc.clientContextHeight)
}

grpcContext := context.Background()
if tc.grpcHeight != heightNotSetFlag {
header := metadata.Pairs(grpctypes.GRPCBlockHeightHeader, fmt.Sprintf("%d", tc.grpcHeight))
grpcContext = metadata.NewOutgoingContext(grpcContext, header)
}

// Test
var header metadata.MD
denom := fmt.Sprintf("%stoken", val0.Moniker)
request := &banktypes.QueryBalanceRequest{Address: val0.Address.String(), Denom: denom}
response := &banktypes.QueryBalanceResponse{}
err := clientCtx.Invoke(grpcContext, method, request, response, grpc.Header(&header))

// Assert results
if tc.expectedHeight == errorHeightFlag {
s.Require().Error(err)
return
}

s.Require().NoError(err)
s.Require().Equal(
sdk.NewCoin(denom, s.network.Config.AccountTokens),
*response.GetBalance(),
)
blockHeight := header.Get(grpctypes.GRPCBlockHeightHeader)
s.Require().Equal([]string{fmt.Sprintf("%d", tc.expectedHeight)}, blockHeight)
})
}
}

func TestIntegrationTestSuite(t *testing.T) {
suite.Run(t, new(IntegrationTestSuite))
}

func TestSelectHeight(t *testing.T) {
testcases := map[string]testcase{
"clientContextHeight 1; grpcHeight not set - clientContextHeight selected": {
clientContextHeight: 1,
grpcHeight: heightNotSetFlag,
expectedHeight: 1,
},
"clientContextHeight not set; grpcHeight is 2 - grpcHeight is chosen": {
clientContextHeight: heightNotSetFlag,
grpcHeight: 2,
expectedHeight: 2,
},
"both not set - 0 returned": {
clientContextHeight: heightNotSetFlag,
grpcHeight: heightNotSetFlag,
expectedHeight: 0,
},
"clientContextHeight 3; grpcHeight is 0 - grpcHeight is chosen": {
clientContextHeight: 3,
grpcHeight: 0,
expectedHeight: 0,
},
}

for name, tc := range testcases {
t.Run(name, func(t *testing.T) {
clientCtx := client.Context{}
if tc.clientContextHeight != heightNotSetFlag {
clientCtx = clientCtx.WithHeight(tc.clientContextHeight)
}

grpcContxt := context.Background()
if tc.grpcHeight != heightNotSetFlag {
header := metadata.Pairs(grpctypes.GRPCBlockHeightHeader, fmt.Sprintf("%d", tc.grpcHeight))
grpcContxt = metadata.NewOutgoingContext(grpcContxt, header)
}

height, err := client.SelectHeight(clientCtx, grpcContxt)
require.NoError(t, err)
require.Equal(t, tc.expectedHeight, height)
})
}
}
19 changes: 19 additions & 0 deletions testutil/network/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ import (
"github.com/tendermint/tendermint/node"
"github.com/tendermint/tendermint/rpc/client/local"
"github.com/tendermint/tendermint/types"
"google.golang.org/grpc"
"google.golang.org/grpc/encoding"
"google.golang.org/grpc/encoding/proto"

"github.com/cosmos/cosmos-sdk/server/api"
servergrpc "github.com/cosmos/cosmos-sdk/server/grpc"
Expand Down Expand Up @@ -106,6 +109,22 @@ func startInProcess(cfg Config, val *Validator) error {

val.grpc = grpcSrv

// If grpc is enabled, configure grpc client.
grpcClient, err := grpc.Dial(
val.AppConfig.GRPC.Address,
grpc.WithInsecure(),
grpc.WithDefaultCallOptions(
grpc.ForceCodec(encoding.GetCodec(proto.Name)),
grpc.MaxCallRecvMsgSize(val.AppConfig.GRPC.MaxRecvMsgSize),
grpc.MaxCallSendMsgSize(val.AppConfig.GRPC.MaxSendMsgSize),
),
)
if err != nil {
return err
}

val.ClientCtx = val.ClientCtx.WithGRPCClient(grpcClient)

if val.AppConfig.GRPCWeb.Enable {
val.grpcWeb, err = servergrpc.StartGRPCWeb(grpcSrv, *val.AppConfig)
if err != nil {
Expand Down
Loading

0 comments on commit a32bdf1

Please sign in to comment.