Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add binary logger option for client and server #5675

Merged
merged 2 commits into from
Oct 6, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions default_dial_option_server_option_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (s) TestAddExtraDialOptions(t *testing.T) {

// Set and check the DialOptions
opts := []DialOption{WithTransportCredentials(insecure.NewCredentials()), WithTransportCredentials(insecure.NewCredentials()), WithTransportCredentials(insecure.NewCredentials())}
internal.AddExtraDialOptions.(func(opt ...DialOption))(opts...)
internal.AddGlobalDialOptions.(func(opt ...DialOption))(opts...)
for i, opt := range opts {
if extraDialOptions[i] != opt {
t.Fatalf("Unexpected extra dial option at index %d: %v != %v", i, extraDialOptions[i], opt)
Expand All @@ -52,7 +52,7 @@ func (s) TestAddExtraDialOptions(t *testing.T) {
cc.Close()
}

internal.ClearExtraDialOptions()
internal.ClearGlobalDialOptions()
if len(extraDialOptions) != 0 {
t.Fatalf("Unexpected len of extraDialOptions: %d != 0", len(extraDialOptions))
}
Expand All @@ -62,7 +62,7 @@ func (s) TestAddExtraServerOptions(t *testing.T) {
const maxRecvSize = 998765
// Set and check the ServerOptions
opts := []ServerOption{Creds(insecure.NewCredentials()), MaxRecvMsgSize(maxRecvSize)}
internal.AddExtraServerOptions.(func(opt ...ServerOption))(opts...)
internal.AddGlobalServerOptions.(func(opt ...ServerOption))(opts...)
for i, opt := range opts {
if extraServerOptions[i] != opt {
t.Fatalf("Unexpected extra server option at index %d: %v != %v", i, extraServerOptions[i], opt)
Expand All @@ -75,7 +75,7 @@ func (s) TestAddExtraServerOptions(t *testing.T) {
t.Fatalf("Unexpected s.opts.maxReceiveMessageSize: %d != %d", s.opts.maxReceiveMessageSize, maxRecvSize)
}

internal.ClearExtraServerOptions()
internal.ClearGlobalServerOptions()
if len(extraServerOptions) != 0 {
t.Fatalf("Unexpected len of extraServerOptions: %d != 0", len(extraServerOptions))
}
Expand Down
14 changes: 12 additions & 2 deletions dialoptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,18 @@ import (
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal"
internalbackoff "google.golang.org/grpc/internal/backoff"
"google.golang.org/grpc/internal/binarylog"
"google.golang.org/grpc/internal/transport"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/stats"
)

func init() {
internal.AddExtraDialOptions = func(opt ...DialOption) {
internal.AddGlobalDialOptions = func(opt ...DialOption) {
extraDialOptions = append(extraDialOptions, opt...)
}
internal.ClearExtraDialOptions = func() {
internal.ClearGlobalDialOptions = func() {
extraDialOptions = nil
}
}
Expand All @@ -61,6 +62,7 @@ type dialOptions struct {
timeout time.Duration
scChan <-chan ServiceConfig
authority string
binaryLogger binarylog.Logger
copts transport.ConnectOptions
callOptions []CallOption
channelzParentID *channelz.Identifier
Expand Down Expand Up @@ -401,6 +403,14 @@ func WithStatsHandler(h stats.Handler) DialOption {
})
}

// WithBinaryLogger returns a DialOption that specifies the binary logger for
// this ClientConn.
func WithBinaryLogger(bl binarylog.Logger) DialOption {
dfawley marked this conversation as resolved.
Show resolved Hide resolved
return newFuncDialOption(func(o *dialOptions) {
o.binaryLogger = bl
})
}

// FailOnNonTempDialError returns a DialOption that specifies if gRPC fails on
// non-temporary dial errors. If f is true, and dialer returns a non-temporary
// error, gRPC will fail the connection to the network address and won't try to
Expand Down
8 changes: 4 additions & 4 deletions gcp/observability/opencensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,8 @@ func startOpenCensus(config *config) error {
}

// Only register default StatsHandlers if other things are setup correctly.
internal.AddExtraServerOptions.(func(opt ...grpc.ServerOption))(grpc.StatsHandler(&ocgrpc.ServerHandler{StartOptions: so}))
internal.AddExtraDialOptions.(func(opt ...grpc.DialOption))(grpc.WithStatsHandler(&ocgrpc.ClientHandler{StartOptions: so}))
internal.AddGlobalServerOptions.(func(opt ...grpc.ServerOption))(grpc.StatsHandler(&ocgrpc.ServerHandler{StartOptions: so}))
internal.AddGlobalDialOptions.(func(opt ...grpc.DialOption))(grpc.WithStatsHandler(&ocgrpc.ClientHandler{StartOptions: so}))
logger.Infof("Enabled OpenCensus StatsHandlers for clients and servers")

return nil
Expand All @@ -128,8 +128,8 @@ func startOpenCensus(config *config) error {
// packages if exporter was created.
func stopOpenCensus() {
if exporter != nil {
internal.ClearExtraDialOptions()
internal.ClearExtraServerOptions()
internal.ClearGlobalDialOptions()
internal.ClearGlobalServerOptions()

// Call these unconditionally, doesn't matter if not registered, will be
// a noop if not registered.
Expand Down
16 changes: 8 additions & 8 deletions internal/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,20 +63,20 @@ var (
// xDS-enabled server invokes this method on a grpc.Server when a particular
// listener moves to "not-serving" mode.
DrainServerTransports interface{} // func(*grpc.Server, string)
// AddExtraServerOptions adds an array of ServerOption that will be
// AddGlobalServerOptions adds an array of ServerOption that will be
// effective globally for newly created servers. The priority will be: 1.
// user-provided; 2. this method; 3. default values.
AddExtraServerOptions interface{} // func(opt ...ServerOption)
// ClearExtraServerOptions clears the array of extra ServerOption. This
AddGlobalServerOptions interface{} // func(opt ...ServerOption)
// ClearGlobalServerOptions clears the array of extra ServerOption. This
// method is useful in testing and benchmarking.
ClearExtraServerOptions func()
// AddExtraDialOptions adds an array of DialOption that will be effective
ClearGlobalServerOptions func()
// AddGlobalDialOptions adds an array of DialOption that will be effective
// globally for newly created client channels. The priority will be: 1.
// user-provided; 2. this method; 3. default values.
AddExtraDialOptions interface{} // func(opt ...DialOption)
// ClearExtraDialOptions clears the array of extra DialOption. This
AddGlobalDialOptions interface{} // func(opt ...DialOption)
// ClearGlobalDialOptions clears the array of extra DialOption. This
// method is useful in testing and benchmarking.
ClearExtraDialOptions func()
ClearGlobalDialOptions func()
// JoinServerOptions combines the server options passed as arguments into a
// single server option.
JoinServerOptions interface{} // func(...grpc.ServerOption) grpc.ServerOption
Expand Down
127 changes: 90 additions & 37 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,10 @@ func init() {
internal.DrainServerTransports = func(srv *Server, addr string) {
srv.drainServerTransports(addr)
}
internal.AddExtraServerOptions = func(opt ...ServerOption) {
internal.AddGlobalServerOptions = func(opt ...ServerOption) {
extraServerOptions = opt
}
internal.ClearExtraServerOptions = func() {
internal.ClearGlobalServerOptions = func() {
extraServerOptions = nil
}
internal.JoinServerOptions = newJoinServerOption
Expand Down Expand Up @@ -156,6 +156,7 @@ type serverOptions struct {
streamInt StreamServerInterceptor
chainUnaryInts []UnaryServerInterceptor
chainStreamInts []StreamServerInterceptor
binaryLogger binarylog.Logger
inTapHandle tap.ServerInHandle
statsHandlers []stats.Handler
maxConcurrentStreams uint32
Expand Down Expand Up @@ -469,6 +470,14 @@ func StatsHandler(h stats.Handler) ServerOption {
})
}

// BinaryLogger returns a ServerOption that can set the binary logger for the
// server.
func BinaryLogger(bl binarylog.Logger) ServerOption {
return newFuncServerOption(func(o *serverOptions) {
o.binaryLogger = bl
})
}

// UnknownServiceHandler returns a ServerOption that allows for adding a custom
// unknown service handler. The provided method is a bidi-streaming RPC service
// handler that will be invoked instead of returning the "unimplemented" gRPC
Expand Down Expand Up @@ -1216,9 +1225,16 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
}
}()
}

binlog := binarylog.GetMethodLogger(stream.Method())
if binlog != nil {
var binlogs []binarylog.MethodLogger
if ml := binarylog.GetMethodLogger(stream.Method()); ml != nil {
binlogs = append(binlogs, ml)
}
if s.opts.binaryLogger != nil {
if ml := s.opts.binaryLogger.GetMethodLogger(stream.Method()); ml != nil {
binlogs = append(binlogs, ml)
}
}
if len(binlogs) != 0 {
ctx := stream.Context()
md, _ := metadata.FromIncomingContext(ctx)
logEntry := &binarylog.ClientHeader{
Expand All @@ -1238,7 +1254,9 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
if peer, ok := peer.FromContext(ctx); ok {
logEntry.PeerAddr = peer.Addr
}
binlog.Log(logEntry)
for _, binlog := range binlogs {
binlog.Log(logEntry)
}
}

// comp and cp are used for compression. decomp and dc are used for
Expand Down Expand Up @@ -1278,7 +1296,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
}

var payInfo *payloadInfo
if len(shs) != 0 || binlog != nil {
if len(shs) != 0 || len(binlogs) != 0 {
payInfo = &payloadInfo{}
}
d, err := recvAndDecompress(&parser{r: stream}, stream, dc, s.opts.maxReceiveMessageSize, payInfo, decomp)
Expand All @@ -1304,10 +1322,13 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
Length: len(d),
})
}
if binlog != nil {
binlog.Log(&binarylog.ClientMessage{
if len(binlogs) != 0 {
cm := &binarylog.ClientMessage{
Message: d,
})
}
for _, binlog := range binlogs {
binlog.Log(cm)
}
}
if trInfo != nil {
trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true)
Expand All @@ -1331,18 +1352,24 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
if e := t.WriteStatus(stream, appStatus); e != nil {
channelz.Warningf(logger, s.channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", e)
}
if binlog != nil {
if len(binlogs) != 0 {
if h, _ := stream.Header(); h.Len() > 0 {
// Only log serverHeader if there was header. Otherwise it can
// be trailer only.
binlog.Log(&binarylog.ServerHeader{
sh := &binarylog.ServerHeader{
Header: h,
})
}
for _, binlog := range binlogs {
binlog.Log(sh)
}
}
binlog.Log(&binarylog.ServerTrailer{
st := &binarylog.ServerTrailer{
Trailer: stream.Trailer(),
Err: appErr,
})
}
for _, binlog := range binlogs {
binlog.Log(st)
}
}
return appErr
}
Expand All @@ -1368,26 +1395,34 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
panic(fmt.Sprintf("grpc: Unexpected error (%T) from sendResponse: %v", st, st))
}
}
if binlog != nil {
if len(binlogs) != 0 {
h, _ := stream.Header()
binlog.Log(&binarylog.ServerHeader{
sh := &binarylog.ServerHeader{
Header: h,
})
binlog.Log(&binarylog.ServerTrailer{
}
st := &binarylog.ServerTrailer{
Trailer: stream.Trailer(),
Err: appErr,
})
}
for _, binlog := range binlogs {
binlog.Log(sh)
binlog.Log(st)
}
}
return err
}
if binlog != nil {
if len(binlogs) != 0 {
h, _ := stream.Header()
binlog.Log(&binarylog.ServerHeader{
sh := &binarylog.ServerHeader{
Header: h,
})
binlog.Log(&binarylog.ServerMessage{
}
sm := &binarylog.ServerMessage{
Message: reply,
})
}
for _, binlog := range binlogs {
binlog.Log(sh)
binlog.Log(sm)
}
}
if channelz.IsOn() {
t.IncrMsgSent()
Expand All @@ -1399,11 +1434,14 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
// Should the logging be in WriteStatus? Should we ignore the WriteStatus
// error or allow the stats handler to see it?
err = t.WriteStatus(stream, statusOK)
if binlog != nil {
binlog.Log(&binarylog.ServerTrailer{
if len(binlogs) != 0 {
st := &binarylog.ServerTrailer{
Trailer: stream.Trailer(),
Err: appErr,
})
}
for _, binlog := range binlogs {
binlog.Log(st)
}
}
return err
}
Expand Down Expand Up @@ -1516,8 +1554,15 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
}()
}

ss.binlog = binarylog.GetMethodLogger(stream.Method())
if ss.binlog != nil {
if ml := binarylog.GetMethodLogger(stream.Method()); ml != nil {
ss.binlogs = append(ss.binlogs, ml)
}
if s.opts.binaryLogger != nil {
if ml := s.opts.binaryLogger.GetMethodLogger(stream.Method()); ml != nil {
ss.binlogs = append(ss.binlogs, ml)
}
}
if len(ss.binlogs) != 0 {
md, _ := metadata.FromIncomingContext(ctx)
logEntry := &binarylog.ClientHeader{
Header: md,
Expand All @@ -1536,7 +1581,9 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
if peer, ok := peer.FromContext(ss.Context()); ok {
logEntry.PeerAddr = peer.Addr
}
ss.binlog.Log(logEntry)
for _, binlog := range ss.binlogs {
binlog.Log(logEntry)
}
}

// If dc is set and matches the stream's compression, use it. Otherwise, try
Expand Down Expand Up @@ -1602,11 +1649,14 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
ss.mu.Unlock()
}
t.WriteStatus(ss.s, appStatus)
if ss.binlog != nil {
ss.binlog.Log(&binarylog.ServerTrailer{
if len(ss.binlogs) != 0 {
st := &binarylog.ServerTrailer{
Trailer: ss.s.Trailer(),
Err: appErr,
})
}
for _, binlog := range ss.binlogs {
binlog.Log(st)
}
}
// TODO: Should we log an error from WriteStatus here and below?
return appErr
Expand All @@ -1617,11 +1667,14 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
ss.mu.Unlock()
}
err = t.WriteStatus(ss.s, statusOK)
if ss.binlog != nil {
ss.binlog.Log(&binarylog.ServerTrailer{
if len(ss.binlogs) != 0 {
st := &binarylog.ServerTrailer{
Trailer: ss.s.Trailer(),
Err: appErr,
})
}
for _, binlog := range ss.binlogs {
binlog.Log(st)
}
}
return err
}
Expand Down
Loading