Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[processor/tailsampling] Decision cache for non-sampled trace IDs #33722

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .chloggen/jmoe_decision-cache-non-sampled.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
change_type: enhancement
component: tailsamplingprocessor
note: Adds decision cache for non-sampled trace IDs
issues: [31583]
subtext:
change_logs: []
18 changes: 13 additions & 5 deletions processor/tailsamplingprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,18 @@ The following configuration options can also be modified:
- `decision_wait` (default = 30s): Wait time since the first span of a trace before making a sampling decision
- `num_traces` (default = 50000): Number of traces kept in memory.
- `expected_new_traces_per_sec` (default = 0): Expected number of new traces (helps in allocating data structures)
- `decision_cache` (default = `sampled_cache_size: 0`): Configures amount of trace IDs to be kept in an LRU cache,
persisting the "keep" decisions for traces that may have already been released from memory.
By default, the size is 0 and the cache is inactive.
If using, configure this as much higher than `num_traces` so decisions for trace IDs are kept
- `decision_cache`: Options for configuring caches for sampling decisions. You may want to vary the size of these caches
depending on how many "keep" vs "drop" decisions you expect from your policies. For example, you may allocate a
larger `non_sampled_cache_size` if you expect most traces to be dropped.
Additionally, if using, configure this as much higher than `num_traces` so decisions for trace IDs are kept
longer than the span data for the trace.
- `sampled_cache_size` (default = 0): Configures amount of trace IDs to be kept in an LRU cache,
persisting the "keep" decisions for traces that may have already been released from memory.
By default, the size is 0 and the cache is inactive.
- `non_sampled_cache_size` (default = 0) Configures amount of trace IDs to be kept in an LRU cache,
persisting the "drop" decisions for traces that may have already been released from memory.
By default, the size is 0 and the cache is inactive.


Each policy will result in a decision, and the processor will evaluate them to make a final decision:

