From 9329a00e93beb4063ad2123e663fd79abd5fb1b1 Mon Sep 17 00:00:00 2001 From: Dev Ojha Date: Wed, 17 May 2023 14:52:19 +0200 Subject: [PATCH] refactor: Split up server functions a bit (#16152) Co-authored-by: marbar3778 --- server/start.go | 314 ++++++++++++++++++++++++++++-------------------- 1 file changed, 187 insertions(+), 127 deletions(-) diff --git a/server/start.go b/server/start.go index 882ec02b105c..c8c3e51e9322 100644 --- a/server/start.go +++ b/server/start.go @@ -3,6 +3,7 @@ package server import ( "context" "fmt" + "io" "net" "os" "runtime/pprof" @@ -11,6 +12,7 @@ import ( "github.com/armon/go-metrics" "github.com/cometbft/cometbft/abci/server" cmtcmd "github.com/cometbft/cometbft/cmd/cometbft/commands" + cmtcfg "github.com/cometbft/cometbft/config" "github.com/cometbft/cometbft/node" "github.com/cometbft/cometbft/p2p" pvm "github.com/cometbft/cometbft/privval" @@ -210,6 +212,8 @@ func startStandAlone(svrCtx *Context, appCreator types.AppCreator) error { return err } + // TODO: Should we be using startTraceServer, and defer closing the traceWriter? + // right now its left unclosed traceWriterFile := svrCtx.Viper.GetString(flagTraceStore) traceWriter, err := openTraceWriter(traceWriterFile) if err != nil { @@ -223,6 +227,10 @@ func startStandAlone(svrCtx *Context, appCreator types.AppCreator) error { return err } + if err := config.ValidateBasic(); err != nil { + return err + } + if _, err := startTelemetry(config); err != nil { return err } @@ -259,57 +267,32 @@ func startStandAlone(svrCtx *Context, appCreator types.AppCreator) error { } func startInProcess(svrCtx *Context, clientCtx client.Context, appCreator types.AppCreator) error { - cfg := svrCtx.Config - home := cfg.RootDir + cmtCfg := svrCtx.Config + home := cmtCfg.RootDir db, err := openDB(home, GetAppDBBackend(svrCtx.Viper)) if err != nil { return err } - traceWriterFile := svrCtx.Viper.GetString(flagTraceStore) - traceWriter, err := openTraceWriter(traceWriterFile) + svrCfg, err := getAndValidateConfig(svrCtx) if err != nil { return err } - // clean up the traceWriter when the server is shutting down - var traceWriterCleanup func() - - // if flagTraceStore is not used then traceWriter is nil - if traceWriter != nil { - traceWriterCleanup = func() { - if err = traceWriter.Close(); err != nil { - svrCtx.Logger.Error("failed to close trace writer", "err", err) - } - } - } - - config, err := serverconfig.GetConfig(svrCtx.Viper) + traceWriter, traceWriterCleanup, err := setupTraceWriter(svrCtx) if err != nil { return err } - if err := config.ValidateBasic(); err != nil { - return err - } - app := appCreator(svrCtx.Logger, db, traceWriter, svrCtx.Viper) - nodeKey, err := p2p.LoadOrGenNodeKey(cfg.NodeKeyFile()) + // TODO: Move this to only be done if were launching the node. (So not in GRPC-only mode) + nodeKey, err := p2p.LoadOrGenNodeKey(cmtCfg.NodeKeyFile()) if err != nil { return err } - genDocProvider := func() (*cmttypes.GenesisDoc, error) { - appGenesis, err := genutiltypes.AppGenesisFromFile(cfg.GenesisFile()) - if err != nil { - return nil, err - } - - return appGenesis.ToGenesisDoc() - } - var ( tmNode *node.Node gRPCOnly = svrCtx.Viper.GetBool(flagGRPCOnly) @@ -317,125 +300,49 @@ func startInProcess(svrCtx *Context, clientCtx client.Context, appCreator types. if gRPCOnly { svrCtx.Logger.Info("starting node in gRPC only mode; CometBFT is disabled") - config.GRPC.Enable = true + svrCfg.GRPC.Enable = true } else { svrCtx.Logger.Info("starting node with ABCI CometBFT in-process") - - tmNode, err = node.NewNode( - cfg, - pvm.LoadOrGenFilePV(cfg.PrivValidatorKeyFile(), cfg.PrivValidatorStateFile()), - nodeKey, - proxy.NewLocalClientCreator(app), - genDocProvider, - node.DefaultDBProvider, - node.DefaultMetricsProvider(cfg.Instrumentation), - servercmtlog.CometLoggerWrapper{Logger: svrCtx.Logger}, - ) + tmNode, err = startCmtNode(cmtCfg, nodeKey, app, svrCtx) if err != nil { return err } - if err := tmNode.Start(); err != nil { - return err + // Add the tx service to the gRPC router. We only need to register this + // service if API or gRPC is enabled, and avoid doing so in the general + // case, because it spawns a new local CometBFT RPC client. + if svrCfg.API.Enable || svrCfg.GRPC.Enable { + // Re-assign for making the client available below do not use := to avoid + // shadowing the clientCtx variable. + clientCtx = clientCtx.WithClient(local.New(tmNode)) + + app.RegisterTxService(clientCtx) + app.RegisterTendermintService(clientCtx) + app.RegisterNodeService(clientCtx, svrCfg) } } - // Add the tx service to the gRPC router. We only need to register this - // service if API or gRPC is enabled, and avoid doing so in the general - // case, because it spawns a new local CometBFT RPC client. - if (config.API.Enable || config.GRPC.Enable) && tmNode != nil { - // Re-assign for making the client available below do not use := to avoid - // shadowing the clientCtx variable. - clientCtx = clientCtx.WithClient(local.New(tmNode)) - - app.RegisterTxService(clientCtx) - app.RegisterTendermintService(clientCtx) - app.RegisterNodeService(clientCtx, config) - } - - metrics, err := startTelemetry(config) + metrics, err := startTelemetry(svrCfg) if err != nil { return err } emitServerInfoMetrics() - var ( - apiSrv *api.Server - grpcSrv *grpc.Server - ) - ctx, cancelFn := context.WithCancel(context.Background()) g, ctx := errgroup.WithContext(ctx) // listen for quit signals so the calling parent process can gracefully exit ListenForQuitSignals(cancelFn, svrCtx.Logger) - if config.GRPC.Enable { - _, port, err := net.SplitHostPort(config.GRPC.Address) - if err != nil { - return err - } - - maxSendMsgSize := config.GRPC.MaxSendMsgSize - if maxSendMsgSize == 0 { - maxSendMsgSize = serverconfig.DefaultGRPCMaxSendMsgSize - } - - maxRecvMsgSize := config.GRPC.MaxRecvMsgSize - if maxRecvMsgSize == 0 { - maxRecvMsgSize = serverconfig.DefaultGRPCMaxRecvMsgSize - } - - grpcAddress := fmt.Sprintf("127.0.0.1:%s", port) - - // if gRPC is enabled, configure gRPC client for gRPC gateway - grpcClient, err := grpc.Dial( - grpcAddress, - grpc.WithTransportCredentials(insecure.NewCredentials()), - grpc.WithDefaultCallOptions( - grpc.ForceCodec(codec.NewProtoCodec(clientCtx.InterfaceRegistry).GRPCCodec()), - grpc.MaxCallRecvMsgSize(maxRecvMsgSize), - grpc.MaxCallSendMsgSize(maxSendMsgSize), - ), - ) - if err != nil { - return err - } - - clientCtx = clientCtx.WithGRPCClient(grpcClient) - svrCtx.Logger.Debug("gRPC client assigned to client context", "target", grpcAddress) - - grpcSrv, err = servergrpc.NewGRPCServer(clientCtx, app, config.GRPC) - if err != nil { - return err - } - - // Start the gRPC server in a goroutine. Note, the provided ctx will ensure - // that the server is gracefully shut down. - g.Go(func() error { - return servergrpc.StartGRPCServer(ctx, svrCtx.Logger.With("module", "grpc-server"), config.GRPC, grpcSrv) - }) + grpcSrv, clientCtx, err := startGrpcServer(ctx, g, svrCfg.GRPC, clientCtx, svrCtx, app) + if err != nil { + return err } - if config.API.Enable { - genDoc, err := genDocProvider() - if err != nil { - return err - } - - clientCtx := clientCtx.WithHomeDir(home).WithChainID(genDoc.ChainID) - - apiSrv = api.New(clientCtx, svrCtx.Logger.With("module", "api-server"), grpcSrv) - app.RegisterAPIRoutes(apiSrv, config.API) - - if config.Telemetry.Enabled { - apiSrv.SetTelemetry(metrics) - } - - g.Go(func() error { - return apiSrv.Start(ctx, config) - }) + err = startAPIServer(ctx, g, cmtCfg, svrCfg, clientCtx, svrCtx, app, home, grpcSrv, metrics) + if err != nil { + return err } // At this point it is safe to block the process if we're in gRPC-only mode as @@ -454,6 +361,8 @@ func startInProcess(svrCtx *Context, clientCtx client.Context, appCreator types. }) // deferred cleanup function + // TODO: Make a generic cleanup function that takes in several func(), and runs them all. + // then we defer that. defer func() { if tmNode != nil && tmNode.IsRunning() { _ = tmNode.Stop() @@ -468,6 +377,157 @@ func startInProcess(svrCtx *Context, clientCtx client.Context, appCreator types. return g.Wait() } +// TODO: Move nodeKey into being created within the function. +func startCmtNode(cfg *cmtcfg.Config, nodeKey *p2p.NodeKey, app types.Application, svrCtx *Context) (tmNode *node.Node, err error) { + tmNode, err = node.NewNode( + cfg, + pvm.LoadOrGenFilePV(cfg.PrivValidatorKeyFile(), cfg.PrivValidatorStateFile()), + nodeKey, + proxy.NewLocalClientCreator(app), + getGenDocProvider(cfg), + node.DefaultDBProvider, + node.DefaultMetricsProvider(cfg.Instrumentation), + servercmtlog.CometLoggerWrapper{Logger: svrCtx.Logger}, + ) + if err != nil { + return tmNode, err + } + + if err := tmNode.Start(); err != nil { + return tmNode, err + } + return tmNode, nil +} + +func getAndValidateConfig(svrCtx *Context) (serverconfig.Config, error) { + config, err := serverconfig.GetConfig(svrCtx.Viper) + if err != nil { + return config, err + } + + if err := config.ValidateBasic(); err != nil { + return config, err + } + return config, nil +} + +// returns a function which returns the genesis doc from the genesis file. +func getGenDocProvider(cfg *cmtcfg.Config) func() (*cmttypes.GenesisDoc, error) { + return func() (*cmttypes.GenesisDoc, error) { + appGenesis, err := genutiltypes.AppGenesisFromFile(cfg.GenesisFile()) + if err != nil { + return nil, err + } + + return appGenesis.ToGenesisDoc() + } +} + +func setupTraceWriter(svrCtx *Context) (traceWriter io.WriteCloser, cleanup func(), err error) { + traceWriterFile := svrCtx.Viper.GetString(flagTraceStore) + traceWriter, err = openTraceWriter(traceWriterFile) + if err != nil { + return traceWriter, cleanup, err + } + + // clean up the traceWriter when the server is shutting down + cleanup = func() {} + + // if flagTraceStore is not used then traceWriter is nil + if traceWriter != nil { + cleanup = func() { + if err = traceWriter.Close(); err != nil { + svrCtx.Logger.Error("failed to close trace writer", "err", err) + } + } + } + + return traceWriter, cleanup, nil +} + +func startGrpcServer(ctx context.Context, g *errgroup.Group, config serverconfig.GRPCConfig, clientCtx client.Context, svrCtx *Context, app types.Application) ( + *grpc.Server, client.Context, error, +) { + if !config.Enable { + // return grpcServer as nil if gRPC is disabled + return nil, clientCtx, nil + } + _, port, err := net.SplitHostPort(config.Address) + if err != nil { + return nil, clientCtx, err + } + + maxSendMsgSize := config.MaxSendMsgSize + if maxSendMsgSize == 0 { + maxSendMsgSize = serverconfig.DefaultGRPCMaxSendMsgSize + } + + maxRecvMsgSize := config.MaxRecvMsgSize + if maxRecvMsgSize == 0 { + maxRecvMsgSize = serverconfig.DefaultGRPCMaxRecvMsgSize + } + + grpcAddress := fmt.Sprintf("127.0.0.1:%s", port) + + // if gRPC is enabled, configure gRPC client for gRPC gateway + grpcClient, err := grpc.Dial( + grpcAddress, + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithDefaultCallOptions( + grpc.ForceCodec(codec.NewProtoCodec(clientCtx.InterfaceRegistry).GRPCCodec()), + grpc.MaxCallRecvMsgSize(maxRecvMsgSize), + grpc.MaxCallSendMsgSize(maxSendMsgSize), + ), + ) + if err != nil { + return nil, clientCtx, err + } + + clientCtx = clientCtx.WithGRPCClient(grpcClient) + svrCtx.Logger.Debug("gRPC client assigned to client context", "target", grpcAddress) + + grpcSrv, err := servergrpc.NewGRPCServer(clientCtx, app, config) + if err != nil { + return nil, clientCtx, err + } + + // Start the gRPC server in a goroutine. Note, the provided ctx will ensure + // that the server is gracefully shut down. + g.Go(func() error { + return servergrpc.StartGRPCServer(ctx, svrCtx.Logger.With("module", "grpc-server"), config, grpcSrv) + }) + return grpcSrv, clientCtx, nil +} + +func startAPIServer(ctx context.Context, g *errgroup.Group, cmtCfg *cmtcfg.Config, svrCfg serverconfig.Config, + clientCtx client.Context, svrCtx *Context, app types.Application, home string, grpcSrv *grpc.Server, metrics *telemetry.Metrics, +) error { + if !svrCfg.API.Enable { + return nil + } + // TODO: Why do we reload and unmarshal the entire genesis doc in order to get the chain ID. + // surely theres a better way. This is likely a serious node start time overhead. + genDocProvider := getGenDocProvider(cmtCfg) + genDoc, err := genDocProvider() + if err != nil { + return err + } + + clientCtx = clientCtx.WithHomeDir(home).WithChainID(genDoc.ChainID) + + apiSrv := api.New(clientCtx, svrCtx.Logger.With("module", "api-server"), grpcSrv) + app.RegisterAPIRoutes(apiSrv, svrCfg.API) + + if svrCfg.Telemetry.Enabled { + apiSrv.SetTelemetry(metrics) + } + + g.Go(func() error { + return apiSrv.Start(ctx, svrCfg) + }) + return nil +} + func startTelemetry(cfg serverconfig.Config) (*telemetry.Metrics, error) { if !cfg.Telemetry.Enabled { return nil, nil