Skip to content

Commit

Permalink
Tenancy for queries (#3791)
Browse files Browse the repository at this point in the history
* Tenancy for queries

Signed-off-by: Ed Snible <[email protected]>

* New parameter for RegisterGRPCGateway() test function

Signed-off-by: Ed Snible <[email protected]>

* More tests that are local to package itself

Signed-off-by: Ed Snible <[email protected]>

* Additional test cases to raise test coverage

Signed-off-by: Ed Snible <[email protected]>

* Fix test

Signed-off-by: Ed Snible <[email protected]>

* spelling

Signed-off-by: Ed Snible <[email protected]>

* Rename file

Signed-off-by: Ed Snible <[email protected]>

* Refactor tenancy packages

Signed-off-by: Ed Snible <[email protected]>

* restore empty_test.go as part of refactoring tenancy out of storage

Signed-off-by: Ed Snible <[email protected]>

* lint/gofumpt

Signed-off-by: Ed Snible <[email protected]>

* Enforce tenancy on non-streaming gRPC and add additional tests

Signed-off-by: Ed Snible <[email protected]>

* Test for tenant flow to storage for both streaming and unary RPC

Signed-off-by: Ed Snible <[email protected]>

* HTTP tenancy test

Signed-off-by: Ed Snible <[email protected]>

* Unit test for unary tenancy handler

Signed-off-by: Ed Snible <[email protected]>

* Factor out rendundent test function

Signed-off-by: Ed Snible <[email protected]>

* Address review comments

Signed-off-by: Ed Snible <[email protected]>

* Error name

Signed-off-by: Ed Snible <[email protected]>

* Refactor TenancyConfig to TenancyManager

Signed-off-by: Ed Snible <[email protected]>

* Address review comments

Signed-off-by: Ed Snible <[email protected]>

* Refactor so that NewTenancyManager() only called from main and tests

Signed-off-by: Ed Snible <[email protected]>
  • Loading branch information
esnible authored Jul 21, 2022
1 parent 3f71e0c commit ddca3c8
Show file tree
Hide file tree
Showing 34 changed files with 1,147 additions and 142 deletions.
9 changes: 7 additions & 2 deletions cmd/all-in-one/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"github.com/jaegertracing/jaeger/internal/metrics/jlibadapter"
"github.com/jaegertracing/jaeger/pkg/config"
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/pkg/tenancy"
"github.com/jaegertracing/jaeger/pkg/version"
metricsPlugin "github.com/jaegertracing/jaeger/plugin/metrics"
ss "github.com/jaegertracing/jaeger/plugin/sampling/strategystore"
Expand Down Expand Up @@ -155,6 +156,8 @@ by default uses only in-memory database.`,
logger.Fatal("Failed to configure query service", zap.Error(err))
}

tm := tenancy.NewTenancyManager(&cOpts.GRPC.Tenancy)

// collector
c := collectorApp.New(&collectorApp.CollectorParams{
ServiceName: "jaeger-collector",
Expand All @@ -164,6 +167,7 @@ by default uses only in-memory database.`,
StrategyStore: strategyStore,
Aggregator: aggregator,
HealthCheck: svc.HC(),
TenancyMgr: tm,
})
if err := c.Start(cOpts); err != nil {
log.Fatal(err)
Expand Down Expand Up @@ -192,7 +196,7 @@ by default uses only in-memory database.`,
querySrv := startQuery(
svc, qOpts, qOpts.BuildQueryServiceOptions(storageFactory, logger),
spanReader, dependencyReader, metricsQueryService,
metricsFactory,
metricsFactory, tm,
)

svc.RunAndThen(func() {
Expand Down Expand Up @@ -265,10 +269,11 @@ func startQuery(
depReader dependencystore.Reader,
metricsQueryService querysvc.MetricsQueryService,
baseFactory metrics.Factory,
tm *tenancy.TenancyManager,
) *queryApp.Server {
spanReader = storageMetrics.NewReadMetricsDecorator(spanReader, baseFactory.Namespace(metrics.NSOptions{Name: "query"}))
qs := querysvc.NewQueryService(spanReader, depReader, *queryOpts)
server, err := queryApp.NewServer(svc.Logger, qs, metricsQueryService, qOpts, opentracing.GlobalTracer())
server, err := queryApp.NewServer(svc.Logger, qs, metricsQueryService, qOpts, tm, opentracing.GlobalTracer())
if err != nil {
svc.Logger.Fatal("Could not start jaeger-query service", zap.Error(err))
}
Expand Down
7 changes: 6 additions & 1 deletion cmd/collector/app/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/jaegertracing/jaeger/cmd/collector/app/server"
"github.com/jaegertracing/jaeger/pkg/healthcheck"
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/pkg/tenancy"
"github.com/jaegertracing/jaeger/storage/spanstore"
)

Expand All @@ -52,6 +53,7 @@ type Collector struct {
hCheck *healthcheck.HealthCheck
spanProcessor processor.SpanProcessor
spanHandlers *SpanHandlers
tenancyMgr *tenancy.TenancyManager

// state, read only
hServer *http.Server
Expand All @@ -72,6 +74,7 @@ type CollectorParams struct {
StrategyStore strategystore.StrategyStore
Aggregator strategystore.Aggregator
HealthCheck *healthcheck.HealthCheck
TenancyMgr *tenancy.TenancyManager
}

// New constructs a new collector component, ready to be started
Expand All @@ -84,6 +87,7 @@ func New(params *CollectorParams) *Collector {
strategyStore: params.StrategyStore,
aggregator: params.Aggregator,
hCheck: params.HealthCheck,
tenancyMgr: params.TenancyMgr,
}
}

Expand All @@ -94,6 +98,7 @@ func (c *Collector) Start(options *flags.CollectorOptions) error {
CollectorOpts: options,
Logger: c.logger,
MetricsFactory: c.metricsFactory,
TenancyMgr: c.tenancyMgr,
}

var additionalProcessors []ProcessSpan
Expand Down Expand Up @@ -152,7 +157,7 @@ func (c *Collector) Start(options *flags.CollectorOptions) error {
c.zkServer = zkServer

if options.OTLP.Enabled {
otlpReceiver, err := handler.StartOTLPReceiver(options, c.logger, c.spanProcessor)
otlpReceiver, err := handler.StartOTLPReceiver(options, c.logger, c.spanProcessor, c.tenancyMgr)
if err != nil {
return fmt.Errorf("could not start OTLP receiver: %w", err)
}
Expand Down
9 changes: 9 additions & 0 deletions cmd/collector/app/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/jaegertracing/jaeger/internal/metricstest"
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/pkg/healthcheck"
"github.com/jaegertracing/jaeger/pkg/tenancy"
"github.com/jaegertracing/jaeger/thrift-gen/sampling"
)

Expand All @@ -53,6 +54,7 @@ func TestNewCollector(t *testing.T) {
baseMetrics := metricstest.NewFactory(time.Hour)
spanWriter := &fakeSpanWriter{}
strategyStore := &mockStrategyStore{}
tm := &tenancy.TenancyManager{}

c := New(&CollectorParams{
ServiceName: "collector",
Expand All @@ -61,6 +63,7 @@ func TestNewCollector(t *testing.T) {
SpanWriter: spanWriter,
StrategyStore: strategyStore,
HealthCheck: hc,
TenancyMgr: tm,
})

collectorOpts := optionsForEphemeralPorts()
Expand All @@ -77,6 +80,7 @@ func TestCollector_StartErrors(t *testing.T) {
baseMetrics := metricstest.NewFactory(time.Hour)
spanWriter := &fakeSpanWriter{}
strategyStore := &mockStrategyStore{}
tm := &tenancy.TenancyManager{}

c := New(&CollectorParams{
ServiceName: "collector",
Expand All @@ -85,6 +89,7 @@ func TestCollector_StartErrors(t *testing.T) {
SpanWriter: spanWriter,
StrategyStore: strategyStore,
HealthCheck: hc,
TenancyMgr: tm,
})
err := c.Start(options)
require.Error(t, err)
Expand Down Expand Up @@ -130,6 +135,7 @@ func TestCollector_PublishOpts(t *testing.T) {
metricsFactory := fork.New("internal", forkFactory, baseMetrics)
spanWriter := &fakeSpanWriter{}
strategyStore := &mockStrategyStore{}
tm := &tenancy.TenancyManager{}

c := New(&CollectorParams{
ServiceName: "collector",
Expand All @@ -138,6 +144,7 @@ func TestCollector_PublishOpts(t *testing.T) {
SpanWriter: spanWriter,
StrategyStore: strategyStore,
HealthCheck: hc,
TenancyMgr: tm,
})
collectorOpts := optionsForEphemeralPorts()
collectorOpts.NumWorkers = 24
Expand All @@ -164,6 +171,7 @@ func TestAggregator(t *testing.T) {
spanWriter := &fakeSpanWriter{}
strategyStore := &mockStrategyStore{}
agg := &mockAggregator{}
tm := &tenancy.TenancyManager{}

c := New(&CollectorParams{
ServiceName: "collector",
Expand All @@ -173,6 +181,7 @@ func TestAggregator(t *testing.T) {
StrategyStore: strategyStore,
HealthCheck: hc,
Aggregator: agg,
TenancyMgr: tm,
})
collectorOpts := optionsForEphemeralPorts()
collectorOpts.NumWorkers = 10
Expand Down
2 changes: 1 addition & 1 deletion cmd/collector/app/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import (
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/cmd/flags"
"github.com/jaegertracing/jaeger/pkg/config/tenancy"
"github.com/jaegertracing/jaeger/pkg/config/tlscfg"
"github.com/jaegertracing/jaeger/pkg/tenancy"
"github.com/jaegertracing/jaeger/ports"
)

Expand Down
21 changes: 9 additions & 12 deletions cmd/collector/app/handler/grpc_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (

"github.com/jaegertracing/jaeger/cmd/collector/app/processor"
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/pkg/config/tenancy"
"github.com/jaegertracing/jaeger/pkg/tenancy"
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
)

Expand All @@ -36,14 +36,14 @@ type GRPCHandler struct {
}

// NewGRPCHandler registers routes for this handler on the given router.
func NewGRPCHandler(logger *zap.Logger, spanProcessor processor.SpanProcessor, tenancyConfig *tenancy.TenancyConfig) *GRPCHandler {
func NewGRPCHandler(logger *zap.Logger, spanProcessor processor.SpanProcessor, tenancyMgr *tenancy.TenancyManager) *GRPCHandler {
return &GRPCHandler{
logger: logger,
batchConsumer: newBatchConsumer(logger,
spanProcessor,
processor.GRPCTransport,
processor.ProtoSpanFormat,
tenancyConfig),
tenancyMgr),
}
}

Expand All @@ -58,21 +58,18 @@ type batchConsumer struct {
logger *zap.Logger
spanProcessor processor.SpanProcessor
spanOptions processor.SpansOptions
tenancyConfig tenancy.TenancyConfig
tenancyMgr *tenancy.TenancyManager
}

func newBatchConsumer(logger *zap.Logger, spanProcessor processor.SpanProcessor, transport processor.InboundTransport, spanFormat processor.SpanFormat, tenancyConfig *tenancy.TenancyConfig) batchConsumer {
if tenancyConfig == nil {
tenancyConfig = &tenancy.TenancyConfig{}
}
func newBatchConsumer(logger *zap.Logger, spanProcessor processor.SpanProcessor, transport processor.InboundTransport, spanFormat processor.SpanFormat, tenancyMgr *tenancy.TenancyManager) batchConsumer {
return batchConsumer{
logger: logger,
spanProcessor: spanProcessor,
spanOptions: processor.SpansOptions{
InboundTransport: transport,
SpanFormat: spanFormat,
},
tenancyConfig: *tenancyConfig,
tenancyMgr: tenancyMgr,
}
}

Expand Down Expand Up @@ -104,7 +101,7 @@ func (c *batchConsumer) consume(ctx context.Context, batch *model.Batch) error {
}

func (c *batchConsumer) validateTenant(ctx context.Context) (string, error) {
if !c.tenancyConfig.Enabled {
if !c.tenancyMgr.Enabled {
return "", nil
}

Expand All @@ -113,14 +110,14 @@ func (c *batchConsumer) validateTenant(ctx context.Context) (string, error) {
return "", status.Errorf(codes.PermissionDenied, "missing tenant header")
}

tenants := md[c.tenancyConfig.Header]
tenants := md.Get(c.tenancyMgr.Header)
if len(tenants) < 1 {
return "", status.Errorf(codes.PermissionDenied, "missing tenant header")
} else if len(tenants) > 1 {
return "", status.Errorf(codes.PermissionDenied, "extra tenant header")
}

if !c.tenancyConfig.Valid(tenants[0]) {
if !c.tenancyMgr.Valid(tenants[0]) {
return "", status.Errorf(codes.PermissionDenied, "unknown tenant")
}

Expand Down
12 changes: 6 additions & 6 deletions cmd/collector/app/handler/grpc_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (

"github.com/jaegertracing/jaeger/cmd/collector/app/processor"
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/pkg/config/tenancy"
"github.com/jaegertracing/jaeger/pkg/tenancy"
"github.com/jaegertracing/jaeger/pkg/testutils"
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
)
Expand Down Expand Up @@ -98,7 +98,7 @@ func newClient(t *testing.T, addr net.Addr) (api_v2.CollectorServiceClient, *grp
func TestPostSpans(t *testing.T) {
processor := &mockSpanProcessor{}
server, addr := initializeGRPCTestServer(t, func(s *grpc.Server) {
handler := NewGRPCHandler(zap.NewNop(), processor, &tenancy.TenancyConfig{})
handler := NewGRPCHandler(zap.NewNop(), processor, &tenancy.TenancyManager{})
api_v2.RegisterCollectorServiceServer(s, handler)
})
defer server.Stop()
Expand Down Expand Up @@ -133,7 +133,7 @@ func TestPostSpans(t *testing.T) {
func TestGRPCCompressionEnabled(t *testing.T) {
processor := &mockSpanProcessor{}
server, addr := initializeGRPCTestServer(t, func(s *grpc.Server) {
handler := NewGRPCHandler(zap.NewNop(), processor, &tenancy.TenancyConfig{})
handler := NewGRPCHandler(zap.NewNop(), processor, &tenancy.TenancyManager{})
api_v2.RegisterCollectorServiceServer(s, handler)
})
defer server.Stop()
Expand Down Expand Up @@ -171,7 +171,7 @@ func TestPostSpansWithError(t *testing.T) {
processor := &mockSpanProcessor{expectedError: test.processorError}
logger, logBuf := testutils.NewLogger()
server, addr := initializeGRPCTestServer(t, func(s *grpc.Server) {
handler := NewGRPCHandler(logger, processor, &tenancy.TenancyConfig{})
handler := NewGRPCHandler(logger, processor, &tenancy.TenancyManager{})
api_v2.RegisterCollectorServiceServer(s, handler)
})
defer server.Stop()
Expand Down Expand Up @@ -210,7 +210,7 @@ func TestPostTenantedSpans(t *testing.T) {
processor := &mockSpanProcessor{}
server, addr := initializeGRPCTestServer(t, func(s *grpc.Server) {
handler := NewGRPCHandler(zap.NewNop(), processor,
tenancy.NewTenancyConfig(&tenancy.Options{
tenancy.NewTenancyManager(&tenancy.Options{
Enabled: true,
Header: tenantHeader,
Tenants: []string{dummyTenant},
Expand Down Expand Up @@ -346,7 +346,7 @@ func TestGetTenant(t *testing.T) {

processor := &mockSpanProcessor{}
handler := NewGRPCHandler(zap.NewNop(), processor,
tenancy.NewTenancyConfig(&tenancy.Options{
tenancy.NewTenancyManager(&tenancy.Options{
Enabled: true,
Header: tenantHeader,
Tenants: validTenants,
Expand Down
11 changes: 7 additions & 4 deletions cmd/collector/app/handler/otlp_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,19 @@ import (
"github.com/jaegertracing/jaeger/cmd/collector/app/processor"
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/pkg/config/tlscfg"
"github.com/jaegertracing/jaeger/pkg/tenancy"
)

var _ component.Host = (*otelHost)(nil) // API check

// StartOTLPReceiver starts OpenTelemetry OTLP receiver listening on gRPC and HTTP ports.
func StartOTLPReceiver(options *flags.CollectorOptions, logger *zap.Logger, spanProcessor processor.SpanProcessor) (component.TracesReceiver, error) {
func StartOTLPReceiver(options *flags.CollectorOptions, logger *zap.Logger, spanProcessor processor.SpanProcessor, tm *tenancy.TenancyManager) (component.TracesReceiver, error) {
otlpFactory := otlpreceiver.NewFactory()
return startOTLPReceiver(
options,
logger,
spanProcessor,
tm,
otlpFactory,
consumer.NewTraces,
otlpFactory.CreateTracesReceiver,
Expand All @@ -58,6 +60,7 @@ func startOTLPReceiver(
options *flags.CollectorOptions,
logger *zap.Logger,
spanProcessor processor.SpanProcessor,
tm *tenancy.TenancyManager,
// from here: params that can be mocked in tests
otlpFactory component.ReceiverFactory,
newTraces func(consume consumer.ConsumeTracesFunc, options ...consumer.Option) (consumer.Traces, error),
Expand All @@ -74,7 +77,7 @@ func startOTLPReceiver(
},
}

otlpConsumer := newConsumerDelegate(logger, spanProcessor)
otlpConsumer := newConsumerDelegate(logger, spanProcessor, tm)
// the following two constructors never return errors given non-nil arguments, so we ignore errors
nextConsumer, err := newTraces(otlpConsumer.consume)
if err != nil {
Expand Down Expand Up @@ -137,13 +140,13 @@ func applyTLSSettings(opts *tlscfg.Options) *configtls.TLSServerSetting {
}
}

func newConsumerDelegate(logger *zap.Logger, spanProcessor processor.SpanProcessor) *consumerDelegate {
func newConsumerDelegate(logger *zap.Logger, spanProcessor processor.SpanProcessor, tm *tenancy.TenancyManager) *consumerDelegate {
return &consumerDelegate{
batchConsumer: newBatchConsumer(logger,
spanProcessor,
processor.UnknownTransport, // could be gRPC or HTTP
processor.OTLPSpanFormat,
nil),
tm),
protoFromTraces: otlp2jaeger.ProtoFromTraces,
}
}
Expand Down
Loading

0 comments on commit ddca3c8

Please sign in to comment.