Expand All @@ -69,7 +76,8 @@ processors:
num_traces: 100
expected_new_traces_per_sec: 10
decision_cache:
sampled_cache_size: 100000
sampled_cache_size: 100_000
non_sampled_cache_size: 100_000
policies:
[
{
Expand Down
7 changes: 6 additions & 1 deletion processor/tailsamplingprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,11 +221,16 @@ type OTTLConditionCfg struct {
}

type DecisionCacheConfig struct {
// SampledCacheSize specifies the size of the cache that holds the sampled trace IDs
// SampledCacheSize specifies the size of the cache that holds the sampled trace IDs.
// This value will be the maximum amount of trace IDs that the cache can hold before overwriting previous IDs.
// For effective use, this value should be at least an order of magnitude higher than Config.NumTraces.
// If left as default 0, a no-op DecisionCache will be used.
SampledCacheSize int `mapstructure:"sampled_cache_size"`
// NonSampledCacheSize specifies the size of the cache that holds the non-sampled trace IDs.
// This value will be the maximum amount of trace IDs that the cache can hold before overwriting previous IDs.
// For effective use, this value should be at least an order of magnitude higher than Config.NumTraces.
// If left as default 0, a no-op DecisionCache will be used.
NonSampledCacheSize int `mapstructure:"non_sampled_cache_size"`
}

// Config holds the configuration for tail-based sampling.
Expand Down
2 changes: 1 addition & 1 deletion processor/tailsamplingprocessor/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func TestLoadConfig(t *testing.T) {
DecisionWait: 10 * time.Second,
NumTraces: 100,
ExpectedNewTracesPerSec: 10,
DecisionCache: DecisionCacheConfig{SampledCacheSize: 500},
DecisionCache: DecisionCacheConfig{SampledCacheSize: 1000, NonSampledCacheSize: 10000},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
DecisionCache: DecisionCacheConfig{SampledCacheSize: 1000, NonSampledCacheSize: 10000},
DecisionCache: DecisionCacheConfig{SampledCacheSize: 1_000, NonSampledCacheSize: 10_000},

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This suggestion is because it wasn't very clear on a first read that those were different numbers.

PolicyCfgs: []PolicyCfg{
{
sharedPolicyCfg: sharedPolicyCfg{
Expand Down
67 changes: 47 additions & 20 deletions processor/tailsamplingprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,16 +48,17 @@ type tailSamplingSpanProcessor struct {
telemetry *metadata.TelemetryBuilder
logger *zap.Logger

nextConsumer consumer.Traces
maxNumTraces uint64
policies []*policy
idToTrace sync.Map
policyTicker timeutils.TTicker
tickerFrequency time.Duration
decisionBatcher idbatcher.Batcher
sampledIDCache cache.Cache[bool]
deleteChan chan pcommon.TraceID
numTracesOnMap *atomic.Uint64
nextConsumer consumer.Traces
maxNumTraces uint64
policies []*policy
idToTrace sync.Map
policyTicker timeutils.TTicker
tickerFrequency time.Duration
decisionBatcher idbatcher.Batcher
sampledIDCache cache.Cache[bool]
nonSampledIDCache cache.Cache[bool]
deleteChan chan pcommon.TraceID
numTracesOnMap *atomic.Uint64
}

// spanAndScope a structure for holding information about span and its instrumentation scope.
Expand Down Expand Up @@ -88,23 +89,32 @@ func newTracesProcessor(ctx context.Context, settings component.TelemetrySetting
if err != nil {
return nil, err
}
sampledDecisions := cache.NewNopDecisionCache[bool]()
nopCache := cache.NewNopDecisionCache[bool]()
sampledDecisions := nopCache
nonSampledDecisions := nopCache
if cfg.DecisionCache.SampledCacheSize > 0 {
sampledDecisions, err = cache.NewLRUDecisionCache[bool](cfg.DecisionCache.SampledCacheSize)
if err != nil {
return nil, err
}
}
if cfg.DecisionCache.NonSampledCacheSize > 0 {
nonSampledDecisions, err = cache.NewLRUDecisionCache[bool](cfg.DecisionCache.NonSampledCacheSize)
if err != nil {
return nil, err
}
}

tsp := &tailSamplingSpanProcessor{
ctx: ctx,
telemetry: telemetry,
nextConsumer: nextConsumer,
maxNumTraces: cfg.NumTraces,
sampledIDCache: sampledDecisions,
logger: settings.Logger,
numTracesOnMap: &atomic.Uint64{},
deleteChan: make(chan pcommon.TraceID, cfg.NumTraces),
ctx: ctx,
telemetry: telemetry,
nextConsumer: nextConsumer,
maxNumTraces: cfg.NumTraces,
sampledIDCache: sampledDecisions,
nonSampledIDCache: nonSampledDecisions,
logger: settings.Logger,
numTracesOnMap: &atomic.Uint64{},
deleteChan: make(chan pcommon.TraceID, cfg.NumTraces),
}
tsp.policyTicker = &timeutils.PolicyTicker{OnTickFunc: tsp.samplingPolicyOnTick}

Expand Down Expand Up @@ -182,6 +192,13 @@ func withSampledDecisionCache(c cache.Cache[bool]) Option {
}
}

// withSampledDecisionCache sets the cache which the processor uses to store recently sampled trace IDs.
func withNonSampledDecisionCache(c cache.Cache[bool]) Option {
return func(tsp *tailSamplingSpanProcessor) {
tsp.nonSampledIDCache = c
}
}

func getPolicyEvaluator(settings component.TelemetrySettings, cfg *PolicyCfg) (sampling.PolicyEvaluator, error) {
switch cfg.Type {
case Composite:
Expand Down Expand Up @@ -365,7 +382,14 @@ func (tsp *tailSamplingSpanProcessor) processTraces(resourceSpans ptrace.Resourc
traceTd := ptrace.NewTraces()
appendToTraces(traceTd, resourceSpans, spans)
tsp.releaseSampledTrace(tsp.ctx, id, traceTd)
tsp.telemetry.ProcessorTailSamplingEarlyReleasesFromCacheDecision.Add(tsp.ctx, int64(len(spans)))
tsp.telemetry.ProcessorTailSamplingEarlyReleasesFromCacheDecision.
Add(tsp.ctx, int64(len(spans)), metric.WithAttributes(attribute.String("decision", "sample")))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interstingly, metric.WithAttributes performs very poorly, so it should be avoided on the hot path. I typically have a static var for that, and refer it later.

Using metric.WithAttributeSet should not cause any allocations on the hot path.

successAttr: attribute.NewSet(ea, attribute.Bool("success", true)),

continue
}
// If the trace ID is in the non-sampled cache, short circuit the decision
if _, ok := tsp.nonSampledIDCache.Get(id); ok {
tsp.telemetry.ProcessorTailSamplingEarlyReleasesFromCacheDecision.
Add(tsp.ctx, int64(len(spans)), metric.WithAttributes(attribute.String("decision", "drop")))
continue
}

Expand Down Expand Up @@ -458,6 +482,9 @@ func (tsp *tailSamplingSpanProcessor) dropTrace(traceID pcommon.TraceID, deletio
tsp.idToTrace.Delete(traceID)
// Subtract one from numTracesOnMap per https://godoc.org/sync/atomic#AddUint64
tsp.numTracesOnMap.Add(^uint64(0))
if trace.FinalDecision != sampling.Sampled {
tsp.nonSampledIDCache.Put(traceID, true)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be placed close to line 450, where the NotSampled decision is made. I think this code here will not apply to all NotSampled decisions, especially if the idToTrace cache is full.

}
}
if trace == nil {
tsp.logger.Debug("Attempt to delete traceID not on table")
Expand Down
74 changes: 74 additions & 0 deletions processor/tailsamplingprocessor/processor_decisions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,3 +400,77 @@ func TestLateArrivingSpanUsesDecisionCache(t *testing.T) {
require.EqualValues(t, 1, mpe.EvaluationCount)
require.EqualValues(t, 2, nextConsumer.SpanCount(), "original final decision not honored")
}

func TestLateSpanUsesNonSampledDecisionCache(t *testing.T) {
cfg := Config{
DecisionWait: defaultTestDecisionWait * 10,
NumTraces: defaultNumTraces,
}
nextConsumer := new(consumertest.TracesSink)
s := setupTestTelemetry()
ct := s.NewSettings().TelemetrySettings
idb := newSyncIDBatcher()

mpe := &mockPolicyEvaluator{}
policies := []*policy{
{name: "mock-policy-1", evaluator: mpe, attribute: metric.WithAttributes(attribute.String("policy", "mock-policy-1"))},
}

// Use this instead of the default no-op cache
c, err := cache.NewLRUDecisionCache[bool](200)
require.NoError(t, err)
p, err := newTracesProcessor(context.Background(), ct, nextConsumer, cfg, withDecisionBatcher(idb), withPolicies(policies), withNonSampledDecisionCache(c))
require.NoError(t, err)

require.NoError(t, p.Start(context.Background(), componenttest.NewNopHost()))
defer func() {
require.NoError(t, p.Shutdown(context.Background()))
}()

// We are going to create 2 spans belonging to the same trace
traceID := uInt64ToTraceID(1)

// The first span will be NOT sampled, this will later be set to sampled, but the sampling decision will be cached
mpe.NextDecision = sampling.NotSampled

// A function that return a ptrace.Traces containing a single span for the single trace we are using.
spanIndexToTraces := func(spanIndex uint64) ptrace.Traces {
traces := ptrace.NewTraces()
span := traces.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans().AppendEmpty()
span.SetTraceID(traceID)
span.SetSpanID(uInt64ToSpanID(spanIndex))
return traces
}

// Generate and deliver first span
require.NoError(t, p.ConsumeTraces(context.Background(), spanIndexToTraces(1)))

tsp := p.(*tailSamplingSpanProcessor)

// The first tick won't do anything
tsp.policyTicker.OnTick()
require.EqualValues(t, 0, mpe.EvaluationCount)

// This will cause policy evaluations on the first span
tsp.policyTicker.OnTick()

// Policy should have been evaluated once
require.EqualValues(t, 1, mpe.EvaluationCount)

// The final decision SHOULD be NOT Sampled.
require.EqualValues(t, 0, nextConsumer.SpanCount())

// Drop the trace to force cache to make decision
tsp.dropTrace(traceID, time.Now())
_, ok := tsp.idToTrace.Load(traceID)
require.False(t, ok)

// Set next decision to sampled, ensuring the next decision is determined by the decision cache, not the policy
mpe.NextDecision = sampling.Sampled

// Generate and deliver final span for the trace which SHOULD get the same sampling decision as the first span.
// The policies should NOT be evaluated again.
require.NoError(t, p.ConsumeTraces(context.Background(), spanIndexToTraces(2)))
require.EqualValues(t, 1, mpe.EvaluationCount)
require.EqualValues(t, 0, nextConsumer.SpanCount(), "original final decision not honored")
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ tail_sampling:
num_traces: 100
expected_new_traces_per_sec: 10
decision_cache:
sampled_cache_size: 500
sampled_cache_size: 1000
non_sampled_cache_size: 10000
policies:
[
{
Expand Down