Skip to content

Commit

Permalink
Feat/add server to porter cmd (#2)
Browse files Browse the repository at this point in the history
* wip: Saving state

* wip: Added per rpc connection

* chore: Removed debug prints
  • Loading branch information
sgettys authored Jan 23, 2023
1 parent 1f00986 commit e3d895b
Show file tree
Hide file tree
Showing 10 changed files with 160 additions and 36 deletions.
1 change: 1 addition & 0 deletions cmd/porter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ Try our QuickStart https://getporter.org/quickstart to learn how to use Porter.
cmd.AddCommand(buildCredentialsCommands(p))
cmd.AddCommand(buildParametersCommands(p))
cmd.AddCommand(buildCompletionCommand(p))
cmd.AddCommand(buildServerCommands(p))

for _, alias := range buildAliasCommands(p) {
cmd.AddCommand(alias)
Expand Down
58 changes: 58 additions & 0 deletions cmd/porter/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package main

import (
"time"

grpc "get.porter.sh/porter/pkg/grpc"
"get.porter.sh/porter/pkg/porter"
"get.porter.sh/porter/pkg/signals"
"github.com/spf13/cobra"
)

func buildServerCommands(p *porter.Porter) *cobra.Command {
cmd := &cobra.Command{
Use: "server",
Short: "Server commands",
Long: "Command for working with the gRPC server for porter",
Hidden: true, // This is a hidden command and is currently only meant to be used by the porter operator
}
cmd.Annotations = map[string]string{
"group": "server",
}
cmd.AddCommand(buildServerRunCommand(p))
return cmd
}

func buildServerRunCommand(p *porter.Porter) *cobra.Command {
opts := porter.ListOptions{}
grpcOpts := grpc.Config{
Port: 3333,
}
cmd := &cobra.Command{
Use: "run",
Short: "Run the gRPC server",
Long: `Run the gRPC server for porter.
This command starts the gRPC server for porter which is able to expose limited porter functionality via RPC.
Currently only data operations are supported, creation of resources such as installations, credential sets, or parameter sets is not supported.
A list of the supported RPCs can be found at <link?>
`,
PreRunE: func(cmd *cobra.Command, args []string) error {
return opts.Validate()
},
RunE: func(cmd *cobra.Command, args []string) error {
srv, err := grpc.NewServer(cmd.Context(), &grpcOpts)
if err != nil {
return err
}
grpcServer, err := srv.ListenAndServe()
stopCh := signals.SetupSignalHandler()
serverShutdownTimeout := time.Duration(time.Second * 30)
sd, _ := signals.NewShutdown(serverShutdownTimeout, cmd.Context())
sd.Graceful(stopCh, grpcServer, cmd.Context())
return err
},
}
return cmd
}
5 changes: 3 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ require (
github.com/google/go-cmp v0.5.9
github.com/google/go-containerregistry v0.12.1
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/hashicorp/go-hclog v1.4.0
github.com/hashicorp/go-multierror v1.1.1
github.com/hashicorp/go-plugin v1.4.0
Expand All @@ -54,6 +55,7 @@ require (
github.com/opencontainers/go-digest v1.0.0
github.com/osteele/liquid v1.3.0
github.com/pelletier/go-toml v1.9.5
github.com/prometheus/client_golang v1.12.1
github.com/spf13/afero v1.9.3
github.com/spf13/cobra v1.6.1
github.com/spf13/pflag v1.0.5
Expand Down Expand Up @@ -177,7 +179,6 @@ require (
github.com/pierrec/lz4/v4 v4.1.2 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_golang v1.12.1 // indirect
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.32.1 // indirect
github.com/prometheus/procfs v0.7.3 // indirect
Expand Down Expand Up @@ -237,4 +238,4 @@ require (
sigs.k8s.io/yaml v1.2.0 // indirect
)

replace get.porter.sh/magefiles v0.3.3 => ../magefiles
//replace get.porter.sh/magefiles v0.3.3 => ../magefiles
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ contrib.go.opencensus.io/exporter/stackdriver v0.12.1/go.mod h1:iwB6wGarfphGGe/e
contrib.go.opencensus.io/integrations/ocsql v0.1.4/go.mod h1:8DsSdjz3F+APR+0z0WkU1aRorQCFfRxvqjUUPMbF3fE=
contrib.go.opencensus.io/resource v0.1.1/go.mod h1:F361eGI91LCmW1I/Saf+rX0+OFcigGlFvXwEGEnkRLA=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
get.porter.sh/magefiles v0.3.3 h1:vsgOnzZDHWqHCYTzv0OeVTLXxuD/ZtD0xff75PkcBA0=
get.porter.sh/magefiles v0.3.3/go.mod h1:KzKenKVauKKDhZ5FERVhqSz8m/xVSsRzOPseDA4UDIE=
git.apache.org/thrift.git v0.0.0-20180902110319-2566ecd5d999/go.mod h1:fPE2ZNJGynbRyZ4dJvy6G277gSllfV2HJqblrnkyeyg=
git.apache.org/thrift.git v0.12.0/go.mod h1:fPE2ZNJGynbRyZ4dJvy6G277gSllfV2HJqblrnkyeyg=
github.com/AdaLogics/go-fuzz-headers v0.0.0-20210715213245-6c3934b029d8/go.mod h1:CzsSbkDixRphAF5hS6wbMKq0eI6ccJRb7/A0M6JBnwg=
Expand Down Expand Up @@ -878,6 +880,7 @@ github.com/grpc-ecosystem/go-grpc-middleware v1.0.1-0.20190118093823-f849b5445de
github.com/grpc-ecosystem/go-grpc-middleware v1.2.0/go.mod h1:mJzapYve32yjrKlk9GbyCZHuPgZsrbyIbyKhSzOpg6s=
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 h1:+9834+KizmvFV7pXQGSXQTsaWhq2GjuNUt0aUU0YBYw=
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0/go.mod h1:z0ButlSOZa5vEBq9m2m2hlwIgKw+rp3sdCBRoJY+30Y=
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 h1:Ovs26xHkKqVztRpIrF/92BcuyuQ/YW4NSIpoGtfXNho=
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk=
github.com/grpc-ecosystem/grpc-gateway v1.5.0/go.mod h1:RSKVYQBd5MCa4OVpNdGskqpgL2+G+NZTnrVHpWWfpdw=
github.com/grpc-ecosystem/grpc-gateway v1.6.2/go.mod h1:RSKVYQBd5MCa4OVpNdGskqpgL2+G+NZTnrVHpWWfpdw=
Expand Down
8 changes: 8 additions & 0 deletions pkg/grpc/base/base.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package base

type BaseGRPCService struct{}

func (b *BaseGRPCService) Connect() func() {
// returns connection close function
return func() {}
}
23 changes: 13 additions & 10 deletions pkg/grpc/installation/installation.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,31 +11,34 @@ import (

// server is used to implement helloworld.GreeterServer.
type PorterServer struct {
porter *porter.Porter
//base.BaseGRPCService
Porter *porter.Porter
pGRPC.UnimplementedPorterBundleServer
}

func NewPorterService() (*PorterServer, error) {
p := porter.New()
return &PorterServer{porter: p}, nil
func NewPorterService(p *porter.Porter) (*PorterServer, error) {
return &PorterServer{Porter: p}, nil
//return &PorterServer{}, nil
}

func (s *PorterServer) ListInstallations(ctx context.Context, in *iGRPC.ListInstallationsRequest) (*iGRPC.ListInstallationsResponse, error) {
ctx, log := tracing.StartSpan(ctx)
defer log.EndSpan()

err := s.porter.Connect(ctx)
p := porter.New()
err := p.Connect(ctx)
if err != nil {
return nil, err
}
defer s.porter.Close()
defer p.Close()
// cleanup := s.Connect()
// defer cleanup()
//defer s.Porter.Storage.Close()
opts := porter.ListOptions{}

installations, err := s.porter.ListInstallations(ctx, opts)
//installations, err := s.Porter.ListInstallations(ctx, opts)
installations, err := p.ListInstallations(ctx, opts)
if err != nil {
return nil, err
}

insts := []*iGRPC.Installation{}
for _, pInst := range installations {
inst := iGRPC.Installation{
Expand Down
82 changes: 67 additions & 15 deletions pkg/grpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,67 +2,119 @@ package grpc

import (
//"context"
"context"
"fmt"
"net"
"net/http"
"os"

"go.uber.org/zap"
//"go.uber.org/zap/zapcore"

//igrpc "get.porter.sh/porter/gen/proto/go/porterapis/installation/v1alpha1"
pGRPC "get.porter.sh/porter/gen/proto/go/porterapis/porter/v1alpha1"
"get.porter.sh/porter/pkg/grpc/installation"
"get.porter.sh/porter/pkg/porter"
"get.porter.sh/porter/pkg/tracing"

//"go.opentelemetry.io/otel/attribute"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"google.golang.org/grpc"
"google.golang.org/grpc/health"
"google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/reflection"
)

var (
reg = prometheus.NewRegistry()
// Create some standard server metrics.
grpcMetrics = grpc_prometheus.NewServerMetrics()

// Create a customized counter metric.
customizedCounterMetric = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "demo_server_say_hello_method_handle_count",
Help: "Total number of RPCs handled on the server.",
}, []string{"name"})
)

func init() {
reg.MustRegister(grpcMetrics, customizedCounterMetric)
customizedCounterMetric.WithLabelValues("Test")
}

type PorterGRPCService struct {
Porter *porter.Porter
config *Config
log *zap.Logger
Porter *porter.Porter
config *Config
ctx context.Context
CalledCount *int
}

type Config struct {
Port int `mapstructure:"grpc-port"`
ServiceName string `mapstructure:"grpc-service-name"`
}

func NewServer(config *Config, logger *zap.Logger) (*PorterGRPCService, error) {
func NewServer(ctx context.Context, config *Config) (*PorterGRPCService, error) {
// log := tracing.LoggerFromContext(ctx)
// log.Debug("HELLO")
p := porter.New()
var c int
srv := &PorterGRPCService{
config: config,
log: logger,
Porter: p,
config: config,
ctx: ctx,
CalledCount: &c,
}

return srv, nil
}

func (s *PorterGRPCService) ListenAndServe() *grpc.Server {
func (s *PorterGRPCService) ListenAndServe() (*grpc.Server, error) {
ctx, log := tracing.StartSpan(s.ctx)
err := s.Porter.Connect(ctx)
if err != nil {
return nil, err
}
defer s.Porter.Close()
defer log.EndSpan()
log.Infof("Starting gRPC on %v", s.config.Port)
listener, err := net.Listen("tcp", fmt.Sprintf(":%v", s.config.Port))
if err != nil {
s.log.Fatal("failed to listen", zap.Int("port", s.config.Port))
return nil, fmt.Errorf("failed to listen on %d: %s", s.config.Port, err)
}
httpServer := &http.Server{Handler: promhttp.HandlerFor(reg, promhttp.HandlerOpts{}), Addr: fmt.Sprintf("0.0.0.0:%d", 9092)}

srv := grpc.NewServer()
srv := grpc.NewServer(
grpc.StreamInterceptor(grpcMetrics.StreamServerInterceptor()),
grpc.UnaryInterceptor(grpcMetrics.UnaryServerInterceptor()),
)
healthServer := health.NewServer()
reflection.Register(srv)
grpc_health_v1.RegisterHealthServer(srv, healthServer)
isrv, err := installation.NewPorterService()
isrv, err := installation.NewPorterService(s.Porter)
//isrv, err := installation.NewPorterService()
if err != nil {
panic(err)
}

pGRPC.RegisterPorterBundleServer(srv, isrv)
healthServer.SetServingStatus(s.config.ServiceName, grpc_health_v1.HealthCheckResponse_SERVING)

grpc_prometheus.Register(srv)
go func() {
if err := srv.Serve(listener); err != nil {
s.log.Fatal("failed to serve", zap.Error(err))
log.Errorf("failed to serve: %s", err)
os.Exit(1)
}
}()

return srv
http.Handle("/metrics", promhttp.Handler())
// Start your http server for prometheus.
go func() {
if err := httpServer.ListenAndServe(); err != nil {
log.Errorf("Unable to start a http server.")
os.Exit(1)
}
}()
grpcMetrics.InitializeMetrics(srv)
return srv, nil
}
14 changes: 6 additions & 8 deletions pkg/signals/shutdown.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,29 +4,27 @@ import (
"context"
"time"

"get.porter.sh/porter/pkg/tracing"
"github.com/spf13/viper"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.uber.org/zap"
"google.golang.org/grpc"
)

type Shutdown struct {
logger *zap.Logger
tracerProvider *sdktrace.TracerProvider
serverShutdownTimeout time.Duration
}

func NewShutdown(serverShutdownTimeout time.Duration, logger *zap.Logger) (*Shutdown, error) {
func NewShutdown(serverShutdownTimeout time.Duration, ctx context.Context) (*Shutdown, error) {
srv := &Shutdown{
logger: logger,
serverShutdownTimeout: serverShutdownTimeout,
}

return srv, nil
}

func (s *Shutdown) Graceful(stopCh <-chan struct{}, grpcServer *grpc.Server) {
ctx := context.Background()
func (s *Shutdown) Graceful(stopCh <-chan struct{}, grpcServer *grpc.Server, ctx context.Context) {
ctx, log := tracing.StartSpan(ctx)

// wait for SIGTERM or SIGINT
<-stopCh
Expand All @@ -42,13 +40,13 @@ func (s *Shutdown) Graceful(stopCh <-chan struct{}, grpcServer *grpc.Server) {
// stop OpenTelemetry tracer provider
if s.tracerProvider != nil {
if err := s.tracerProvider.Shutdown(ctx); err != nil {
s.logger.Warn("stopping tracer provider", zap.Error(err))
log.Warnf("stopping tracer provider: ", err)
}
}

// determine if the GRPC was started
if grpcServer != nil {
s.logger.Info("Shutting down GRPC server", zap.Duration("timeout", s.serverShutdownTimeout))
log.Infof("Shutting down GRPC server: ", s.serverShutdownTimeout)
grpcServer.GracefulStop()
}

Expand Down
1 change: 0 additions & 1 deletion tests/grpc/install_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (

iGRPC "get.porter.sh/porter/gen/proto/go/porterapis/installation/v1alpha1"
pGRPC "get.porter.sh/porter/gen/proto/go/porterapis/porter/v1alpha1"
"get.porter.sh/porter/pkg/portercontext"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
Expand Down
1 change: 1 addition & 0 deletions tests/grpc/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ func (s *TestPorterGRPCService) ListenAndServe() *grpc.Server {
if err != nil {
panic(err)
}
pSvc.Porter.Config.SetPorterPath("porter")
pGRPC.RegisterPorterBundleServer(srv, pSvc)
healthServer.SetServingStatus("test-health", grpc_health_v1.HealthCheckResponse_SERVING)

Expand Down

0 comments on commit e3d895b

Please sign in to comment.