Skip to content

Commit

Permalink
feat: add max_batch_request_num feature (#672)
Browse files Browse the repository at this point in the history
* chore: add a new parameter to the configuration

Signed-off-by: 170210 <[email protected]>

* feat: add max_request_batch_request feature in Ostracon

Signed-off-by: 170210 <[email protected]>

* test: add test in http_json_handler_test

Signed-off-by: 170210 <[email protected]>

* test: add the field to http header of existing tests

Signed-off-by: 170210 <[email protected]>

* test: add test in http_server_test

Signed-off-by: 170210 <[email protected]>

* fixup: add field to merged tests

Signed-off-by: 170210 <[email protected]>

* refactor: change to max_batch_request_num

Signed-off-by: 170210 <[email protected]>

* fixup: fix goroutine leaks in test

Signed-off-by: 170210 <[email protected]>

* fixup: fix warning when defer in a loop

Signed-off-by: 170210 <[email protected]>

* style: fix for comment

Signed-off-by: 170210 <[email protected]>

* refactor: change request header key

Signed-off-by: 170210 <[email protected]>

* fixup: fix for review

Signed-off-by: 170210 <[email protected]>

* fixup: add field in light client

Signed-off-by: 170210 <[email protected]>

---------

Signed-off-by: 170210 <[email protected]>
  • Loading branch information
170210 authored Jul 28, 2023
1 parent 73ca272 commit 59b58b6
Show file tree
Hide file tree
Showing 11 changed files with 205 additions and 12 deletions.
1 change: 1 addition & 0 deletions cmd/ostracon/commands/light.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ func runProxy(cmd *cobra.Command, args []string) error {

cfg := rpcserver.DefaultConfig()
cfg.MaxBodyBytes = config.RPC.MaxBodyBytes
cfg.MaxBatchRequestNum = config.RPC.MaxBatchRequestNum
cfg.MaxHeaderBytes = config.RPC.MaxHeaderBytes
cfg.MaxOpenConnections = maxOpenConnections
// If necessary adjust global WriteTimeout to ensure it's greater than
Expand Down
11 changes: 9 additions & 2 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,9 @@ type RPCConfig struct {
// Maximum size of request body, in bytes
MaxBodyBytes int64 `mapstructure:"max_body_bytes"`

// Maximum number of requests in a request body
MaxBatchRequestNum int `mapstructure:"max_batch_request_num"`

// Maximum size of request header, in bytes
MaxHeaderBytes int `mapstructure:"max_header_bytes"`

Expand Down Expand Up @@ -492,8 +495,9 @@ func DefaultRPCConfig() *RPCConfig {
TimeoutBroadcastTxCommit: 10 * time.Second,
WebSocketWriteBufferSize: defaultSubscriptionBufferSize,

MaxBodyBytes: int64(1000000), // 1MB
MaxHeaderBytes: 1 << 20, // same as the net/http default
MaxBodyBytes: int64(1000000), // 1MB
MaxBatchRequestNum: 10,
MaxHeaderBytes: 1 << 20, // same as the net/http default

TLSCertFile: "",
TLSKeyFile: "",
Expand Down Expand Up @@ -551,6 +555,9 @@ func (cfg *RPCConfig) ValidateBasic() error {
if cfg.MaxBodyBytes < 0 {
return errors.New("max_body_bytes can't be negative")
}
if cfg.MaxBatchRequestNum < 0 {
return errors.New("max_batch_request_num can't be negative")
}
if cfg.MaxHeaderBytes < 0 {
return errors.New("max_header_bytes can't be negative")
}
Expand Down
1 change: 1 addition & 0 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ func TestRPCConfigValidateBasic(t *testing.T) {
"MaxSubscriptionsPerClient",
"TimeoutBroadcastTxCommit",
"MaxBodyBytes",
"MaxBatchRequestNum",
"MaxHeaderBytes",
}

Expand Down
3 changes: 3 additions & 0 deletions config/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,9 @@ timeout_broadcast_tx_commit = "{{ .RPC.TimeoutBroadcastTxCommit }}"
# Maximum size of request body, in bytes
max_body_bytes = {{ .RPC.MaxBodyBytes }}
# Maximum number of requests in a request body
max_batch_request_num = {{ .RPC.MaxBatchRequestNum }}
# Maximum size of request header, in bytes
max_header_bytes = {{ .RPC.MaxHeaderBytes }}
Expand Down
2 changes: 2 additions & 0 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -1152,6 +1152,7 @@ func (n *Node) startRPC() ([]net.Listener, error) {

config := rpcserver.DefaultConfig()
config.MaxBodyBytes = n.config.RPC.MaxBodyBytes
config.MaxBatchRequestNum = n.config.RPC.MaxBatchRequestNum
config.MaxHeaderBytes = n.config.RPC.MaxHeaderBytes
config.MaxOpenConnections = n.config.RPC.MaxOpenConnections
config.ReadTimeout = n.config.RPC.ReadTimeout
Expand Down Expand Up @@ -1234,6 +1235,7 @@ func (n *Node) startRPC() ([]net.Listener, error) {
if grpcListenAddr != "" {
config := rpcserver.DefaultConfig()
config.MaxBodyBytes = n.config.RPC.MaxBodyBytes
config.MaxBatchRequestNum = n.config.RPC.MaxBatchRequestNum
config.MaxHeaderBytes = n.config.RPC.MaxHeaderBytes
// NOTE: GRPCMaxOpenConnections is used, not MaxOpenConnections
config.MaxOpenConnections = n.config.RPC.GRPCMaxOpenConnections
Expand Down
7 changes: 4 additions & 3 deletions rpc/jsonrpc/server/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@ var (
TestWSRPCFunc = NewWSRPCFunc(
func(ctx *types.Context, s string, i int) (string, error) { return TestText, nil }, "s,i")

TestFuncMap = map[string]*RPCFunc{"c": TestRPCFunc}
TestGoodBody = `{"jsonrpc": "2.0", "method": "c", "id": "0", "params": null}`
TestBadParams = `{"jsonrpc": "2.0", "method": "c", "id": "0", "params": "s=a,i=b"}`
TestFuncMap = map[string]*RPCFunc{"c": TestRPCFunc}
TestGoodBody = `{"jsonrpc": "2.0", "method": "c", "id": "0", "params": null}`
TestBadParams = `{"jsonrpc": "2.0", "method": "c", "id": "0", "params": "s=a,i=b"}`
TestMaxBatchRequestNum = "10"
)

type FailManager struct {
Expand Down
24 changes: 24 additions & 0 deletions rpc/jsonrpc/server/http_json_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"net/http"
"reflect"
"sort"
"strconv"

tmjson "github.com/Finschia/ostracon/libs/json"
"github.com/Finschia/ostracon/libs/log"
Expand Down Expand Up @@ -54,6 +55,29 @@ func makeJSONRPCHandler(funcMap map[string]*RPCFunc, logger log.Logger) http.Han
}
requests = []types.RPCRequest{request}
}
// read the Max-Batch-Request-Num from header
maxBatchRequestNum, err := strconv.Atoi(r.Header.Get("Max-Batch-Request-Num"))
if err != nil {
res := types.RPCInvalidRequestError(nil,
fmt.Errorf("error reading request header key"),
)
if wErr := WriteRPCResponseHTTPError(w, http.StatusInternalServerError, res); wErr != nil {
logger.Error("failed to write response", "res", res, "err", wErr)
}
return
}

// if the number of requests in the batch exceeds the max_batch_request_num
// return a invalid request error
if len(requests) > maxBatchRequestNum {
res := types.RPCInvalidRequestError(nil,
fmt.Errorf("too many requests in a request batch, current is %d, where the upper limit is %d", len(requests), maxBatchRequestNum),
)
if wErr := WriteRPCResponseHTTPError(w, http.StatusBadRequest, res); wErr != nil {
logger.Error("failed to write response", "res", res, "err", wErr)
}
return
}

// Set the default response cache to true unless
// 1. Any RPC request error.
Expand Down
42 changes: 42 additions & 0 deletions rpc/jsonrpc/server/http_json_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ func TestRPCParams(t *testing.T) {

for i, tt := range tests {
req, _ := http.NewRequest("POST", "http://localhost/", strings.NewReader(tt.payload))
req.Header.Set("Max-Batch-Request-Num", TestMaxBatchRequestNum)
rec := httptest.NewRecorder()
mux.ServeHTTP(rec, req)
res := rec.Result()
Expand Down Expand Up @@ -110,6 +111,7 @@ func TestJSONRPCID(t *testing.T) {

for i, tt := range tests {
req, _ := http.NewRequest("POST", "http://localhost/", strings.NewReader(tt.payload))
req.Header.Set("Max-Batch-Request-Num", TestMaxBatchRequestNum)
rec := httptest.NewRecorder()
mux.ServeHTTP(rec, req)
res := rec.Result()
Expand Down Expand Up @@ -139,6 +141,7 @@ func TestRPCNotification(t *testing.T) {
mux := testMux()
body := strings.NewReader(`{"jsonrpc": "2.0"}`)
req, _ := http.NewRequest("POST", "http://localhost/", body)
req.Header.Set("Max-Batch-Request-Num", TestMaxBatchRequestNum)
rec := httptest.NewRecorder()
mux.ServeHTTP(rec, req)
res := rec.Result()
Expand Down Expand Up @@ -176,6 +179,7 @@ func TestRPCNotificationInBatch(t *testing.T) {
}
for i, tt := range tests {
req, _ := http.NewRequest("POST", "http://localhost/", strings.NewReader(tt.payload))
req.Header.Set("Max-Batch-Request-Num", TestMaxBatchRequestNum)
rec := httptest.NewRecorder()
mux.ServeHTTP(rec, req)
res := rec.Result()
Expand Down Expand Up @@ -218,6 +222,40 @@ func TestRPCNotificationInBatch(t *testing.T) {
}
}

func TestTooManyRPCNotificationInBatch_error(t *testing.T) {
// prepare the mock batch request
var jsonArray []json.RawMessage
for i := 0; i < 11; i++ {
jsonArray = append(jsonArray, json.RawMessage(TestGoodBody))
}
jsonData, err := json.Marshal(jsonArray)
if err != nil {
t.Errorf("expected an array, couldn't marshal it")
}
// execute the batch request
mux := testMux()
req, _ := http.NewRequest("POST", "http://localhost/", strings.NewReader(string(jsonData)))
req.Header.Set("Max-Batch-Request-Num", TestMaxBatchRequestNum)
rec := httptest.NewRecorder()
mux.ServeHTTP(rec, req)
res := rec.Result()
res.Body.Close()
// always expecting back a 400 error
assert.Equal(t, http.StatusBadRequest, res.StatusCode, "should always return 400")
}

func TestNoMaxBatchRequestNumField_error(t *testing.T) {
// execute the batch request
mux := testMux()
req, _ := http.NewRequest("POST", "http://localhost/", strings.NewReader(TestGoodBody))
rec := httptest.NewRecorder()
mux.ServeHTTP(rec, req)
res := rec.Result()
res.Body.Close()
// always expecting back a 500 error
assert.Equal(t, http.StatusInternalServerError, res.StatusCode, "should always return 500")
}

func TestUnknownRPCPath(t *testing.T) {
mux := testMux()
req, _ := http.NewRequest("GET", "http://localhost/unknownrpcpath", nil)
Expand All @@ -234,6 +272,7 @@ func TestRPCResponseCache(t *testing.T) {
mux := testMux()
body := strings.NewReader(`{"jsonrpc": "2.0","method":"block","id": 0, "params": ["1"]}`)
req, _ := http.NewRequest("Get", "http://localhost/", body)
req.Header.Set("Max-Batch-Request-Num", TestMaxBatchRequestNum)
rec := httptest.NewRecorder()
mux.ServeHTTP(rec, req)
res := rec.Result()
Expand All @@ -249,6 +288,7 @@ func TestRPCResponseCache(t *testing.T) {
// send a request with default height.
body = strings.NewReader(`{"jsonrpc": "2.0","method":"block","id": 0, "params": ["0"]}`)
req, _ = http.NewRequest("Get", "http://localhost/", body)
req.Header.Set("Max-Batch-Request-Num", TestMaxBatchRequestNum)
rec = httptest.NewRecorder()
mux.ServeHTTP(rec, req)
res = rec.Result()
Expand All @@ -265,6 +305,7 @@ func TestRPCResponseCache(t *testing.T) {
// send a request with default height, but as empty set of parameters.
body = strings.NewReader(`{"jsonrpc": "2.0","method":"block","id": 0, "params": []}`)
req, _ = http.NewRequest("Get", "http://localhost/", body)
req.Header.Set("Max-Batch-Request-Num", TestMaxBatchRequestNum)
rec = httptest.NewRecorder()
mux.ServeHTTP(rec, req)
res = rec.Result()
Expand Down Expand Up @@ -294,6 +335,7 @@ func TestMakeJSONRPCHandler_Unmarshal_WriteRPCResponseHTTPError_error(t *testing
func TestMakeJSONRPCHandler_last_WriteRPCResponseHTTP_error(t *testing.T) {
handlerFunc := makeJSONRPCHandler(TestFuncMap, log.TestingLogger())
req, _ := http.NewRequest("GET", "http://localhost/", strings.NewReader(TestGoodBody))
req.Header.Set("Max-Batch-Request-Num", TestMaxBatchRequestNum)
// WriteRPCResponseHTTP error
rec := NewFailedWriteResponseWriter()
handlerFunc.ServeHTTP(rec, req)
Expand Down
51 changes: 44 additions & 7 deletions rpc/jsonrpc/server/http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"net/http"
"os"
"runtime/debug"
"strconv"
"strings"
"time"

Expand All @@ -32,6 +33,8 @@ type Config struct {
// MaxBodyBytes controls the maximum number of bytes the
// server will read parsing the request body.
MaxBodyBytes int64
// MaxBodyBytes controls the maximum number of one request batch
MaxBatchRequestNum int
// mirrors http.Server#MaxHeaderBytes
MaxHeaderBytes int
}
Expand All @@ -44,19 +47,31 @@ func DefaultConfig() *Config {
WriteTimeout: 10 * time.Second,
IdleTimeout: 60 * time.Second,
MaxBodyBytes: int64(1000000), // 1MB
MaxHeaderBytes: 1 << 20, // same as the net/http default
MaxBatchRequestNum: 10,
MaxHeaderBytes: 1 << 20, // same as the net/http default
}
}

// Serve creates a http.Server and calls Serve with the given listener. It
// wraps handler with RecoverAndLogHandler and a handler, which limits the max
// body size to config.MaxBodyBytes.
// wraps handler with RecoverAndLogHandler and handlers. Handlers contain
// a maxBytesHandler, which limits the max body size to config.MaxBodyBytes and
// a maxBatchRequestHandler, which limits the max number of requests in a batch.
//
// NOTE: This function blocks - you may want to call it in a go-routine.
func Serve(listener net.Listener, handler http.Handler, logger log.Logger, config *Config) error {
logger.Info("serve", "msg", log.NewLazySprintf("Starting RPC HTTP server on %s", listener.Addr()))

handlers := maxBatchRequestHandler{
h: maxBytesHandler{
h: handler,
n: config.MaxBodyBytes,
},
NewHeaderName: "Max-Batch-Request-Num",
NewHeaderValue: strconv.Itoa(config.MaxBatchRequestNum),
}

s := &http.Server{
Handler: RecoverAndLogHandler(maxBytesHandler{h: handler, n: config.MaxBodyBytes}, logger),
Handler: RecoverAndLogHandler(handlers, logger),
ReadTimeout: config.ReadTimeout,
ReadHeaderTimeout: config.ReadTimeout,
WriteTimeout: config.WriteTimeout,
Expand All @@ -69,8 +84,9 @@ func Serve(listener net.Listener, handler http.Handler, logger log.Logger, confi
}

// Serve creates a http.Server and calls ServeTLS with the given listener,
// certFile and keyFile. It wraps handler with RecoverAndLogHandler and a
// handler, which limits the max body size to config.MaxBodyBytes.
// certFile and keyFile. It wraps handler with RecoverAndLogHandler and handlers.
// Handlers contain a maxBytesHandler, which limits the max body size to config.MaxBodyBytes and
// a maxBatchRequestHandler, which limits the max number of requests in a batch.
//
// NOTE: This function blocks - you may want to call it in a go-routine.
func ServeTLS(
Expand All @@ -82,8 +98,18 @@ func ServeTLS(
) error {
logger.Info("serve tls", "msg", log.NewLazySprintf("Starting RPC HTTPS server on %s (cert: %q, key: %q)",
listener.Addr(), certFile, keyFile))

handlers := maxBatchRequestHandler{
h: maxBytesHandler{
h: handler,
n: config.MaxBodyBytes,
},
NewHeaderName: "Max-Batch-Request-Num",
NewHeaderValue: strconv.Itoa(config.MaxBatchRequestNum),
}

s := &http.Server{
Handler: RecoverAndLogHandler(maxBytesHandler{h: handler, n: config.MaxBodyBytes}, logger),
Handler: RecoverAndLogHandler(handlers, logger),
ReadTimeout: config.ReadTimeout,
ReadHeaderTimeout: config.ReadTimeout,
WriteTimeout: config.WriteTimeout,
Expand Down Expand Up @@ -261,6 +287,17 @@ func (h maxBytesHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
h.h.ServeHTTP(w, r)
}

type maxBatchRequestHandler struct {
h http.Handler
NewHeaderName string
NewHeaderValue string
}

func (h maxBatchRequestHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
r.Header.Set(h.NewHeaderName, h.NewHeaderValue)
h.h.ServeHTTP(w, r)
}

// Listen starts a new net.Listener on the given address.
// It returns an error if the address is invalid or the call to Listen() fails.
func Listen(addr string, config *Config) (listener net.Listener, err error) {
Expand Down
Loading

0 comments on commit 59b58b6

Please sign in to comment.