Skip to content

Commit

Permalink
spanner: implement generation and propagation of "x-spanner-request-i…
Browse files Browse the repository at this point in the history
…d" Header

This change spins up the basis for sending over the "x-spanner-request-id"
for every call and raises important questions about field value lengths
as well as where to increment and produce increments for RPCs et al on
retries. This header is to be sent on both unary and streaming calls
and it'll help debug latencies for customers.

Updates #11073
  • Loading branch information
odeke-em committed Nov 5, 2024
1 parent 5ff0fdc commit 913715a
Show file tree
Hide file tree
Showing 7 changed files with 853 additions and 18 deletions.
2 changes: 2 additions & 0 deletions spanner/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,8 @@ func (t *BatchReadOnlyTransaction) partitionQuery(ctx context.Context, statement
ParamTypes: paramTypes,
}
sh.updateLastUseTime()
// TODO: (@odeke-em) retrieve the requestID and increment the RPC number
// then send it along in every call per retry.
resp, err := client.PartitionQuery(contextWithOutgoingMetadata(ctx, sh.getMetadata(), t.disableRouteToLeader), req, gax.WithGRPCOptions(grpc.Header(&md)))

if getGFELatencyMetricsFlag() && md != nil && t.ct != nil {
Expand Down
19 changes: 18 additions & 1 deletion spanner/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"regexp"
"strconv"
"strings"
"sync/atomic"
"time"

"cloud.google.com/go/internal/trace"
Expand Down Expand Up @@ -357,6 +358,11 @@ type ClientConfig struct {
DisableNativeMetrics bool
}

type requestIDConfig struct {
processID uint32
dbClientHandleID *atomic.Uint64
}

type openTelemetryConfig struct {
meterProvider metric.MeterProvider
attributeMap []attribute.KeyValue
Expand Down Expand Up @@ -1318,10 +1324,21 @@ func (c *Client) BatchWriteWithOptions(ctx context.Context, mgs []*MutationGroup
return &BatchWriteResponseIterator{meterTracerFactory: c.metricsTracerFactory, err: err}
}

nRPCs := uint64(0)
rpc := func(ct context.Context) (sppb.Spanner_BatchWriteClient, error) {
var md metadata.MD
sh.updateLastUseTime()
stream, rpcErr := sh.getClient().BatchWrite(contextWithOutgoingMetadata(ct, sh.getMetadata(), c.disableRouteToLeader), &sppb.BatchWriteRequest{
nRPCs++

// Firstly set the number of retries as the RPCID.
client := sh.getClient()
gcl, ok := client.(*grpcSpannerClient)
if ok {
gcl.setRPCID(nRPCs)
defer gcl.setOrResetRPCID()
}

stream, rpcErr := client.BatchWrite(contextWithOutgoingMetadata(ct, sh.getMetadata(), c.disableRouteToLeader), &sppb.BatchWriteRequest{
Session: sh.getID(),
MutationGroups: mgsPb,
RequestOptions: createRequestOptions(opts.Priority, "", opts.TransactionTag),
Expand Down
94 changes: 78 additions & 16 deletions spanner/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,19 @@ package spanner

import (
"context"
"fmt"
"math/rand"
"strings"
"sync/atomic"
"time"

vkit "cloud.google.com/go/spanner/apiv1"
"cloud.google.com/go/spanner/apiv1/spannerpb"
"cloud.google.com/go/spanner/internal"
"github.com/googleapis/gax-go/v2"
"google.golang.org/api/option"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/status"
)
Expand Down Expand Up @@ -65,10 +70,43 @@ type spannerClient interface {
// grpcSpannerClient is the gRPC API implementation of the transport-agnostic
// spannerClient interface.
type grpcSpannerClient struct {
id uint64
raw *vkit.Client
metricsTracerFactory *builtinMetricsTracerFactory

// These fields are used to uniquely track x-spanner-request-id
// grpc.ClientConn is presumed to be the channel, hence channelID
// is redundant. However, is it correct to presume that raw.Connection()
// will always be the same throughout the lifetime of a grcpSpannerClient?
channelID uint64
// nthRequest shall always be incremented on every fresh request.
nthRequest *atomic.Uint32
// This id uniquely defines the RPC being issued and in
// the case of retries it should be incremented.
rpcID *atomic.Uint64
}

func (g *grpcSpannerClient) setOrResetRPCID() {
if g.rpcID == nil {
g.rpcID = new(atomic.Uint64)
}
g.rpcID.Store(1)
}

func (g *grpcSpannerClient) setRPCID(rpcID uint64) {
g.rpcID.Store(rpcID)
}

func (g *grpcSpannerClient) prepareRequestIDTrackers() {
g.id = nGRPCClient.Add(1)
g.nthRequest = new(atomic.Uint32)
g.channelID = 1 // Assuming that .raw.Connection() never changes.
g.nthRequest = new(atomic.Uint32)
g.setOrResetRPCID()
}

var nGRPCClient = new(atomic.Uint64)

var (
// Ensure that grpcSpannerClient implements spannerClient.
_ spannerClient = (*grpcSpannerClient)(nil)
Expand All @@ -83,6 +121,8 @@ func newGRPCSpannerClient(ctx context.Context, sc *sessionClient, opts ...option
}

g := &grpcSpannerClient{raw: raw, metricsTracerFactory: sc.metricsTracerFactory}
g.prepareRequestIDTrackers()

clientInfo := []string{"gccl", internal.Version}
if sc.userAgent != "" {
agentWithVersion := strings.SplitN(sc.userAgent, "/", 2)
Expand Down Expand Up @@ -118,7 +158,7 @@ func (g *grpcSpannerClient) CreateSession(ctx context.Context, req *spannerpb.Cr
mt := g.newBuiltinMetricsTracer(ctx)
defer recordOperationCompletion(mt)
ctx = context.WithValue(ctx, metricsTracerKey, mt)
resp, err := g.raw.CreateSession(ctx, req, opts...)
resp, err := g.raw.CreateSession(ctx, req, g.optsWithNextRequestID(opts)...)
statusCode, _ := status.FromError(err)
mt.currOp.setStatus(statusCode.Code().String())
return resp, err
Expand All @@ -128,7 +168,7 @@ func (g *grpcSpannerClient) BatchCreateSessions(ctx context.Context, req *spanne
mt := g.newBuiltinMetricsTracer(ctx)
defer recordOperationCompletion(mt)
ctx = context.WithValue(ctx, metricsTracerKey, mt)
resp, err := g.raw.BatchCreateSessions(ctx, req, opts...)
resp, err := g.raw.BatchCreateSessions(ctx, req, g.optsWithNextRequestID(opts)...)
statusCode, _ := status.FromError(err)
mt.currOp.setStatus(statusCode.Code().String())
return resp, err
Expand All @@ -138,45 +178,67 @@ func (g *grpcSpannerClient) GetSession(ctx context.Context, req *spannerpb.GetSe
mt := g.newBuiltinMetricsTracer(ctx)
defer recordOperationCompletion(mt)
ctx = context.WithValue(ctx, metricsTracerKey, mt)
resp, err := g.raw.GetSession(ctx, req, opts...)
resp, err := g.raw.GetSession(ctx, req, g.optsWithNextRequestID(opts)...)
statusCode, _ := status.FromError(err)
mt.currOp.setStatus(statusCode.Code().String())
return resp, err
}

func (g *grpcSpannerClient) ListSessions(ctx context.Context, req *spannerpb.ListSessionsRequest, opts ...gax.CallOption) *vkit.SessionIterator {
return g.raw.ListSessions(ctx, req, opts...)
return g.raw.ListSessions(ctx, req, g.optsWithNextRequestID(opts)...)
}

func (g *grpcSpannerClient) DeleteSession(ctx context.Context, req *spannerpb.DeleteSessionRequest, opts ...gax.CallOption) error {
mt := g.newBuiltinMetricsTracer(ctx)
defer recordOperationCompletion(mt)
ctx = context.WithValue(ctx, metricsTracerKey, mt)
err := g.raw.DeleteSession(ctx, req, opts...)
err := g.raw.DeleteSession(ctx, req, g.optsWithNextRequestID(opts)...)
statusCode, _ := status.FromError(err)
mt.currOp.setStatus(statusCode.Code().String())
return err
}

var randIdForProcess uint32

func init() {
randIdForProcess = rand.New(rand.NewSource(time.Now().UnixNano())).Uint32()
}

const xSpannerRequestIDHeader = "x-spanner-request-id"

// optsWithNextRequestID bundles priors with a new header "x-spanner-request-id"
func (g *grpcSpannerClient) optsWithNextRequestID(priors []gax.CallOption) []gax.CallOption {
// TODO: Decide if each field should be padded and to what width or
// should we just let fields fill up so as to reduce bandwidth?
// Go creates grpc.ClientConn which is presumed to be a channel, so channelID is going to be redundant.
requestID := fmt.Sprintf("%d.%d.%d.%d.%d", randIdForProcess, g.id, g.nextNthRequest(), g.channelID, g.rpcID.Load())
md := metadata.MD{xSpannerRequestIDHeader: []string{requestID}}
return append(priors, gax.WithGRPCOptions(grpc.Header(&md)))
}

func (g *grpcSpannerClient) nextNthRequest() uint32 {
return g.nthRequest.Add(1)
}

func (g *grpcSpannerClient) ExecuteSql(ctx context.Context, req *spannerpb.ExecuteSqlRequest, opts ...gax.CallOption) (*spannerpb.ResultSet, error) {
mt := g.newBuiltinMetricsTracer(ctx)
defer recordOperationCompletion(mt)
ctx = context.WithValue(ctx, metricsTracerKey, mt)
resp, err := g.raw.ExecuteSql(ctx, req, opts...)
resp, err := g.raw.ExecuteSql(ctx, req, g.optsWithNextRequestID(opts)...)
statusCode, _ := status.FromError(err)
mt.currOp.setStatus(statusCode.Code().String())
return resp, err
}

func (g *grpcSpannerClient) ExecuteStreamingSql(ctx context.Context, req *spannerpb.ExecuteSqlRequest, opts ...gax.CallOption) (spannerpb.Spanner_ExecuteStreamingSqlClient, error) {
return g.raw.ExecuteStreamingSql(peer.NewContext(ctx, &peer.Peer{}), req, opts...)
return g.raw.ExecuteStreamingSql(peer.NewContext(ctx, &peer.Peer{}), req, g.optsWithNextRequestID(opts)...)
}

func (g *grpcSpannerClient) ExecuteBatchDml(ctx context.Context, req *spannerpb.ExecuteBatchDmlRequest, opts ...gax.CallOption) (*spannerpb.ExecuteBatchDmlResponse, error) {
mt := g.newBuiltinMetricsTracer(ctx)
defer recordOperationCompletion(mt)
ctx = context.WithValue(ctx, metricsTracerKey, mt)
resp, err := g.raw.ExecuteBatchDml(ctx, req, opts...)
resp, err := g.raw.ExecuteBatchDml(ctx, req, g.optsWithNextRequestID(opts)...)
statusCode, _ := status.FromError(err)
mt.currOp.setStatus(statusCode.Code().String())
return resp, err
Expand All @@ -186,21 +248,21 @@ func (g *grpcSpannerClient) Read(ctx context.Context, req *spannerpb.ReadRequest
mt := g.newBuiltinMetricsTracer(ctx)
defer recordOperationCompletion(mt)
ctx = context.WithValue(ctx, metricsTracerKey, mt)
resp, err := g.raw.Read(ctx, req, opts...)
resp, err := g.raw.Read(ctx, req, g.optsWithNextRequestID(opts)...)
statusCode, _ := status.FromError(err)
mt.currOp.setStatus(statusCode.Code().String())
return resp, err
}

func (g *grpcSpannerClient) StreamingRead(ctx context.Context, req *spannerpb.ReadRequest, opts ...gax.CallOption) (spannerpb.Spanner_StreamingReadClient, error) {
return g.raw.StreamingRead(peer.NewContext(ctx, &peer.Peer{}), req, opts...)
return g.raw.StreamingRead(peer.NewContext(ctx, &peer.Peer{}), req, g.optsWithNextRequestID(opts)...)
}

func (g *grpcSpannerClient) BeginTransaction(ctx context.Context, req *spannerpb.BeginTransactionRequest, opts ...gax.CallOption) (*spannerpb.Transaction, error) {
mt := g.newBuiltinMetricsTracer(ctx)
defer recordOperationCompletion(mt)
ctx = context.WithValue(ctx, metricsTracerKey, mt)
resp, err := g.raw.BeginTransaction(ctx, req, opts...)
resp, err := g.raw.BeginTransaction(ctx, req, g.optsWithNextRequestID(opts)...)
statusCode, _ := status.FromError(err)
mt.currOp.setStatus(statusCode.Code().String())
return resp, err
Expand All @@ -210,7 +272,7 @@ func (g *grpcSpannerClient) Commit(ctx context.Context, req *spannerpb.CommitReq
mt := g.newBuiltinMetricsTracer(ctx)
defer recordOperationCompletion(mt)
ctx = context.WithValue(ctx, metricsTracerKey, mt)
resp, err := g.raw.Commit(ctx, req, opts...)
resp, err := g.raw.Commit(ctx, req, g.optsWithNextRequestID(opts)...)
statusCode, _ := status.FromError(err)
mt.currOp.setStatus(statusCode.Code().String())
return resp, err
Expand All @@ -220,7 +282,7 @@ func (g *grpcSpannerClient) Rollback(ctx context.Context, req *spannerpb.Rollbac
mt := g.newBuiltinMetricsTracer(ctx)
defer recordOperationCompletion(mt)
ctx = context.WithValue(ctx, metricsTracerKey, mt)
err := g.raw.Rollback(ctx, req, opts...)
err := g.raw.Rollback(ctx, req, g.optsWithNextRequestID(opts)...)
statusCode, _ := status.FromError(err)
mt.currOp.setStatus(statusCode.Code().String())
return err
Expand All @@ -230,7 +292,7 @@ func (g *grpcSpannerClient) PartitionQuery(ctx context.Context, req *spannerpb.P
mt := g.newBuiltinMetricsTracer(ctx)
defer recordOperationCompletion(mt)
ctx = context.WithValue(ctx, metricsTracerKey, mt)
resp, err := g.raw.PartitionQuery(ctx, req, opts...)
resp, err := g.raw.PartitionQuery(ctx, req, g.optsWithNextRequestID(opts)...)
statusCode, _ := status.FromError(err)
mt.currOp.setStatus(statusCode.Code().String())
return resp, err
Expand All @@ -240,12 +302,12 @@ func (g *grpcSpannerClient) PartitionRead(ctx context.Context, req *spannerpb.Pa
mt := g.newBuiltinMetricsTracer(ctx)
defer recordOperationCompletion(mt)
ctx = context.WithValue(ctx, metricsTracerKey, mt)
resp, err := g.raw.PartitionRead(ctx, req, opts...)
resp, err := g.raw.PartitionRead(ctx, req, g.optsWithNextRequestID(opts)...)
statusCode, _ := status.FromError(err)
mt.currOp.setStatus(statusCode.Code().String())
return resp, err
}

func (g *grpcSpannerClient) BatchWrite(ctx context.Context, req *spannerpb.BatchWriteRequest, opts ...gax.CallOption) (spannerpb.Spanner_BatchWriteClient, error) {
return g.raw.BatchWrite(peer.NewContext(ctx, &peer.Peer{}), req, opts...)
return g.raw.BatchWrite(peer.NewContext(ctx, &peer.Peer{}), req, g.optsWithNextRequestID(opts)...)
}
2 changes: 2 additions & 0 deletions spanner/pdml.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ func executePdml(ctx context.Context, sh *sessionHandle, req *sppb.ExecuteSqlReq
}

sh.updateLastUseTime()
// TODO: (@odeke-em) retrieve the requestID and increment the RPC number
// then send it along in every call per retry.
resultSet, err := sh.getClient().ExecuteSql(ctx, req, gax.WithGRPCOptions(grpc.Header(&md)))
if getGFELatencyMetricsFlag() && md != nil && sh.session.pool != nil {
err := captureGFELatencyStats(tag.NewContext(ctx, sh.session.pool.tagMap), md, "executePdml_ExecuteSql")
Expand Down
Loading

0 comments on commit 913715a

Please sign in to comment.