Skip to content

Commit

Permalink
Add binary logger option for client and server (grpc#5675)
Browse files Browse the repository at this point in the history
* Add binary logger option for client and server
  • Loading branch information
zasweq committed Oct 14, 2022
1 parent 6576007 commit 53234a2
Show file tree
Hide file tree
Showing 7 changed files with 296 additions and 92 deletions.
8 changes: 4 additions & 4 deletions default_dial_option_server_option_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (s) TestAddExtraDialOptions(t *testing.T) {

// Set and check the DialOptions
opts := []DialOption{WithTransportCredentials(insecure.NewCredentials()), WithTransportCredentials(insecure.NewCredentials()), WithTransportCredentials(insecure.NewCredentials())}
internal.AddExtraDialOptions.(func(opt ...DialOption))(opts...)
internal.AddGlobalDialOptions.(func(opt ...DialOption))(opts...)
for i, opt := range opts {
if extraDialOptions[i] != opt {
t.Fatalf("Unexpected extra dial option at index %d: %v != %v", i, extraDialOptions[i], opt)
Expand All @@ -52,7 +52,7 @@ func (s) TestAddExtraDialOptions(t *testing.T) {
cc.Close()
}

internal.ClearExtraDialOptions()
internal.ClearGlobalDialOptions()
if len(extraDialOptions) != 0 {
t.Fatalf("Unexpected len of extraDialOptions: %d != 0", len(extraDialOptions))
}
Expand All @@ -62,7 +62,7 @@ func (s) TestAddExtraServerOptions(t *testing.T) {
const maxRecvSize = 998765
// Set and check the ServerOptions
opts := []ServerOption{Creds(insecure.NewCredentials()), MaxRecvMsgSize(maxRecvSize)}
internal.AddExtraServerOptions.(func(opt ...ServerOption))(opts...)
internal.AddGlobalServerOptions.(func(opt ...ServerOption))(opts...)
for i, opt := range opts {
if extraServerOptions[i] != opt {
t.Fatalf("Unexpected extra server option at index %d: %v != %v", i, extraServerOptions[i], opt)
Expand All @@ -75,7 +75,7 @@ func (s) TestAddExtraServerOptions(t *testing.T) {
t.Fatalf("Unexpected s.opts.maxReceiveMessageSize: %d != %d", s.opts.maxReceiveMessageSize, maxRecvSize)
}

internal.ClearExtraServerOptions()
internal.ClearGlobalServerOptions()
if len(extraServerOptions) != 0 {
t.Fatalf("Unexpected len of extraServerOptions: %d != 0", len(extraServerOptions))
}
Expand Down
15 changes: 13 additions & 2 deletions dialoptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,21 @@ import (
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal"
internalbackoff "google.golang.org/grpc/internal/backoff"
"google.golang.org/grpc/internal/binarylog"
"google.golang.org/grpc/internal/transport"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/stats"
)

func init() {
internal.AddExtraDialOptions = func(opt ...DialOption) {
internal.AddGlobalDialOptions = func(opt ...DialOption) {
extraDialOptions = append(extraDialOptions, opt...)
}
internal.ClearExtraDialOptions = func() {
internal.ClearGlobalDialOptions = func() {
extraDialOptions = nil
}
internal.WithBinaryLogger = withBinaryLogger
}

// dialOptions configure a Dial call. dialOptions are set by the DialOption
Expand All @@ -61,6 +63,7 @@ type dialOptions struct {
timeout time.Duration
scChan <-chan ServiceConfig
authority string
binaryLogger binarylog.Logger
copts transport.ConnectOptions
callOptions []CallOption
channelzParentID *channelz.Identifier
Expand Down Expand Up @@ -401,6 +404,14 @@ func WithStatsHandler(h stats.Handler) DialOption {
})
}

// withBinaryLogger returns a DialOption that specifies the binary logger for
// this ClientConn.
func withBinaryLogger(bl binarylog.Logger) DialOption {
return newFuncDialOption(func(o *dialOptions) {
o.binaryLogger = bl
})
}

// FailOnNonTempDialError returns a DialOption that specifies if gRPC fails on
// non-temporary dial errors. If f is true, and dialer returns a non-temporary
// error, gRPC will fail the connection to the network address and won't try to
Expand Down
8 changes: 4 additions & 4 deletions gcp/observability/opencensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,8 @@ func startOpenCensus(config *config) error {
}

// Only register default StatsHandlers if other things are setup correctly.
internal.AddExtraServerOptions.(func(opt ...grpc.ServerOption))(grpc.StatsHandler(&ocgrpc.ServerHandler{StartOptions: so}))
internal.AddExtraDialOptions.(func(opt ...grpc.DialOption))(grpc.WithStatsHandler(&ocgrpc.ClientHandler{StartOptions: so}))
internal.AddGlobalServerOptions.(func(opt ...grpc.ServerOption))(grpc.StatsHandler(&ocgrpc.ServerHandler{StartOptions: so}))
internal.AddGlobalDialOptions.(func(opt ...grpc.DialOption))(grpc.WithStatsHandler(&ocgrpc.ClientHandler{StartOptions: so}))
logger.Infof("Enabled OpenCensus StatsHandlers for clients and servers")

return nil
Expand All @@ -128,8 +128,8 @@ func startOpenCensus(config *config) error {
// packages if exporter was created.
func stopOpenCensus() {
if exporter != nil {
internal.ClearExtraDialOptions()
internal.ClearExtraServerOptions()
internal.ClearGlobalDialOptions()
internal.ClearGlobalServerOptions()

// Call these unconditionally, doesn't matter if not registered, will be
// a noop if not registered.
Expand Down
23 changes: 15 additions & 8 deletions internal/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,24 +63,31 @@ var (
// xDS-enabled server invokes this method on a grpc.Server when a particular
// listener moves to "not-serving" mode.
DrainServerTransports interface{} // func(*grpc.Server, string)
// AddExtraServerOptions adds an array of ServerOption that will be
// AddGlobalServerOptions adds an array of ServerOption that will be
// effective globally for newly created servers. The priority will be: 1.
// user-provided; 2. this method; 3. default values.
AddExtraServerOptions interface{} // func(opt ...ServerOption)
// ClearExtraServerOptions clears the array of extra ServerOption. This
AddGlobalServerOptions interface{} // func(opt ...ServerOption)
// ClearGlobalServerOptions clears the array of extra ServerOption. This
// method is useful in testing and benchmarking.
ClearExtraServerOptions func()
// AddExtraDialOptions adds an array of DialOption that will be effective
ClearGlobalServerOptions func()
// AddGlobalDialOptions adds an array of DialOption that will be effective
// globally for newly created client channels. The priority will be: 1.
// user-provided; 2. this method; 3. default values.
AddExtraDialOptions interface{} // func(opt ...DialOption)
// ClearExtraDialOptions clears the array of extra DialOption. This
AddGlobalDialOptions interface{} // func(opt ...DialOption)
// ClearGlobalDialOptions clears the array of extra DialOption. This
// method is useful in testing and benchmarking.
ClearExtraDialOptions func()
ClearGlobalDialOptions func()
// JoinServerOptions combines the server options passed as arguments into a
// single server option.
JoinServerOptions interface{} // func(...grpc.ServerOption) grpc.ServerOption

// WithBinaryLogger returns a DialOption that specifies the binary logger
// for a ClientConn.
WithBinaryLogger interface{} // func(binarylog.Logger) grpc.DialOption
// BinaryLogger returns a ServerOption that can set the binary logger for a
// server.
BinaryLogger interface{} // func(binarylog.Logger) grpc.ServerOption

// NewXDSResolverWithConfigForTesting creates a new xds resolver builder using
// the provided xds bootstrap config instead of the global configuration from
// the supported environment variables. The resolver.Builder is meant to be
Expand Down
128 changes: 91 additions & 37 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,13 @@ func init() {
internal.DrainServerTransports = func(srv *Server, addr string) {
srv.drainServerTransports(addr)
}
internal.AddExtraServerOptions = func(opt ...ServerOption) {
internal.AddGlobalServerOptions = func(opt ...ServerOption) {
extraServerOptions = opt
}
internal.ClearExtraServerOptions = func() {
internal.ClearGlobalServerOptions = func() {
extraServerOptions = nil
}
internal.BinaryLogger = binaryLogger
internal.JoinServerOptions = newJoinServerOption
}

Expand Down Expand Up @@ -156,6 +157,7 @@ type serverOptions struct {
streamInt StreamServerInterceptor
chainUnaryInts []UnaryServerInterceptor
chainStreamInts []StreamServerInterceptor
binaryLogger binarylog.Logger
inTapHandle tap.ServerInHandle
statsHandlers []stats.Handler
maxConcurrentStreams uint32
Expand Down Expand Up @@ -469,6 +471,14 @@ func StatsHandler(h stats.Handler) ServerOption {
})
}

// binaryLogger returns a ServerOption that can set the binary logger for the
// server.
func binaryLogger(bl binarylog.Logger) ServerOption {
return newFuncServerOption(func(o *serverOptions) {
o.binaryLogger = bl
})
}

// UnknownServiceHandler returns a ServerOption that allows for adding a custom
// unknown service handler. The provided method is a bidi-streaming RPC service
// handler that will be invoked instead of returning the "unimplemented" gRPC
Expand Down Expand Up @@ -1216,9 +1226,16 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
}
}()
}

binlog := binarylog.GetMethodLogger(stream.Method())
if binlog != nil {
var binlogs []binarylog.MethodLogger
if ml := binarylog.GetMethodLogger(stream.Method()); ml != nil {
binlogs = append(binlogs, ml)
}
if s.opts.binaryLogger != nil {
if ml := s.opts.binaryLogger.GetMethodLogger(stream.Method()); ml != nil {
binlogs = append(binlogs, ml)
}
}
if len(binlogs) != 0 {
ctx := stream.Context()
md, _ := metadata.FromIncomingContext(ctx)
logEntry := &binarylog.ClientHeader{
Expand All @@ -1238,7 +1255,9 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
if peer, ok := peer.FromContext(ctx); ok {
logEntry.PeerAddr = peer.Addr
}
binlog.Log(logEntry)
for _, binlog := range binlogs {
binlog.Log(logEntry)
}
}

// comp and cp are used for compression. decomp and dc are used for
Expand Down Expand Up @@ -1278,7 +1297,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
}

var payInfo *payloadInfo
if len(shs) != 0 || binlog != nil {
if len(shs) != 0 || len(binlogs) != 0 {
payInfo = &payloadInfo{}
}
d, err := recvAndDecompress(&parser{r: stream}, stream, dc, s.opts.maxReceiveMessageSize, payInfo, decomp)
Expand All @@ -1304,10 +1323,13 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
Length: len(d),
})
}
if binlog != nil {
binlog.Log(&binarylog.ClientMessage{
if len(binlogs) != 0 {
cm := &binarylog.ClientMessage{
Message: d,
})
}
for _, binlog := range binlogs {
binlog.Log(cm)
}
}
if trInfo != nil {
trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true)
Expand All @@ -1331,18 +1353,24 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
if e := t.WriteStatus(stream, appStatus); e != nil {
channelz.Warningf(logger, s.channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", e)
}
if binlog != nil {
if len(binlogs) != 0 {
if h, _ := stream.Header(); h.Len() > 0 {
// Only log serverHeader if there was header. Otherwise it can
// be trailer only.
binlog.Log(&binarylog.ServerHeader{
sh := &binarylog.ServerHeader{
Header: h,
})
}
for _, binlog := range binlogs {
binlog.Log(sh)
}
}
binlog.Log(&binarylog.ServerTrailer{
st := &binarylog.ServerTrailer{
Trailer: stream.Trailer(),
Err: appErr,
})
}
for _, binlog := range binlogs {
binlog.Log(st)
}
}
return appErr
}
Expand All @@ -1368,26 +1396,34 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
panic(fmt.Sprintf("grpc: Unexpected error (%T) from sendResponse: %v", st, st))
}
}
if binlog != nil {
if len(binlogs) != 0 {
h, _ := stream.Header()
binlog.Log(&binarylog.ServerHeader{
sh := &binarylog.ServerHeader{
Header: h,
})
binlog.Log(&binarylog.ServerTrailer{
}
st := &binarylog.ServerTrailer{
Trailer: stream.Trailer(),
Err: appErr,
})
}
for _, binlog := range binlogs {
binlog.Log(sh)
binlog.Log(st)
}
}
return err
}
if binlog != nil {
if len(binlogs) != 0 {
h, _ := stream.Header()
binlog.Log(&binarylog.ServerHeader{
sh := &binarylog.ServerHeader{
Header: h,
})
binlog.Log(&binarylog.ServerMessage{
}
sm := &binarylog.ServerMessage{
Message: reply,
})
}
for _, binlog := range binlogs {
binlog.Log(sh)
binlog.Log(sm)
}
}
if channelz.IsOn() {
t.IncrMsgSent()
Expand All @@ -1399,11 +1435,14 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
// Should the logging be in WriteStatus? Should we ignore the WriteStatus
// error or allow the stats handler to see it?
err = t.WriteStatus(stream, statusOK)
if binlog != nil {
binlog.Log(&binarylog.ServerTrailer{
if len(binlogs) != 0 {
st := &binarylog.ServerTrailer{
Trailer: stream.Trailer(),
Err: appErr,
})
}
for _, binlog := range binlogs {
binlog.Log(st)
}
}
return err
}
Expand Down Expand Up @@ -1516,8 +1555,15 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
}()
}

ss.binlog = binarylog.GetMethodLogger(stream.Method())
if ss.binlog != nil {
if ml := binarylog.GetMethodLogger(stream.Method()); ml != nil {
ss.binlogs = append(ss.binlogs, ml)
}
if s.opts.binaryLogger != nil {
if ml := s.opts.binaryLogger.GetMethodLogger(stream.Method()); ml != nil {
ss.binlogs = append(ss.binlogs, ml)
}
}
if len(ss.binlogs) != 0 {
md, _ := metadata.FromIncomingContext(ctx)
logEntry := &binarylog.ClientHeader{
Header: md,
Expand All @@ -1536,7 +1582,9 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
if peer, ok := peer.FromContext(ss.Context()); ok {
logEntry.PeerAddr = peer.Addr
}
ss.binlog.Log(logEntry)
for _, binlog := range ss.binlogs {
binlog.Log(logEntry)
}
}

// If dc is set and matches the stream's compression, use it. Otherwise, try
Expand Down Expand Up @@ -1602,11 +1650,14 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
ss.mu.Unlock()
}
t.WriteStatus(ss.s, appStatus)
if ss.binlog != nil {
ss.binlog.Log(&binarylog.ServerTrailer{
if len(ss.binlogs) != 0 {
st := &binarylog.ServerTrailer{
Trailer: ss.s.Trailer(),
Err: appErr,
})
}
for _, binlog := range ss.binlogs {
binlog.Log(st)
}
}
// TODO: Should we log an error from WriteStatus here and below?
return appErr
Expand All @@ -1617,11 +1668,14 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
ss.mu.Unlock()
}
err = t.WriteStatus(ss.s, statusOK)
if ss.binlog != nil {
ss.binlog.Log(&binarylog.ServerTrailer{
if len(ss.binlogs) != 0 {
st := &binarylog.ServerTrailer{
Trailer: ss.s.Trailer(),
Err: appErr,
})
}
for _, binlog := range ss.binlogs {
binlog.Log(st)
}
}
return err
}
Expand Down
Loading

0 comments on commit 53234a2

Please sign in to comment.