Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add timeout as parameters in op node #82

Merged
merged 4 commits into from
Nov 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions op-e2e/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -797,6 +797,8 @@ func configureL2(rollupNodeCfg *rollupNode.Config, l2Node WSOrHTTPEndpoint, jwtS
rollupNodeCfg.L2 = &rollupNode.L2EndpointConfig{
L2EngineAddr: l2EndpointConfig,
L2EngineJWTSecret: jwtSecret,
L2RpcTimeout: 10 * time.Second,
L2RpcBatchTimeout: 20 * time.Second,
}
}

Expand Down
42 changes: 42 additions & 0 deletions op-node/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,18 @@ var (
Usage: "Address of L2 Engine JSON-RPC endpoints to use (engine and eth namespace required)",
EnvVars: prefixEnvVars("L2_ENGINE_RPC"),
}
L2RpcTimeout = &cli.DurationFlag{
Name: "l2.rpc-timeout",
Usage: "Timeout for L2 RPC requests",
EnvVars: prefixEnvVars("L2_RPC_TIMEOUT"),
Value: time.Second * 10,
}
L2RpcBatchTimeout = &cli.DurationFlag{
Name: "l2.rpc-batch-timeout",
Usage: "Timeout for L2 RPC batch requests",
EnvVars: prefixEnvVars("L2_RPC_BATCH_TIMEOUT"),
Value: time.Second * 20,
}
RollupConfig = &cli.StringFlag{
Name: "rollup.config",
Usage: "Rollup chain parameters",
Expand All @@ -57,6 +69,30 @@ var (
EnvVars: prefixEnvVars("RPC_PORT"),
Value: 9545, // Note: op-service/rpc/cli.go uses 8545 as the default.
}
RPCListenReadTimeout = &cli.DurationFlag{
Name: "rpc.read-timeout",
Usage: "RPC read timeout",
EnvVars: prefixEnvVars("RPC_READ_TIMEOUT"),
Value: time.Second * 30,
}
RPCListenReadHeaderTimeout = &cli.DurationFlag{
Name: "rpc.read-header-timeout",
Usage: "RPC read header timeout",
EnvVars: prefixEnvVars("RPC_READ_HEADER_TIMEOUT"),
Value: time.Second * 30,
}
RPCListenWriteTimeout = &cli.DurationFlag{
Name: "rpc.write-timeout",
Usage: "RPC write timeout",
EnvVars: prefixEnvVars("RPC_WRITE_TIMEOUT"),
Value: time.Second * 30,
}
RPCListenIdleTimeout = &cli.DurationFlag{
Name: "rpc.idle-timeout",
Usage: "RPC idle timeout",
EnvVars: prefixEnvVars("RPC_IDLE_TIMEOUT"),
Value: time.Second * 120,
}
RPCEnableAdmin = &cli.BoolFlag{
Name: "rpc.enable-admin",
Usage: "Enable the admin API (experimental)",
Expand Down Expand Up @@ -268,6 +304,10 @@ var requiredFlags = []cli.Flag{
var optionalFlags = []cli.Flag{
RPCListenAddr,
RPCListenPort,
RPCListenReadTimeout,
RPCListenReadHeaderTimeout,
RPCListenWriteTimeout,
RPCListenIdleTimeout,
RollupConfig,
Network,
L1TrustRPC,
Expand All @@ -276,6 +316,8 @@ var optionalFlags = []cli.Flag{
L1RPCMaxBatchSize,
L1HTTPPollInterval,
L2EngineJWTSecret,
L2RpcTimeout,
L2RpcBatchTimeout,
VerifierL1Confs,
SequencerEnabledFlag,
SequencerStoppedFlag,
Expand Down
14 changes: 13 additions & 1 deletion op-node/node/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@ type L2EndpointConfig struct {
// JWT secrets for L2 Engine API authentication during HTTP or initial Websocket communication.
// Any value for an IPC connection.
L2EngineJWTSecret [32]byte

// L2RpcTimeout specifies the timeout for L2 RPC requests.
L2RpcTimeout time.Duration

// L2RpcBatchTimeout specifies the timeout for L2 RPC batch requests.
L2RpcBatchTimeout time.Duration
}

var _ L2EndpointSetup = (*L2EndpointConfig)(nil)
Expand All @@ -50,7 +56,12 @@ func (cfg *L2EndpointConfig) Check() error {
if cfg.L2EngineAddr == "" {
return errors.New("empty L2 Engine Address")
}

if cfg.L2RpcTimeout == 0 {
return fmt.Errorf("L2 RPC timeout cannot be 0")
}
if cfg.L2RpcBatchTimeout == 0 {
return fmt.Errorf("L2 RPC batch timeout cannot be 0")
}
return nil
}

Expand All @@ -62,6 +73,7 @@ func (cfg *L2EndpointConfig) Setup(ctx context.Context, log log.Logger, rollupCf
opts := []client.RPCOption{
client.WithGethRPCOptions(auth),
client.WithDialBackoff(10),
client.WithTimeout(cfg.L2RpcTimeout, cfg.L2RpcBatchTimeout),
}
l2Node, err := client.NewRPC(ctx, log, cfg.L2EngineAddr, opts...)
if err != nil {
Expand Down
8 changes: 5 additions & 3 deletions op-node/node/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/driver"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync"
"github.com/ethereum-optimism/optimism/op-service/httputil"
oppprof "github.com/ethereum-optimism/optimism/op-service/pprof"
"github.com/ethereum/go-ethereum/log"
)
Expand Down Expand Up @@ -63,9 +64,10 @@ type Config struct {
}

type RPCConfig struct {
ListenAddr string
ListenPort int
EnableAdmin bool
ListenAddr string
ListenPort int
EnableAdmin bool
ListenTimeout *httputil.HTTPTimeouts
}

func (cfg *RPCConfig) HttpEndpoint() string {
Expand Down
13 changes: 10 additions & 3 deletions op-node/node/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"net/http"
"strconv"

"github.com/ethereum-optimism/optimism/op-service/httputil"
ophttp "github.com/ethereum-optimism/optimism/op-service/httputil"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/node"
Expand All @@ -25,9 +26,14 @@ type rpcServer struct {
appVersion string
log log.Logger
sources.L2Client
rpcServerTimeout httputil.HTTPTimeouts
}

func newRPCServer(ctx context.Context, rpcCfg *RPCConfig, rollupCfg *rollup.Config, l2Client l2EthClient, dr driverClient, log log.Logger, appVersion string, m metrics.Metricer) (*rpcServer, error) {
rpcServerTimeout := httputil.DefaultTimeouts
if rpcCfg.ListenTimeout != nil {
rpcServerTimeout = *rpcCfg.ListenTimeout
}
api := NewNodeAPI(rollupCfg, l2Client, dr, log.New("rpc", "node"), m)
// TODO: extend RPC config with options for WS, IPC and HTTP RPC connections
endpoint := net.JoinHostPort(rpcCfg.ListenAddr, strconv.Itoa(rpcCfg.ListenPort))
Expand All @@ -38,8 +44,9 @@ func newRPCServer(ctx context.Context, rpcCfg *RPCConfig, rollupCfg *rollup.Conf
Service: api,
Authenticated: false,
}},
appVersion: appVersion,
log: log,
appVersion: appVersion,
log: log,
rpcServerTimeout: rpcServerTimeout,
}
return r, nil
}
Expand Down Expand Up @@ -78,7 +85,7 @@ func (s *rpcServer) Start() error {
mux.Handle("/", nodeHandler)
mux.HandleFunc("/healthz", healthzHandler(s.appVersion))

hs, err := ophttp.StartHTTPServer(s.endpoint, mux)
hs, err := ophttp.StartHTTPServer(s.endpoint, mux, ophttp.WithTimeouts(s.rpcServerTimeout))
if err != nil {
return fmt.Errorf("failed to start HTTP RPC server: %w", err)
}
Expand Down
40 changes: 40 additions & 0 deletions op-node/node/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"math/rand"
"testing"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/stretchr/testify/assert"
Expand All @@ -19,6 +20,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/version"
rpcclient "github.com/ethereum-optimism/optimism/op-service/client"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/httputil"
"github.com/ethereum-optimism/optimism/op-service/testlog"
"github.com/ethereum-optimism/optimism/op-service/testutils"
)
Expand Down Expand Up @@ -197,6 +199,44 @@ func TestSyncStatus(t *testing.T) {
assert.Equal(t, status, out)
}

func TestTimeout(t *testing.T) {
log := testlog.Logger(t, log.LvlError)
l2Client := &testutils.MockL2Client{}
drClient := &mockDriverClient{}
rng := rand.New(rand.NewSource(1234))
status := randomSyncStatus(rng)
drClient.On("SyncStatus").Run(func(args mock.Arguments) {
time.Sleep(2 * time.Second)
}).Return(status)

rpcCfg := &RPCConfig{
ListenAddr: "localhost",
ListenPort: 0,
ListenTimeout: &httputil.HTTPTimeouts{
ReadTimeout: 1 * time.Second,
ReadHeaderTimeout: 1 * time.Second,
WriteTimeout: 1 * time.Second,
IdleTimeout: 1 * time.Second,
},
}
rollupCfg := &rollup.Config{
// ignore other rollup config info in this test
}
server, err := newRPCServer(context.Background(), rpcCfg, rollupCfg, l2Client, drClient, log, "0.0", metrics.NoopMetrics)
assert.NoError(t, err)
assert.NoError(t, server.Start())
defer func() {
require.NoError(t, server.Stop(context.Background()))
}()

client, err := rpcclient.NewRPC(context.Background(), log, "http://"+server.Addr().String(), rpcclient.WithDialBackoff(3))
assert.NoError(t, err)

var out *eth.SyncStatus
err = client.CallContext(context.Background(), &out, "optimism_syncStatus")
assert.ErrorContains(t, err, "request timed out")
}

type mockDriverClient struct {
mock.Mock
}
Expand Down
13 changes: 11 additions & 2 deletions op-node/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"strings"

"github.com/ethereum-optimism/optimism/op-node/chaincfg"
"github.com/ethereum-optimism/optimism/op-service/httputil"
oppprof "github.com/ethereum-optimism/optimism/op-service/pprof"
"github.com/ethereum-optimism/optimism/op-service/sources"
"github.com/urfave/cli/v2"
Expand Down Expand Up @@ -78,8 +79,14 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*node.Config, error) {
Rollup: *rollupConfig,
Driver: *driverConfig,
RPC: node.RPCConfig{
ListenAddr: ctx.String(flags.RPCListenAddr.Name),
ListenPort: ctx.Int(flags.RPCListenPort.Name),
ListenAddr: ctx.String(flags.RPCListenAddr.Name),
ListenPort: ctx.Int(flags.RPCListenPort.Name),
ListenTimeout: &httputil.HTTPTimeouts{
ReadTimeout: ctx.Duration(flags.RPCListenReadTimeout.Name),
ReadHeaderTimeout: ctx.Duration(flags.RPCListenReadHeaderTimeout.Name),
WriteTimeout: ctx.Duration(flags.RPCListenWriteTimeout.Name),
IdleTimeout: ctx.Duration(flags.RPCListenIdleTimeout.Name),
},
EnableAdmin: ctx.Bool(flags.RPCEnableAdmin.Name),
},
Metrics: node.MetricsConfig{
Expand Down Expand Up @@ -154,6 +161,8 @@ func NewL2EndpointConfig(ctx *cli.Context, log log.Logger) (*node.L2EndpointConf
return &node.L2EndpointConfig{
L2EngineAddr: l2Addr,
L2EngineJWTSecret: secret,
L2RpcTimeout: ctx.Duration(flags.L2RpcTimeout.Name),
L2RpcBatchTimeout: ctx.Duration(flags.L2RpcBatchTimeout.Name),
}, nil
}

Expand Down
36 changes: 30 additions & 6 deletions op-service/client/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@ import (
"github.com/ethereum/go-ethereum/rpc"
)

var httpRegex = regexp.MustCompile("^http(s)?://")
var (
httpRegex = regexp.MustCompile("^http(s)?://")
defaultRpcTimeout = 10 * time.Second
defaultRpcBatchTimeout = 20 * time.Second
)

type RPC interface {
Close()
Expand All @@ -33,6 +37,8 @@ type rpcConfig struct {
backoffAttempts int
limit float64
burst int
rpcTimeout *time.Duration
rpcBatchTimeout *time.Duration
}

type RPCOption func(cfg *rpcConfig) error
Expand Down Expand Up @@ -72,6 +78,15 @@ func WithRateLimit(rateLimit float64, burst int) RPCOption {
}
}

// WithTimeout configures the RPC to timeout after the given duration.
func WithTimeout(timeout, batchTimeout time.Duration) RPCOption {
return func(cfg *rpcConfig) error {
cfg.rpcTimeout = &timeout
cfg.rpcBatchTimeout = &batchTimeout
return nil
}
}

// NewRPC returns the correct client.RPC instance for a given RPC url.
func NewRPC(ctx context.Context, lgr log.Logger, addr string, opts ...RPCOption) (RPC, error) {
var cfg rpcConfig
Expand All @@ -90,7 +105,14 @@ func NewRPC(ctx context.Context, lgr log.Logger, addr string, opts ...RPCOption)
return nil, err
}

var wrapped RPC = &BaseRPCClient{c: underlying}
baseRPCClient := &BaseRPCClient{c: underlying, rpcTimeout: defaultRpcTimeout, rpcBatchTimeout: defaultRpcBatchTimeout}
if cfg.rpcTimeout != nil {
baseRPCClient.rpcTimeout = *cfg.rpcTimeout
}
if cfg.rpcBatchTimeout != nil {
baseRPCClient.rpcBatchTimeout = *cfg.rpcBatchTimeout
}
var wrapped RPC = baseRPCClient

if cfg.limit != 0 {
wrapped = NewRateLimitingClient(wrapped, rate.Limit(cfg.limit), cfg.burst)
Expand Down Expand Up @@ -152,25 +174,27 @@ func IsURLAvailable(address string) bool {
// with the client.RPC interface.
// It sets a timeout of 10s on CallContext & 20s on BatchCallContext made through it.
type BaseRPCClient struct {
c *rpc.Client
c *rpc.Client
rpcTimeout time.Duration
rpcBatchTimeout time.Duration
}

func NewBaseRPCClient(c *rpc.Client) *BaseRPCClient {
return &BaseRPCClient{c: c}
return &BaseRPCClient{c: c, rpcTimeout: defaultRpcTimeout, rpcBatchTimeout: defaultRpcBatchTimeout}
}

func (b *BaseRPCClient) Close() {
b.c.Close()
}

func (b *BaseRPCClient) CallContext(ctx context.Context, result any, method string, args ...any) error {
cCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
cCtx, cancel := context.WithTimeout(ctx, b.rpcTimeout)
defer cancel()
return b.c.CallContext(cCtx, result, method, args...)
}

func (b *BaseRPCClient) BatchCallContext(ctx context.Context, batch []rpc.BatchElem) error {
cCtx, cancel := context.WithTimeout(ctx, 20*time.Second)
cCtx, cancel := context.WithTimeout(ctx, b.rpcBatchTimeout)
defer cancel()
return b.c.BatchCallContext(cCtx, batch)
}
Expand Down