Skip to content

Commit

Permalink
refactor(bigquery/storage/managedwriter): introduce send optimizers (#…
Browse files Browse the repository at this point in the history
…7323)

* refactor(bigquery/storage/managedwriter): introduce send optimizers

This PR introduces a new optimizer abstraction for connection objects,
but doesn't wire it in.  The purpose of the optimizers is to leverage
awareness of previous requests to reduce transferred bytes.
  • Loading branch information
shollyman authored Jan 31, 2023
1 parent 0bf80d7 commit c37f9ae
Show file tree
Hide file tree
Showing 2 changed files with 397 additions and 0 deletions.
135 changes: 135 additions & 0 deletions bigquery/storage/managedwriter/send_optimizer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
// Copyright 2023 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package managedwriter

import (
"cloud.google.com/go/bigquery/storage/apiv1/storagepb"
"google.golang.org/protobuf/proto"
)

// optimizeAndSend handles the general task of optimizing AppendRowsRequest messages send to the backend.
//
// The basic premise is that by maintaining awareness of previous sends, individual messages can be made
// more efficient (smaller) by redacting redundant information.
type sendOptimizer interface {
// signalReset is used to signal to the optimizer that the connection is freshly (re)opened.
signalReset()

// optimizeSend handles redactions for a given stream.
optimizeSend(arc storagepb.BigQueryWrite_AppendRowsClient, req *storagepb.AppendRowsRequest) error
}

// passthroughOptimizer is an optimizer that doesn't modify requests.
type passthroughOptimizer struct {
}

func (po *passthroughOptimizer) signalReset() {
// we don't care, just here to satisfy the interface.
}

func (po *passthroughOptimizer) optimizeSend(arc storagepb.BigQueryWrite_AppendRowsClient, req *storagepb.AppendRowsRequest) error {
return arc.Send(req)
}

// simplexOptimizer is used for connections where there's only a single stream's data being transmitted.
//
// The optimizations here are straightforward: the first request on a stream is unmodified, all
// subsequent requests can redact WriteStream, WriterSchema, and TraceID.
//
// TODO: this optimizer doesn't do schema evolution checkes, but relies on existing behavior that triggers reconnect
// on schema change. Revisit this, as it may not be necessary once b/266946486 is resolved.
type simplexOptimizer struct {
haveSent bool
}

func (eo *simplexOptimizer) signalReset() {
eo.haveSent = false
}

func (eo *simplexOptimizer) optimizeSend(arc storagepb.BigQueryWrite_AppendRowsClient, req *storagepb.AppendRowsRequest) error {
var resp error
if eo.haveSent {
// subsequent send, clone and redact.
cp := proto.Clone(req).(*storagepb.AppendRowsRequest)
cp.WriteStream = ""
cp.GetProtoRows().WriterSchema = nil
cp.TraceId = ""
resp = arc.Send(cp)
} else {
// first request, send unmodified.
resp = arc.Send(req)
}
eo.haveSent = resp == nil
return resp
}

// multiplexOptimizer is used for connections where requests for multiple streams are sent on a common connection.
//
// In this case, the optimizations are as follows:
// * We **must** send the WriteStream on all requests.
// * For sequential requests to the same stream, schema can be redacted after the first request.
// * Trace ID can be redacted from all requests after the first.
type multiplexOptimizer struct {
prev *storagepb.AppendRowsRequest
}

func (mo *multiplexOptimizer) signalReset() {
mo.prev = nil
}

func (mo *multiplexOptimizer) optimizeSend(arc storagepb.BigQueryWrite_AppendRowsClient, req *storagepb.AppendRowsRequest) error {
var resp error
// we'll need a copy
cp := proto.Clone(req).(*storagepb.AppendRowsRequest)
if mo.prev != nil {
var swapOnSuccess bool
// Clear trace ID. We use the _presence_ of a previous request for reasoning about TraceID, we don't compare
// it's value.
cp.TraceId = ""
// we have a previous send.
if cp.GetWriteStream() != mo.prev.GetWriteStream() {
// different stream, no further optimization.
swapOnSuccess = true
} else {
// same stream
if !proto.Equal(mo.prev.GetProtoRows().GetWriterSchema().GetProtoDescriptor(), cp.GetProtoRows().GetWriterSchema().GetProtoDescriptor()) {
swapOnSuccess = true
} else {
// the redaction case, where we won't swap.
cp.GetProtoRows().WriterSchema = nil
}
}
resp = arc.Send(cp)
if resp == nil && swapOnSuccess {
cp.GetProtoRows().Rows = nil
cp.MissingValueInterpretations = nil
mo.prev = cp
}
if resp != nil {
mo.prev = nil
}
return resp
}

// no previous trace case.
resp = arc.Send(req)
if resp == nil {
// copy the send as the previous.
cp.GetProtoRows().Rows = nil
cp.MissingValueInterpretations = nil
mo.prev = cp
}
return resp
}
262 changes: 262 additions & 0 deletions bigquery/storage/managedwriter/send_optimizer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,262 @@
// Copyright 2023 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package managedwriter

