Skip to content

Commit

Permalink
rpc: Add experimental config params to allow for subscription buffer …
Browse files Browse the repository at this point in the history
…size control (tm v0.34.x) (#7230)

A workaround for #6729. Add parameters to control buffer sizes for
event subscription RPC clients. On some networks, buffering causes
clients to be dropped and/or events to be lost.

For additional context, see the discussion on #7188.

- Add experimental_subscription_buffer_size config parameter
- Add experimental_websocket_write_buffer_size config parameter
- Add experimental_close_on_slow_client config parameter

Co-authored-by: M. J. Fromberger <[email protected]>
Co-authored-by: Thane Thomson <[email protected]>
  • Loading branch information
3 people committed Feb 25, 2022
1 parent cd4d897 commit b80e5ef
Show file tree
Hide file tree
Showing 10 changed files with 92 additions and 20 deletions.
40 changes: 40 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ var (

defaultNodeKeyPath = filepath.Join(defaultConfigDir, defaultNodeKeyName)
defaultAddrBookPath = filepath.Join(defaultConfigDir, defaultAddrBookName)

minSubscriptionBufferSize = 100
defaultSubscriptionBufferSize = 200
)

// Config defines the top level configuration for an Ostracon node
Expand Down Expand Up @@ -387,6 +390,29 @@ type RPCConfig struct {
// to the estimated maximum number of broadcast_tx_commit calls per block.
MaxSubscriptionsPerClient int `mapstructure:"max_subscriptions_per_client"`

// The number of events that can be buffered per subscription before
// returning `ErrOutOfCapacity`.
SubscriptionBufferSize int `mapstructure:"experimental_subscription_buffer_size"`

// The maximum number of responses that can be buffered per WebSocket
// client. If clients cannot read from the WebSocket endpoint fast enough,
// they will be disconnected, so increasing this parameter may reduce the
// chances of them being disconnected (but will cause the node to use more
// memory).
//
// Must be at least the same as `SubscriptionBufferSize`, otherwise
// connections may be dropped unnecessarily.
WebSocketWriteBufferSize int `mapstructure:"experimental_websocket_write_buffer_size"`

// If a WebSocket client cannot read fast enough, at present we may
// silently drop events instead of generating an error or disconnecting the
// client.
//
// Enabling this parameter will cause the WebSocket connection to be closed
// instead if it cannot read fast enough, allowing for greater
// predictability in subscription behaviour.
CloseOnSlowClient bool `mapstructure:"experimental_close_on_slow_client"`

// How long to wait for a tx to be committed during /broadcast_tx_commit
// WARNING: Using a value larger than 'WriteTimeout' will result in increasing the
// global HTTP write timeout, which applies to all connections and endpoints.
Expand Down Expand Up @@ -439,7 +465,9 @@ func DefaultRPCConfig() *RPCConfig {

MaxSubscriptionClients: 100,
MaxSubscriptionsPerClient: 5,
SubscriptionBufferSize: defaultSubscriptionBufferSize,
TimeoutBroadcastTxCommit: 10 * time.Second,
WebSocketWriteBufferSize: defaultSubscriptionBufferSize,

MaxBodyBytes: int64(1000000), // 1MB
MaxHeaderBytes: 1 << 20, // same as the net/http default
Expand Down Expand Up @@ -482,6 +510,18 @@ func (cfg *RPCConfig) ValidateBasic() error {
if cfg.MaxSubscriptionsPerClient < 0 {
return errors.New("max_subscriptions_per_client can't be negative")
}
if cfg.SubscriptionBufferSize < minSubscriptionBufferSize {
return fmt.Errorf(
"experimental_subscription_buffer_size must be >= %d",
minSubscriptionBufferSize,
)
}
if cfg.WebSocketWriteBufferSize < cfg.SubscriptionBufferSize {
return fmt.Errorf(
"experimental_websocket_write_buffer_size must be >= experimental_subscription_buffer_size (%d)",
cfg.SubscriptionBufferSize,
)
}
if cfg.TimeoutBroadcastTxCommit < 0 {
return errors.New("timeout_broadcast_tx_commit can't be negative")
}
Expand Down
27 changes: 27 additions & 0 deletions config/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,33 @@ max_subscription_clients = {{ .RPC.MaxSubscriptionClients }}
# the estimated # maximum number of broadcast_tx_commit calls per block.
max_subscriptions_per_client = {{ .RPC.MaxSubscriptionsPerClient }}
# Experimental parameter to specify the maximum number of events a node will
# buffer, per subscription, before returning an error and closing the
# subscription. Must be set to at least 100, but higher values will accommodate
# higher event throughput rates (and will use more memory).
experimental_subscription_buffer_size = {{ .RPC.SubscriptionBufferSize }}
# Experimental parameter to specify the maximum number of RPC responses that
# can be buffered per WebSocket client. If clients cannot read from the
# WebSocket endpoint fast enough, they will be disconnected, so increasing this
# parameter may reduce the chances of them being disconnected (but will cause
# the node to use more memory).
#
# Must be at least the same as "experimental_subscription_buffer_size",
# otherwise connections could be dropped unnecessarily. This value should
# ideally be somewhat higher than "experimental_subscription_buffer_size" to
# accommodate non-subscription-related RPC responses.
experimental_websocket_write_buffer_size = {{ .RPC.WebSocketWriteBufferSize }}
# If a WebSocket client cannot read fast enough, at present we may
# silently drop events instead of generating an error or disconnecting the
# client.
#
# Enabling this experimental parameter will cause the WebSocket connection to
# be closed instead if it cannot read fast enough, allowing for greater
# predictability in subscription behaviour.
experimental_close_on_slow_client = {{ .RPC.CloseOnSlowClient }}
# How long to wait for a tx to be committed during /broadcast_tx_commit.
# WARNING: Using a value larger than 'WriteTimeout' will result in increasing the
# global HTTP write timeout, which applies to all connections and endpoints.
Expand Down
2 changes: 1 addition & 1 deletion libs/pubsub/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ var (

// ErrOutOfCapacity is returned by Err when a client is not pulling messages
// fast enough. Note the client's subscription will be terminated.
ErrOutOfCapacity = errors.New("client is not pulling messages fast enough")
ErrOutOfCapacity = errors.New("internal subscription event buffer is out of capacity")
)

// A Subscription represents a client subscription for a particular query and
Expand Down
1 change: 1 addition & 0 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -1115,6 +1115,7 @@ func (n *Node) startRPC() ([]net.Listener, error) {
}
}),
rpcserver.ReadLimit(config.MaxBodyBytes),
rpcserver.WriteChanCapacity(n.config.RPC.WebSocketWriteBufferSize),
)
wm.SetLogger(wmLogger)
mux.HandleFunc("/websocket", wm.WebsocketHandler)
Expand Down
3 changes: 1 addition & 2 deletions proxy/mocks/app_conn_consensus.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions proxy/mocks/app_conn_mempool.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 17 additions & 7 deletions rpc/core/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package core

import (
"context"
"errors"
"fmt"
"time"

Expand All @@ -11,11 +12,6 @@ import (
rpctypes "github.com/line/ostracon/rpc/jsonrpc/types"
)

const (
// Buffer on the Ostracon (server) side to allow some slowness in clients.
subBufferSize = 100
)

// Subscribe for events via WebSocket.
// More: https://docs.tendermint.com/master/rpc/#/Websocket/subscribe
func Subscribe(ctx *rpctypes.Context, query string) (*ctypes.ResultSubscribe, error) {
Expand All @@ -37,11 +33,13 @@ func Subscribe(ctx *rpctypes.Context, query string) (*ctypes.ResultSubscribe, er
subCtx, cancel := context.WithTimeout(ctx.Context(), SubscribeTimeout)
defer cancel()

sub, err := env.EventBus.Subscribe(subCtx, addr, q, subBufferSize)
sub, err := env.EventBus.Subscribe(subCtx, addr, q, env.Config.SubscriptionBufferSize)
if err != nil {
return nil, err
}

closeIfSlow := env.Config.CloseOnSlowClient

// Capture the current ID, since it can change in the future.
subscriptionID := ctx.JSONReq.ID
go func() {
Expand All @@ -57,6 +55,18 @@ func Subscribe(ctx *rpctypes.Context, query string) (*ctypes.ResultSubscribe, er
if err := ctx.WSConn.WriteRPCResponse(writeCtx, resp); err != nil {
env.Logger.Info("Can't write response (slow client)",
"to", addr, "subscriptionID", subscriptionID, "err", err)

if closeIfSlow {
var (
err = errors.New("subscription was cancelled (reason: slow client)")
resp = rpctypes.RPCServerError(subscriptionID, err)
)
if !ctx.WSConn.TryWriteRPCResponse(resp) {
env.Logger.Info("Can't write response (slow client)",
"to", addr, "subscriptionID", subscriptionID, "err", err)
}
return
}
}
case <-sub.Cancelled():
if sub.Err() != tmpubsub.ErrUnsubscribed {
Expand All @@ -70,7 +80,7 @@ func Subscribe(ctx *rpctypes.Context, query string) (*ctypes.ResultSubscribe, er
err = fmt.Errorf("subscription was cancelled (reason: %s)", reason)
resp = rpctypes.RPCServerError(subscriptionID, err)
)
if ok := ctx.WSConn.TryWriteRPCResponse(resp); !ok {
if !ctx.WSConn.TryWriteRPCResponse(resp) {
env.Logger.Info("Can't write response (slow client)",
"to", addr, "subscriptionID", subscriptionID, "err", err)
}
Expand Down
3 changes: 1 addition & 2 deletions state/mocks/evidence_pool.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 2 additions & 4 deletions state/mocks/store.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions statesync/mocks/state_provider.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit b80e5ef

Please sign in to comment.