Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Split up server functions a bit #16152

Merged
merged 12 commits into from
May 17, 2023
275 changes: 160 additions & 115 deletions server/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package server
import (
"context"
"fmt"
"io"
"net"
"os"
"runtime/pprof"
Expand All @@ -11,6 +12,7 @@ import (
"github.com/armon/go-metrics"
"github.com/cometbft/cometbft/abci/server"
cmtcmd "github.com/cometbft/cometbft/cmd/cometbft/commands"
cmtcfg "github.com/cometbft/cometbft/config"
"github.com/cometbft/cometbft/node"
"github.com/cometbft/cometbft/p2p"
pvm "github.com/cometbft/cometbft/privval"
Expand Down Expand Up @@ -210,6 +212,8 @@ func startStandAlone(svrCtx *Context, appCreator types.AppCreator) error {
return err
}

// TODO: Should we be using startTraceServer, and defer closing the traceWriter?
// right now its left unclosed
traceWriterFile := svrCtx.Viper.GetString(flagTraceStore)
traceWriter, err := openTraceWriter(traceWriterFile)
if err != nil {
Expand All @@ -218,6 +222,7 @@ func startStandAlone(svrCtx *Context, appCreator types.AppCreator) error {

app := appCreator(svrCtx.Logger, db, traceWriter, svrCtx.Viper)

// TODO: should config be getting validated here?
tac0turtle marked this conversation as resolved.
Show resolved Hide resolved
config, err := serverconfig.GetConfig(svrCtx.Viper)
if err != nil {
return err
Expand Down Expand Up @@ -259,182 +264,99 @@ func startStandAlone(svrCtx *Context, appCreator types.AppCreator) error {
}

func startInProcess(svrCtx *Context, clientCtx client.Context, appCreator types.AppCreator) error {
cfg := svrCtx.Config
home := cfg.RootDir
cmtCfg := svrCtx.Config
home := cmtCfg.RootDir

db, err := openDB(home, GetAppDBBackend(svrCtx.Viper))
if err != nil {
return err
}

traceWriterFile := svrCtx.Viper.GetString(flagTraceStore)
traceWriter, err := openTraceWriter(traceWriterFile)
traceWriter, traceWriterCleanup, err := setupTraceWriter(svrCtx)
if err != nil {
return err
}

// clean up the traceWriter when the server is shutting down
var traceWriterCleanup func()

// if flagTraceStore is not used then traceWriter is nil
if traceWriter != nil {
traceWriterCleanup = func() {
if err = traceWriter.Close(); err != nil {
svrCtx.Logger.Error("failed to close trace writer", "err", err)
}
}
}

config, err := serverconfig.GetConfig(svrCtx.Viper)
// TODO: Should this be moved to the very top of the function?
svrCfg, err := getAndValidateConfig(svrCtx)
if err != nil {
return err
}

if err := config.ValidateBasic(); err != nil {
return err
}

app := appCreator(svrCtx.Logger, db, traceWriter, svrCtx.Viper)

nodeKey, err := p2p.LoadOrGenNodeKey(cfg.NodeKeyFile())
// TODO: Move this to only be done if were launching the node. (So not in GRPC-only mode)
nodeKey, err := p2p.LoadOrGenNodeKey(cmtCfg.NodeKeyFile())
Comment on lines +290 to +291
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be changed, but would technically change the behavior for Grpc-Only nodes, so would want it to be in its own PR.

if err != nil {
return err
}

genDocProvider := func() (*cmttypes.GenesisDoc, error) {
appGenesis, err := genutiltypes.AppGenesisFromFile(cfg.GenesisFile())
if err != nil {
return nil, err
}

return appGenesis.ToGenesisDoc()
}

var (
tmNode *node.Node
gRPCOnly = svrCtx.Viper.GetBool(flagGRPCOnly)
)

if gRPCOnly {
svrCtx.Logger.Info("starting node in gRPC only mode; CometBFT is disabled")
config.GRPC.Enable = true
svrCfg.GRPC.Enable = true
} else {
svrCtx.Logger.Info("starting node with ABCI CometBFT in-process")

tmNode, err = node.NewNode(
cfg,
pvm.LoadOrGenFilePV(cfg.PrivValidatorKeyFile(), cfg.PrivValidatorStateFile()),
nodeKey,
proxy.NewLocalClientCreator(app),
genDocProvider,
node.DefaultDBProvider,
node.DefaultMetricsProvider(cfg.Instrumentation),
servercmtlog.CometLoggerWrapper{Logger: svrCtx.Logger},
)
tmNode, err = startCmtNode(cmtCfg, nodeKey, app, svrCtx)
if err != nil {
return err
}

if err := tmNode.Start(); err != nil {
return err
// Add the tx service to the gRPC router. We only need to register this
// service if API or gRPC is enabled, and avoid doing so in the general
// case, because it spawns a new local CometBFT RPC client.
if svrCfg.API.Enable || svrCfg.GRPC.Enable {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Notice that its equivalent to move this here, as tmNode != nil implies its not GRPC only.

// Re-assign for making the client available below do not use := to avoid
// shadowing the clientCtx variable.
clientCtx = clientCtx.WithClient(local.New(tmNode))

app.RegisterTxService(clientCtx)
app.RegisterTendermintService(clientCtx)
app.RegisterNodeService(clientCtx, svrCfg)
}
}

// Add the tx service to the gRPC router. We only need to register this
// service if API or gRPC is enabled, and avoid doing so in the general
// case, because it spawns a new local CometBFT RPC client.
if (config.API.Enable || config.GRPC.Enable) && tmNode != nil {
// Re-assign for making the client available below do not use := to avoid
// shadowing the clientCtx variable.
clientCtx = clientCtx.WithClient(local.New(tmNode))

app.RegisterTxService(clientCtx)
app.RegisterTendermintService(clientCtx)
app.RegisterNodeService(clientCtx, config)
}

metrics, err := startTelemetry(config)
metrics, err := startTelemetry(svrCfg)
if err != nil {
return err
}

emitServerInfoMetrics()

var (
apiSrv *api.Server
grpcSrv *grpc.Server
)

ctx, cancelFn := context.WithCancel(context.Background())
g, ctx := errgroup.WithContext(ctx)

// listen for quit signals so the calling parent process can gracefully exit
ListenForQuitSignals(cancelFn, svrCtx.Logger)

if config.GRPC.Enable {
_, port, err := net.SplitHostPort(config.GRPC.Address)
if err != nil {
return err
}

maxSendMsgSize := config.GRPC.MaxSendMsgSize
if maxSendMsgSize == 0 {
maxSendMsgSize = serverconfig.DefaultGRPCMaxSendMsgSize
}

maxRecvMsgSize := config.GRPC.MaxRecvMsgSize
if maxRecvMsgSize == 0 {
maxRecvMsgSize = serverconfig.DefaultGRPCMaxRecvMsgSize
}

grpcAddress := fmt.Sprintf("127.0.0.1:%s", port)

// if gRPC is enabled, configure gRPC client for gRPC gateway
grpcClient, err := grpc.Dial(
grpcAddress,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultCallOptions(
grpc.ForceCodec(codec.NewProtoCodec(clientCtx.InterfaceRegistry).GRPCCodec()),
grpc.MaxCallRecvMsgSize(maxRecvMsgSize),
grpc.MaxCallSendMsgSize(maxSendMsgSize),
),
)
if err != nil {
return err
}

clientCtx = clientCtx.WithGRPCClient(grpcClient)
svrCtx.Logger.Debug("gRPC client assigned to client context", "target", grpcAddress)

grpcSrv, err = servergrpc.NewGRPCServer(clientCtx, app, config.GRPC)
if err != nil {
return err
}

// Start the gRPC server in a goroutine. Note, the provided ctx will ensure
// that the server is gracefully shut down.
g.Go(func() error {
return servergrpc.StartGRPCServer(ctx, svrCtx.Logger.With("module", "grpc-server"), config.GRPC, grpcSrv)
})
grpcSrv, clientCtx, err := startGrpcServer(ctx, g, svrCfg.GRPC, clientCtx, svrCtx, app)
if err != nil {
return err
}

if config.API.Enable {
if svrCfg.API.Enable {
// TODO: Why do we reload and unmarshal the entire genesis doc in order to get the chain ID.
// surely theres a better way. This is likely a serious node start time overhead.
genDocProvider := getGenDocProvider(cmtCfg)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixing this (in another PR) would be really helpful!

Maybe should become a follow-up issue?

genDoc, err := genDocProvider()
if err != nil {
return err
}

clientCtx := clientCtx.WithHomeDir(home).WithChainID(genDoc.ChainID)

apiSrv = api.New(clientCtx, svrCtx.Logger.With("module", "api-server"), grpcSrv)
app.RegisterAPIRoutes(apiSrv, config.API)
apiSrv := api.New(clientCtx, svrCtx.Logger.With("module", "api-server"), grpcSrv)
app.RegisterAPIRoutes(apiSrv, svrCfg.API)

if config.Telemetry.Enabled {
if svrCfg.Telemetry.Enabled {
apiSrv.SetTelemetry(metrics)
}

g.Go(func() error {
return apiSrv.Start(ctx, config)
return apiSrv.Start(ctx, svrCfg)
})
}

Expand All @@ -454,6 +376,8 @@ func startInProcess(svrCtx *Context, clientCtx client.Context, appCreator types.
})

// deferred cleanup function
// TODO: Make a generic cleanup function that takes in several func(), and runs them all.
// then we defer that.
defer func() {
if tmNode != nil && tmNode.IsRunning() {
_ = tmNode.Stop()
Expand All @@ -468,6 +392,127 @@ func startInProcess(svrCtx *Context, clientCtx client.Context, appCreator types.
return g.Wait()
}

// TODO: Move nodeKey into being created within the function.
func startCmtNode(cfg *cmtcfg.Config, nodeKey *p2p.NodeKey, app types.Application, svrCtx *Context) (tmNode *node.Node, err error) {
tmNode, err = node.NewNode(
cfg,
pvm.LoadOrGenFilePV(cfg.PrivValidatorKeyFile(), cfg.PrivValidatorStateFile()),
nodeKey,
proxy.NewLocalClientCreator(app),
getGenDocProvider(cfg),
node.DefaultDBProvider,
node.DefaultMetricsProvider(cfg.Instrumentation),
servercmtlog.CometLoggerWrapper{Logger: svrCtx.Logger},
)
if err != nil {
return tmNode, err
}

if err := tmNode.Start(); err != nil {
return tmNode, err
}
return tmNode, nil
}

func getAndValidateConfig(svrCtx *Context) (serverconfig.Config, error) {
config, err := serverconfig.GetConfig(svrCtx.Viper)
if err != nil {
return config, err
}

if err := config.ValidateBasic(); err != nil {
return config, err
}
return config, nil
}

// returns a function which returns the genesis doc from the genesis file.
func getGenDocProvider(cfg *cmtcfg.Config) func() (*cmttypes.GenesisDoc, error) {
return func() (*cmttypes.GenesisDoc, error) {
appGenesis, err := genutiltypes.AppGenesisFromFile(cfg.GenesisFile())
if err != nil {
return nil, err
}

return appGenesis.ToGenesisDoc()
}
}

func setupTraceWriter(svrCtx *Context) (traceWriter io.WriteCloser, cleanup func(), err error) {
traceWriterFile := svrCtx.Viper.GetString(flagTraceStore)
traceWriter, err = openTraceWriter(traceWriterFile)
if err != nil {
return traceWriter, cleanup, err
}

// clean up the traceWriter when the server is shutting down
cleanup = func() {}

// if flagTraceStore is not used then traceWriter is nil
if traceWriter != nil {
cleanup = func() {
if err = traceWriter.Close(); err != nil {
svrCtx.Logger.Error("failed to close trace writer", "err", err)
}
}
}

return traceWriter, cleanup, nil
}

func startGrpcServer(ctx context.Context, g *errgroup.Group, config serverconfig.GRPCConfig, clientCtx client.Context, svrCtx *Context, app types.Application) (
*grpc.Server, client.Context, error) {
if !config.Enable {
// return grpcServer as nil if gRPC is disabled
return nil, clientCtx, nil
}
_, port, err := net.SplitHostPort(config.Address)
if err != nil {
return nil, clientCtx, err
}

maxSendMsgSize := config.MaxSendMsgSize
if maxSendMsgSize == 0 {
maxSendMsgSize = serverconfig.DefaultGRPCMaxSendMsgSize
}

maxRecvMsgSize := config.MaxRecvMsgSize
if maxRecvMsgSize == 0 {
maxRecvMsgSize = serverconfig.DefaultGRPCMaxRecvMsgSize
}

grpcAddress := fmt.Sprintf("127.0.0.1:%s", port)

// if gRPC is enabled, configure gRPC client for gRPC gateway
grpcClient, err := grpc.Dial(
grpcAddress,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultCallOptions(
grpc.ForceCodec(codec.NewProtoCodec(clientCtx.InterfaceRegistry).GRPCCodec()),
grpc.MaxCallRecvMsgSize(maxRecvMsgSize),
grpc.MaxCallSendMsgSize(maxSendMsgSize),
),
)
if err != nil {
return nil, clientCtx, err
}

clientCtx = clientCtx.WithGRPCClient(grpcClient)
svrCtx.Logger.Debug("gRPC client assigned to client context", "target", grpcAddress)

grpcSrv, err := servergrpc.NewGRPCServer(clientCtx, app, config)
if err != nil {
return nil, clientCtx, err
}

// Start the gRPC server in a goroutine. Note, the provided ctx will ensure
// that the server is gracefully shut down.
g.Go(func() error {
return servergrpc.StartGRPCServer(ctx, svrCtx.Logger.With("module", "grpc-server"), config, grpcSrv)
})
return grpcSrv, clientCtx, nil
}

func startTelemetry(cfg serverconfig.Config) (*telemetry.Metrics, error) {
if !cfg.Telemetry.Enabled {
return nil, nil
Expand Down