import (
"io"
"testing"

"cloud.google.com/go/bigquery/storage/apiv1/storagepb"
"cloud.google.com/go/bigquery/storage/managedwriter/testdata"
"github.com/google/go-cmp/cmp"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protodesc"
"google.golang.org/protobuf/testing/protocmp"
)

func TestSendOptimizer(t *testing.T) {

exampleReq := &storagepb.AppendRowsRequest{
WriteStream: "foo",
Rows: &storagepb.AppendRowsRequest_ProtoRows{
ProtoRows: &storagepb.AppendRowsRequest_ProtoData{
Rows: &storagepb.ProtoRows{
SerializedRows: [][]byte{[]byte("row_data")},
},
WriterSchema: &storagepb.ProtoSchema{
ProtoDescriptor: protodesc.ToDescriptorProto((&testdata.SimpleMessageProto2{}).ProtoReflect().Descriptor()),
},
},
},
TraceId: "trace_id",
}

var testCases = []struct {
description string
optimizer sendOptimizer
reqs []*storagepb.AppendRowsRequest
sendResults []error
wantReqs []*storagepb.AppendRowsRequest
}{
{
description: "passthrough-optimizer",
optimizer: &passthroughOptimizer{},
reqs: []*storagepb.AppendRowsRequest{
proto.Clone(exampleReq).(*storagepb.AppendRowsRequest),
proto.Clone(exampleReq).(*storagepb.AppendRowsRequest),
proto.Clone(exampleReq).(*storagepb.AppendRowsRequest),
},
sendResults: []error{
nil,
io.EOF,
io.EOF,
},
wantReqs: []*storagepb.AppendRowsRequest{
proto.Clone(exampleReq).(*storagepb.AppendRowsRequest),
proto.Clone(exampleReq).(*storagepb.AppendRowsRequest),
proto.Clone(exampleReq).(*storagepb.AppendRowsRequest),
},
},
{
description: "simplex no errors",
optimizer: &simplexOptimizer{},
reqs: []*storagepb.AppendRowsRequest{
proto.Clone(exampleReq).(*storagepb.AppendRowsRequest),
proto.Clone(exampleReq).(*storagepb.AppendRowsRequest),
proto.Clone(exampleReq).(*storagepb.AppendRowsRequest),
},
sendResults: []error{
nil,
nil,
nil,
},
wantReqs: func() []*storagepb.AppendRowsRequest {
want := make([]*storagepb.AppendRowsRequest, 3)
// first has no redactions.
want[0] = proto.Clone(exampleReq).(*storagepb.AppendRowsRequest)
req := proto.Clone(want[0]).(*storagepb.AppendRowsRequest)
req.GetProtoRows().WriterSchema = nil
req.TraceId = ""
req.WriteStream = ""
want[1] = req
// previous had errors, so unredacted.
want[2] = req
return want
}(),
},
{
description: "simplex w/partial errors",
optimizer: &simplexOptimizer{},
reqs: []*storagepb.AppendRowsRequest{
proto.Clone(exampleReq).(*storagepb.AppendRowsRequest),
proto.Clone(exampleReq).(*storagepb.AppendRowsRequest),
proto.Clone(exampleReq).(*storagepb.AppendRowsRequest),
},
sendResults: []error{
nil,
io.EOF,
nil,
},
wantReqs: func() []*storagepb.AppendRowsRequest {
want := make([]*storagepb.AppendRowsRequest, 3)
want[0] = proto.Clone(exampleReq).(*storagepb.AppendRowsRequest)
req := proto.Clone(want[0]).(*storagepb.AppendRowsRequest)
req.GetProtoRows().WriterSchema = nil
req.TraceId = ""
req.WriteStream = ""
want[1] = req
want[2] = want[0]
return want
}(),
},
{
description: "multiplex single all errors",
optimizer: &multiplexOptimizer{},
reqs: []*storagepb.AppendRowsRequest{
proto.Clone(exampleReq).(*storagepb.AppendRowsRequest),
proto.Clone(exampleReq).(*storagepb.AppendRowsRequest),
proto.Clone(exampleReq).(*storagepb.AppendRowsRequest),
},
sendResults: []error{
io.EOF,
io.EOF,
io.EOF,
},
wantReqs: []*storagepb.AppendRowsRequest{
proto.Clone(exampleReq).(*storagepb.AppendRowsRequest),
proto.Clone(exampleReq).(*storagepb.AppendRowsRequest),
proto.Clone(exampleReq).(*storagepb.AppendRowsRequest),
},
},
{
description: "multiplex single no errors",
optimizer: &multiplexOptimizer{},
reqs: []*storagepb.AppendRowsRequest{
proto.Clone(exampleReq).(*storagepb.AppendRowsRequest),
proto.Clone(exampleReq).(*storagepb.AppendRowsRequest),
proto.Clone(exampleReq).(*storagepb.AppendRowsRequest),
},
sendResults: []error{
nil,
nil,
nil,
},
wantReqs: func() []*storagepb.AppendRowsRequest {
want := make([]*storagepb.AppendRowsRequest, 3)
want[0] = proto.Clone(exampleReq).(*storagepb.AppendRowsRequest)
req := proto.Clone(want[0]).(*storagepb.AppendRowsRequest)
req.GetProtoRows().WriterSchema = nil
req.TraceId = ""
want[1] = req
want[2] = req
return want
}(),
},
{
description: "multiplex interleave",
optimizer: &multiplexOptimizer{},
reqs: func() []*storagepb.AppendRowsRequest {
reqs := make([]*storagepb.AppendRowsRequest, 10)
reqA := proto.Clone(exampleReq).(*storagepb.AppendRowsRequest)
reqA.WriteStream = "alpha"

reqB := proto.Clone(exampleReq).(*storagepb.AppendRowsRequest)
reqB.WriteStream = "beta"
reqB.GetProtoRows().GetWriterSchema().ProtoDescriptor = protodesc.ToDescriptorProto((&testdata.AllSupportedTypes{}).ProtoReflect().Descriptor())
reqs[0] = reqA
reqs[1] = reqA
reqs[2] = reqB
reqs[3] = reqA
reqs[4] = reqB
reqs[5] = reqB
reqs[6] = reqB
reqs[7] = reqB
reqs[8] = reqA
reqs[9] = reqA

return reqs
}(),
sendResults: []error{
nil,
nil,
nil,
nil,
nil,
io.EOF,
nil,
nil,
nil,
io.EOF,
},
wantReqs: func() []*storagepb.AppendRowsRequest {
want := make([]*storagepb.AppendRowsRequest, 10)

wantReqAFull := proto.Clone(exampleReq).(*storagepb.AppendRowsRequest)
wantReqAFull.WriteStream = "alpha"

wantReqANoTrace := proto.Clone(wantReqAFull).(*storagepb.AppendRowsRequest)
wantReqANoTrace.TraceId = ""

wantReqAOpt := proto.Clone(wantReqAFull).(*storagepb.AppendRowsRequest)
wantReqAOpt.GetProtoRows().WriterSchema = nil
wantReqAOpt.TraceId = ""

wantReqBFull := proto.Clone(exampleReq).(*storagepb.AppendRowsRequest)
wantReqBFull.WriteStream = "beta"
wantReqBFull.GetProtoRows().GetWriterSchema().ProtoDescriptor = protodesc.ToDescriptorProto((&testdata.AllSupportedTypes{}).ProtoReflect().Descriptor())

wantReqBNoTrace := proto.Clone(wantReqBFull).(*storagepb.AppendRowsRequest)
wantReqBNoTrace.TraceId = ""

wantReqBOpt := proto.Clone(wantReqBFull).(*storagepb.AppendRowsRequest)
wantReqBOpt.GetProtoRows().WriterSchema = nil
wantReqBOpt.TraceId = ""

want[0] = wantReqAFull
want[1] = wantReqAOpt
want[2] = wantReqBNoTrace
want[3] = wantReqANoTrace
want[4] = wantReqBNoTrace
want[5] = wantReqBOpt
want[6] = wantReqBFull
want[7] = wantReqBOpt
want[8] = wantReqANoTrace
want[9] = wantReqAOpt

return want
}(),
},
}

for _, tc := range testCases {
testARC := &testAppendRowsClient{}
testARC.sendF = func(req *storagepb.AppendRowsRequest) error {
testARC.requests = append(testARC.requests, proto.Clone(req).(*storagepb.AppendRowsRequest))
respErr := tc.sendResults[0]
tc.sendResults = tc.sendResults[1:]
return respErr
}

for _, req := range tc.reqs {
tc.optimizer.optimizeSend(testARC, req)
}
// now, compare.
for k, wr := range tc.wantReqs {
if diff := cmp.Diff(testARC.requests[k], wr, protocmp.Transform()); diff != "" {
t.Errorf("%s (req %d) mismatch: -got, +want:\n%s", tc.description, k, diff)
}
}
}
}

0 comments on commit c37f9ae

Please sign in to comment.