Skip to content

Commit

Permalink
Tenancy for queries
Browse files Browse the repository at this point in the history
Signed-off-by: Ed Snible <[email protected]>
  • Loading branch information
esnible committed Jul 1, 2022
1 parent 8c2e162 commit 9304764
Show file tree
Hide file tree
Showing 8 changed files with 505 additions and 30 deletions.
2 changes: 1 addition & 1 deletion cmd/collector/app/handler/grpc_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (c *batchConsumer) validateTenant(ctx context.Context) (string, error) {
return "", status.Errorf(codes.PermissionDenied, "missing tenant header")
}

tenants := md[c.tenancyConfig.Header]
tenants := md.Get(c.tenancyConfig.Header)
if len(tenants) < 1 {
return "", status.Errorf(codes.PermissionDenied, "missing tenant header")
} else if len(tenants) > 1 {
Expand Down
20 changes: 18 additions & 2 deletions cmd/query/app/apiv3/grpc_gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,32 @@ import (
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"

"github.com/jaegertracing/jaeger/pkg/config/tenancy"
"github.com/jaegertracing/jaeger/pkg/config/tlscfg"
"github.com/jaegertracing/jaeger/proto-gen/api_v3"
)

// RegisterGRPCGateway registers api_v3 endpoints into provided mux.
func RegisterGRPCGateway(ctx context.Context, logger *zap.Logger, r *mux.Router, basePath string, grpcEndpoint string, grpcTLS tlscfg.Options) error {
return RegisterGRPCGatewayWithTenancy(ctx, logger, r, basePath, grpcEndpoint, grpcTLS,
tenancy.Options{
Enabled: false,
},
)
}

func RegisterGRPCGatewayWithTenancy(ctx context.Context, logger *zap.Logger, r *mux.Router, basePath string, grpcEndpoint string, grpcTLS tlscfg.Options, tenancyOptions tenancy.Options) error {
jsonpb := &runtime.JSONPb{}
grpcGatewayMux := runtime.NewServeMux(

muxOpts := []runtime.ServeMuxOption{
runtime.WithMarshalerOption(runtime.MIMEWildcard, jsonpb),
)
}
tc := tenancy.NewTenancyConfig(&tenancyOptions)
if tenancyOptions.Enabled {
muxOpts = append(muxOpts, runtime.WithMetadata(tc.MetadataAnnotator()))
}

grpcGatewayMux := runtime.NewServeMux(muxOpts...)
var handler http.Handler = grpcGatewayMux
if basePath != "/" {
handler = http.StripPrefix(basePath, grpcGatewayMux)
Expand Down
112 changes: 96 additions & 16 deletions cmd/query/app/apiv3/grpc_gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (

"github.com/jaegertracing/jaeger/cmd/query/app/querysvc"
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/pkg/config/tenancy"
"github.com/jaegertracing/jaeger/pkg/config/tlscfg"
_ "github.com/jaegertracing/jaeger/pkg/gogocodec" // force gogo codec registration
"github.com/jaegertracing/jaeger/proto-gen/api_v3"
Expand All @@ -45,21 +46,15 @@ import (
var testCertKeyLocation = "../../../../pkg/config/tlscfg/testdata/"

func testGRPCGateway(t *testing.T, basePath string, serverTLS tlscfg.Options, clientTLS tlscfg.Options) {
defer serverTLS.Close()
defer clientTLS.Close()
testGRPCGatewayWithTenancy(t, basePath, serverTLS, clientTLS,
tenancy.Options{
Enabled: false,
},
func(*http.Request) {})
}

func setupGRPCGateway(t *testing.T, basePath string, serverTLS tlscfg.Options, clientTLS tlscfg.Options, tenancyOptions tenancy.Options) (*spanstoremocks.Reader, net.Listener, *grpc.Server, context.CancelFunc, *http.Server) {
r := &spanstoremocks.Reader{}
traceID := model.NewTraceID(150, 160)
r.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")).Return(
&model.Trace{
Spans: []*model.Span{
{
TraceID: traceID,
SpanID: model.NewSpanID(180),
OperationName: "foobar",
},
},
}, nil).Once()

q := querysvc.NewQueryService(r, &dependencyStoreMocks.Reader{}, querysvc.QueryServiceOptions{})

Expand All @@ -70,6 +65,11 @@ func testGRPCGateway(t *testing.T, basePath string, serverTLS tlscfg.Options, cl
creds := credentials.NewTLS(config)
serverGRPCOpts = append(serverGRPCOpts, grpc.Creds(creds))
}
if tenancyOptions.Enabled {
tc := tenancy.NewTenancyConfig(&tenancyOptions)
serverGRPCOpts = append(serverGRPCOpts, grpc.StreamInterceptor(
tenancy.NewGuardingStreamInterceptor(tc)))
}
grpcServer := grpc.NewServer(serverGRPCOpts...)
h := &Handler{
QueryService: q,
Expand All @@ -80,13 +80,11 @@ func testGRPCGateway(t *testing.T, basePath string, serverTLS tlscfg.Options, cl
err := grpcServer.Serve(lis)
require.NoError(t, err)
}()
defer grpcServer.Stop()

router := &mux.Router{}
router = router.PathPrefix(basePath).Subrouter()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
err := RegisterGRPCGateway(ctx, zap.NewNop(), router, basePath, lis.Addr().String(), clientTLS)
err := RegisterGRPCGatewayWithTenancy(ctx, zap.NewNop(), router, basePath, lis.Addr().String(), clientTLS, tenancyOptions)
require.NoError(t, err)

httpLis, err := net.Listen("tcp", ":0")
Expand All @@ -98,10 +96,39 @@ func testGRPCGateway(t *testing.T, basePath string, serverTLS tlscfg.Options, cl
err = httpServer.Serve(httpLis)
require.Equal(t, http.ErrServerClosed, err)
}()
return r, httpLis, grpcServer, cancel, httpServer
}

func testGRPCGatewayWithTenancy(t *testing.T, basePath string, serverTLS tlscfg.Options, clientTLS tlscfg.Options,
tenancyOptions tenancy.Options,
setupRequest func(*http.Request),
) {
defer serverTLS.Close()
defer clientTLS.Close()

reader, httpLis, grpcServer, cancel, httpServer := setupGRPCGateway(t, basePath, serverTLS, clientTLS, tenancyOptions)
defer grpcServer.Stop()
defer cancel()
defer httpServer.Shutdown(context.Background())

traceID := model.NewTraceID(150, 160)
reader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")).Return(
&model.Trace{
Spans: []*model.Span{
{
TraceID: traceID,
SpanID: model.NewSpanID(180),
OperationName: "foobar",
},
},
}, nil).Once()

req, err := http.NewRequest("GET", fmt.Sprintf("http://localhost%s%s/api/v3/traces/123", strings.Replace(httpLis.Addr().String(), "[::]", "", 1), basePath), nil)
require.NoError(t, err)
req.Header.Set("Content-Type", "application/json")
setupRequest(req)
response, err := http.DefaultClient.Do(req)
require.NoError(t, err)
buf := bytes.Buffer{}
_, err = buf.ReadFrom(response.Body)
require.NoError(t, err)
Expand Down Expand Up @@ -142,3 +169,56 @@ func TestGRPCGateway_TLS_with_base_path(t *testing.T) {
type envelope struct {
Result json.RawMessage `json:"result"`
}

func TestTenancyGRPCGateway(t *testing.T) {
tenancyOptions := tenancy.Options{
Enabled: true,
}
tc := tenancy.NewTenancyConfig(&tenancyOptions)
testGRPCGatewayWithTenancy(t, "/", tlscfg.Options{}, tlscfg.Options{},
// Configure the gateway to forward tenancy header from HTTP to GRPC
tenancyOptions,
// Add a tenancy header on outbound requests
func(req *http.Request) {
req.Header.Add(tc.Header, "dummy")
})
}

func TestTenancyGRPCRejection(t *testing.T) {
basePath := "/"
tenancyOptions := tenancy.Options{Enabled: true}
reader, httpLis, grpcServer, cancel, httpServer := setupGRPCGateway(t,
basePath, tlscfg.Options{}, tlscfg.Options{},
tenancyOptions)
defer grpcServer.Stop()
defer cancel()
defer httpServer.Shutdown(context.Background())

traceID := model.NewTraceID(150, 160)
reader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")).Return(
&model.Trace{
Spans: []*model.Span{
{
TraceID: traceID,
SpanID: model.NewSpanID(180),
OperationName: "foobar",
},
},
}, nil).Once()

req, err := http.NewRequest("GET", fmt.Sprintf("http://localhost%s%s/api/v3/traces/123", strings.Replace(httpLis.Addr().String(), "[::]", "", 1), basePath), nil)
require.NoError(t, err)
req.Header.Set("Content-Type", "application/json")
// We don't set tenant header
response, err := http.DefaultClient.Do(req)
require.NoError(t, err)
require.Equal(t, http.StatusForbidden, response.StatusCode)

// Try again with tenant header set
tc := tenancy.NewTenancyConfig(&tenancyOptions)
req.Header.Set(tc.Header, "acme")
response, err = http.DefaultClient.Do(req)
require.NoError(t, err)
require.Equal(t, http.StatusOK, response.StatusCode)
// Skip unmarshal of response; it is enough that it succeeded
}
Loading

0 comments on commit 9304764

Please sign in to comment.