Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tenancy for queries #3791

Merged
merged 20 commits into from
Jul 21, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions cmd/all-in-one/main.go
Original file line number Diff line number Diff line change
@@ -46,6 +46,7 @@ import (
"github.com/jaegertracing/jaeger/internal/metrics/jlibadapter"
"github.com/jaegertracing/jaeger/pkg/config"
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/pkg/tenancy"
"github.com/jaegertracing/jaeger/pkg/version"
metricsPlugin "github.com/jaegertracing/jaeger/plugin/metrics"
ss "github.com/jaegertracing/jaeger/plugin/sampling/strategystore"
@@ -155,6 +156,8 @@ by default uses only in-memory database.`,
logger.Fatal("Failed to configure query service", zap.Error(err))
}

tm := tenancy.NewTenancyManager(&cOpts.GRPC.Tenancy)

// collector
c := collectorApp.New(&collectorApp.CollectorParams{
ServiceName: "jaeger-collector",
@@ -164,6 +167,7 @@ by default uses only in-memory database.`,
StrategyStore: strategyStore,
Aggregator: aggregator,
HealthCheck: svc.HC(),
TenancyMgr: tm,
})
if err := c.Start(cOpts); err != nil {
log.Fatal(err)
@@ -192,7 +196,7 @@ by default uses only in-memory database.`,
querySrv := startQuery(
svc, qOpts, qOpts.BuildQueryServiceOptions(storageFactory, logger),
spanReader, dependencyReader, metricsQueryService,
metricsFactory,
metricsFactory, tm,
)

svc.RunAndThen(func() {
@@ -265,10 +269,11 @@ func startQuery(
depReader dependencystore.Reader,
metricsQueryService querysvc.MetricsQueryService,
baseFactory metrics.Factory,
tm *tenancy.TenancyManager,
) *queryApp.Server {
spanReader = storageMetrics.NewReadMetricsDecorator(spanReader, baseFactory.Namespace(metrics.NSOptions{Name: "query"}))
qs := querysvc.NewQueryService(spanReader, depReader, *queryOpts)
server, err := queryApp.NewServer(svc.Logger, qs, metricsQueryService, qOpts, opentracing.GlobalTracer())
server, err := queryApp.NewServer(svc.Logger, qs, metricsQueryService, qOpts, tm, opentracing.GlobalTracer())
if err != nil {
svc.Logger.Fatal("Could not start jaeger-query service", zap.Error(err))
}
7 changes: 6 additions & 1 deletion cmd/collector/app/collector.go
Original file line number Diff line number Diff line change
@@ -32,6 +32,7 @@ import (
"github.com/jaegertracing/jaeger/cmd/collector/app/server"
"github.com/jaegertracing/jaeger/pkg/healthcheck"
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/pkg/tenancy"
"github.com/jaegertracing/jaeger/storage/spanstore"
)

@@ -52,6 +53,7 @@ type Collector struct {
hCheck *healthcheck.HealthCheck
spanProcessor processor.SpanProcessor
spanHandlers *SpanHandlers
tenancyMgr *tenancy.TenancyManager

// state, read only
hServer *http.Server
@@ -72,6 +74,7 @@ type CollectorParams struct {
StrategyStore strategystore.StrategyStore
Aggregator strategystore.Aggregator
HealthCheck *healthcheck.HealthCheck
TenancyMgr *tenancy.TenancyManager
}

// New constructs a new collector component, ready to be started
@@ -84,6 +87,7 @@ func New(params *CollectorParams) *Collector {
strategyStore: params.StrategyStore,
aggregator: params.Aggregator,
hCheck: params.HealthCheck,
tenancyMgr: params.TenancyMgr,
}
}

@@ -94,6 +98,7 @@ func (c *Collector) Start(options *flags.CollectorOptions) error {
CollectorOpts: options,
Logger: c.logger,
MetricsFactory: c.metricsFactory,
TenancyMgr: c.tenancyMgr,
}

var additionalProcessors []ProcessSpan
@@ -152,7 +157,7 @@ func (c *Collector) Start(options *flags.CollectorOptions) error {
c.zkServer = zkServer

if options.OTLP.Enabled {
otlpReceiver, err := handler.StartOTLPReceiver(options, c.logger, c.spanProcessor)
otlpReceiver, err := handler.StartOTLPReceiver(options, c.logger, c.spanProcessor, c.tenancyMgr)
if err != nil {
return fmt.Errorf("could not start OTLP receiver: %w", err)
}
9 changes: 9 additions & 0 deletions cmd/collector/app/collector_test.go
Original file line number Diff line number Diff line change
@@ -30,6 +30,7 @@ import (
"github.com/jaegertracing/jaeger/internal/metricstest"
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/pkg/healthcheck"
"github.com/jaegertracing/jaeger/pkg/tenancy"
"github.com/jaegertracing/jaeger/thrift-gen/sampling"
)

@@ -53,6 +54,7 @@ func TestNewCollector(t *testing.T) {
baseMetrics := metricstest.NewFactory(time.Hour)
spanWriter := &fakeSpanWriter{}
strategyStore := &mockStrategyStore{}
tm := &tenancy.TenancyManager{}

c := New(&CollectorParams{
ServiceName: "collector",
@@ -61,6 +63,7 @@ func TestNewCollector(t *testing.T) {
SpanWriter: spanWriter,
StrategyStore: strategyStore,
HealthCheck: hc,
TenancyMgr: tm,
})

collectorOpts := optionsForEphemeralPorts()
@@ -77,6 +80,7 @@ func TestCollector_StartErrors(t *testing.T) {
baseMetrics := metricstest.NewFactory(time.Hour)
spanWriter := &fakeSpanWriter{}
strategyStore := &mockStrategyStore{}
tm := &tenancy.TenancyManager{}

c := New(&CollectorParams{
ServiceName: "collector",
@@ -85,6 +89,7 @@ func TestCollector_StartErrors(t *testing.T) {
SpanWriter: spanWriter,
StrategyStore: strategyStore,
HealthCheck: hc,
TenancyMgr: tm,
})
err := c.Start(options)
require.Error(t, err)
@@ -130,6 +135,7 @@ func TestCollector_PublishOpts(t *testing.T) {
metricsFactory := fork.New("internal", forkFactory, baseMetrics)
spanWriter := &fakeSpanWriter{}
strategyStore := &mockStrategyStore{}
tm := &tenancy.TenancyManager{}

c := New(&CollectorParams{
ServiceName: "collector",
@@ -138,6 +144,7 @@ func TestCollector_PublishOpts(t *testing.T) {
SpanWriter: spanWriter,
StrategyStore: strategyStore,
HealthCheck: hc,
TenancyMgr: tm,
})
collectorOpts := optionsForEphemeralPorts()
collectorOpts.NumWorkers = 24
@@ -164,6 +171,7 @@ func TestAggregator(t *testing.T) {
spanWriter := &fakeSpanWriter{}
strategyStore := &mockStrategyStore{}
agg := &mockAggregator{}
tm := &tenancy.TenancyManager{}

c := New(&CollectorParams{
ServiceName: "collector",
@@ -173,6 +181,7 @@ func TestAggregator(t *testing.T) {
StrategyStore: strategyStore,
HealthCheck: hc,
Aggregator: agg,
TenancyMgr: tm,
})
collectorOpts := optionsForEphemeralPorts()
collectorOpts.NumWorkers = 10
2 changes: 1 addition & 1 deletion cmd/collector/app/flags/flags.go
Original file line number Diff line number Diff line change
@@ -24,8 +24,8 @@ import (
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/cmd/flags"
"github.com/jaegertracing/jaeger/pkg/config/tenancy"
"github.com/jaegertracing/jaeger/pkg/config/tlscfg"
"github.com/jaegertracing/jaeger/pkg/tenancy"
"github.com/jaegertracing/jaeger/ports"
)

21 changes: 9 additions & 12 deletions cmd/collector/app/handler/grpc_handler.go
Original file line number Diff line number Diff line change
@@ -25,7 +25,7 @@ import (

"github.com/jaegertracing/jaeger/cmd/collector/app/processor"
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/pkg/config/tenancy"
"github.com/jaegertracing/jaeger/pkg/tenancy"
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
)

@@ -36,14 +36,14 @@ type GRPCHandler struct {
}

// NewGRPCHandler registers routes for this handler on the given router.
func NewGRPCHandler(logger *zap.Logger, spanProcessor processor.SpanProcessor, tenancyConfig *tenancy.TenancyConfig) *GRPCHandler {
func NewGRPCHandler(logger *zap.Logger, spanProcessor processor.SpanProcessor, tenancyMgr *tenancy.TenancyManager) *GRPCHandler {
return &GRPCHandler{
logger: logger,
batchConsumer: newBatchConsumer(logger,
spanProcessor,
processor.GRPCTransport,
processor.ProtoSpanFormat,
tenancyConfig),
tenancyMgr),
}
}

@@ -58,21 +58,18 @@ type batchConsumer struct {
logger *zap.Logger
spanProcessor processor.SpanProcessor
spanOptions processor.SpansOptions
tenancyConfig tenancy.TenancyConfig
tenancyMgr *tenancy.TenancyManager
}

func newBatchConsumer(logger *zap.Logger, spanProcessor processor.SpanProcessor, transport processor.InboundTransport, spanFormat processor.SpanFormat, tenancyConfig *tenancy.TenancyConfig) batchConsumer {
if tenancyConfig == nil {
tenancyConfig = &tenancy.TenancyConfig{}
}
func newBatchConsumer(logger *zap.Logger, spanProcessor processor.SpanProcessor, transport processor.InboundTransport, spanFormat processor.SpanFormat, tenancyMgr *tenancy.TenancyManager) batchConsumer {
return batchConsumer{
logger: logger,
spanProcessor: spanProcessor,
spanOptions: processor.SpansOptions{
InboundTransport: transport,
SpanFormat: spanFormat,
},
tenancyConfig: *tenancyConfig,
tenancyMgr: tenancyMgr,
}
}

@@ -104,7 +101,7 @@ func (c *batchConsumer) consume(ctx context.Context, batch *model.Batch) error {
}

func (c *batchConsumer) validateTenant(ctx context.Context) (string, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be covered by an interceptor?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line only became part of the PR because of the rename from TenancyConfig to TenancyManager. The collector doesn't use the interceptors, which were introduced for query. This PR is large; I propose backporting the new query interceptor to the collector should be in a follow-up.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

if !c.tenancyConfig.Enabled {
if !c.tenancyMgr.Enabled {
return "", nil
}

@@ -113,14 +110,14 @@ func (c *batchConsumer) validateTenant(ctx context.Context) (string, error) {
return "", status.Errorf(codes.PermissionDenied, "missing tenant header")
}

tenants := md[c.tenancyConfig.Header]
tenants := md.Get(c.tenancyMgr.Header)
if len(tenants) < 1 {
return "", status.Errorf(codes.PermissionDenied, "missing tenant header")
} else if len(tenants) > 1 {
return "", status.Errorf(codes.PermissionDenied, "extra tenant header")
}

if !c.tenancyConfig.Valid(tenants[0]) {
if !c.tenancyMgr.Valid(tenants[0]) {
return "", status.Errorf(codes.PermissionDenied, "unknown tenant")
}

12 changes: 6 additions & 6 deletions cmd/collector/app/handler/grpc_handler_test.go
Original file line number Diff line number Diff line change
@@ -30,7 +30,7 @@ import (

"github.com/jaegertracing/jaeger/cmd/collector/app/processor"
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/pkg/config/tenancy"
"github.com/jaegertracing/jaeger/pkg/tenancy"
"github.com/jaegertracing/jaeger/pkg/testutils"
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
)
@@ -98,7 +98,7 @@ func newClient(t *testing.T, addr net.Addr) (api_v2.CollectorServiceClient, *grp
func TestPostSpans(t *testing.T) {
processor := &mockSpanProcessor{}
server, addr := initializeGRPCTestServer(t, func(s *grpc.Server) {
handler := NewGRPCHandler(zap.NewNop(), processor, &tenancy.TenancyConfig{})
handler := NewGRPCHandler(zap.NewNop(), processor, &tenancy.TenancyManager{})
api_v2.RegisterCollectorServiceServer(s, handler)
})
defer server.Stop()
@@ -133,7 +133,7 @@ func TestPostSpans(t *testing.T) {
func TestGRPCCompressionEnabled(t *testing.T) {
processor := &mockSpanProcessor{}
server, addr := initializeGRPCTestServer(t, func(s *grpc.Server) {
handler := NewGRPCHandler(zap.NewNop(), processor, &tenancy.TenancyConfig{})
handler := NewGRPCHandler(zap.NewNop(), processor, &tenancy.TenancyManager{})
api_v2.RegisterCollectorServiceServer(s, handler)
})
defer server.Stop()
@@ -171,7 +171,7 @@ func TestPostSpansWithError(t *testing.T) {
processor := &mockSpanProcessor{expectedError: test.processorError}
logger, logBuf := testutils.NewLogger()
server, addr := initializeGRPCTestServer(t, func(s *grpc.Server) {
handler := NewGRPCHandler(logger, processor, &tenancy.TenancyConfig{})
handler := NewGRPCHandler(logger, processor, &tenancy.TenancyManager{})
api_v2.RegisterCollectorServiceServer(s, handler)
})
defer server.Stop()
@@ -210,7 +210,7 @@ func TestPostTenantedSpans(t *testing.T) {
processor := &mockSpanProcessor{}
server, addr := initializeGRPCTestServer(t, func(s *grpc.Server) {
handler := NewGRPCHandler(zap.NewNop(), processor,
tenancy.NewTenancyConfig(&tenancy.Options{
tenancy.NewTenancyManager(&tenancy.Options{
Enabled: true,
Header: tenantHeader,
Tenants: []string{dummyTenant},
@@ -346,7 +346,7 @@ func TestGetTenant(t *testing.T) {

processor := &mockSpanProcessor{}
handler := NewGRPCHandler(zap.NewNop(), processor,
tenancy.NewTenancyConfig(&tenancy.Options{
tenancy.NewTenancyManager(&tenancy.Options{
Enabled: true,
Header: tenantHeader,
Tenants: validTenants,
11 changes: 7 additions & 4 deletions cmd/collector/app/handler/otlp_receiver.go
Original file line number Diff line number Diff line change
@@ -34,17 +34,19 @@ import (
"github.com/jaegertracing/jaeger/cmd/collector/app/processor"
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/pkg/config/tlscfg"
"github.com/jaegertracing/jaeger/pkg/tenancy"
)

var _ component.Host = (*otelHost)(nil) // API check

// StartOTLPReceiver starts OpenTelemetry OTLP receiver listening on gRPC and HTTP ports.
func StartOTLPReceiver(options *flags.CollectorOptions, logger *zap.Logger, spanProcessor processor.SpanProcessor) (component.TracesReceiver, error) {
func StartOTLPReceiver(options *flags.CollectorOptions, logger *zap.Logger, spanProcessor processor.SpanProcessor, tm *tenancy.TenancyManager) (component.TracesReceiver, error) {
otlpFactory := otlpreceiver.NewFactory()
return startOTLPReceiver(
options,
logger,
spanProcessor,
tm,
otlpFactory,
consumer.NewTraces,
otlpFactory.CreateTracesReceiver,
@@ -58,6 +60,7 @@ func startOTLPReceiver(
options *flags.CollectorOptions,
logger *zap.Logger,
spanProcessor processor.SpanProcessor,
tm *tenancy.TenancyManager,
// from here: params that can be mocked in tests
otlpFactory component.ReceiverFactory,
newTraces func(consume consumer.ConsumeTracesFunc, options ...consumer.Option) (consumer.Traces, error),
@@ -74,7 +77,7 @@ func startOTLPReceiver(
},
}

otlpConsumer := newConsumerDelegate(logger, spanProcessor)
otlpConsumer := newConsumerDelegate(logger, spanProcessor, tm)
// the following two constructors never return errors given non-nil arguments, so we ignore errors
nextConsumer, err := newTraces(otlpConsumer.consume)
if err != nil {
@@ -137,13 +140,13 @@ func applyTLSSettings(opts *tlscfg.Options) *configtls.TLSServerSetting {
}
}

func newConsumerDelegate(logger *zap.Logger, spanProcessor processor.SpanProcessor) *consumerDelegate {
func newConsumerDelegate(logger *zap.Logger, spanProcessor processor.SpanProcessor, tm *tenancy.TenancyManager) *consumerDelegate {
return &consumerDelegate{
batchConsumer: newBatchConsumer(logger,
spanProcessor,
processor.UnknownTransport, // could be gRPC or HTTP
processor.OTLPSpanFormat,
nil),
tm),
protoFromTraces: otlp2jaeger.ProtoFromTraces,
}
}
Loading