From 0ab39c6f9cbd02d640a7fac17c2973902854f3ac Mon Sep 17 00:00:00 2001 From: James Moessis Date: Tue, 26 Nov 2024 23:11:52 +1100 Subject: [PATCH] [processor/tailsampling] Decision cache for non-sampled trace IDs (#36040) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Re-opening https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/33722 so it can be finished. I've addressed @jpkrohling's open comments from that PR. Description: Adds a decision cache for non sampled IDs. This is off-by-default. It's the same as the sampled IDs cache but is separate and has a separate size. **Link to tracking Issue: https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/31583 Testing: Unit test added. Cache type itself already tested. Documentation: Updated the readme with config options and descriptions. --------- Co-authored-by: Juraci Paixão Kröhling --- .../jmoe_decision-cache-non-sampled.yaml | 6 ++ processor/tailsamplingprocessor/README.md | 18 +++-- processor/tailsamplingprocessor/config.go | 7 +- .../tailsamplingprocessor/config_test.go | 2 +- processor/tailsamplingprocessor/processor.go | 66 ++++++++++++----- .../processor_decisions_test.go | 74 +++++++++++++++++++ .../testdata/tail_sampling_config.yaml | 3 +- 7 files changed, 148 insertions(+), 28 deletions(-) create mode 100644 .chloggen/jmoe_decision-cache-non-sampled.yaml diff --git a/.chloggen/jmoe_decision-cache-non-sampled.yaml b/.chloggen/jmoe_decision-cache-non-sampled.yaml new file mode 100644 index 000000000000..8c9dbff3d038 --- /dev/null +++ b/.chloggen/jmoe_decision-cache-non-sampled.yaml @@ -0,0 +1,6 @@ +change_type: enhancement +component: tailsamplingprocessor +note: Adds decision cache for non-sampled trace IDs +issues: [31583] +subtext: +change_logs: [] diff --git a/processor/tailsamplingprocessor/README.md b/processor/tailsamplingprocessor/README.md index 358450d3f458..0c859105c8a8 100644 --- a/processor/tailsamplingprocessor/README.md +++ b/processor/tailsamplingprocessor/README.md @@ -46,11 +46,18 @@ The following configuration options can also be modified: - `decision_wait` (default = 30s): Wait time since the first span of a trace before making a sampling decision - `num_traces` (default = 50000): Number of traces kept in memory. - `expected_new_traces_per_sec` (default = 0): Expected number of new traces (helps in allocating data structures) -- `decision_cache` (default = `sampled_cache_size: 0`): Configures amount of trace IDs to be kept in an LRU cache, - persisting the "keep" decisions for traces that may have already been released from memory. - By default, the size is 0 and the cache is inactive. - If using, configure this as much higher than `num_traces` so decisions for trace IDs are kept +- `decision_cache`: Options for configuring caches for sampling decisions. You may want to vary the size of these caches + depending on how many "keep" vs "drop" decisions you expect from your policies. For example, you may allocate a + larger `non_sampled_cache_size` if you expect most traces to be dropped. + Additionally, if using, configure this as much higher than `num_traces` so decisions for trace IDs are kept longer than the span data for the trace. + - `sampled_cache_size` (default = 0): Configures amount of trace IDs to be kept in an LRU cache, + persisting the "keep" decisions for traces that may have already been released from memory. + By default, the size is 0 and the cache is inactive. + - `non_sampled_cache_size` (default = 0) Configures amount of trace IDs to be kept in an LRU cache, + persisting the "drop" decisions for traces that may have already been released from memory. + By default, the size is 0 and the cache is inactive. + Each policy will result in a decision, and the processor will evaluate them to make a final decision: @@ -70,7 +77,8 @@ processors: num_traces: 100 expected_new_traces_per_sec: 10 decision_cache: - sampled_cache_size: 100000 + sampled_cache_size: 100_000 + non_sampled_cache_size: 100_000 policies: [ { diff --git a/processor/tailsamplingprocessor/config.go b/processor/tailsamplingprocessor/config.go index 4185e7b9b0b2..1b18c039fb00 100644 --- a/processor/tailsamplingprocessor/config.go +++ b/processor/tailsamplingprocessor/config.go @@ -225,11 +225,16 @@ type OTTLConditionCfg struct { } type DecisionCacheConfig struct { - // SampledCacheSize specifies the size of the cache that holds the sampled trace IDs + // SampledCacheSize specifies the size of the cache that holds the sampled trace IDs. // This value will be the maximum amount of trace IDs that the cache can hold before overwriting previous IDs. // For effective use, this value should be at least an order of magnitude higher than Config.NumTraces. // If left as default 0, a no-op DecisionCache will be used. SampledCacheSize int `mapstructure:"sampled_cache_size"` + // NonSampledCacheSize specifies the size of the cache that holds the non-sampled trace IDs. + // This value will be the maximum amount of trace IDs that the cache can hold before overwriting previous IDs. + // For effective use, this value should be at least an order of magnitude higher than Config.NumTraces. + // If left as default 0, a no-op DecisionCache will be used. + NonSampledCacheSize int `mapstructure:"non_sampled_cache_size"` } // Config holds the configuration for tail-based sampling. diff --git a/processor/tailsamplingprocessor/config_test.go b/processor/tailsamplingprocessor/config_test.go index c94b3fc6b12e..7f88e87567db 100644 --- a/processor/tailsamplingprocessor/config_test.go +++ b/processor/tailsamplingprocessor/config_test.go @@ -35,7 +35,7 @@ func TestLoadConfig(t *testing.T) { DecisionWait: 10 * time.Second, NumTraces: 100, ExpectedNewTracesPerSec: 10, - DecisionCache: DecisionCacheConfig{SampledCacheSize: 500}, + DecisionCache: DecisionCacheConfig{SampledCacheSize: 1_000, NonSampledCacheSize: 10_000}, PolicyCfgs: []PolicyCfg{ { sharedPolicyCfg: sharedPolicyCfg{ diff --git a/processor/tailsamplingprocessor/processor.go b/processor/tailsamplingprocessor/processor.go index 4515290198ac..02883fbc4778 100644 --- a/processor/tailsamplingprocessor/processor.go +++ b/processor/tailsamplingprocessor/processor.go @@ -48,16 +48,17 @@ type tailSamplingSpanProcessor struct { telemetry *metadata.TelemetryBuilder logger *zap.Logger - nextConsumer consumer.Traces - maxNumTraces uint64 - policies []*policy - idToTrace sync.Map - policyTicker timeutils.TTicker - tickerFrequency time.Duration - decisionBatcher idbatcher.Batcher - sampledIDCache cache.Cache[bool] - deleteChan chan pcommon.TraceID - numTracesOnMap *atomic.Uint64 + nextConsumer consumer.Traces + maxNumTraces uint64 + policies []*policy + idToTrace sync.Map + policyTicker timeutils.TTicker + tickerFrequency time.Duration + decisionBatcher idbatcher.Batcher + sampledIDCache cache.Cache[bool] + nonSampledIDCache cache.Cache[bool] + deleteChan chan pcommon.TraceID + numTracesOnMap *atomic.Uint64 } // spanAndScope a structure for holding information about span and its instrumentation scope. @@ -89,23 +90,32 @@ func newTracesProcessor(ctx context.Context, set processor.Settings, nextConsume if err != nil { return nil, err } - sampledDecisions := cache.NewNopDecisionCache[bool]() + nopCache := cache.NewNopDecisionCache[bool]() + sampledDecisions := nopCache + nonSampledDecisions := nopCache if cfg.DecisionCache.SampledCacheSize > 0 { sampledDecisions, err = cache.NewLRUDecisionCache[bool](cfg.DecisionCache.SampledCacheSize) if err != nil { return nil, err } } + if cfg.DecisionCache.NonSampledCacheSize > 0 { + nonSampledDecisions, err = cache.NewLRUDecisionCache[bool](cfg.DecisionCache.NonSampledCacheSize) + if err != nil { + return nil, err + } + } tsp := &tailSamplingSpanProcessor{ - ctx: ctx, - telemetry: telemetry, - nextConsumer: nextConsumer, - maxNumTraces: cfg.NumTraces, - sampledIDCache: sampledDecisions, - logger: telemetrySettings.Logger, - numTracesOnMap: &atomic.Uint64{}, - deleteChan: make(chan pcommon.TraceID, cfg.NumTraces), + ctx: ctx, + telemetry: telemetry, + nextConsumer: nextConsumer, + maxNumTraces: cfg.NumTraces, + sampledIDCache: sampledDecisions, + nonSampledIDCache: nonSampledDecisions, + logger: telemetrySettings.Logger, + numTracesOnMap: &atomic.Uint64{}, + deleteChan: make(chan pcommon.TraceID, cfg.NumTraces), } tsp.policyTicker = &timeutils.PolicyTicker{OnTickFunc: tsp.samplingPolicyOnTick} @@ -188,6 +198,13 @@ func withSampledDecisionCache(c cache.Cache[bool]) Option { } } +// withSampledDecisionCache sets the cache which the processor uses to store recently sampled trace IDs. +func withNonSampledDecisionCache(c cache.Cache[bool]) Option { + return func(tsp *tailSamplingSpanProcessor) { + tsp.nonSampledIDCache = c + } +} + func getPolicyEvaluator(settings component.TelemetrySettings, cfg *PolicyCfg) (sampling.PolicyEvaluator, error) { switch cfg.Type { case Composite: @@ -371,7 +388,15 @@ func (tsp *tailSamplingSpanProcessor) processTraces(resourceSpans ptrace.Resourc traceTd := ptrace.NewTraces() appendToTraces(traceTd, resourceSpans, spans) tsp.releaseSampledTrace(tsp.ctx, id, traceTd) - tsp.telemetry.ProcessorTailSamplingEarlyReleasesFromCacheDecision.Add(tsp.ctx, int64(len(spans))) + metric.WithAttributeSet(attribute.NewSet()) + tsp.telemetry.ProcessorTailSamplingEarlyReleasesFromCacheDecision. + Add(tsp.ctx, int64(len(spans)), attrSampledTrue) + continue + } + // If the trace ID is in the non-sampled cache, short circuit the decision + if _, ok := tsp.nonSampledIDCache.Get(id); ok { + tsp.telemetry.ProcessorTailSamplingEarlyReleasesFromCacheDecision. + Add(tsp.ctx, int64(len(spans)), attrSampledFalse) continue } @@ -429,6 +454,7 @@ func (tsp *tailSamplingSpanProcessor) processTraces(resourceSpans ptrace.Resourc appendToTraces(traceTd, resourceSpans, spans) tsp.releaseSampledTrace(tsp.ctx, id, traceTd) case sampling.NotSampled: + tsp.nonSampledIDCache.Put(id, true) tsp.telemetry.ProcessorTailSamplingSamplingLateSpanAge.Record(tsp.ctx, int64(time.Since(actualData.DecisionTime)/time.Second)) default: tsp.logger.Warn("Encountered unexpected sampling decision", diff --git a/processor/tailsamplingprocessor/processor_decisions_test.go b/processor/tailsamplingprocessor/processor_decisions_test.go index c198316fc418..f6e9d1730ed1 100644 --- a/processor/tailsamplingprocessor/processor_decisions_test.go +++ b/processor/tailsamplingprocessor/processor_decisions_test.go @@ -400,3 +400,77 @@ func TestLateArrivingSpanUsesDecisionCache(t *testing.T) { require.EqualValues(t, 2, nextConsumer.SpanCount(), "original final decision not honored") require.NoError(t, tel.Shutdown(context.Background())) } + +func TestLateSpanUsesNonSampledDecisionCache(t *testing.T) { + cfg := Config{ + DecisionWait: defaultTestDecisionWait * 10, + NumTraces: defaultNumTraces, + } + nextConsumer := new(consumertest.TracesSink) + s := setupTestTelemetry() + ct := s.NewSettings() + idb := newSyncIDBatcher() + + mpe := &mockPolicyEvaluator{} + policies := []*policy{ + {name: "mock-policy-1", evaluator: mpe, attribute: metric.WithAttributes(attribute.String("policy", "mock-policy-1"))}, + } + + // Use this instead of the default no-op cache + c, err := cache.NewLRUDecisionCache[bool](200) + require.NoError(t, err) + p, err := newTracesProcessor(context.Background(), ct, nextConsumer, cfg, withDecisionBatcher(idb), withPolicies(policies), withNonSampledDecisionCache(c)) + require.NoError(t, err) + + require.NoError(t, p.Start(context.Background(), componenttest.NewNopHost())) + defer func() { + require.NoError(t, p.Shutdown(context.Background())) + }() + + // We are going to create 2 spans belonging to the same trace + traceID := uInt64ToTraceID(1) + + // The first span will be NOT sampled, this will later be set to sampled, but the sampling decision will be cached + mpe.NextDecision = sampling.NotSampled + + // A function that return a ptrace.Traces containing a single span for the single trace we are using. + spanIndexToTraces := func(spanIndex uint64) ptrace.Traces { + traces := ptrace.NewTraces() + span := traces.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans().AppendEmpty() + span.SetTraceID(traceID) + span.SetSpanID(uInt64ToSpanID(spanIndex)) + return traces + } + + // Generate and deliver first span + require.NoError(t, p.ConsumeTraces(context.Background(), spanIndexToTraces(1))) + + tsp := p.(*tailSamplingSpanProcessor) + + // The first tick won't do anything + tsp.policyTicker.OnTick() + require.EqualValues(t, 0, mpe.EvaluationCount) + + // This will cause policy evaluations on the first span + tsp.policyTicker.OnTick() + + // Policy should have been evaluated once + require.EqualValues(t, 1, mpe.EvaluationCount) + + // The final decision SHOULD be NOT Sampled. + require.EqualValues(t, 0, nextConsumer.SpanCount()) + + // Drop the trace to force cache to make decision + tsp.dropTrace(traceID, time.Now()) + _, ok := tsp.idToTrace.Load(traceID) + require.False(t, ok) + + // Set next decision to sampled, ensuring the next decision is determined by the decision cache, not the policy + mpe.NextDecision = sampling.Sampled + + // Generate and deliver final span for the trace which SHOULD get the same sampling decision as the first span. + // The policies should NOT be evaluated again. + require.NoError(t, p.ConsumeTraces(context.Background(), spanIndexToTraces(2))) + require.EqualValues(t, 1, mpe.EvaluationCount) + require.EqualValues(t, 0, nextConsumer.SpanCount(), "original final decision not honored") +} diff --git a/processor/tailsamplingprocessor/testdata/tail_sampling_config.yaml b/processor/tailsamplingprocessor/testdata/tail_sampling_config.yaml index 3221665647e4..f950b52b797c 100644 --- a/processor/tailsamplingprocessor/testdata/tail_sampling_config.yaml +++ b/processor/tailsamplingprocessor/testdata/tail_sampling_config.yaml @@ -3,7 +3,8 @@ tail_sampling: num_traces: 100 expected_new_traces_per_sec: 10 decision_cache: - sampled_cache_size: 500 + sampled_cache_size: 1000 + non_sampled_cache_size: 10000 policies: [ {