diff --git a/balancer/balancer.go b/balancer/balancer.go index 25713908072c..f4f9408f3852 100644 --- a/balancer/balancer.go +++ b/balancer/balancer.go @@ -244,7 +244,7 @@ type DoneInfo struct { // ServerLoad is the load received from server. It's usually sent as part of // trailing metadata. // - // The only supported type now is *orca_v1.LoadReport. + // The only supported type now is *orca_v3.LoadReport. ServerLoad interface{} } diff --git a/orca/orca.go b/orca/orca.go index 414f6ed6ef4f..676c66e2829b 100644 --- a/orca/orca.go +++ b/orca/orca.go @@ -34,6 +34,7 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/grpclog" "google.golang.org/grpc/internal" + "google.golang.org/grpc/internal/balancerload" "google.golang.org/grpc/metadata" "google.golang.org/protobuf/proto" @@ -162,3 +163,24 @@ func ToLoadReport(md metadata.MD) (*v3orcapb.OrcaLoadReport, error) { } return ret, nil } + +// loadParser implements the Parser interface defined in `internal/balancerload` +// package. This interface is used by the client stream to parse load reports +// sent by the server in trailer metadata. The parsed loads are then sent to +// balancers via balancer.DoneInfo. +// +// The grpc package cannot directly call orca.ToLoadReport() as that would cause +// an import cycle. Hence this roundabout method is used. +type loadParser struct{} + +func (loadParser) Parse(md metadata.MD) interface{} { + lr, err := ToLoadReport(md) + if err != nil { + logger.Errorf("Parse(%v) failed: %v", err) + } + return lr +} + +func init() { + balancerload.SetParser(loadParser{}) +} diff --git a/xds/internal/balancer/clusterimpl/picker.go b/xds/internal/balancer/clusterimpl/picker.go index 8cce07553082..360fc44c9e4d 100644 --- a/xds/internal/balancer/clusterimpl/picker.go +++ b/xds/internal/balancer/clusterimpl/picker.go @@ -19,7 +19,6 @@ package clusterimpl import ( - orcapb "github.com/cncf/xds/go/xds/data/orca/v3" "google.golang.org/grpc/balancer" "google.golang.org/grpc/codes" "google.golang.org/grpc/connectivity" @@ -27,6 +26,8 @@ import ( "google.golang.org/grpc/status" "google.golang.org/grpc/xds/internal/xdsclient" "google.golang.org/grpc/xds/internal/xdsclient/load" + + v3orcapb "github.com/cncf/xds/go/xds/data/orca/v3" ) // NewRandomWRR is used when calculating drops. It's exported so that tests can @@ -158,7 +159,7 @@ func (d *picker) Pick(info balancer.PickInfo) (balancer.PickResult, error) { } d.loadStore.CallFinished(lIDStr, info.Err) - load, ok := info.ServerLoad.(*orcapb.OrcaLoadReport) + load, ok := info.ServerLoad.(*v3orcapb.OrcaLoadReport) if !ok { return } diff --git a/xds/internal/balancer/orca/orca.go b/xds/internal/balancer/orca/orca.go deleted file mode 100644 index 75b9439d4dba..000000000000 --- a/xds/internal/balancer/orca/orca.go +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Copyright 2019 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -// Package orca implements Open Request Cost Aggregation. -package orca - -import ( - orcapb "github.com/cncf/xds/go/xds/data/orca/v3" - "github.com/golang/protobuf/proto" - "google.golang.org/grpc/grpclog" - "google.golang.org/grpc/internal/balancerload" - "google.golang.org/grpc/metadata" -) - -const mdKey = "X-Endpoint-Load-Metrics-Bin" - -var logger = grpclog.Component("xds") - -// toBytes converts a orca load report into bytes. -func toBytes(r *orcapb.OrcaLoadReport) []byte { - if r == nil { - return nil - } - - b, err := proto.Marshal(r) - if err != nil { - logger.Warningf("orca: failed to marshal load report: %v", err) - return nil - } - return b -} - -// ToMetadata converts a orca load report into grpc metadata. -func ToMetadata(r *orcapb.OrcaLoadReport) metadata.MD { - b := toBytes(r) - if b == nil { - return nil - } - return metadata.Pairs(mdKey, string(b)) -} - -// fromBytes reads load report bytes and converts it to orca. -func fromBytes(b []byte) *orcapb.OrcaLoadReport { - ret := new(orcapb.OrcaLoadReport) - if err := proto.Unmarshal(b, ret); err != nil { - logger.Warningf("orca: failed to unmarshal load report: %v", err) - return nil - } - return ret -} - -// FromMetadata reads load report from metadata and converts it to orca. -// -// It returns nil if report is not found in metadata. -func FromMetadata(md metadata.MD) *orcapb.OrcaLoadReport { - vs := md.Get(mdKey) - if len(vs) == 0 { - return nil - } - return fromBytes([]byte(vs[0])) -} - -type loadParser struct{} - -func (*loadParser) Parse(md metadata.MD) interface{} { - return FromMetadata(md) -} - -func init() { - balancerload.SetParser(&loadParser{}) -} diff --git a/xds/internal/balancer/orca/orca_test.go b/xds/internal/balancer/orca/orca_test.go deleted file mode 100644 index 9979c1a9d107..000000000000 --- a/xds/internal/balancer/orca/orca_test.go +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Copyright 2019 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package orca - -import ( - "strings" - "testing" - - orcapb "github.com/cncf/xds/go/xds/data/orca/v3" - "github.com/golang/protobuf/proto" - "github.com/google/go-cmp/cmp" - "google.golang.org/grpc/internal/grpctest" - "google.golang.org/grpc/metadata" -) - -var ( - testMessage = &orcapb.OrcaLoadReport{ - CpuUtilization: 0.1, - MemUtilization: 0.2, - RequestCost: map[string]float64{"ccc": 3.4}, - Utilization: map[string]float64{"ttt": 0.4}, - } - testBytes, _ = proto.Marshal(testMessage) -) - -type s struct { - grpctest.Tester -} - -func Test(t *testing.T) { - grpctest.RunSubTests(t, s{}) -} - -func (s) TestToMetadata(t *testing.T) { - tests := []struct { - name string - r *orcapb.OrcaLoadReport - want metadata.MD - }{{ - name: "nil", - r: nil, - want: nil, - }, { - name: "valid", - r: testMessage, - want: metadata.MD{ - strings.ToLower(mdKey): []string{string(testBytes)}, - }, - }} - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - if got := ToMetadata(tt.r); !cmp.Equal(got, tt.want) { - t.Errorf("ToMetadata() = %v, want %v", got, tt.want) - } - }) - } -} - -func (s) TestFromMetadata(t *testing.T) { - tests := []struct { - name string - md metadata.MD - want *orcapb.OrcaLoadReport - }{{ - name: "nil", - md: nil, - want: nil, - }, { - name: "valid", - md: metadata.MD{ - strings.ToLower(mdKey): []string{string(testBytes)}, - }, - want: testMessage, - }} - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - if got := FromMetadata(tt.md); !cmp.Equal(got, tt.want, cmp.Comparer(proto.Equal)) { - t.Errorf("FromMetadata() = %v, want %v", got, tt.want) - } - }) - } -}