Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Split up server functions a bit #16152

Merged
merged 12 commits into from
May 17, 2023
167 changes: 98 additions & 69 deletions server/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package server
import (
"context"
"fmt"
"io"
"net"
"os"
"runtime/pprof"
Expand Down Expand Up @@ -210,6 +211,8 @@ func startStandAlone(svrCtx *Context, appCreator types.AppCreator) error {
return err
}

// TODO: Should we be using startTraceServer, and defer closing the traceWriter?
// right now its left unclosed
traceWriterFile := svrCtx.Viper.GetString(flagTraceStore)
traceWriter, err := openTraceWriter(traceWriterFile)
if err != nil {
Expand All @@ -218,6 +221,7 @@ func startStandAlone(svrCtx *Context, appCreator types.AppCreator) error {

app := appCreator(svrCtx.Logger, db, traceWriter, svrCtx.Viper)

// TODO: should config be getting validated here?
tac0turtle marked this conversation as resolved.
Show resolved Hide resolved
config, err := serverconfig.GetConfig(svrCtx.Viper)
if err != nil {
return err
Expand Down Expand Up @@ -267,33 +271,16 @@ func startInProcess(svrCtx *Context, clientCtx client.Context, appCreator types.
return err
}

traceWriterFile := svrCtx.Viper.GetString(flagTraceStore)
traceWriter, err := openTraceWriter(traceWriterFile)
traceWriter, traceWriterCleanup, err := setupTraceWriter(svrCtx)
if err != nil {
return err
}

// clean up the traceWriter when the server is shutting down
var traceWriterCleanup func()

// if flagTraceStore is not used then traceWriter is nil
if traceWriter != nil {
traceWriterCleanup = func() {
if err = traceWriter.Close(); err != nil {
svrCtx.Logger.Error("failed to close trace writer", "err", err)
}
}
}

config, err := serverconfig.GetConfig(svrCtx.Viper)
// TODO: Should this be moved to the very top of the function?
config, err := getAndValidateConfig(svrCtx)
if err != nil {
return err
}

if err := config.ValidateBasic(); err != nil {
return err
}

app := appCreator(svrCtx.Logger, db, traceWriter, svrCtx.Viper)

nodeKey, err := p2p.LoadOrGenNodeKey(cfg.NodeKeyFile())
Expand Down Expand Up @@ -360,62 +347,17 @@ func startInProcess(svrCtx *Context, clientCtx client.Context, appCreator types.

emitServerInfoMetrics()

var (
apiSrv *api.Server
grpcSrv *grpc.Server
)
var apiSrv *api.Server
ValarDragon marked this conversation as resolved.
Show resolved Hide resolved

ctx, cancelFn := context.WithCancel(context.Background())
g, ctx := errgroup.WithContext(ctx)

// listen for quit signals so the calling parent process can gracefully exit
ListenForQuitSignals(cancelFn, svrCtx.Logger)

if config.GRPC.Enable {
_, port, err := net.SplitHostPort(config.GRPC.Address)
if err != nil {
return err
}

maxSendMsgSize := config.GRPC.MaxSendMsgSize
if maxSendMsgSize == 0 {
maxSendMsgSize = serverconfig.DefaultGRPCMaxSendMsgSize
}

maxRecvMsgSize := config.GRPC.MaxRecvMsgSize
if maxRecvMsgSize == 0 {
maxRecvMsgSize = serverconfig.DefaultGRPCMaxRecvMsgSize
}

grpcAddress := fmt.Sprintf("127.0.0.1:%s", port)

// if gRPC is enabled, configure gRPC client for gRPC gateway
grpcClient, err := grpc.Dial(
grpcAddress,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultCallOptions(
grpc.ForceCodec(codec.NewProtoCodec(clientCtx.InterfaceRegistry).GRPCCodec()),
grpc.MaxCallRecvMsgSize(maxRecvMsgSize),
grpc.MaxCallSendMsgSize(maxSendMsgSize),
),
)
if err != nil {
return err
}

clientCtx = clientCtx.WithGRPCClient(grpcClient)
svrCtx.Logger.Debug("gRPC client assigned to client context", "target", grpcAddress)

grpcSrv, err = servergrpc.NewGRPCServer(clientCtx, app, config.GRPC)
if err != nil {
return err
}

// Start the gRPC server in a goroutine. Note, the provided ctx will ensure
// that the server is gracefully shut down.
g.Go(func() error {
return servergrpc.StartGRPCServer(ctx, svrCtx.Logger.With("module", "grpc-server"), config.GRPC, grpcSrv)
})
grpcSrv, clientCtx, err := startGrpcServer(g, config.GRPC, ctx, clientCtx, svrCtx, app)
if err != nil {
return err
}

if config.API.Enable {
Expand Down Expand Up @@ -468,6 +410,93 @@ func startInProcess(svrCtx *Context, clientCtx client.Context, appCreator types.
return g.Wait()
}

func getAndValidateConfig(svrCtx *Context) (serverconfig.Config, error) {
config, err := serverconfig.GetConfig(svrCtx.Viper)
if err != nil {
return config, err
}

if err := config.ValidateBasic(); err != nil {
return config, err
}
return config, nil
}

func setupTraceWriter(svrCtx *Context) (traceWriter io.WriteCloser, cleanup func(), err error) {
traceWriterFile := svrCtx.Viper.GetString(flagTraceStore)
traceWriter, err = openTraceWriter(traceWriterFile)
if err != nil {
return traceWriter, cleanup, err
}

// clean up the traceWriter when the server is shutting down
cleanup = func() {}

// if flagTraceStore is not used then traceWriter is nil
if traceWriter != nil {
cleanup = func() {
if err = traceWriter.Close(); err != nil {
svrCtx.Logger.Error("failed to close trace writer", "err", err)
}
}
}

return traceWriter, cleanup, nil
}

func startGrpcServer(g *errgroup.Group, config serverconfig.GRPCConfig, ctx context.Context, clientCtx client.Context, svrCtx *Context, app types.Application) (
*grpc.Server, client.Context, error) {
if !config.Enable {
// return grpcServer as nil if gRPC is disabled
return nil, clientCtx, nil
}
_, port, err := net.SplitHostPort(config.Address)
if err != nil {
return nil, clientCtx, err
}

maxSendMsgSize := config.MaxSendMsgSize
if maxSendMsgSize == 0 {
maxSendMsgSize = serverconfig.DefaultGRPCMaxSendMsgSize
}

maxRecvMsgSize := config.MaxRecvMsgSize
if maxRecvMsgSize == 0 {
maxRecvMsgSize = serverconfig.DefaultGRPCMaxRecvMsgSize
}

grpcAddress := fmt.Sprintf("127.0.0.1:%s", port)

// if gRPC is enabled, configure gRPC client for gRPC gateway
grpcClient, err := grpc.Dial(
grpcAddress,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultCallOptions(
grpc.ForceCodec(codec.NewProtoCodec(clientCtx.InterfaceRegistry).GRPCCodec()),
grpc.MaxCallRecvMsgSize(maxRecvMsgSize),
grpc.MaxCallSendMsgSize(maxSendMsgSize),
),
)
if err != nil {
return nil, clientCtx, err
}

clientCtx = clientCtx.WithGRPCClient(grpcClient)
svrCtx.Logger.Debug("gRPC client assigned to client context", "target", grpcAddress)

grpcSrv, err := servergrpc.NewGRPCServer(clientCtx, app, config)
if err != nil {
return nil, clientCtx, err
}

// Start the gRPC server in a goroutine. Note, the provided ctx will ensure
// that the server is gracefully shut down.
g.Go(func() error {
return servergrpc.StartGRPCServer(ctx, svrCtx.Logger.With("module", "grpc-server"), config, grpcSrv)
})
return grpcSrv, clientCtx, nil
}

func startTelemetry(cfg serverconfig.Config) (*telemetry.Metrics, error) {
if !cfg.Telemetry.Enabled {
return nil, nil
Expand Down