diff --git a/cmd/all-in-one/main.go b/cmd/all-in-one/main.go index d6dcf9867bd..c3ceb111d77 100644 --- a/cmd/all-in-one/main.go +++ b/cmd/all-in-one/main.go @@ -46,6 +46,7 @@ import ( "github.com/jaegertracing/jaeger/internal/metrics/jlibadapter" "github.com/jaegertracing/jaeger/pkg/config" "github.com/jaegertracing/jaeger/pkg/metrics" + "github.com/jaegertracing/jaeger/pkg/tenancy" "github.com/jaegertracing/jaeger/pkg/version" metricsPlugin "github.com/jaegertracing/jaeger/plugin/metrics" ss "github.com/jaegertracing/jaeger/plugin/sampling/strategystore" @@ -155,6 +156,8 @@ by default uses only in-memory database.`, logger.Fatal("Failed to configure query service", zap.Error(err)) } + tm := tenancy.NewTenancyManager(&cOpts.GRPC.Tenancy) + // collector c := collectorApp.New(&collectorApp.CollectorParams{ ServiceName: "jaeger-collector", @@ -164,6 +167,7 @@ by default uses only in-memory database.`, StrategyStore: strategyStore, Aggregator: aggregator, HealthCheck: svc.HC(), + TenancyMgr: tm, }) if err := c.Start(cOpts); err != nil { log.Fatal(err) @@ -192,7 +196,7 @@ by default uses only in-memory database.`, querySrv := startQuery( svc, qOpts, qOpts.BuildQueryServiceOptions(storageFactory, logger), spanReader, dependencyReader, metricsQueryService, - metricsFactory, + metricsFactory, tm, ) svc.RunAndThen(func() { @@ -265,10 +269,11 @@ func startQuery( depReader dependencystore.Reader, metricsQueryService querysvc.MetricsQueryService, baseFactory metrics.Factory, + tm *tenancy.TenancyManager, ) *queryApp.Server { spanReader = storageMetrics.NewReadMetricsDecorator(spanReader, baseFactory.Namespace(metrics.NSOptions{Name: "query"})) qs := querysvc.NewQueryService(spanReader, depReader, *queryOpts) - server, err := queryApp.NewServer(svc.Logger, qs, metricsQueryService, qOpts, opentracing.GlobalTracer()) + server, err := queryApp.NewServer(svc.Logger, qs, metricsQueryService, qOpts, tm, opentracing.GlobalTracer()) if err != nil { svc.Logger.Fatal("Could not start jaeger-query service", zap.Error(err)) } diff --git a/cmd/collector/app/collector.go b/cmd/collector/app/collector.go index a81213d49d5..2beff366f01 100644 --- a/cmd/collector/app/collector.go +++ b/cmd/collector/app/collector.go @@ -32,6 +32,7 @@ import ( "github.com/jaegertracing/jaeger/cmd/collector/app/server" "github.com/jaegertracing/jaeger/pkg/healthcheck" "github.com/jaegertracing/jaeger/pkg/metrics" + "github.com/jaegertracing/jaeger/pkg/tenancy" "github.com/jaegertracing/jaeger/storage/spanstore" ) @@ -52,6 +53,7 @@ type Collector struct { hCheck *healthcheck.HealthCheck spanProcessor processor.SpanProcessor spanHandlers *SpanHandlers + tenancyMgr *tenancy.TenancyManager // state, read only hServer *http.Server @@ -72,6 +74,7 @@ type CollectorParams struct { StrategyStore strategystore.StrategyStore Aggregator strategystore.Aggregator HealthCheck *healthcheck.HealthCheck + TenancyMgr *tenancy.TenancyManager } // New constructs a new collector component, ready to be started @@ -84,6 +87,7 @@ func New(params *CollectorParams) *Collector { strategyStore: params.StrategyStore, aggregator: params.Aggregator, hCheck: params.HealthCheck, + tenancyMgr: params.TenancyMgr, } } @@ -94,6 +98,7 @@ func (c *Collector) Start(options *flags.CollectorOptions) error { CollectorOpts: options, Logger: c.logger, MetricsFactory: c.metricsFactory, + TenancyMgr: c.tenancyMgr, } var additionalProcessors []ProcessSpan @@ -152,7 +157,7 @@ func (c *Collector) Start(options *flags.CollectorOptions) error { c.zkServer = zkServer if options.OTLP.Enabled { - otlpReceiver, err := handler.StartOTLPReceiver(options, c.logger, c.spanProcessor) + otlpReceiver, err := handler.StartOTLPReceiver(options, c.logger, c.spanProcessor, c.tenancyMgr) if err != nil { return fmt.Errorf("could not start OTLP receiver: %w", err) } diff --git a/cmd/collector/app/collector_test.go b/cmd/collector/app/collector_test.go index 06dc2fe70cd..cf951f22e23 100644 --- a/cmd/collector/app/collector_test.go +++ b/cmd/collector/app/collector_test.go @@ -30,6 +30,7 @@ import ( "github.com/jaegertracing/jaeger/internal/metricstest" "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/pkg/healthcheck" + "github.com/jaegertracing/jaeger/pkg/tenancy" "github.com/jaegertracing/jaeger/thrift-gen/sampling" ) @@ -53,6 +54,7 @@ func TestNewCollector(t *testing.T) { baseMetrics := metricstest.NewFactory(time.Hour) spanWriter := &fakeSpanWriter{} strategyStore := &mockStrategyStore{} + tm := &tenancy.TenancyManager{} c := New(&CollectorParams{ ServiceName: "collector", @@ -61,6 +63,7 @@ func TestNewCollector(t *testing.T) { SpanWriter: spanWriter, StrategyStore: strategyStore, HealthCheck: hc, + TenancyMgr: tm, }) collectorOpts := optionsForEphemeralPorts() @@ -77,6 +80,7 @@ func TestCollector_StartErrors(t *testing.T) { baseMetrics := metricstest.NewFactory(time.Hour) spanWriter := &fakeSpanWriter{} strategyStore := &mockStrategyStore{} + tm := &tenancy.TenancyManager{} c := New(&CollectorParams{ ServiceName: "collector", @@ -85,6 +89,7 @@ func TestCollector_StartErrors(t *testing.T) { SpanWriter: spanWriter, StrategyStore: strategyStore, HealthCheck: hc, + TenancyMgr: tm, }) err := c.Start(options) require.Error(t, err) @@ -130,6 +135,7 @@ func TestCollector_PublishOpts(t *testing.T) { metricsFactory := fork.New("internal", forkFactory, baseMetrics) spanWriter := &fakeSpanWriter{} strategyStore := &mockStrategyStore{} + tm := &tenancy.TenancyManager{} c := New(&CollectorParams{ ServiceName: "collector", @@ -138,6 +144,7 @@ func TestCollector_PublishOpts(t *testing.T) { SpanWriter: spanWriter, StrategyStore: strategyStore, HealthCheck: hc, + TenancyMgr: tm, }) collectorOpts := optionsForEphemeralPorts() collectorOpts.NumWorkers = 24 @@ -164,6 +171,7 @@ func TestAggregator(t *testing.T) { spanWriter := &fakeSpanWriter{} strategyStore := &mockStrategyStore{} agg := &mockAggregator{} + tm := &tenancy.TenancyManager{} c := New(&CollectorParams{ ServiceName: "collector", @@ -173,6 +181,7 @@ func TestAggregator(t *testing.T) { StrategyStore: strategyStore, HealthCheck: hc, Aggregator: agg, + TenancyMgr: tm, }) collectorOpts := optionsForEphemeralPorts() collectorOpts.NumWorkers = 10 diff --git a/cmd/collector/app/flags/flags.go b/cmd/collector/app/flags/flags.go index 05f5a190151..bc614348fbe 100644 --- a/cmd/collector/app/flags/flags.go +++ b/cmd/collector/app/flags/flags.go @@ -24,8 +24,8 @@ import ( "go.uber.org/zap" "github.com/jaegertracing/jaeger/cmd/flags" - "github.com/jaegertracing/jaeger/pkg/config/tenancy" "github.com/jaegertracing/jaeger/pkg/config/tlscfg" + "github.com/jaegertracing/jaeger/pkg/tenancy" "github.com/jaegertracing/jaeger/ports" ) diff --git a/cmd/collector/app/handler/grpc_handler.go b/cmd/collector/app/handler/grpc_handler.go index 8028ea7bdd2..1c6d9e79f08 100644 --- a/cmd/collector/app/handler/grpc_handler.go +++ b/cmd/collector/app/handler/grpc_handler.go @@ -25,7 +25,7 @@ import ( "github.com/jaegertracing/jaeger/cmd/collector/app/processor" "github.com/jaegertracing/jaeger/model" - "github.com/jaegertracing/jaeger/pkg/config/tenancy" + "github.com/jaegertracing/jaeger/pkg/tenancy" "github.com/jaegertracing/jaeger/proto-gen/api_v2" ) @@ -36,14 +36,14 @@ type GRPCHandler struct { } // NewGRPCHandler registers routes for this handler on the given router. -func NewGRPCHandler(logger *zap.Logger, spanProcessor processor.SpanProcessor, tenancyConfig *tenancy.TenancyConfig) *GRPCHandler { +func NewGRPCHandler(logger *zap.Logger, spanProcessor processor.SpanProcessor, tenancyMgr *tenancy.TenancyManager) *GRPCHandler { return &GRPCHandler{ logger: logger, batchConsumer: newBatchConsumer(logger, spanProcessor, processor.GRPCTransport, processor.ProtoSpanFormat, - tenancyConfig), + tenancyMgr), } } @@ -58,13 +58,10 @@ type batchConsumer struct { logger *zap.Logger spanProcessor processor.SpanProcessor spanOptions processor.SpansOptions - tenancyConfig tenancy.TenancyConfig + tenancyMgr *tenancy.TenancyManager } -func newBatchConsumer(logger *zap.Logger, spanProcessor processor.SpanProcessor, transport processor.InboundTransport, spanFormat processor.SpanFormat, tenancyConfig *tenancy.TenancyConfig) batchConsumer { - if tenancyConfig == nil { - tenancyConfig = &tenancy.TenancyConfig{} - } +func newBatchConsumer(logger *zap.Logger, spanProcessor processor.SpanProcessor, transport processor.InboundTransport, spanFormat processor.SpanFormat, tenancyMgr *tenancy.TenancyManager) batchConsumer { return batchConsumer{ logger: logger, spanProcessor: spanProcessor, @@ -72,7 +69,7 @@ func newBatchConsumer(logger *zap.Logger, spanProcessor processor.SpanProcessor, InboundTransport: transport, SpanFormat: spanFormat, }, - tenancyConfig: *tenancyConfig, + tenancyMgr: tenancyMgr, } } @@ -104,7 +101,7 @@ func (c *batchConsumer) consume(ctx context.Context, batch *model.Batch) error { } func (c *batchConsumer) validateTenant(ctx context.Context) (string, error) { - if !c.tenancyConfig.Enabled { + if !c.tenancyMgr.Enabled { return "", nil } @@ -113,14 +110,14 @@ func (c *batchConsumer) validateTenant(ctx context.Context) (string, error) { return "", status.Errorf(codes.PermissionDenied, "missing tenant header") } - tenants := md[c.tenancyConfig.Header] + tenants := md.Get(c.tenancyMgr.Header) if len(tenants) < 1 { return "", status.Errorf(codes.PermissionDenied, "missing tenant header") } else if len(tenants) > 1 { return "", status.Errorf(codes.PermissionDenied, "extra tenant header") } - if !c.tenancyConfig.Valid(tenants[0]) { + if !c.tenancyMgr.Valid(tenants[0]) { return "", status.Errorf(codes.PermissionDenied, "unknown tenant") } diff --git a/cmd/collector/app/handler/grpc_handler_test.go b/cmd/collector/app/handler/grpc_handler_test.go index 9628aeba93b..171835e578c 100644 --- a/cmd/collector/app/handler/grpc_handler_test.go +++ b/cmd/collector/app/handler/grpc_handler_test.go @@ -30,7 +30,7 @@ import ( "github.com/jaegertracing/jaeger/cmd/collector/app/processor" "github.com/jaegertracing/jaeger/model" - "github.com/jaegertracing/jaeger/pkg/config/tenancy" + "github.com/jaegertracing/jaeger/pkg/tenancy" "github.com/jaegertracing/jaeger/pkg/testutils" "github.com/jaegertracing/jaeger/proto-gen/api_v2" ) @@ -98,7 +98,7 @@ func newClient(t *testing.T, addr net.Addr) (api_v2.CollectorServiceClient, *grp func TestPostSpans(t *testing.T) { processor := &mockSpanProcessor{} server, addr := initializeGRPCTestServer(t, func(s *grpc.Server) { - handler := NewGRPCHandler(zap.NewNop(), processor, &tenancy.TenancyConfig{}) + handler := NewGRPCHandler(zap.NewNop(), processor, &tenancy.TenancyManager{}) api_v2.RegisterCollectorServiceServer(s, handler) }) defer server.Stop() @@ -133,7 +133,7 @@ func TestPostSpans(t *testing.T) { func TestGRPCCompressionEnabled(t *testing.T) { processor := &mockSpanProcessor{} server, addr := initializeGRPCTestServer(t, func(s *grpc.Server) { - handler := NewGRPCHandler(zap.NewNop(), processor, &tenancy.TenancyConfig{}) + handler := NewGRPCHandler(zap.NewNop(), processor, &tenancy.TenancyManager{}) api_v2.RegisterCollectorServiceServer(s, handler) }) defer server.Stop() @@ -171,7 +171,7 @@ func TestPostSpansWithError(t *testing.T) { processor := &mockSpanProcessor{expectedError: test.processorError} logger, logBuf := testutils.NewLogger() server, addr := initializeGRPCTestServer(t, func(s *grpc.Server) { - handler := NewGRPCHandler(logger, processor, &tenancy.TenancyConfig{}) + handler := NewGRPCHandler(logger, processor, &tenancy.TenancyManager{}) api_v2.RegisterCollectorServiceServer(s, handler) }) defer server.Stop() @@ -210,7 +210,7 @@ func TestPostTenantedSpans(t *testing.T) { processor := &mockSpanProcessor{} server, addr := initializeGRPCTestServer(t, func(s *grpc.Server) { handler := NewGRPCHandler(zap.NewNop(), processor, - tenancy.NewTenancyConfig(&tenancy.Options{ + tenancy.NewTenancyManager(&tenancy.Options{ Enabled: true, Header: tenantHeader, Tenants: []string{dummyTenant}, @@ -346,7 +346,7 @@ func TestGetTenant(t *testing.T) { processor := &mockSpanProcessor{} handler := NewGRPCHandler(zap.NewNop(), processor, - tenancy.NewTenancyConfig(&tenancy.Options{ + tenancy.NewTenancyManager(&tenancy.Options{ Enabled: true, Header: tenantHeader, Tenants: validTenants, diff --git a/cmd/collector/app/handler/otlp_receiver.go b/cmd/collector/app/handler/otlp_receiver.go index 8c714b52042..678fdd78b6d 100644 --- a/cmd/collector/app/handler/otlp_receiver.go +++ b/cmd/collector/app/handler/otlp_receiver.go @@ -34,17 +34,19 @@ import ( "github.com/jaegertracing/jaeger/cmd/collector/app/processor" "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/pkg/config/tlscfg" + "github.com/jaegertracing/jaeger/pkg/tenancy" ) var _ component.Host = (*otelHost)(nil) // API check // StartOTLPReceiver starts OpenTelemetry OTLP receiver listening on gRPC and HTTP ports. -func StartOTLPReceiver(options *flags.CollectorOptions, logger *zap.Logger, spanProcessor processor.SpanProcessor) (component.TracesReceiver, error) { +func StartOTLPReceiver(options *flags.CollectorOptions, logger *zap.Logger, spanProcessor processor.SpanProcessor, tm *tenancy.TenancyManager) (component.TracesReceiver, error) { otlpFactory := otlpreceiver.NewFactory() return startOTLPReceiver( options, logger, spanProcessor, + tm, otlpFactory, consumer.NewTraces, otlpFactory.CreateTracesReceiver, @@ -58,6 +60,7 @@ func startOTLPReceiver( options *flags.CollectorOptions, logger *zap.Logger, spanProcessor processor.SpanProcessor, + tm *tenancy.TenancyManager, // from here: params that can be mocked in tests otlpFactory component.ReceiverFactory, newTraces func(consume consumer.ConsumeTracesFunc, options ...consumer.Option) (consumer.Traces, error), @@ -74,7 +77,7 @@ func startOTLPReceiver( }, } - otlpConsumer := newConsumerDelegate(logger, spanProcessor) + otlpConsumer := newConsumerDelegate(logger, spanProcessor, tm) // the following two constructors never return errors given non-nil arguments, so we ignore errors nextConsumer, err := newTraces(otlpConsumer.consume) if err != nil { @@ -137,13 +140,13 @@ func applyTLSSettings(opts *tlscfg.Options) *configtls.TLSServerSetting { } } -func newConsumerDelegate(logger *zap.Logger, spanProcessor processor.SpanProcessor) *consumerDelegate { +func newConsumerDelegate(logger *zap.Logger, spanProcessor processor.SpanProcessor, tm *tenancy.TenancyManager) *consumerDelegate { return &consumerDelegate{ batchConsumer: newBatchConsumer(logger, spanProcessor, processor.UnknownTransport, // could be gRPC or HTTP processor.OTLPSpanFormat, - nil), + tm), protoFromTraces: otlp2jaeger.ProtoFromTraces, } } diff --git a/cmd/collector/app/handler/otlp_receiver_test.go b/cmd/collector/app/handler/otlp_receiver_test.go index 83f17e412e1..b05b5ef5d0b 100644 --- a/cmd/collector/app/handler/otlp_receiver_test.go +++ b/cmd/collector/app/handler/otlp_receiver_test.go @@ -31,6 +31,7 @@ import ( "github.com/jaegertracing/jaeger/cmd/collector/app/flags" "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/pkg/config/tlscfg" + "github.com/jaegertracing/jaeger/pkg/tenancy" "github.com/jaegertracing/jaeger/pkg/testutils" ) @@ -48,7 +49,8 @@ func optionsWithPorts(port string) *flags.CollectorOptions { func TestStartOtlpReceiver(t *testing.T) { spanProcessor := &mockSpanProcessor{} logger, _ := testutils.NewLogger() - rec, err := StartOTLPReceiver(optionsWithPorts(":0"), logger, spanProcessor) + tm := &tenancy.TenancyManager{} + rec, err := StartOTLPReceiver(optionsWithPorts(":0"), logger, spanProcessor, tm) require.NoError(t, err) defer func() { assert.NoError(t, rec.Shutdown(context.Background())) @@ -79,7 +81,7 @@ func TestConsumerDelegate(t *testing.T) { t.Run(test.expectLog, func(t *testing.T) { logger, logBuf := testutils.NewLogger() spanProcessor := &mockSpanProcessor{expectedError: test.expectErr} - consumer := newConsumerDelegate(logger, spanProcessor) + consumer := newConsumerDelegate(logger, spanProcessor, &tenancy.TenancyManager{}) err := consumer.consume(context.Background(), makeTracesOneSpan()) @@ -98,7 +100,8 @@ func TestStartOtlpReceiver_Error(t *testing.T) { spanProcessor := &mockSpanProcessor{} logger, _ := testutils.NewLogger() opts := optionsWithPorts(":-1") - _, err := StartOTLPReceiver(opts, logger, spanProcessor) + tm := &tenancy.TenancyManager{} + _, err := StartOTLPReceiver(opts, logger, spanProcessor, tm) require.Error(t, err) assert.Contains(t, err.Error(), "could not start the OTLP receiver") @@ -106,7 +109,7 @@ func TestStartOtlpReceiver_Error(t *testing.T) { return nil, errors.New("mock error") } f := otlpreceiver.NewFactory() - _, err = startOTLPReceiver(opts, logger, spanProcessor, f, newTraces, f.CreateTracesReceiver) + _, err = startOTLPReceiver(opts, logger, spanProcessor, &tenancy.TenancyManager{}, f, newTraces, f.CreateTracesReceiver) require.Error(t, err) assert.Contains(t, err.Error(), "could not create the OTLP consumer") @@ -115,7 +118,7 @@ func TestStartOtlpReceiver_Error(t *testing.T) { ) (component.TracesReceiver, error) { return nil, errors.New("mock error") } - _, err = startOTLPReceiver(opts, logger, spanProcessor, f, consumer.NewTraces, createTracesReceiver) + _, err = startOTLPReceiver(opts, logger, spanProcessor, &tenancy.TenancyManager{}, f, consumer.NewTraces, createTracesReceiver) require.Error(t, err) assert.Contains(t, err.Error(), "could not create the OTLP receiver") } diff --git a/cmd/collector/app/server/grpc_test.go b/cmd/collector/app/server/grpc_test.go index 34ab8d0e460..bdf722a8206 100644 --- a/cmd/collector/app/server/grpc_test.go +++ b/cmd/collector/app/server/grpc_test.go @@ -30,8 +30,8 @@ import ( "github.com/jaegertracing/jaeger/cmd/collector/app/handler" "github.com/jaegertracing/jaeger/internal/grpctest" - "github.com/jaegertracing/jaeger/pkg/config/tenancy" "github.com/jaegertracing/jaeger/pkg/config/tlscfg" + "github.com/jaegertracing/jaeger/pkg/tenancy" "github.com/jaegertracing/jaeger/proto-gen/api_v2" ) @@ -40,7 +40,7 @@ func TestFailToListen(t *testing.T) { logger, _ := zap.NewDevelopment() server, err := StartGRPCServer(&GRPCServerParams{ HostPort: ":-1", - Handler: handler.NewGRPCHandler(logger, &mockSpanProcessor{}, &tenancy.TenancyConfig{}), + Handler: handler.NewGRPCHandler(logger, &mockSpanProcessor{}, &tenancy.TenancyManager{}), SamplingStore: &mockSamplingStore{}, Logger: logger, }) @@ -57,7 +57,7 @@ func TestFailServe(t *testing.T) { logger := zap.New(core) serveGRPC(grpc.NewServer(), lis, &GRPCServerParams{ - Handler: handler.NewGRPCHandler(logger, &mockSpanProcessor{}, &tenancy.TenancyConfig{}), + Handler: handler.NewGRPCHandler(logger, &mockSpanProcessor{}, &tenancy.TenancyManager{}), SamplingStore: &mockSamplingStore{}, Logger: logger, OnError: func(e error) { @@ -72,7 +72,7 @@ func TestFailServe(t *testing.T) { func TestSpanCollector(t *testing.T) { logger, _ := zap.NewDevelopment() params := &GRPCServerParams{ - Handler: handler.NewGRPCHandler(logger, &mockSpanProcessor{}, &tenancy.TenancyConfig{}), + Handler: handler.NewGRPCHandler(logger, &mockSpanProcessor{}, &tenancy.TenancyManager{}), SamplingStore: &mockSamplingStore{}, Logger: logger, MaxReceiveMessageLength: 1024 * 1024, @@ -97,7 +97,7 @@ func TestSpanCollector(t *testing.T) { func TestCollectorStartWithTLS(t *testing.T) { logger, _ := zap.NewDevelopment() params := &GRPCServerParams{ - Handler: handler.NewGRPCHandler(logger, &mockSpanProcessor{}, &tenancy.TenancyConfig{}), + Handler: handler.NewGRPCHandler(logger, &mockSpanProcessor{}, &tenancy.TenancyManager{}), SamplingStore: &mockSamplingStore{}, Logger: logger, TLSConfig: tlscfg.Options{ @@ -116,7 +116,7 @@ func TestCollectorStartWithTLS(t *testing.T) { func TestCollectorReflection(t *testing.T) { logger, _ := zap.NewDevelopment() params := &GRPCServerParams{ - Handler: handler.NewGRPCHandler(logger, &mockSpanProcessor{}, &tenancy.TenancyConfig{}), + Handler: handler.NewGRPCHandler(logger, &mockSpanProcessor{}, &tenancy.TenancyManager{}), SamplingStore: &mockSamplingStore{}, Logger: logger, } diff --git a/cmd/collector/app/span_handler_builder.go b/cmd/collector/app/span_handler_builder.go index 7a314e80b95..dd5dd311702 100644 --- a/cmd/collector/app/span_handler_builder.go +++ b/cmd/collector/app/span_handler_builder.go @@ -25,8 +25,8 @@ import ( "github.com/jaegertracing/jaeger/cmd/collector/app/processor" zs "github.com/jaegertracing/jaeger/cmd/collector/app/sanitizer/zipkin" "github.com/jaegertracing/jaeger/model" - "github.com/jaegertracing/jaeger/pkg/config/tenancy" "github.com/jaegertracing/jaeger/pkg/metrics" + "github.com/jaegertracing/jaeger/pkg/tenancy" "github.com/jaegertracing/jaeger/storage/spanstore" ) @@ -36,6 +36,7 @@ type SpanHandlerBuilder struct { CollectorOpts *flags.CollectorOptions Logger *zap.Logger MetricsFactory metrics.Factory + TenancyMgr *tenancy.TenancyManager } // SpanHandlers holds instances to the span handlers built by the SpanHandlerBuilder @@ -76,7 +77,7 @@ func (b *SpanHandlerBuilder) BuildHandlers(spanProcessor processor.SpanProcessor zs.NewChainedSanitizer(zs.NewStandardSanitizers()...), ), handler.NewJaegerSpanHandler(b.Logger, spanProcessor), - handler.NewGRPCHandler(b.Logger, spanProcessor, tenancy.NewTenancyConfig(&b.CollectorOpts.GRPC.Tenancy)), + handler.NewGRPCHandler(b.Logger, spanProcessor, b.TenancyMgr), } } diff --git a/cmd/collector/app/span_handler_builder_test.go b/cmd/collector/app/span_handler_builder_test.go index ec02e0e4503..3f747f83372 100644 --- a/cmd/collector/app/span_handler_builder_test.go +++ b/cmd/collector/app/span_handler_builder_test.go @@ -26,6 +26,7 @@ import ( cmdFlags "github.com/jaegertracing/jaeger/cmd/flags" "github.com/jaegertracing/jaeger/pkg/config" "github.com/jaegertracing/jaeger/pkg/metrics" + "github.com/jaegertracing/jaeger/pkg/tenancy" "github.com/jaegertracing/jaeger/plugin/storage/memory" ) @@ -41,6 +42,7 @@ func TestNewSpanHandlerBuilder(t *testing.T) { builder := &SpanHandlerBuilder{ SpanWriter: spanWriter, CollectorOpts: cOpts, + TenancyMgr: &tenancy.TenancyManager{}, } assert.NotNil(t, builder.logger()) assert.NotNil(t, builder.metricsFactory()) @@ -50,6 +52,7 @@ func TestNewSpanHandlerBuilder(t *testing.T) { CollectorOpts: cOpts, Logger: zap.NewNop(), MetricsFactory: metrics.NullFactory, + TenancyMgr: &tenancy.TenancyManager{}, } spanProcessor := builder.BuildSpanProcessor() diff --git a/cmd/collector/app/span_processor.go b/cmd/collector/app/span_processor.go index 00f73e44548..5e9ff4acafd 100644 --- a/cmd/collector/app/span_processor.go +++ b/cmd/collector/app/span_processor.go @@ -27,7 +27,7 @@ import ( "github.com/jaegertracing/jaeger/cmd/collector/app/sanitizer" "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/pkg/queue" - "github.com/jaegertracing/jaeger/storage" + "github.com/jaegertracing/jaeger/pkg/tenancy" "github.com/jaegertracing/jaeger/storage/spanstore" ) @@ -156,7 +156,7 @@ func (sp *spanProcessor) saveSpan(span *model.Span, tenant string) { // Since we save spans asynchronously from receiving them, we cannot reuse // the inbound Context, as it may be cancelled by the time we reach this point, // so we need to start a new Context. - ctx := storage.WithTenant(context.Background(), tenant) + ctx := tenancy.WithTenant(context.Background(), tenant) if err := sp.spanWriter.WriteSpan(ctx, span); err != nil { sp.logger.Error("Failed to save span", zap.Error(err)) sp.metrics.SavedErrBySvc.ReportServiceNameForSpan(span) diff --git a/cmd/collector/app/span_processor_test.go b/cmd/collector/app/span_processor_test.go index 7b1ff52c8a7..864baf47993 100644 --- a/cmd/collector/app/span_processor_test.go +++ b/cmd/collector/app/span_processor_test.go @@ -34,8 +34,8 @@ import ( "github.com/jaegertracing/jaeger/internal/metricstest" "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/pkg/metrics" + "github.com/jaegertracing/jaeger/pkg/tenancy" "github.com/jaegertracing/jaeger/pkg/testutils" - "github.com/jaegertracing/jaeger/storage" "github.com/jaegertracing/jaeger/thrift-gen/jaeger" zc "github.com/jaegertracing/jaeger/thrift-gen/zipkincore" ) @@ -184,7 +184,7 @@ func (n *fakeSpanWriter) WriteSpan(ctx context.Context, span *model.Span) error n.tenants = make(map[string]bool) } - n.tenants[storage.GetTenant(ctx)] = true + n.tenants[tenancy.GetTenant(ctx)] = true return n.err } diff --git a/cmd/collector/main.go b/cmd/collector/main.go index c295fc57fde..f92c9482689 100644 --- a/cmd/collector/main.go +++ b/cmd/collector/main.go @@ -36,6 +36,7 @@ import ( "github.com/jaegertracing/jaeger/internal/metrics/fork" "github.com/jaegertracing/jaeger/pkg/config" "github.com/jaegertracing/jaeger/pkg/metrics" + "github.com/jaegertracing/jaeger/pkg/tenancy" "github.com/jaegertracing/jaeger/pkg/version" ss "github.com/jaegertracing/jaeger/plugin/sampling/strategystore" "github.com/jaegertracing/jaeger/plugin/storage" @@ -98,6 +99,12 @@ func main() { if err != nil { logger.Fatal("Failed to create sampling strategy store", zap.Error(err)) } + collectorOpts, err := new(flags.CollectorOptions).InitFromViper(v, logger) + if err != nil { + logger.Fatal("Failed to initialize collector", zap.Error(err)) + } + tm := tenancy.NewTenancyManager(&collectorOpts.GRPC.Tenancy) + collector := app.New(&app.CollectorParams{ ServiceName: serviceName, Logger: logger, @@ -106,11 +113,8 @@ func main() { StrategyStore: strategyStore, Aggregator: aggregator, HealthCheck: svc.HC(), + TenancyMgr: tm, }) - collectorOpts, err := new(flags.CollectorOptions).InitFromViper(v, logger) - if err != nil { - logger.Fatal("Failed to initialize collector", zap.Error(err)) - } // Start all Collector services if err := collector.Start(collectorOpts); err != nil { logger.Fatal("Failed to start collector", zap.Error(err)) diff --git a/cmd/query/app/apiv3/grpc_gateway.go b/cmd/query/app/apiv3/grpc_gateway.go index 1709ed626e3..f563f7158e9 100644 --- a/cmd/query/app/apiv3/grpc_gateway.go +++ b/cmd/query/app/apiv3/grpc_gateway.go @@ -26,15 +26,22 @@ import ( "google.golang.org/grpc/credentials/insecure" "github.com/jaegertracing/jaeger/pkg/config/tlscfg" + "github.com/jaegertracing/jaeger/pkg/tenancy" "github.com/jaegertracing/jaeger/proto-gen/api_v3" ) // RegisterGRPCGateway registers api_v3 endpoints into provided mux. -func RegisterGRPCGateway(ctx context.Context, logger *zap.Logger, r *mux.Router, basePath string, grpcEndpoint string, grpcTLS tlscfg.Options) error { +func RegisterGRPCGateway(ctx context.Context, logger *zap.Logger, r *mux.Router, basePath string, grpcEndpoint string, grpcTLS tlscfg.Options, tm *tenancy.TenancyManager) error { jsonpb := &runtime.JSONPb{} - grpcGatewayMux := runtime.NewServeMux( + + muxOpts := []runtime.ServeMuxOption{ runtime.WithMarshalerOption(runtime.MIMEWildcard, jsonpb), - ) + } + if tm.Enabled { + muxOpts = append(muxOpts, runtime.WithMetadata(tm.MetadataAnnotator())) + } + + grpcGatewayMux := runtime.NewServeMux(muxOpts...) var handler http.Handler = grpcGatewayMux if basePath != "/" { handler = http.StripPrefix(basePath, grpcGatewayMux) diff --git a/cmd/query/app/apiv3/grpc_gateway_test.go b/cmd/query/app/apiv3/grpc_gateway_test.go index 0557ab40660..92b9ac5a4cf 100644 --- a/cmd/query/app/apiv3/grpc_gateway_test.go +++ b/cmd/query/app/apiv3/grpc_gateway_test.go @@ -37,6 +37,7 @@ import ( "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/pkg/config/tlscfg" _ "github.com/jaegertracing/jaeger/pkg/gogocodec" // force gogo codec registration + "github.com/jaegertracing/jaeger/pkg/tenancy" "github.com/jaegertracing/jaeger/proto-gen/api_v3" dependencyStoreMocks "github.com/jaegertracing/jaeger/storage/dependencystore/mocks" spanstoremocks "github.com/jaegertracing/jaeger/storage/spanstore/mocks" @@ -45,21 +46,15 @@ import ( var testCertKeyLocation = "../../../../pkg/config/tlscfg/testdata/" func testGRPCGateway(t *testing.T, basePath string, serverTLS tlscfg.Options, clientTLS tlscfg.Options) { - defer serverTLS.Close() - defer clientTLS.Close() + testGRPCGatewayWithTenancy(t, basePath, serverTLS, clientTLS, + tenancy.Options{ + Enabled: false, + }, + func(*http.Request) {}) +} +func setupGRPCGateway(t *testing.T, basePath string, serverTLS tlscfg.Options, clientTLS tlscfg.Options, tenancyOptions tenancy.Options) (*spanstoremocks.Reader, net.Listener, *grpc.Server, context.CancelFunc, *http.Server) { r := &spanstoremocks.Reader{} - traceID := model.NewTraceID(150, 160) - r.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")).Return( - &model.Trace{ - Spans: []*model.Span{ - { - TraceID: traceID, - SpanID: model.NewSpanID(180), - OperationName: "foobar", - }, - }, - }, nil).Once() q := querysvc.NewQueryService(r, &dependencyStoreMocks.Reader{}, querysvc.QueryServiceOptions{}) @@ -70,6 +65,13 @@ func testGRPCGateway(t *testing.T, basePath string, serverTLS tlscfg.Options, cl creds := credentials.NewTLS(config) serverGRPCOpts = append(serverGRPCOpts, grpc.Creds(creds)) } + if tenancyOptions.Enabled { + tm := tenancy.NewTenancyManager(&tenancyOptions) + serverGRPCOpts = append(serverGRPCOpts, + grpc.StreamInterceptor(tenancy.NewGuardingStreamInterceptor(tm)), + grpc.UnaryInterceptor(tenancy.NewGuardingUnaryInterceptor(tm)), + ) + } grpcServer := grpc.NewServer(serverGRPCOpts...) h := &Handler{ QueryService: q, @@ -80,13 +82,11 @@ func testGRPCGateway(t *testing.T, basePath string, serverTLS tlscfg.Options, cl err := grpcServer.Serve(lis) require.NoError(t, err) }() - defer grpcServer.Stop() router := &mux.Router{} router = router.PathPrefix(basePath).Subrouter() ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - err := RegisterGRPCGateway(ctx, zap.NewNop(), router, basePath, lis.Addr().String(), clientTLS) + err := RegisterGRPCGateway(ctx, zap.NewNop(), router, basePath, lis.Addr().String(), clientTLS, tenancy.NewTenancyManager(&tenancyOptions)) require.NoError(t, err) httpLis, err := net.Listen("tcp", ":0") @@ -98,10 +98,39 @@ func testGRPCGateway(t *testing.T, basePath string, serverTLS tlscfg.Options, cl err = httpServer.Serve(httpLis) require.Equal(t, http.ErrServerClosed, err) }() + return r, httpLis, grpcServer, cancel, httpServer +} + +func testGRPCGatewayWithTenancy(t *testing.T, basePath string, serverTLS tlscfg.Options, clientTLS tlscfg.Options, + tenancyOptions tenancy.Options, + setupRequest func(*http.Request), +) { + defer serverTLS.Close() + defer clientTLS.Close() + + reader, httpLis, grpcServer, cancel, httpServer := setupGRPCGateway(t, basePath, serverTLS, clientTLS, tenancyOptions) + defer grpcServer.Stop() + defer cancel() defer httpServer.Shutdown(context.Background()) + + traceID := model.NewTraceID(150, 160) + reader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")).Return( + &model.Trace{ + Spans: []*model.Span{ + { + TraceID: traceID, + SpanID: model.NewSpanID(180), + OperationName: "foobar", + }, + }, + }, nil).Once() + req, err := http.NewRequest("GET", fmt.Sprintf("http://localhost%s%s/api/v3/traces/123", strings.Replace(httpLis.Addr().String(), "[::]", "", 1), basePath), nil) + require.NoError(t, err) req.Header.Set("Content-Type", "application/json") + setupRequest(req) response, err := http.DefaultClient.Do(req) + require.NoError(t, err) buf := bytes.Buffer{} _, err = buf.ReadFrom(response.Body) require.NoError(t, err) @@ -142,3 +171,56 @@ func TestGRPCGateway_TLS_with_base_path(t *testing.T) { type envelope struct { Result json.RawMessage `json:"result"` } + +func TestTenancyGRPCGateway(t *testing.T) { + tenancyOptions := tenancy.Options{ + Enabled: true, + } + tm := tenancy.NewTenancyManager(&tenancyOptions) + testGRPCGatewayWithTenancy(t, "/", tlscfg.Options{}, tlscfg.Options{}, + // Configure the gateway to forward tenancy header from HTTP to GRPC + tenancyOptions, + // Add a tenancy header on outbound requests + func(req *http.Request) { + req.Header.Add(tm.Header, "dummy") + }) +} + +func TestTenancyGRPCRejection(t *testing.T) { + basePath := "/" + tenancyOptions := tenancy.Options{Enabled: true} + reader, httpLis, grpcServer, cancel, httpServer := setupGRPCGateway(t, + basePath, tlscfg.Options{}, tlscfg.Options{}, + tenancyOptions) + defer grpcServer.Stop() + defer cancel() + defer httpServer.Shutdown(context.Background()) + + traceID := model.NewTraceID(150, 160) + reader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")).Return( + &model.Trace{ + Spans: []*model.Span{ + { + TraceID: traceID, + SpanID: model.NewSpanID(180), + OperationName: "foobar", + }, + }, + }, nil).Once() + + req, err := http.NewRequest("GET", fmt.Sprintf("http://localhost%s%s/api/v3/traces/123", strings.Replace(httpLis.Addr().String(), "[::]", "", 1), basePath), nil) + require.NoError(t, err) + req.Header.Set("Content-Type", "application/json") + // We don't set tenant header + response, err := http.DefaultClient.Do(req) + require.NoError(t, err) + require.Equal(t, http.StatusForbidden, response.StatusCode) + + // Try again with tenant header set + tm := tenancy.NewTenancyManager(&tenancyOptions) + req.Header.Set(tm.Header, "acme") + response, err = http.DefaultClient.Do(req) + require.NoError(t, err) + require.Equal(t, http.StatusOK, response.StatusCode) + // Skip unmarshal of response; it is enough that it succeeded +} diff --git a/cmd/query/app/flags.go b/cmd/query/app/flags.go index f9daa5041e9..07dd9392a5d 100644 --- a/cmd/query/app/flags.go +++ b/cmd/query/app/flags.go @@ -32,6 +32,7 @@ import ( "github.com/jaegertracing/jaeger/model/adjuster" "github.com/jaegertracing/jaeger/pkg/config" "github.com/jaegertracing/jaeger/pkg/config/tlscfg" + "github.com/jaegertracing/jaeger/pkg/tenancy" "github.com/jaegertracing/jaeger/ports" "github.com/jaegertracing/jaeger/storage" ) @@ -79,6 +80,8 @@ type QueryOptions struct { AdditionalHeaders http.Header // MaxClockSkewAdjust is the maximum duration by which jaeger-query will adjust a span MaxClockSkewAdjust time.Duration + // Tenancy configures tenancy for query + Tenancy tenancy.Options } // AddFlags adds flags for QueryOptions @@ -122,6 +125,11 @@ func (qOpts *QueryOptions) InitFromViper(v *viper.Viper, logger *zap.Logger) (*Q } else { qOpts.AdditionalHeaders = headers } + if tenancy, err := tenancy.InitFromViper(v); err == nil { + qOpts.Tenancy = tenancy + } else { + return qOpts, fmt.Errorf("failed to parse Tenancy options: %w", err) + } return qOpts, nil } diff --git a/cmd/query/app/grpc_handler_test.go b/cmd/query/app/grpc_handler_test.go index 349e309ccff..c25b890ccf3 100644 --- a/cmd/query/app/grpc_handler_test.go +++ b/cmd/query/app/grpc_handler_test.go @@ -31,10 +31,12 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" "github.com/jaegertracing/jaeger/cmd/query/app/querysvc" "github.com/jaegertracing/jaeger/model" + "github.com/jaegertracing/jaeger/pkg/tenancy" "github.com/jaegertracing/jaeger/plugin/metrics/disabled" "github.com/jaegertracing/jaeger/proto-gen/api_v2" "github.com/jaegertracing/jaeger/proto-gen/api_v2/metrics" @@ -143,9 +145,16 @@ type grpcClient struct { conn *grpc.ClientConn } -func newGRPCServer(t *testing.T, q *querysvc.QueryService, mq querysvc.MetricsQueryService, logger *zap.Logger, tracer opentracing.Tracer) (*grpc.Server, net.Addr) { +func newGRPCServer(t *testing.T, q *querysvc.QueryService, mq querysvc.MetricsQueryService, logger *zap.Logger, tracer opentracing.Tracer, tenancyMgr *tenancy.TenancyManager) (*grpc.Server, net.Addr) { lis, _ := net.Listen("tcp", ":0") - grpcServer := grpc.NewServer() + var grpcOpts []grpc.ServerOption + if tenancyMgr.Enabled { + grpcOpts = append(grpcOpts, + grpc.StreamInterceptor(tenancy.NewGuardingStreamInterceptor(tenancyMgr)), + grpc.UnaryInterceptor(tenancy.NewGuardingUnaryInterceptor(tenancyMgr)), + ) + } + grpcServer := grpc.NewServer(grpcOpts...) grpcHandler := &GRPCHandler{ queryService: q, metricsQueryService: mq, @@ -193,47 +202,8 @@ func withMetricsQuery() testOption { } } -func initializeTestServerGRPCWithOptions(t *testing.T, options ...testOption) *grpcServer { - archiveSpanReader := &spanstoremocks.Reader{} - archiveSpanWriter := &spanstoremocks.Writer{} - - spanReader := &spanstoremocks.Reader{} - dependencyReader := &depsmocks.Reader{} - disabledReader, err := disabled.NewMetricsReader() - require.NoError(t, err) - - q := querysvc.NewQueryService(spanReader, dependencyReader, - querysvc.QueryServiceOptions{ - ArchiveSpanReader: archiveSpanReader, - ArchiveSpanWriter: archiveSpanWriter, - }) - - tqs := &testQueryService{ - // Disable metrics query by default. - metricsQueryService: disabledReader, - } - for _, opt := range options { - opt(tqs) - } - - logger := zap.NewNop() - tracer := opentracing.NoopTracer{} - - server, addr := newGRPCServer(t, q, tqs.metricsQueryService, logger, tracer) - - return &grpcServer{ - server: server, - lisAddr: addr, - spanReader: spanReader, - depReader: dependencyReader, - metricsQueryService: tqs.metricsQueryService, - archiveSpanReader: archiveSpanReader, - archiveSpanWriter: archiveSpanWriter, - } -} - func withServerAndClient(t *testing.T, actualTest func(server *grpcServer, client *grpcClient), options ...testOption) { - server := initializeTestServerGRPCWithOptions(t, options...) + server := initializeTenantedTestServerGRPCWithOptions(t, &tenancy.TenancyManager{}, options...) client := newGRPCClient(t, server.lisAddr.String()) defer server.server.Stop() defer client.conn.Close() @@ -930,3 +900,271 @@ func TestMetricsQueryNilRequestGRPC(t *testing.T) { assert.Empty(t, bqp) assert.EqualError(t, err, errNilRequest.Error()) } + +func initializeTenantedTestServerGRPCWithOptions(t *testing.T, tm *tenancy.TenancyManager, options ...testOption) *grpcServer { + archiveSpanReader := &spanstoremocks.Reader{} + archiveSpanWriter := &spanstoremocks.Writer{} + + spanReader := &spanstoremocks.Reader{} + dependencyReader := &depsmocks.Reader{} + disabledReader, err := disabled.NewMetricsReader() + require.NoError(t, err) + + q := querysvc.NewQueryService( + spanReader, + dependencyReader, + querysvc.QueryServiceOptions{ + ArchiveSpanReader: archiveSpanReader, + ArchiveSpanWriter: archiveSpanWriter, + }) + + tqs := &testQueryService{ + // Disable metrics query by default. + metricsQueryService: disabledReader, + } + for _, opt := range options { + opt(tqs) + } + + logger := zap.NewNop() + tracer := opentracing.NoopTracer{} + + server, addr := newGRPCServer(t, q, tqs.metricsQueryService, logger, tracer, tm) + + return &grpcServer{ + server: server, + lisAddr: addr, + spanReader: spanReader, + depReader: dependencyReader, + metricsQueryService: tqs.metricsQueryService, + archiveSpanReader: archiveSpanReader, + archiveSpanWriter: archiveSpanWriter, + } +} + +func withTenantedServerAndClient(t *testing.T, tm *tenancy.TenancyManager, actualTest func(server *grpcServer, client *grpcClient), options ...testOption) { + server := initializeTenantedTestServerGRPCWithOptions(t, tm, options...) + client := newGRPCClient(t, server.lisAddr.String()) + defer server.server.Stop() + defer client.conn.Close() + + actualTest(server, client) +} + +// withOutgoingMetadata returns a Context with metadata for a server to receive +func withOutgoingMetadata(t *testing.T, ctx context.Context, headerName, headerValue string) context.Context { + t.Helper() + + md := metadata.New(map[string]string{headerName: headerValue}) + return metadata.NewOutgoingContext(ctx, md) +} + +func TestSearchTenancyGRPC(t *testing.T) { + tm := tenancy.NewTenancyManager(&tenancy.Options{ + Enabled: true, + }) + withTenantedServerAndClient(t, tm, func(server *grpcServer, client *grpcClient) { + server.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")). + Return(mockTrace, nil).Once() + + // First try without tenancy header + res, err := client.GetTrace(context.Background(), &api_v2.GetTraceRequest{ + TraceID: mockTraceID, + }) + + require.NoError(t, err, "could not initiate GetTraceRequest") + + spanResChunk, err := res.Recv() + assertGRPCError(t, err, codes.PermissionDenied, "missing tenant header") + assert.Nil(t, spanResChunk) + + // Next try with tenancy + res, err = client.GetTrace( + withOutgoingMetadata(t, context.Background(), tm.Header, "acme"), + &api_v2.GetTraceRequest{ + TraceID: mockTraceID, + }) + + spanResChunk, _ = res.Recv() + + require.NoError(t, err, "expecting gRPC to succeed with any tenancy header") + require.NotNil(t, spanResChunk) + require.NotNil(t, spanResChunk.Spans) + require.Equal(t, len(mockTrace.Spans), len(spanResChunk.Spans)) + assert.Equal(t, mockTraceID, spanResChunk.Spans[0].TraceID) + }) +} + +func TestServicesTenancyGRPC(t *testing.T) { + tm := tenancy.NewTenancyManager(&tenancy.Options{ + Enabled: true, + }) + withTenantedServerAndClient(t, tm, func(server *grpcServer, client *grpcClient) { + expectedServices := []string{"trifle", "bling"} + server.spanReader.On("GetServices", mock.AnythingOfType("*context.valueCtx")).Return(expectedServices, nil).Once() + + // First try without tenancy header + _, err := client.GetServices(context.Background(), &api_v2.GetServicesRequest{}) + assertGRPCError(t, err, codes.PermissionDenied, "missing tenant header") + + // Next try with tenancy + res, err := client.GetServices(withOutgoingMetadata(t, context.Background(), tm.Header, "acme"), &api_v2.GetServicesRequest{}) + require.NoError(t, err, "expecting gRPC to succeed with any tenancy header") + assert.Equal(t, expectedServices, res.Services) + }) +} + +func TestSearchTenancyGRPCExplicitList(t *testing.T) { + tm := tenancy.NewTenancyManager(&tenancy.Options{ + Enabled: true, + Header: "non-standard-tenant-header", + Tenants: []string{"mercury", "venus", "mars"}, + }) + withTenantedServerAndClient(t, tm, func(server *grpcServer, client *grpcClient) { + server.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")). + Return(mockTrace, nil).Once() + + for _, tc := range []struct { + name string + tenancyHeader string + tenant string + wantErr bool + failureCode codes.Code + failureMessage string + }{ + { + name: "no header", + wantErr: true, + failureCode: codes.PermissionDenied, + failureMessage: "missing tenant header", + }, + { + name: "invalid header", + tenancyHeader: "not-the-correct-header", + tenant: "mercury", + wantErr: true, + failureCode: codes.PermissionDenied, + failureMessage: "missing tenant header", + }, + { + name: "missing tenant", + tenancyHeader: tm.Header, + tenant: "", + wantErr: true, + failureCode: codes.PermissionDenied, + failureMessage: "unknown tenant", + }, + { + name: "invalid tenant", + tenancyHeader: tm.Header, + tenant: "some-other-tenant-not-in-the-list", + wantErr: true, + failureCode: codes.PermissionDenied, + failureMessage: "unknown tenant", + }, + { + name: "valid tenant", + tenancyHeader: tm.Header, + tenant: "venus", + }, + } { + t.Run(tc.name, func(t *testing.T) { + ctx := context.Background() + if tc.tenancyHeader != "" { + ctx = withOutgoingMetadata(t, context.Background(), tc.tenancyHeader, tc.tenant) + } + res, err := client.GetTrace(ctx, &api_v2.GetTraceRequest{ + TraceID: mockTraceID, + }) + + require.NoError(t, err, "could not initiate GetTraceRequest") + + spanResChunk, err := res.Recv() + + if tc.wantErr { + assertGRPCError(t, err, tc.failureCode, tc.failureMessage) + assert.Nil(t, spanResChunk) + } else { + require.NoError(t, err, "expecting gRPC to succeed") + require.NotNil(t, spanResChunk) + require.NotNil(t, spanResChunk.Spans) + require.Equal(t, len(mockTrace.Spans), len(spanResChunk.Spans)) + assert.Equal(t, mockTraceID, spanResChunk.Spans[0].TraceID) + } + }) + } + }) +} + +func TestTenancyContextFlowGRPC(t *testing.T) { + tm := tenancy.NewTenancyManager(&tenancy.Options{ + Enabled: true, + }) + withTenantedServerAndClient(t, tm, func(server *grpcServer, client *grpcClient) { + // Mock a storage backend with tenant 'acme' and 'megacorp' + allExpectedResults := map[string]struct { + expectedServices []string + expectedTrace *model.Trace + expectedTraceErr error + }{ + "acme": {[]string{"trifle", "bling"}, mockTrace, nil}, + "megacorp": {[]string{"grapefruit"}, nil, errStorageGRPC}, + } + + addTenantedGetServices := func(mockReader *spanstoremocks.Reader, tenant string, expectedServices []string) { + mockReader.On("GetServices", mock.MatchedBy(func(v interface{}) bool { + ctx, ok := v.(context.Context) + if !ok { + return false + } + if tenancy.GetTenant(ctx) != tenant { + return false + } + return true + })).Return(expectedServices, nil).Once() + } + addTenantedGetTrace := func(mockReader *spanstoremocks.Reader, tenant string, trace *model.Trace, err error) { + mockReader.On("GetTrace", mock.MatchedBy(func(v interface{}) bool { + ctx, ok := v.(context.Context) + if !ok { + return false + } + if tenancy.GetTenant(ctx) != tenant { + return false + } + return true + }), mock.AnythingOfType("model.TraceID")).Return(trace, err).Once() + } + + for tenant, expected := range allExpectedResults { + addTenantedGetServices(server.spanReader, tenant, expected.expectedServices) + addTenantedGetTrace(server.spanReader, tenant, expected.expectedTrace, expected.expectedTraceErr) + } + + for tenant, expected := range allExpectedResults { + t.Run(tenant, func(t *testing.T) { + // Test context propagation to Unary method. + resGetServices, err := client.GetServices(withOutgoingMetadata(t, context.Background(), tm.Header, tenant), &api_v2.GetServicesRequest{}) + require.NoError(t, err, "expecting gRPC to succeed with %q tenancy header", tenant) + assert.Equal(t, expected.expectedServices, resGetServices.Services) + + // Test context propagation to Streaming method. + resGetTrace, err := client.GetTrace(withOutgoingMetadata(t, context.Background(), tm.Header, tenant), + &api_v2.GetTraceRequest{ + TraceID: mockTraceID, + }) + require.NoError(t, err) + spanResChunk, err := resGetTrace.Recv() + + if expected.expectedTrace != nil { + assert.Equal(t, expected.expectedTrace.Spans[0].TraceID, spanResChunk.Spans[0].TraceID) + } + if expected.expectedTraceErr != nil { + assert.Contains(t, err.Error(), expected.expectedTraceErr.Error()) + } + }) + } + + server.spanReader.AssertExpectations(t) + }) +} diff --git a/cmd/query/app/http_handler.go b/cmd/query/app/http_handler.go index 6b3f098a8cd..8abde1b9b74 100644 --- a/cmd/query/app/http_handler.go +++ b/cmd/query/app/http_handler.go @@ -36,6 +36,7 @@ import ( uiconv "github.com/jaegertracing/jaeger/model/converter/json" ui "github.com/jaegertracing/jaeger/model/json" "github.com/jaegertracing/jaeger/pkg/multierror" + "github.com/jaegertracing/jaeger/pkg/tenancy" "github.com/jaegertracing/jaeger/plugin/metrics/disabled" "github.com/jaegertracing/jaeger/proto-gen/api_v2/metrics" "github.com/jaegertracing/jaeger/storage/metricsstore" @@ -84,6 +85,7 @@ type APIHandler struct { queryService *querysvc.QueryService metricsQueryService querysvc.MetricsQueryService queryParser queryParser + tenancyMgr *tenancy.TenancyManager basePath string apiPrefix string logger *zap.Logger @@ -91,13 +93,14 @@ type APIHandler struct { } // NewAPIHandler returns an APIHandler -func NewAPIHandler(queryService *querysvc.QueryService, options ...HandlerOption) *APIHandler { +func NewAPIHandler(queryService *querysvc.QueryService, tm *tenancy.TenancyManager, options ...HandlerOption) *APIHandler { aH := &APIHandler{ queryService: queryService, queryParser: queryParser{ traceQueryLookbackDuration: defaultTraceQueryLookbackDuration, timeNow: time.Now, }, + tenancyMgr: tm, } for _, option := range options { @@ -139,9 +142,14 @@ func (aH *APIHandler) handleFunc( args ...interface{}, ) *mux.Route { route = aH.route(route, args...) + var handler http.Handler + handler = http.HandlerFunc(f) + if aH.tenancyMgr.Enabled { + handler = tenancy.ExtractTenantHTTPHandler(aH.tenancyMgr, handler) + } traceMiddleware := nethttp.Middleware( aH.tracer, - http.HandlerFunc(f), + handler, nethttp.OperationNameFunc(func(r *http.Request) string { return route })) diff --git a/cmd/query/app/http_handler_test.go b/cmd/query/app/http_handler_test.go index afcff3d86c4..881b164cf4d 100644 --- a/cmd/query/app/http_handler_test.go +++ b/cmd/query/app/http_handler_test.go @@ -17,6 +17,7 @@ package app import ( "bytes" + "context" "encoding/json" "errors" "fmt" @@ -42,6 +43,7 @@ import ( "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/model/adjuster" ui "github.com/jaegertracing/jaeger/model/json" + "github.com/jaegertracing/jaeger/pkg/tenancy" "github.com/jaegertracing/jaeger/plugin/metrics/disabled" "github.com/jaegertracing/jaeger/proto-gen/api_v2/metrics" depsmocks "github.com/jaegertracing/jaeger/storage/dependencystore/mocks" @@ -91,6 +93,7 @@ type structuredTraceResponse struct { func initializeTestServerWithHandler(queryOptions querysvc.QueryServiceOptions, options ...HandlerOption) *testServer { return initializeTestServerWithOptions( + &tenancy.TenancyManager{}, queryOptions, append( []HandlerOption{ @@ -105,15 +108,15 @@ func initializeTestServerWithHandler(queryOptions querysvc.QueryServiceOptions, ) } -func initializeTestServerWithOptions(queryOptions querysvc.QueryServiceOptions, options ...HandlerOption) *testServer { +func initializeTestServerWithOptions(tenancyMgr *tenancy.TenancyManager, queryOptions querysvc.QueryServiceOptions, options ...HandlerOption) *testServer { readStorage := &spanstoremocks.Reader{} dependencyStorage := &depsmocks.Reader{} qs := querysvc.NewQueryService(readStorage, dependencyStorage, queryOptions) r := NewRouter() - handler := NewAPIHandler(qs, options...) + handler := NewAPIHandler(qs, tenancyMgr, options...) handler.RegisterRoutes(r) return &testServer{ - server: httptest.NewServer(r), + server: httptest.NewServer(tenancy.ExtractTenantHTTPHandler(tenancyMgr, r)), spanReader: readStorage, dependencyReader: dependencyStorage, handler: handler, @@ -132,7 +135,7 @@ type testServer struct { } func withTestServer(doTest func(s *testServer), queryOptions querysvc.QueryServiceOptions, options ...HandlerOption) { - ts := initializeTestServerWithOptions(queryOptions, options...) + ts := initializeTestServerWithOptions(&tenancy.TenancyManager{}, queryOptions, options...) defer ts.server.Close() doTest(ts) } @@ -180,7 +183,7 @@ func TestLogOnServerError(t *testing.T) { apiHandlerOptions := []HandlerOption{ HandlerOptions.Logger(zap.New(l)), } - h := NewAPIHandler(qs, apiHandlerOptions...) + h := NewAPIHandler(qs, &tenancy.TenancyManager{}, apiHandlerOptions...) e := errors.New("test error") h.handleError(&testHttp.TestResponseWriter{}, e, http.StatusInternalServerError) require.Equal(t, 1, len(*l.logs)) @@ -401,7 +404,7 @@ func TestSearchByTraceIDSuccess(t *testing.T) { func TestSearchByTraceIDSuccessWithArchive(t *testing.T) { archiveReadMock := &spanstoremocks.Reader{} - ts := initializeTestServerWithOptions(querysvc.QueryServiceOptions{ + ts := initializeTestServerWithOptions(&tenancy.TenancyManager{}, querysvc.QueryServiceOptions{ ArchiveSpanReader: archiveReadMock, }) defer ts.server.Close() @@ -444,6 +447,7 @@ func TestSearchByTraceIDFailure(t *testing.T) { func TestSearchModelConversionFailure(t *testing.T) { ts := initializeTestServerWithOptions( + &tenancy.TenancyManager{}, querysvc.QueryServiceOptions{ Adjuster: adjuster.Func(func(trace *model.Trace) (*model.Trace, error) { return trace, errAdjustment @@ -812,11 +816,15 @@ func TestGetMinStep(t *testing.T) { // getJSON fetches a JSON document from a server via HTTP GET func getJSON(url string, out interface{}) error { + return getJSONCustomHeaders(url, make(map[string]string), out) +} + +func getJSONCustomHeaders(url string, additionalHeaders map[string]string, out interface{}) error { req, err := http.NewRequest("GET", url, nil) if err != nil { return err } - return execJSON(req, out) + return execJSON(req, additionalHeaders, out) } // postJSON submits a JSON document to a server via HTTP POST and parses response as JSON. @@ -830,12 +838,15 @@ func postJSON(url string, req interface{}, out interface{}) error { if err != nil { return err } - return execJSON(r, out) + return execJSON(r, make(map[string]string), out) } // execJSON executes an http request against a server and parses response as JSON -func execJSON(req *http.Request, out interface{}) error { +func execJSON(req *http.Request, additionalHeaders map[string]string, out interface{}) error { req.Header.Add("Accept", "application/json") + for k, v := range additionalHeaders { + req.Header.Add(k, v) + } resp, err := httpClient.Do(req) if err != nil { @@ -869,3 +880,99 @@ func execJSON(req *http.Request, out interface{}) error { func parsedError(code int, err string) string { return fmt.Sprintf(`%d error from server: {"data":null,"total":0,"limit":0,"offset":0,"errors":[{"code":%d,"msg":"%s"}]}`+"\n", code, code, err) } + +func TestSearchTenancyHTTP(t *testing.T) { + tenancyOptions := tenancy.Options{ + Enabled: true, + } + ts := initializeTestServerWithOptions( + tenancy.NewTenancyManager(&tenancyOptions), + querysvc.QueryServiceOptions{}) + defer ts.server.Close() + ts.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")). + Return(mockTrace, nil).Twice() + + var response structuredResponse + err := getJSON(ts.server.URL+`/api/traces?traceID=1&traceID=2`, &response) + require.Error(t, err) + assert.Equal(t, "401 error from server: missing tenant header", err.Error()) + assert.Len(t, response.Errors, 0) + assert.Nil(t, response.Data) + + err = getJSONCustomHeaders( + ts.server.URL+`/api/traces?traceID=1&traceID=2`, + map[string]string{"x-tenant": "acme"}, + &response) + assert.NoError(t, err) + assert.Len(t, response.Errors, 0) + assert.Len(t, response.Data, 2) +} + +func TestSearchTenancyRejectionHTTP(t *testing.T) { + tenancyOptions := tenancy.Options{ + Enabled: true, + } + ts := initializeTestServerWithOptions( + tenancy.NewTenancyManager(&tenancyOptions), + querysvc.QueryServiceOptions{}) + defer ts.server.Close() + ts.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")). + Return(mockTrace, nil).Twice() + + req, err := http.NewRequest("GET", ts.server.URL+`/api/traces?traceID=1&traceID=2`, nil) + assert.NoError(t, err) + req.Header.Add("Accept", "application/json") + // We don't set tenant header + resp, err := httpClient.Do(req) + assert.NoError(t, err) + assert.Equal(t, http.StatusUnauthorized, resp.StatusCode) + + tm := tenancy.NewTenancyManager(&tenancyOptions) + req.Header.Set(tm.Header, "acme") + resp, err = http.DefaultClient.Do(req) + require.NoError(t, err) + require.Equal(t, http.StatusOK, resp.StatusCode) + // Skip unmarshal of response; it is enough that it succeeded +} + +func TestSearchTenancyFlowTenantHTTP(t *testing.T) { + tenancyOptions := tenancy.Options{ + Enabled: true, + } + ts := initializeTestServerWithOptions( + tenancy.NewTenancyManager(&tenancyOptions), + querysvc.QueryServiceOptions{}) + defer ts.server.Close() + ts.spanReader.On("GetTrace", mock.MatchedBy(func(v interface{}) bool { + ctx, ok := v.(context.Context) + if !ok || tenancy.GetTenant(ctx) != "acme" { + return false + } + return true + }), mock.AnythingOfType("model.TraceID")).Return(mockTrace, nil).Twice() + ts.spanReader.On("GetTrace", mock.MatchedBy(func(v interface{}) bool { + ctx, ok := v.(context.Context) + if !ok || tenancy.GetTenant(ctx) != "megacorp" { + return false + } + return true + }), mock.AnythingOfType("model.TraceID")).Return(nil, errStorage).Once() + + var responseAcme structuredResponse + err := getJSONCustomHeaders( + ts.server.URL+`/api/traces?traceID=1&traceID=2`, + map[string]string{"x-tenant": "acme"}, + &responseAcme) + assert.NoError(t, err) + assert.Len(t, responseAcme.Errors, 0) + assert.Len(t, responseAcme.Data, 2) + + var responseMegacorp structuredResponse + err = getJSONCustomHeaders( + ts.server.URL+`/api/traces?traceID=1&traceID=2`, + map[string]string{"x-tenant": "megacorp"}, + &responseMegacorp) + assert.Contains(t, err.Error(), "storage error") + assert.Len(t, responseMegacorp.Errors, 0) + assert.Nil(t, responseMegacorp.Data) +} diff --git a/cmd/query/app/server.go b/cmd/query/app/server.go index c341af13e68..721a0491ead 100644 --- a/cmd/query/app/server.go +++ b/cmd/query/app/server.go @@ -39,6 +39,7 @@ import ( "github.com/jaegertracing/jaeger/pkg/healthcheck" "github.com/jaegertracing/jaeger/pkg/netutils" "github.com/jaegertracing/jaeger/pkg/recoveryhandler" + "github.com/jaegertracing/jaeger/pkg/tenancy" "github.com/jaegertracing/jaeger/proto-gen/api_v2" "github.com/jaegertracing/jaeger/proto-gen/api_v2/metrics" "github.com/jaegertracing/jaeger/proto-gen/api_v3" @@ -64,7 +65,7 @@ type Server struct { } // NewServer creates and initializes Server -func NewServer(logger *zap.Logger, querySvc *querysvc.QueryService, metricsQuerySvc querysvc.MetricsQueryService, options *QueryOptions, tracer opentracing.Tracer) (*Server, error) { +func NewServer(logger *zap.Logger, querySvc *querysvc.QueryService, metricsQuerySvc querysvc.MetricsQueryService, options *QueryOptions, tm *tenancy.TenancyManager, tracer opentracing.Tracer) (*Server, error) { _, httpPort, err := net.SplitHostPort(options.HTTPHostPort) if err != nil { return nil, err @@ -78,12 +79,12 @@ func NewServer(logger *zap.Logger, querySvc *querysvc.QueryService, metricsQuery return nil, errors.New("server with TLS enabled can not use same host ports for gRPC and HTTP. Use dedicated HTTP and gRPC host ports instead") } - grpcServer, err := createGRPCServer(querySvc, metricsQuerySvc, options, logger, tracer) + grpcServer, err := createGRPCServer(querySvc, metricsQuerySvc, options, tm, logger, tracer) if err != nil { return nil, err } - httpServer, closeGRPCGateway, err := createHTTPServer(querySvc, metricsQuerySvc, options, tracer, logger) + httpServer, closeGRPCGateway, err := createHTTPServer(querySvc, metricsQuerySvc, options, tm, tracer, logger) if err != nil { return nil, err } @@ -106,7 +107,7 @@ func (s Server) HealthCheckStatus() chan healthcheck.Status { return s.unavailableChannel } -func createGRPCServer(querySvc *querysvc.QueryService, metricsQuerySvc querysvc.MetricsQueryService, options *QueryOptions, logger *zap.Logger, tracer opentracing.Tracer) (*grpc.Server, error) { +func createGRPCServer(querySvc *querysvc.QueryService, metricsQuerySvc querysvc.MetricsQueryService, options *QueryOptions, tm *tenancy.TenancyManager, logger *zap.Logger, tracer opentracing.Tracer) (*grpc.Server, error) { var grpcOpts []grpc.ServerOption if options.TLSGRPC.Enabled { @@ -119,6 +120,12 @@ func createGRPCServer(querySvc *querysvc.QueryService, metricsQuerySvc querysvc. grpcOpts = append(grpcOpts, grpc.Creds(creds)) } + if tm.Enabled { + grpcOpts = append(grpcOpts, + grpc.StreamInterceptor(tenancy.NewGuardingStreamInterceptor(tm)), + grpc.UnaryInterceptor(tenancy.NewGuardingUnaryInterceptor(tm)), + ) + } server := grpc.NewServer(grpcOpts...) reflection.Register(server) @@ -144,7 +151,7 @@ func createGRPCServer(querySvc *querysvc.QueryService, metricsQuerySvc querysvc. return server, nil } -func createHTTPServer(querySvc *querysvc.QueryService, metricsQuerySvc querysvc.MetricsQueryService, queryOpts *QueryOptions, tracer opentracing.Tracer, logger *zap.Logger) (*http.Server, context.CancelFunc, error) { +func createHTTPServer(querySvc *querysvc.QueryService, metricsQuerySvc querysvc.MetricsQueryService, queryOpts *QueryOptions, tm *tenancy.TenancyManager, tracer opentracing.Tracer, logger *zap.Logger) (*http.Server, context.CancelFunc, error) { apiHandlerOptions := []HandlerOption{ HandlerOptions.Logger(logger), HandlerOptions.Tracer(tracer), @@ -153,6 +160,7 @@ func createHTTPServer(querySvc *querysvc.QueryService, metricsQuerySvc querysvc. apiHandler := NewAPIHandler( querySvc, + tm, apiHandlerOptions...) r := NewRouter() if queryOpts.BasePath != "/" { @@ -160,7 +168,7 @@ func createHTTPServer(querySvc *querysvc.QueryService, metricsQuerySvc querysvc. } ctx, closeGRPCGateway := context.WithCancel(context.Background()) - if err := apiv3.RegisterGRPCGateway(ctx, logger, r, queryOpts.BasePath, queryOpts.GRPCHostPort, queryOpts.TLSGRPC); err != nil { + if err := apiv3.RegisterGRPCGateway(ctx, logger, r, queryOpts.BasePath, queryOpts.GRPCHostPort, queryOpts.TLSGRPC, tm); err != nil { closeGRPCGateway() return nil, nil, err } diff --git a/cmd/query/app/server_test.go b/cmd/query/app/server_test.go index d3d798cd2bc..f72373d3434 100644 --- a/cmd/query/app/server_test.go +++ b/cmd/query/app/server_test.go @@ -40,6 +40,7 @@ import ( "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/pkg/config/tlscfg" "github.com/jaegertracing/jaeger/pkg/healthcheck" + "github.com/jaegertracing/jaeger/pkg/tenancy" "github.com/jaegertracing/jaeger/ports" "github.com/jaegertracing/jaeger/proto-gen/api_v2" depsmocks "github.com/jaegertracing/jaeger/storage/dependencystore/mocks" @@ -67,7 +68,8 @@ func TestCreateTLSServerSinglePortError(t *testing.T) { } _, err := NewServer(zap.NewNop(), &querysvc.QueryService{}, nil, - &QueryOptions{HTTPHostPort: ":8080", GRPCHostPort: ":8080", TLSGRPC: tlsCfg, TLSHTTP: tlsCfg}, opentracing.NoopTracer{}) + &QueryOptions{HTTPHostPort: ":8080", GRPCHostPort: ":8080", TLSGRPC: tlsCfg, TLSHTTP: tlsCfg}, + tenancy.NewTenancyManager(&tenancy.Options{}), opentracing.NoopTracer{}) assert.NotNil(t, err) } @@ -80,7 +82,8 @@ func TestCreateTLSGrpcServerError(t *testing.T) { } _, err := NewServer(zap.NewNop(), &querysvc.QueryService{}, nil, - &QueryOptions{HTTPHostPort: ":8080", GRPCHostPort: ":8081", TLSGRPC: tlsCfg}, opentracing.NoopTracer{}) + &QueryOptions{HTTPHostPort: ":8080", GRPCHostPort: ":8081", TLSGRPC: tlsCfg}, + tenancy.NewTenancyManager(&tenancy.Options{}), opentracing.NoopTracer{}) assert.NotNil(t, err) } @@ -93,7 +96,8 @@ func TestCreateTLSHttpServerError(t *testing.T) { } _, err := NewServer(zap.NewNop(), &querysvc.QueryService{}, nil, - &QueryOptions{HTTPHostPort: ":8080", GRPCHostPort: ":8081", TLSHTTP: tlsCfg}, opentracing.NoopTracer{}) + &QueryOptions{HTTPHostPort: ":8080", GRPCHostPort: ":8081", TLSHTTP: tlsCfg}, + tenancy.NewTenancyManager(&tenancy.Options{}), opentracing.NoopTracer{}) assert.NotNil(t, err) } @@ -335,7 +339,7 @@ func TestServerHTTPTLS(t *testing.T) { querySvc := querysvc.NewQueryService(spanReader, dependencyReader, querysvc.QueryServiceOptions{}) server, err := NewServer(flagsSvc.Logger, querySvc, nil, - serverOptions, + serverOptions, tenancy.NewTenancyManager(&tenancy.Options{}), opentracing.NoopTracer{}) assert.Nil(t, err) assert.NoError(t, server.Start()) @@ -495,7 +499,7 @@ func TestServerGRPCTLS(t *testing.T) { querySvc := querysvc.NewQueryService(spanReader, dependencyReader, querysvc.QueryServiceOptions{}) server, err := NewServer(flagsSvc.Logger, querySvc, nil, - serverOptions, + serverOptions, tenancy.NewTenancyManager(&tenancy.Options{}), opentracing.NoopTracer{}) assert.Nil(t, err) assert.NoError(t, server.Start()) @@ -550,11 +554,13 @@ func TestServerGRPCTLS(t *testing.T) { func TestServerBadHostPort(t *testing.T) { _, err := NewServer(zap.NewNop(), &querysvc.QueryService{}, nil, &QueryOptions{HTTPHostPort: "8080", GRPCHostPort: "127.0.0.1:8081", BearerTokenPropagation: true}, + tenancy.NewTenancyManager(&tenancy.Options{}), opentracing.NoopTracer{}) assert.NotNil(t, err) _, err = NewServer(zap.NewNop(), &querysvc.QueryService{}, nil, &QueryOptions{HTTPHostPort: "127.0.0.1:8081", GRPCHostPort: "9123", BearerTokenPropagation: true}, + tenancy.NewTenancyManager(&tenancy.Options{}), opentracing.NoopTracer{}) assert.NotNil(t, err) @@ -585,6 +591,7 @@ func TestServerInUseHostPort(t *testing.T) { GRPCHostPort: tc.grpcHostPort, BearerTokenPropagation: true, }, + tenancy.NewTenancyManager(&tenancy.Options{}), opentracing.NoopTracer{}, ) assert.NoError(t, err) @@ -614,6 +621,7 @@ func TestServerSinglePort(t *testing.T) { querySvc := querysvc.NewQueryService(spanReader, dependencyReader, querysvc.QueryServiceOptions{}) server, err := NewServer(flagsSvc.Logger, querySvc, nil, &QueryOptions{GRPCHostPort: hostPort, HTTPHostPort: hostPort, BearerTokenPropagation: true}, + tenancy.NewTenancyManager(&tenancy.Options{}), opentracing.NoopTracer{}) assert.Nil(t, err) assert.NoError(t, server.Start()) @@ -662,7 +670,8 @@ func TestServerGracefulExit(t *testing.T) { querySvc := &querysvc.QueryService{} tracer := opentracing.NoopTracer{} - server, err := NewServer(flagsSvc.Logger, querySvc, nil, &QueryOptions{GRPCHostPort: hostPort, HTTPHostPort: hostPort}, tracer) + server, err := NewServer(flagsSvc.Logger, querySvc, nil, &QueryOptions{GRPCHostPort: hostPort, HTTPHostPort: hostPort}, + tenancy.NewTenancyManager(&tenancy.Options{}), tracer) assert.Nil(t, err) assert.NoError(t, server.Start()) go func() { @@ -691,6 +700,7 @@ func TestServerHandlesPortZero(t *testing.T) { tracer := opentracing.NoopTracer{} server, err := NewServer(flagsSvc.Logger, querySvc, nil, &QueryOptions{GRPCHostPort: ":0", HTTPHostPort: ":0"}, + tenancy.NewTenancyManager(&tenancy.Options{}), tracer) assert.Nil(t, err) assert.NoError(t, server.Start()) @@ -714,3 +724,76 @@ func TestServerHandlesPortZero(t *testing.T) { }, }.Execute(t) } + +func TestServerHTTPTenancy(t *testing.T) { + testCases := []struct { + name string + tenant string + errMsg string + status int + }{ + { + name: "no tenant", + // no value for tenant header + status: 401, + }, + { + name: "tenant", + tenant: "acme", + status: 200, + }, + } + + serverOptions := &QueryOptions{ + HTTPHostPort: ":8080", + GRPCHostPort: ":8080", + Tenancy: tenancy.Options{ + Enabled: true, + }, + } + tenancyMgr := tenancy.NewTenancyManager(&serverOptions.Tenancy) + + spanReader := &spanstoremocks.Reader{} + dependencyReader := &depsmocks.Reader{} + + querySvc := querysvc.NewQueryService(spanReader, dependencyReader, querysvc.QueryServiceOptions{}) + server, err := NewServer(zap.NewNop(), querySvc, nil, + serverOptions, tenancyMgr, + opentracing.NoopTracer{}) + require.Nil(t, err) + require.NoError(t, server.Start()) + + for _, test := range testCases { + t.Run(test.name, func(t *testing.T) { + conn, clientError := net.DialTimeout("tcp", "localhost:8080", 2*time.Second) + require.NoError(t, clientError) + + queryString := "/api/traces?service=service&start=0&end=0&operation=operation&limit=200&minDuration=20ms" + req, err := http.NewRequest("GET", "http://localhost:8080"+queryString, nil) + if test.tenant != "" { + req.Header.Add(tenancyMgr.Header, test.tenant) + } + assert.Nil(t, err) + req.Header.Add("Accept", "application/json") + + client := &http.Client{} + resp, err2 := client.Do(req) + if test.errMsg == "" { + require.NoError(t, err2) + } else { + assert.Error(t, err2) + if err != nil { + assert.Equal(t, test.errMsg, err2.Error()) + } + } + assert.Equal(t, test.status, resp.StatusCode) + if err2 == nil { + resp.Body.Close() + } + if conn != nil { + require.Nil(t, conn.Close()) + } + }) + } + server.Close() +} diff --git a/cmd/query/main.go b/cmd/query/main.go index a7cbe340e93..bd9b7e49530 100644 --- a/cmd/query/main.go +++ b/cmd/query/main.go @@ -38,6 +38,7 @@ import ( "github.com/jaegertracing/jaeger/pkg/bearertoken" "github.com/jaegertracing/jaeger/pkg/config" "github.com/jaegertracing/jaeger/pkg/metrics" + "github.com/jaegertracing/jaeger/pkg/tenancy" "github.com/jaegertracing/jaeger/pkg/version" metricsPlugin "github.com/jaegertracing/jaeger/plugin/metrics" "github.com/jaegertracing/jaeger/plugin/storage" @@ -124,7 +125,8 @@ func main() { spanReader, dependencyReader, *queryServiceOptions) - server, err := app.NewServer(svc.Logger, queryService, metricsQueryService, queryOpts, tracer) + tm := tenancy.NewTenancyManager(&queryOpts.Tenancy) + server, err := app.NewServer(svc.Logger, queryService, metricsQueryService, queryOpts, tm, tracer) if err != nil { logger.Fatal("Failed to create server", zap.Error(err)) } diff --git a/pkg/config/tenancy/tenancy.go b/pkg/tenancy/config.go similarity index 77% rename from pkg/config/tenancy/tenancy.go rename to pkg/tenancy/config.go index 58909b7af16..5277ead9b93 100644 --- a/pkg/config/tenancy/tenancy.go +++ b/pkg/tenancy/config.go @@ -14,8 +14,8 @@ package tenancy -// TenancyConfig holds the settings for multi-tenant Jaeger -type TenancyConfig struct { +// TenancyManager can check tenant usage for multi-tenant Jaeger configurations +type TenancyManager struct { Enabled bool Header string guard guard @@ -33,16 +33,21 @@ type Options struct { Tenants []string } -// NewTenancyConfig creates a tenancy configuration for tenancy Options -func NewTenancyConfig(options *Options) *TenancyConfig { - return &TenancyConfig{ +// NewTenancyManager creates a TenancyManager from tenancy Options +func NewTenancyManager(options *Options) *TenancyManager { + // Default header value (although set by CLI flags, this helps tests and API users) + header := options.Header + if header == "" && options.Enabled { + header = "x-tenant" + } + return &TenancyManager{ Enabled: options.Enabled, - Header: options.Header, + Header: header, guard: tenancyGuardFactory(options), } } -func (tc *TenancyConfig) Valid(tenant string) bool { +func (tc *TenancyManager) Valid(tenant string) bool { return tc.guard.Valid(tenant) } diff --git a/pkg/config/tenancy/tenancy_test.go b/pkg/tenancy/config_test.go similarity index 97% rename from pkg/config/tenancy/tenancy_test.go rename to pkg/tenancy/config_test.go index b84bd25d405..a38067dc39e 100644 --- a/pkg/config/tenancy/tenancy_test.go +++ b/pkg/tenancy/config_test.go @@ -84,7 +84,7 @@ func TestTenancyValidity(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - tc := NewTenancyConfig(&test.options) + tc := NewTenancyManager(&test.options) assert.Equal(t, test.valid, tc.Valid(test.tenant)) }) } diff --git a/storage/tenant.go b/pkg/tenancy/context.go similarity index 98% rename from storage/tenant.go rename to pkg/tenancy/context.go index c00dfdd257d..9e0021e3659 100644 --- a/storage/tenant.go +++ b/pkg/tenancy/context.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package storage +package tenancy import "context" diff --git a/storage/tenant_test.go b/pkg/tenancy/context_test.go similarity index 99% rename from storage/tenant_test.go rename to pkg/tenancy/context_test.go index 563a511c6e3..d2d29b7ded1 100644 --- a/storage/tenant_test.go +++ b/pkg/tenancy/context_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package storage +package tenancy import ( "context" diff --git a/pkg/config/tenancy/flags.go b/pkg/tenancy/flags.go similarity index 100% rename from pkg/config/tenancy/flags.go rename to pkg/tenancy/flags.go diff --git a/pkg/config/tenancy/flags_test.go b/pkg/tenancy/flags_test.go similarity index 100% rename from pkg/config/tenancy/flags_test.go rename to pkg/tenancy/flags_test.go diff --git a/pkg/tenancy/grpc.go b/pkg/tenancy/grpc.go new file mode 100644 index 00000000000..0d269f59f78 --- /dev/null +++ b/pkg/tenancy/grpc.go @@ -0,0 +1,115 @@ +// Copyright (c) 2022 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tenancy + +import ( + "context" + + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" +) + +// tenantedServerStream is a wrapper for ServerStream providing settable context +type tenantedServerStream struct { + grpc.ServerStream + context context.Context +} + +func (tss *tenantedServerStream) Context() context.Context { + return tss.context +} + +func getValidTenant(ctx context.Context, tc *TenancyManager) (string, error) { + // Handle case where tenant is already directly in the context + tenant := GetTenant(ctx) + if tenant != "" { + if !tc.Valid(tenant) { + return tenant, status.Errorf(codes.PermissionDenied, "unknown tenant") + } + return tenant, nil + } + + // Handle case where tenant is in the context metadata + md, ok := metadata.FromIncomingContext(ctx) + if !ok { + return "", status.Errorf(codes.PermissionDenied, "missing tenant header") + } + + var err error + tenant, err = tenantFromMetadata(md, tc.Header) + if err != nil { + return "", err + } + if !tc.Valid(tenant) { + return tenant, status.Errorf(codes.PermissionDenied, "unknown tenant") + } + + return tenant, nil +} + +func directlyAttachedTenant(ctx context.Context) bool { + return GetTenant(ctx) != "" +} + +// NewGuardingStreamInterceptor blocks handling of streams whose tenancy header doesn't meet tenancy requirements. +// It also ensures the tenant is directly in the context, rather than context metadata. +func NewGuardingStreamInterceptor(tc *TenancyManager) grpc.StreamServerInterceptor { + return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + tenant, err := getValidTenant(ss.Context(), tc) + if err != nil { + return err + } + + if directlyAttachedTenant(ss.Context()) { + return handler(srv, ss) + } + + // "upgrade" the tenant to be part of the context, rather than just incoming metadata + return handler(srv, &tenantedServerStream{ + ServerStream: ss, + context: WithTenant(ss.Context(), tenant), + }) + } +} + +func tenantFromMetadata(md metadata.MD, tenancyHeader string) (string, error) { + tenants := md.Get(tenancyHeader) + if len(tenants) < 1 { + return "", status.Errorf(codes.PermissionDenied, "missing tenant header") + } else if len(tenants) > 1 { + return "", status.Errorf(codes.PermissionDenied, "extra tenant header") + } + + return tenants[0], nil +} + +// NewGuardingUnaryInterceptor blocks handling of RPCs whose tenancy header doesn't meet tenancy requirements. +// It also ensures the tenant is directly in the context, rather than context metadata. +func NewGuardingUnaryInterceptor(tc *TenancyManager) grpc.UnaryServerInterceptor { + return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { + tenant, err := getValidTenant(ctx, tc) + if err != nil { + return nil, err + } + + if directlyAttachedTenant(ctx) { + return handler(ctx, req) + } + + return handler(WithTenant(ctx, tenant), req) + } +} diff --git a/pkg/tenancy/grpc_test.go b/pkg/tenancy/grpc_test.go new file mode 100644 index 00000000000..1bdc898f945 --- /dev/null +++ b/pkg/tenancy/grpc_test.go @@ -0,0 +1,113 @@ +// Copyright (c) 2022 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tenancy + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/metadata" +) + +func TestTenancyInterceptors(t *testing.T) { + tests := []struct { + name string + tenancyMgr *TenancyManager + ctx context.Context + errMsg string + }{ + { + name: "missing tenant context", + tenancyMgr: NewTenancyManager(&Options{Enabled: true}), + ctx: context.Background(), + errMsg: "rpc error: code = PermissionDenied desc = missing tenant header", + }, + { + name: "invalid tenant context", + tenancyMgr: NewTenancyManager(&Options{Enabled: true, Tenants: []string{"megacorp"}}), + ctx: WithTenant(context.Background(), "acme"), + errMsg: "rpc error: code = PermissionDenied desc = unknown tenant", + }, + { + name: "valid tenant context", + tenancyMgr: NewTenancyManager(&Options{Enabled: true, Tenants: []string{"acme"}}), + ctx: WithTenant(context.Background(), "acme"), + errMsg: "", + }, + { + name: "invalid tenant header", + tenancyMgr: NewTenancyManager(&Options{Enabled: true, Tenants: []string{"megacorp"}}), + ctx: metadata.NewIncomingContext(context.Background(), map[string][]string{"x-tenant": {"acme"}}), + errMsg: "rpc error: code = PermissionDenied desc = unknown tenant", + }, + { + name: "missing tenant header", + tenancyMgr: NewTenancyManager(&Options{Enabled: true, Tenants: []string{"megacorp"}}), + ctx: metadata.NewIncomingContext(context.Background(), map[string][]string{}), + errMsg: "rpc error: code = PermissionDenied desc = missing tenant header", + }, + { + name: "valid tenant header", + tenancyMgr: NewTenancyManager(&Options{Enabled: true, Tenants: []string{"acme"}}), + ctx: metadata.NewIncomingContext(context.Background(), map[string][]string{"x-tenant": {"acme"}}), + errMsg: "", + }, + { + name: "extra tenant header", + tenancyMgr: NewTenancyManager(&Options{Enabled: true, Tenants: []string{"acme"}}), + ctx: metadata.NewIncomingContext(context.Background(), map[string][]string{"x-tenant": {"acme", "megacorp"}}), + errMsg: "rpc error: code = PermissionDenied desc = extra tenant header", + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + interceptor := NewGuardingStreamInterceptor(test.tenancyMgr) + ss := tenantedServerStream{ + context: test.ctx, + } + ssi := grpc.StreamServerInfo{} + handler := func(interface{}, grpc.ServerStream) error { + // do nothing + return nil + } + err := interceptor(0, &ss, &ssi, handler) + if test.errMsg == "" { + assert.NoError(t, err) + } else { + require.Error(t, err) + assert.Equal(t, test.errMsg, err.Error()) + } + + uinterceptor := NewGuardingUnaryInterceptor(test.tenancyMgr) + usi := &grpc.UnaryServerInfo{} + iface := 0 + uhandler := func(ctx context.Context, req interface{}) (interface{}, error) { + // do nothing + return req, nil + } + _, err = uinterceptor(test.ctx, iface, usi, uhandler) + if test.errMsg == "" { + assert.NoError(t, err) + } else { + require.Error(t, err) + assert.Equal(t, test.errMsg, err.Error()) + } + }) + } +} diff --git a/pkg/tenancy/http.go b/pkg/tenancy/http.go new file mode 100644 index 00000000000..156fb45d400 --- /dev/null +++ b/pkg/tenancy/http.go @@ -0,0 +1,65 @@ +// Copyright (c) 2022 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tenancy + +import ( + "context" + "net/http" + + "google.golang.org/grpc/metadata" +) + +// PropagationHandler returns a http.Handler containing the logic to extract +// the tenancy header of the http.Request and insert the tenant into request.Context +// for propagation. The token can be accessed via tenancy.GetTenant(). +func ExtractTenantHTTPHandler(tc *TenancyManager, h http.Handler) http.Handler { + if !tc.Enabled { + return h + } + + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + tenant := r.Header.Get(tc.Header) + if tenant == "" { + w.WriteHeader(http.StatusUnauthorized) + w.Write([]byte("missing tenant header")) + return + } + + if !tc.Valid(tenant) { + w.WriteHeader(http.StatusUnauthorized) + w.Write([]byte("unknown tenant")) + return + } + + ctx := WithTenant(r.Context(), tenant) + h.ServeHTTP(w, r.WithContext(ctx)) + }) +} + +// MetadataAnnotator returns a function suitable for propagating tenancy +// via github.com/grpc-ecosystem/grpc-gateway/runtime.NewServeMux +func (tc *TenancyManager) MetadataAnnotator() func(context.Context, *http.Request) metadata.MD { + return func(ctx context.Context, req *http.Request) metadata.MD { + tenant := req.Header.Get(tc.Header) + if tenant == "" { + // The HTTP request lacked the tenancy header. Pass along + // empty metadata -- the gRPC query service will reject later. + return metadata.Pairs() + } + return metadata.New(map[string]string{ + tc.Header: tenant, + }) + } +} diff --git a/pkg/tenancy/http_test.go b/pkg/tenancy/http_test.go new file mode 100644 index 00000000000..db0ba9191ca --- /dev/null +++ b/pkg/tenancy/http_test.go @@ -0,0 +1,119 @@ +// Copyright (c) 2022 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tenancy + +import ( + "context" + "net/http" + "net/http/httptest" + "strings" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +type testHttpHandler struct { + reached bool +} + +func (thh *testHttpHandler) ServeHTTP(res http.ResponseWriter, req *http.Request) { + thh.reached = true +} + +func TestProgationHandler(t *testing.T) { + tests := []struct { + name string + tenancyMgr *TenancyManager + shouldReach bool + requestHeaders map[string][]string + }{ + { + name: "untenanted", + tenancyMgr: NewTenancyManager(&Options{}), + requestHeaders: map[string][]string{}, + shouldReach: true, + }, + { + name: "missing tenant header", + tenancyMgr: NewTenancyManager(&Options{Enabled: true}), + requestHeaders: map[string][]string{}, + shouldReach: false, + }, + { + name: "valid tenant header", + tenancyMgr: NewTenancyManager(&Options{Enabled: true}), + requestHeaders: map[string][]string{"x-tenant": {"acme"}}, + shouldReach: true, + }, + { + name: "unauthorized tenant", + tenancyMgr: NewTenancyManager(&Options{Enabled: true, Tenants: []string{"megacorp"}}), + requestHeaders: map[string][]string{"x-tenant": {"acme"}}, + shouldReach: false, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + handler := &testHttpHandler{} + propH := ExtractTenantHTTPHandler(test.tenancyMgr, handler) + req, err := http.NewRequest("GET", "/", strings.NewReader("")) + for k, vs := range test.requestHeaders { + for _, v := range vs { + req.Header.Add(k, v) + } + } + require.NoError(t, err) + writer := httptest.NewRecorder() + propH.ServeHTTP(writer, req) + assert.Equal(t, test.shouldReach, handler.reached) + }) + } +} + +func TestMetadataAnnotator(t *testing.T) { + tests := []struct { + name string + tenancyMgr *TenancyManager + requestHeaders map[string][]string + }{ + { + name: "missing tenant", + tenancyMgr: NewTenancyManager(&Options{Enabled: true}), + requestHeaders: map[string][]string{}, + }, + { + name: "tenanted", + tenancyMgr: NewTenancyManager(&Options{Enabled: true}), + requestHeaders: map[string][]string{"x-tenant": {"acme"}}, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + req, err := http.NewRequest("GET", "/", strings.NewReader("")) + for k, vs := range test.requestHeaders { + for _, v := range vs { + req.Header.Add(k, v) + } + } + require.NoError(t, err) + annotator := test.tenancyMgr.MetadataAnnotator() + md := annotator(context.Background(), req) + assert.Equal(t, len(test.requestHeaders), len(md)) + }) + } +} diff --git a/storage/empty_test.go b/storage/empty_test.go new file mode 100644 index 00000000000..241676d44eb --- /dev/null +++ b/storage/empty_test.go @@ -0,0 +1,15 @@ +// Copyright (c) 2018 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage