From 6228bc2bb49c3e5e68ee974b0d903e8e23335b63 Mon Sep 17 00:00:00 2001 From: James Moessis Date: Mon, 24 Jun 2024 11:44:49 +1000 Subject: [PATCH 1/6] cache and tests for non sampled IDs values in examples fix typo --- processor/tailsamplingprocessor/README.md | 18 +++-- processor/tailsamplingprocessor/config.go | 7 +- processor/tailsamplingprocessor/processor.go | 67 +++++++++++----- .../processor_decisions_test.go | 76 ++++++++++++++++++- .../testdata/tail_sampling_config.yaml | 3 +- 5 files changed, 143 insertions(+), 28 deletions(-) diff --git a/processor/tailsamplingprocessor/README.md b/processor/tailsamplingprocessor/README.md index 8661c9901b1d..6a870bddbb4e 100644 --- a/processor/tailsamplingprocessor/README.md +++ b/processor/tailsamplingprocessor/README.md @@ -46,11 +46,18 @@ The following configuration options can also be modified: - `decision_wait` (default = 30s): Wait time since the first span of a trace before making a sampling decision - `num_traces` (default = 50000): Number of traces kept in memory. - `expected_new_traces_per_sec` (default = 0): Expected number of new traces (helps in allocating data structures) -- `decision_cache` (default = `sampled_cache_size: 0`): Configures amount of trace IDs to be kept in an LRU cache, - persisting the "keep" decisions for traces that may have already been released from memory. - By default, the size is 0 and the cache is inactive. - If using, configure this as much higher than `num_traces` so decisions for trace IDs are kept +- `decision_cache`: Options for configuring caches for sampling decisions. You may want to vary the size of these caches + depending on how many "keep" vs "drop" decisions you expect from your policies. For example, you may allocate a + larger `non_sampled_cache_size` if you expect most traces to be dropped. + Additionally, if using, configure this as much higher than `num_traces` so decisions for trace IDs are kept longer than the span data for the trace. + - `sampled_cache_size` (default = 0): Configures amount of trace IDs to be kept in an LRU cache, + persisting the "keep" decisions for traces that may have already been released from memory. + By default, the size is 0 and the cache is inactive. + - `non_sampled_cache_size` (default = 0) Configures amount of trace IDs to be kept in an LRU cache, + persisting the "drop" decisions for traces that may have already been released from memory. + By default, the size is 0 and the cache is inactive. + Each policy will result in a decision, and the processor will evaluate them to make a final decision: @@ -70,7 +77,8 @@ processors: num_traces: 100 expected_new_traces_per_sec: 10 decision_cache: - sampled_cache_size: 100000 + sampled_cache_size: 100_000 + non_sampled_cache_size: 100_000 policies: [ { diff --git a/processor/tailsamplingprocessor/config.go b/processor/tailsamplingprocessor/config.go index 4185e7b9b0b2..1b18c039fb00 100644 --- a/processor/tailsamplingprocessor/config.go +++ b/processor/tailsamplingprocessor/config.go @@ -225,11 +225,16 @@ type OTTLConditionCfg struct { } type DecisionCacheConfig struct { - // SampledCacheSize specifies the size of the cache that holds the sampled trace IDs + // SampledCacheSize specifies the size of the cache that holds the sampled trace IDs. // This value will be the maximum amount of trace IDs that the cache can hold before overwriting previous IDs. // For effective use, this value should be at least an order of magnitude higher than Config.NumTraces. // If left as default 0, a no-op DecisionCache will be used. SampledCacheSize int `mapstructure:"sampled_cache_size"` + // NonSampledCacheSize specifies the size of the cache that holds the non-sampled trace IDs. + // This value will be the maximum amount of trace IDs that the cache can hold before overwriting previous IDs. + // For effective use, this value should be at least an order of magnitude higher than Config.NumTraces. + // If left as default 0, a no-op DecisionCache will be used. + NonSampledCacheSize int `mapstructure:"non_sampled_cache_size"` } // Config holds the configuration for tail-based sampling. diff --git a/processor/tailsamplingprocessor/processor.go b/processor/tailsamplingprocessor/processor.go index 4515290198ac..a9928cadfeb2 100644 --- a/processor/tailsamplingprocessor/processor.go +++ b/processor/tailsamplingprocessor/processor.go @@ -48,16 +48,17 @@ type tailSamplingSpanProcessor struct { telemetry *metadata.TelemetryBuilder logger *zap.Logger - nextConsumer consumer.Traces - maxNumTraces uint64 - policies []*policy - idToTrace sync.Map - policyTicker timeutils.TTicker - tickerFrequency time.Duration - decisionBatcher idbatcher.Batcher - sampledIDCache cache.Cache[bool] - deleteChan chan pcommon.TraceID - numTracesOnMap *atomic.Uint64 + nextConsumer consumer.Traces + maxNumTraces uint64 + policies []*policy + idToTrace sync.Map + policyTicker timeutils.TTicker + tickerFrequency time.Duration + decisionBatcher idbatcher.Batcher + sampledIDCache cache.Cache[bool] + nonSampledIDCache cache.Cache[bool] + deleteChan chan pcommon.TraceID + numTracesOnMap *atomic.Uint64 } // spanAndScope a structure for holding information about span and its instrumentation scope. @@ -89,23 +90,32 @@ func newTracesProcessor(ctx context.Context, set processor.Settings, nextConsume if err != nil { return nil, err } - sampledDecisions := cache.NewNopDecisionCache[bool]() + nopCache := cache.NewNopDecisionCache[bool]() + sampledDecisions := nopCache + nonSampledDecisions := nopCache if cfg.DecisionCache.SampledCacheSize > 0 { sampledDecisions, err = cache.NewLRUDecisionCache[bool](cfg.DecisionCache.SampledCacheSize) if err != nil { return nil, err } } + if cfg.DecisionCache.NonSampledCacheSize > 0 { + nonSampledDecisions, err = cache.NewLRUDecisionCache[bool](cfg.DecisionCache.NonSampledCacheSize) + if err != nil { + return nil, err + } + } tsp := &tailSamplingSpanProcessor{ - ctx: ctx, - telemetry: telemetry, - nextConsumer: nextConsumer, - maxNumTraces: cfg.NumTraces, - sampledIDCache: sampledDecisions, - logger: telemetrySettings.Logger, - numTracesOnMap: &atomic.Uint64{}, - deleteChan: make(chan pcommon.TraceID, cfg.NumTraces), + ctx: ctx, + telemetry: telemetry, + nextConsumer: nextConsumer, + maxNumTraces: cfg.NumTraces, + sampledIDCache: sampledDecisions, + nonSampledIDCache: nonSampledDecisions, + logger: settings.Logger, + numTracesOnMap: &atomic.Uint64{}, + deleteChan: make(chan pcommon.TraceID, cfg.NumTraces), } tsp.policyTicker = &timeutils.PolicyTicker{OnTickFunc: tsp.samplingPolicyOnTick} @@ -188,6 +198,13 @@ func withSampledDecisionCache(c cache.Cache[bool]) Option { } } +// withSampledDecisionCache sets the cache which the processor uses to store recently sampled trace IDs. +func withNonSampledDecisionCache(c cache.Cache[bool]) Option { + return func(tsp *tailSamplingSpanProcessor) { + tsp.nonSampledIDCache = c + } +} + func getPolicyEvaluator(settings component.TelemetrySettings, cfg *PolicyCfg) (sampling.PolicyEvaluator, error) { switch cfg.Type { case Composite: @@ -371,7 +388,14 @@ func (tsp *tailSamplingSpanProcessor) processTraces(resourceSpans ptrace.Resourc traceTd := ptrace.NewTraces() appendToTraces(traceTd, resourceSpans, spans) tsp.releaseSampledTrace(tsp.ctx, id, traceTd) - tsp.telemetry.ProcessorTailSamplingEarlyReleasesFromCacheDecision.Add(tsp.ctx, int64(len(spans))) + tsp.telemetry.ProcessorTailSamplingEarlyReleasesFromCacheDecision. + Add(tsp.ctx, int64(len(spans)), metric.WithAttributes(attribute.String("decision", "sample"))) + continue + } + // If the trace ID is in the non-sampled cache, short circuit the decision + if _, ok := tsp.nonSampledIDCache.Get(id); ok { + tsp.telemetry.ProcessorTailSamplingEarlyReleasesFromCacheDecision. + Add(tsp.ctx, int64(len(spans)), metric.WithAttributes(attribute.String("decision", "drop"))) continue } @@ -464,6 +488,9 @@ func (tsp *tailSamplingSpanProcessor) dropTrace(traceID pcommon.TraceID, deletio tsp.idToTrace.Delete(traceID) // Subtract one from numTracesOnMap per https://godoc.org/sync/atomic#AddUint64 tsp.numTracesOnMap.Add(^uint64(0)) + if trace.FinalDecision != sampling.Sampled { + tsp.nonSampledIDCache.Put(traceID, true) + } } if trace == nil { tsp.logger.Debug("Attempt to delete traceID not on table") diff --git a/processor/tailsamplingprocessor/processor_decisions_test.go b/processor/tailsamplingprocessor/processor_decisions_test.go index e50ea4f7ce59..6b016d4b30cd 100644 --- a/processor/tailsamplingprocessor/processor_decisions_test.go +++ b/processor/tailsamplingprocessor/processor_decisions_test.go @@ -316,7 +316,7 @@ func TestLateArrivingSpansAssignedOriginalDecision(t *testing.T) { require.EqualValues(t, 1, mpe1.EvaluationCount) require.EqualValues(t, 1, mpe2.EvaluationCount) - // The final decision SHOULD be NotSampled. + //The final decision SHOULD be NotSampled. require.EqualValues(t, 0, nextConsumer.SpanCount()) // Generate and deliver final span for the trace which SHOULD get the same sampling decision as the first span. @@ -400,3 +400,77 @@ func TestLateArrivingSpanUsesDecisionCache(t *testing.T) { require.EqualValues(t, 1, mpe.EvaluationCount) require.EqualValues(t, 2, nextConsumer.SpanCount(), "original final decision not honored") } + +func TestLateSpanUsesNonSampledDecisionCache(t *testing.T) { + cfg := Config{ + DecisionWait: defaultTestDecisionWait * 10, + NumTraces: defaultNumTraces, + } + nextConsumer := new(consumertest.TracesSink) + s := setupTestTelemetry() + ct := s.NewSettings().TelemetrySettings + idb := newSyncIDBatcher() + + mpe := &mockPolicyEvaluator{} + policies := []*policy{ + {name: "mock-policy-1", evaluator: mpe, attribute: metric.WithAttributes(attribute.String("policy", "mock-policy-1"))}, + } + + // Use this instead of the default no-op cache + c, err := cache.NewLRUDecisionCache[bool](200) + require.NoError(t, err) + p, err := newTracesProcessor(context.Background(), ct, nextConsumer, cfg, withDecisionBatcher(idb), withPolicies(policies), withNonSampledDecisionCache(c)) + require.NoError(t, err) + + require.NoError(t, p.Start(context.Background(), componenttest.NewNopHost())) + defer func() { + require.NoError(t, p.Shutdown(context.Background())) + }() + + // We are going to create 2 spans belonging to the same trace + traceID := uInt64ToTraceID(1) + + // The first span will be NOT sampled, this will later be set to sampled, but the sampling decision will be cached + mpe.NextDecision = sampling.NotSampled + + // A function that return a ptrace.Traces containing a single span for the single trace we are using. + spanIndexToTraces := func(spanIndex uint64) ptrace.Traces { + traces := ptrace.NewTraces() + span := traces.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans().AppendEmpty() + span.SetTraceID(traceID) + span.SetSpanID(uInt64ToSpanID(spanIndex)) + return traces + } + + // Generate and deliver first span + require.NoError(t, p.ConsumeTraces(context.Background(), spanIndexToTraces(1))) + + tsp := p.(*tailSamplingSpanProcessor) + + // The first tick won't do anything + tsp.policyTicker.OnTick() + require.EqualValues(t, 0, mpe.EvaluationCount) + + // This will cause policy evaluations on the first span + tsp.policyTicker.OnTick() + + // Policy should have been evaluated once + require.EqualValues(t, 1, mpe.EvaluationCount) + + // The final decision SHOULD be NOT Sampled. + require.EqualValues(t, 0, nextConsumer.SpanCount()) + + // Drop the trace to force cache to make decision + tsp.dropTrace(traceID, time.Now()) + _, ok := tsp.idToTrace.Load(traceID) + require.False(t, ok) + + // Set next decision to sampled, ensuring the next decision is determined by the decision cache, not the policy + mpe.NextDecision = sampling.Sampled + + // Generate and deliver final span for the trace which SHOULD get the same sampling decision as the first span. + // The policies should NOT be evaluated again. + require.NoError(t, p.ConsumeTraces(context.Background(), spanIndexToTraces(2))) + require.EqualValues(t, 1, mpe.EvaluationCount) + require.EqualValues(t, 0, nextConsumer.SpanCount(), "original final decision not honored") +} diff --git a/processor/tailsamplingprocessor/testdata/tail_sampling_config.yaml b/processor/tailsamplingprocessor/testdata/tail_sampling_config.yaml index 3221665647e4..f950b52b797c 100644 --- a/processor/tailsamplingprocessor/testdata/tail_sampling_config.yaml +++ b/processor/tailsamplingprocessor/testdata/tail_sampling_config.yaml @@ -3,7 +3,8 @@ tail_sampling: num_traces: 100 expected_new_traces_per_sec: 10 decision_cache: - sampled_cache_size: 500 + sampled_cache_size: 1000 + non_sampled_cache_size: 10000 policies: [ { From 22692004427b3363f617881840b04b93c4150153 Mon Sep 17 00:00:00 2001 From: James Moessis Date: Mon, 24 Jun 2024 12:01:28 +1000 Subject: [PATCH 2/6] lint --- processor/tailsamplingprocessor/processor_decisions_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/processor/tailsamplingprocessor/processor_decisions_test.go b/processor/tailsamplingprocessor/processor_decisions_test.go index 6b016d4b30cd..79477435ca83 100644 --- a/processor/tailsamplingprocessor/processor_decisions_test.go +++ b/processor/tailsamplingprocessor/processor_decisions_test.go @@ -316,7 +316,7 @@ func TestLateArrivingSpansAssignedOriginalDecision(t *testing.T) { require.EqualValues(t, 1, mpe1.EvaluationCount) require.EqualValues(t, 1, mpe2.EvaluationCount) - //The final decision SHOULD be NotSampled. + // The final decision SHOULD be NotSampled. require.EqualValues(t, 0, nextConsumer.SpanCount()) // Generate and deliver final span for the trace which SHOULD get the same sampling decision as the first span. From 860891e4e653158031fe8bd1ffc167246689b811 Mon Sep 17 00:00:00 2001 From: James Moessis Date: Mon, 24 Jun 2024 12:03:56 +1000 Subject: [PATCH 3/6] changelog --- .chloggen/jmoe_decision-cache-non-sampled.yaml | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 .chloggen/jmoe_decision-cache-non-sampled.yaml diff --git a/.chloggen/jmoe_decision-cache-non-sampled.yaml b/.chloggen/jmoe_decision-cache-non-sampled.yaml new file mode 100644 index 000000000000..8c9dbff3d038 --- /dev/null +++ b/.chloggen/jmoe_decision-cache-non-sampled.yaml @@ -0,0 +1,6 @@ +change_type: enhancement +component: tailsamplingprocessor +note: Adds decision cache for non-sampled trace IDs +issues: [31583] +subtext: +change_logs: [] From 1212ee5f23657193d2ece32657cb6a2acf685454 Mon Sep 17 00:00:00 2001 From: James Moessis Date: Wed, 26 Jun 2024 16:18:27 +1000 Subject: [PATCH 4/6] fix test --- processor/tailsamplingprocessor/config_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/processor/tailsamplingprocessor/config_test.go b/processor/tailsamplingprocessor/config_test.go index c94b3fc6b12e..692e66ee0910 100644 --- a/processor/tailsamplingprocessor/config_test.go +++ b/processor/tailsamplingprocessor/config_test.go @@ -35,7 +35,7 @@ func TestLoadConfig(t *testing.T) { DecisionWait: 10 * time.Second, NumTraces: 100, ExpectedNewTracesPerSec: 10, - DecisionCache: DecisionCacheConfig{SampledCacheSize: 500}, + DecisionCache: DecisionCacheConfig{SampledCacheSize: 1000, NonSampledCacheSize: 10000}, PolicyCfgs: []PolicyCfg{ { sharedPolicyCfg: sharedPolicyCfg{ From 389663bf8783b54dcd46e2d152c73d823684f549 Mon Sep 17 00:00:00 2001 From: James Moessis Date: Tue, 29 Oct 2024 12:56:48 +1100 Subject: [PATCH 5/6] rebase fixes --- processor/tailsamplingprocessor/processor.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/processor/tailsamplingprocessor/processor.go b/processor/tailsamplingprocessor/processor.go index a9928cadfeb2..c1f3ce1b5fe8 100644 --- a/processor/tailsamplingprocessor/processor.go +++ b/processor/tailsamplingprocessor/processor.go @@ -113,7 +113,7 @@ func newTracesProcessor(ctx context.Context, set processor.Settings, nextConsume maxNumTraces: cfg.NumTraces, sampledIDCache: sampledDecisions, nonSampledIDCache: nonSampledDecisions, - logger: settings.Logger, + logger: telemetrySettings.Logger, numTracesOnMap: &atomic.Uint64{}, deleteChan: make(chan pcommon.TraceID, cfg.NumTraces), } From dbe870e8abcc08f6daed22443903e9cb90b3e698 Mon Sep 17 00:00:00 2001 From: James Moessis Date: Tue, 29 Oct 2024 13:10:04 +1100 Subject: [PATCH 6/6] address juraci comments --- processor/tailsamplingprocessor/config_test.go | 2 +- processor/tailsamplingprocessor/processor.go | 9 ++++----- .../tailsamplingprocessor/processor_decisions_test.go | 2 +- 3 files changed, 6 insertions(+), 7 deletions(-) diff --git a/processor/tailsamplingprocessor/config_test.go b/processor/tailsamplingprocessor/config_test.go index 692e66ee0910..7f88e87567db 100644 --- a/processor/tailsamplingprocessor/config_test.go +++ b/processor/tailsamplingprocessor/config_test.go @@ -35,7 +35,7 @@ func TestLoadConfig(t *testing.T) { DecisionWait: 10 * time.Second, NumTraces: 100, ExpectedNewTracesPerSec: 10, - DecisionCache: DecisionCacheConfig{SampledCacheSize: 1000, NonSampledCacheSize: 10000}, + DecisionCache: DecisionCacheConfig{SampledCacheSize: 1_000, NonSampledCacheSize: 10_000}, PolicyCfgs: []PolicyCfg{ { sharedPolicyCfg: sharedPolicyCfg{ diff --git a/processor/tailsamplingprocessor/processor.go b/processor/tailsamplingprocessor/processor.go index c1f3ce1b5fe8..02883fbc4778 100644 --- a/processor/tailsamplingprocessor/processor.go +++ b/processor/tailsamplingprocessor/processor.go @@ -388,14 +388,15 @@ func (tsp *tailSamplingSpanProcessor) processTraces(resourceSpans ptrace.Resourc traceTd := ptrace.NewTraces() appendToTraces(traceTd, resourceSpans, spans) tsp.releaseSampledTrace(tsp.ctx, id, traceTd) + metric.WithAttributeSet(attribute.NewSet()) tsp.telemetry.ProcessorTailSamplingEarlyReleasesFromCacheDecision. - Add(tsp.ctx, int64(len(spans)), metric.WithAttributes(attribute.String("decision", "sample"))) + Add(tsp.ctx, int64(len(spans)), attrSampledTrue) continue } // If the trace ID is in the non-sampled cache, short circuit the decision if _, ok := tsp.nonSampledIDCache.Get(id); ok { tsp.telemetry.ProcessorTailSamplingEarlyReleasesFromCacheDecision. - Add(tsp.ctx, int64(len(spans)), metric.WithAttributes(attribute.String("decision", "drop"))) + Add(tsp.ctx, int64(len(spans)), attrSampledFalse) continue } @@ -453,6 +454,7 @@ func (tsp *tailSamplingSpanProcessor) processTraces(resourceSpans ptrace.Resourc appendToTraces(traceTd, resourceSpans, spans) tsp.releaseSampledTrace(tsp.ctx, id, traceTd) case sampling.NotSampled: + tsp.nonSampledIDCache.Put(id, true) tsp.telemetry.ProcessorTailSamplingSamplingLateSpanAge.Record(tsp.ctx, int64(time.Since(actualData.DecisionTime)/time.Second)) default: tsp.logger.Warn("Encountered unexpected sampling decision", @@ -488,9 +490,6 @@ func (tsp *tailSamplingSpanProcessor) dropTrace(traceID pcommon.TraceID, deletio tsp.idToTrace.Delete(traceID) // Subtract one from numTracesOnMap per https://godoc.org/sync/atomic#AddUint64 tsp.numTracesOnMap.Add(^uint64(0)) - if trace.FinalDecision != sampling.Sampled { - tsp.nonSampledIDCache.Put(traceID, true) - } } if trace == nil { tsp.logger.Debug("Attempt to delete traceID not on table") diff --git a/processor/tailsamplingprocessor/processor_decisions_test.go b/processor/tailsamplingprocessor/processor_decisions_test.go index 79477435ca83..293b3bf78e55 100644 --- a/processor/tailsamplingprocessor/processor_decisions_test.go +++ b/processor/tailsamplingprocessor/processor_decisions_test.go @@ -408,7 +408,7 @@ func TestLateSpanUsesNonSampledDecisionCache(t *testing.T) { } nextConsumer := new(consumertest.TracesSink) s := setupTestTelemetry() - ct := s.NewSettings().TelemetrySettings + ct := s.NewSettings() idb := newSyncIDBatcher() mpe := &mockPolicyEvaluator{}