Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tenancy for queries #3791

Merged
merged 20 commits into from
Jul 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions cmd/all-in-one/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"github.com/jaegertracing/jaeger/internal/metrics/jlibadapter"
"github.com/jaegertracing/jaeger/pkg/config"
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/pkg/tenancy"
"github.com/jaegertracing/jaeger/pkg/version"
metricsPlugin "github.com/jaegertracing/jaeger/plugin/metrics"
ss "github.com/jaegertracing/jaeger/plugin/sampling/strategystore"
Expand Down Expand Up @@ -155,6 +156,8 @@ by default uses only in-memory database.`,
logger.Fatal("Failed to configure query service", zap.Error(err))
}

tm := tenancy.NewTenancyManager(&cOpts.GRPC.Tenancy)

// collector
c := collectorApp.New(&collectorApp.CollectorParams{
ServiceName: "jaeger-collector",
Expand All @@ -164,6 +167,7 @@ by default uses only in-memory database.`,
StrategyStore: strategyStore,
Aggregator: aggregator,
HealthCheck: svc.HC(),
TenancyMgr: tm,
})
if err := c.Start(cOpts); err != nil {
log.Fatal(err)
Expand Down Expand Up @@ -192,7 +196,7 @@ by default uses only in-memory database.`,
querySrv := startQuery(
svc, qOpts, qOpts.BuildQueryServiceOptions(storageFactory, logger),
spanReader, dependencyReader, metricsQueryService,
metricsFactory,
metricsFactory, tm,
)

svc.RunAndThen(func() {
Expand Down Expand Up @@ -265,10 +269,11 @@ func startQuery(
depReader dependencystore.Reader,
metricsQueryService querysvc.MetricsQueryService,
baseFactory metrics.Factory,
tm *tenancy.TenancyManager,
) *queryApp.Server {
spanReader = storageMetrics.NewReadMetricsDecorator(spanReader, baseFactory.Namespace(metrics.NSOptions{Name: "query"}))
qs := querysvc.NewQueryService(spanReader, depReader, *queryOpts)
server, err := queryApp.NewServer(svc.Logger, qs, metricsQueryService, qOpts, opentracing.GlobalTracer())
server, err := queryApp.NewServer(svc.Logger, qs, metricsQueryService, qOpts, tm, opentracing.GlobalTracer())
if err != nil {
svc.Logger.Fatal("Could not start jaeger-query service", zap.Error(err))
}
Expand Down
7 changes: 6 additions & 1 deletion cmd/collector/app/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/jaegertracing/jaeger/cmd/collector/app/server"
"github.com/jaegertracing/jaeger/pkg/healthcheck"
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/pkg/tenancy"
"github.com/jaegertracing/jaeger/storage/spanstore"
)

Expand All @@ -52,6 +53,7 @@ type Collector struct {
hCheck *healthcheck.HealthCheck
spanProcessor processor.SpanProcessor
spanHandlers *SpanHandlers
tenancyMgr *tenancy.TenancyManager

// state, read only
hServer *http.Server
Expand All @@ -72,6 +74,7 @@ type CollectorParams struct {
StrategyStore strategystore.StrategyStore
Aggregator strategystore.Aggregator
HealthCheck *healthcheck.HealthCheck
TenancyMgr *tenancy.TenancyManager
}

// New constructs a new collector component, ready to be started
Expand All @@ -84,6 +87,7 @@ func New(params *CollectorParams) *Collector {
strategyStore: params.StrategyStore,
aggregator: params.Aggregator,
hCheck: params.HealthCheck,
tenancyMgr: params.TenancyMgr,
}
}

Expand All @@ -94,6 +98,7 @@ func (c *Collector) Start(options *flags.CollectorOptions) error {
CollectorOpts: options,
Logger: c.logger,
MetricsFactory: c.metricsFactory,
TenancyMgr: c.tenancyMgr,
}

var additionalProcessors []ProcessSpan
Expand Down Expand Up @@ -152,7 +157,7 @@ func (c *Collector) Start(options *flags.CollectorOptions) error {
c.zkServer = zkServer

if options.OTLP.Enabled {
otlpReceiver, err := handler.StartOTLPReceiver(options, c.logger, c.spanProcessor)
otlpReceiver, err := handler.StartOTLPReceiver(options, c.logger, c.spanProcessor, c.tenancyMgr)
if err != nil {
return fmt.Errorf("could not start OTLP receiver: %w", err)
}
Expand Down
9 changes: 9 additions & 0 deletions cmd/collector/app/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/jaegertracing/jaeger/internal/metricstest"
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/pkg/healthcheck"
"github.com/jaegertracing/jaeger/pkg/tenancy"
"github.com/jaegertracing/jaeger/thrift-gen/sampling"
)

Expand All @@ -53,6 +54,7 @@ func TestNewCollector(t *testing.T) {
baseMetrics := metricstest.NewFactory(time.Hour)
spanWriter := &fakeSpanWriter{}
strategyStore := &mockStrategyStore{}
tm := &tenancy.TenancyManager{}

c := New(&CollectorParams{
ServiceName: "collector",
Expand All @@ -61,6 +63,7 @@ func TestNewCollector(t *testing.T) {
SpanWriter: spanWriter,
StrategyStore: strategyStore,
HealthCheck: hc,
TenancyMgr: tm,
})

collectorOpts := optionsForEphemeralPorts()
Expand All @@ -77,6 +80,7 @@ func TestCollector_StartErrors(t *testing.T) {
baseMetrics := metricstest.NewFactory(time.Hour)
spanWriter := &fakeSpanWriter{}
strategyStore := &mockStrategyStore{}
tm := &tenancy.TenancyManager{}

c := New(&CollectorParams{
ServiceName: "collector",
Expand All @@ -85,6 +89,7 @@ func TestCollector_StartErrors(t *testing.T) {
SpanWriter: spanWriter,
StrategyStore: strategyStore,
HealthCheck: hc,
TenancyMgr: tm,
})
err := c.Start(options)
require.Error(t, err)
Expand Down Expand Up @@ -130,6 +135,7 @@ func TestCollector_PublishOpts(t *testing.T) {
metricsFactory := fork.New("internal", forkFactory, baseMetrics)
spanWriter := &fakeSpanWriter{}
strategyStore := &mockStrategyStore{}
tm := &tenancy.TenancyManager{}

c := New(&CollectorParams{
ServiceName: "collector",
Expand All @@ -138,6 +144,7 @@ func TestCollector_PublishOpts(t *testing.T) {
SpanWriter: spanWriter,
StrategyStore: strategyStore,
HealthCheck: hc,
TenancyMgr: tm,
})
collectorOpts := optionsForEphemeralPorts()
collectorOpts.NumWorkers = 24
Expand All @@ -164,6 +171,7 @@ func TestAggregator(t *testing.T) {
spanWriter := &fakeSpanWriter{}
strategyStore := &mockStrategyStore{}
agg := &mockAggregator{}
tm := &tenancy.TenancyManager{}

c := New(&CollectorParams{
ServiceName: "collector",
Expand All @@ -173,6 +181,7 @@ func TestAggregator(t *testing.T) {
StrategyStore: strategyStore,
HealthCheck: hc,
Aggregator: agg,
TenancyMgr: tm,
})
collectorOpts := optionsForEphemeralPorts()
collectorOpts.NumWorkers = 10
Expand Down
2 changes: 1 addition & 1 deletion cmd/collector/app/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import (
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/cmd/flags"
"github.com/jaegertracing/jaeger/pkg/config/tenancy"
"github.com/jaegertracing/jaeger/pkg/config/tlscfg"
"github.com/jaegertracing/jaeger/pkg/tenancy"
"github.com/jaegertracing/jaeger/ports"
)

Expand Down
21 changes: 9 additions & 12 deletions cmd/collector/app/handler/grpc_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (

"github.com/jaegertracing/jaeger/cmd/collector/app/processor"
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/pkg/config/tenancy"
"github.com/jaegertracing/jaeger/pkg/tenancy"
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
)

Expand All @@ -36,14 +36,14 @@ type GRPCHandler struct {
}

// NewGRPCHandler registers routes for this handler on the given router.
func NewGRPCHandler(logger *zap.Logger, spanProcessor processor.SpanProcessor, tenancyConfig *tenancy.TenancyConfig) *GRPCHandler {
func NewGRPCHandler(logger *zap.Logger, spanProcessor processor.SpanProcessor, tenancyMgr *tenancy.TenancyManager) *GRPCHandler {
return &GRPCHandler{
logger: logger,
batchConsumer: newBatchConsumer(logger,
spanProcessor,
processor.GRPCTransport,
processor.ProtoSpanFormat,
tenancyConfig),
tenancyMgr),
}
}

Expand All @@ -58,21 +58,18 @@ type batchConsumer struct {
logger *zap.Logger
spanProcessor processor.SpanProcessor
spanOptions processor.SpansOptions
tenancyConfig tenancy.TenancyConfig
tenancyMgr *tenancy.TenancyManager
}

func newBatchConsumer(logger *zap.Logger, spanProcessor processor.SpanProcessor, transport processor.InboundTransport, spanFormat processor.SpanFormat, tenancyConfig *tenancy.TenancyConfig) batchConsumer {
if tenancyConfig == nil {
tenancyConfig = &tenancy.TenancyConfig{}
}
func newBatchConsumer(logger *zap.Logger, spanProcessor processor.SpanProcessor, transport processor.InboundTransport, spanFormat processor.SpanFormat, tenancyMgr *tenancy.TenancyManager) batchConsumer {
return batchConsumer{
logger: logger,
spanProcessor: spanProcessor,
spanOptions: processor.SpansOptions{
InboundTransport: transport,
SpanFormat: spanFormat,
},
tenancyConfig: *tenancyConfig,
tenancyMgr: tenancyMgr,
}
}

Expand Down Expand Up @@ -104,7 +101,7 @@ func (c *batchConsumer) consume(ctx context.Context, batch *model.Batch) error {
}

func (c *batchConsumer) validateTenant(ctx context.Context) (string, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be covered by an interceptor?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line only became part of the PR because of the rename from TenancyConfig to TenancyManager. The collector doesn't use the interceptors, which were introduced for query. This PR is large; I propose backporting the new query interceptor to the collector should be in a follow-up.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

if !c.tenancyConfig.Enabled {
if !c.tenancyMgr.Enabled {
return "", nil
}

Expand All @@ -113,14 +110,14 @@ func (c *batchConsumer) validateTenant(ctx context.Context) (string, error) {
return "", status.Errorf(codes.PermissionDenied, "missing tenant header")
}

tenants := md[c.tenancyConfig.Header]
tenants := md.Get(c.tenancyMgr.Header)
if len(tenants) < 1 {
return "", status.Errorf(codes.PermissionDenied, "missing tenant header")
} else if len(tenants) > 1 {
return "", status.Errorf(codes.PermissionDenied, "extra tenant header")
}

if !c.tenancyConfig.Valid(tenants[0]) {
if !c.tenancyMgr.Valid(tenants[0]) {
return "", status.Errorf(codes.PermissionDenied, "unknown tenant")
}

Expand Down
12 changes: 6 additions & 6 deletions cmd/collector/app/handler/grpc_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (

"github.com/jaegertracing/jaeger/cmd/collector/app/processor"
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/pkg/config/tenancy"
"github.com/jaegertracing/jaeger/pkg/tenancy"
"github.com/jaegertracing/jaeger/pkg/testutils"
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
)
Expand Down Expand Up @@ -98,7 +98,7 @@ func newClient(t *testing.T, addr net.Addr) (api_v2.CollectorServiceClient, *grp
func TestPostSpans(t *testing.T) {
processor := &mockSpanProcessor{}
server, addr := initializeGRPCTestServer(t, func(s *grpc.Server) {
handler := NewGRPCHandler(zap.NewNop(), processor, &tenancy.TenancyConfig{})
handler := NewGRPCHandler(zap.NewNop(), processor, &tenancy.TenancyManager{})
api_v2.RegisterCollectorServiceServer(s, handler)
})
defer server.Stop()
Expand Down Expand Up @@ -133,7 +133,7 @@ func TestPostSpans(t *testing.T) {
func TestGRPCCompressionEnabled(t *testing.T) {
processor := &mockSpanProcessor{}
server, addr := initializeGRPCTestServer(t, func(s *grpc.Server) {
handler := NewGRPCHandler(zap.NewNop(), processor, &tenancy.TenancyConfig{})
handler := NewGRPCHandler(zap.NewNop(), processor, &tenancy.TenancyManager{})
api_v2.RegisterCollectorServiceServer(s, handler)
})
defer server.Stop()
Expand Down Expand Up @@ -171,7 +171,7 @@ func TestPostSpansWithError(t *testing.T) {
processor := &mockSpanProcessor{expectedError: test.processorError}
logger, logBuf := testutils.NewLogger()
server, addr := initializeGRPCTestServer(t, func(s *grpc.Server) {
handler := NewGRPCHandler(logger, processor, &tenancy.TenancyConfig{})
handler := NewGRPCHandler(logger, processor, &tenancy.TenancyManager{})
api_v2.RegisterCollectorServiceServer(s, handler)
})
defer server.Stop()
Expand Down Expand Up @@ -210,7 +210,7 @@ func TestPostTenantedSpans(t *testing.T) {
processor := &mockSpanProcessor{}
server, addr := initializeGRPCTestServer(t, func(s *grpc.Server) {
handler := NewGRPCHandler(zap.NewNop(), processor,
tenancy.NewTenancyConfig(&tenancy.Options{
tenancy.NewTenancyManager(&tenancy.Options{
Enabled: true,
Header: tenantHeader,
Tenants: []string{dummyTenant},
Expand Down Expand Up @@ -346,7 +346,7 @@ func TestGetTenant(t *testing.T) {

processor := &mockSpanProcessor{}
handler := NewGRPCHandler(zap.NewNop(), processor,
tenancy.NewTenancyConfig(&tenancy.Options{
tenancy.NewTenancyManager(&tenancy.Options{
Enabled: true,
Header: tenantHeader,
Tenants: validTenants,
Expand Down
11 changes: 7 additions & 4 deletions cmd/collector/app/handler/otlp_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,19 @@ import (
"github.com/jaegertracing/jaeger/cmd/collector/app/processor"
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/pkg/config/tlscfg"
"github.com/jaegertracing/jaeger/pkg/tenancy"
)

var _ component.Host = (*otelHost)(nil) // API check

// StartOTLPReceiver starts OpenTelemetry OTLP receiver listening on gRPC and HTTP ports.
func StartOTLPReceiver(options *flags.CollectorOptions, logger *zap.Logger, spanProcessor processor.SpanProcessor) (component.TracesReceiver, error) {
func StartOTLPReceiver(options *flags.CollectorOptions, logger *zap.Logger, spanProcessor processor.SpanProcessor, tm *tenancy.TenancyManager) (component.TracesReceiver, error) {
otlpFactory := otlpreceiver.NewFactory()
return startOTLPReceiver(
options,
logger,
spanProcessor,
tm,
otlpFactory,
consumer.NewTraces,
otlpFactory.CreateTracesReceiver,
Expand All @@ -58,6 +60,7 @@ func startOTLPReceiver(
options *flags.CollectorOptions,
logger *zap.Logger,
spanProcessor processor.SpanProcessor,
tm *tenancy.TenancyManager,
// from here: params that can be mocked in tests
otlpFactory component.ReceiverFactory,
newTraces func(consume consumer.ConsumeTracesFunc, options ...consumer.Option) (consumer.Traces, error),
Expand All @@ -74,7 +77,7 @@ func startOTLPReceiver(
},
}

otlpConsumer := newConsumerDelegate(logger, spanProcessor)
otlpConsumer := newConsumerDelegate(logger, spanProcessor, tm)
// the following two constructors never return errors given non-nil arguments, so we ignore errors
nextConsumer, err := newTraces(otlpConsumer.consume)
if err != nil {
Expand Down Expand Up @@ -137,13 +140,13 @@ func applyTLSSettings(opts *tlscfg.Options) *configtls.TLSServerSetting {
}
}

func newConsumerDelegate(logger *zap.Logger, spanProcessor processor.SpanProcessor) *consumerDelegate {
func newConsumerDelegate(logger *zap.Logger, spanProcessor processor.SpanProcessor, tm *tenancy.TenancyManager) *consumerDelegate {
return &consumerDelegate{
batchConsumer: newBatchConsumer(logger,
spanProcessor,
processor.UnknownTransport, // could be gRPC or HTTP
processor.OTLPSpanFormat,
nil),
tm),
protoFromTraces: otlp2jaeger.ProtoFromTraces,
}
}
Expand Down
Loading