From dbe870e8abcc08f6daed22443903e9cb90b3e698 Mon Sep 17 00:00:00 2001 From: James Moessis Date: Tue, 29 Oct 2024 13:10:04 +1100 Subject: [PATCH] address juraci comments --- processor/tailsamplingprocessor/config_test.go | 2 +- processor/tailsamplingprocessor/processor.go | 9 ++++----- .../tailsamplingprocessor/processor_decisions_test.go | 2 +- 3 files changed, 6 insertions(+), 7 deletions(-) diff --git a/processor/tailsamplingprocessor/config_test.go b/processor/tailsamplingprocessor/config_test.go index 692e66ee0910..7f88e87567db 100644 --- a/processor/tailsamplingprocessor/config_test.go +++ b/processor/tailsamplingprocessor/config_test.go @@ -35,7 +35,7 @@ func TestLoadConfig(t *testing.T) { DecisionWait: 10 * time.Second, NumTraces: 100, ExpectedNewTracesPerSec: 10, - DecisionCache: DecisionCacheConfig{SampledCacheSize: 1000, NonSampledCacheSize: 10000}, + DecisionCache: DecisionCacheConfig{SampledCacheSize: 1_000, NonSampledCacheSize: 10_000}, PolicyCfgs: []PolicyCfg{ { sharedPolicyCfg: sharedPolicyCfg{ diff --git a/processor/tailsamplingprocessor/processor.go b/processor/tailsamplingprocessor/processor.go index c1f3ce1b5fe8..02883fbc4778 100644 --- a/processor/tailsamplingprocessor/processor.go +++ b/processor/tailsamplingprocessor/processor.go @@ -388,14 +388,15 @@ func (tsp *tailSamplingSpanProcessor) processTraces(resourceSpans ptrace.Resourc traceTd := ptrace.NewTraces() appendToTraces(traceTd, resourceSpans, spans) tsp.releaseSampledTrace(tsp.ctx, id, traceTd) + metric.WithAttributeSet(attribute.NewSet()) tsp.telemetry.ProcessorTailSamplingEarlyReleasesFromCacheDecision. - Add(tsp.ctx, int64(len(spans)), metric.WithAttributes(attribute.String("decision", "sample"))) + Add(tsp.ctx, int64(len(spans)), attrSampledTrue) continue } // If the trace ID is in the non-sampled cache, short circuit the decision if _, ok := tsp.nonSampledIDCache.Get(id); ok { tsp.telemetry.ProcessorTailSamplingEarlyReleasesFromCacheDecision. - Add(tsp.ctx, int64(len(spans)), metric.WithAttributes(attribute.String("decision", "drop"))) + Add(tsp.ctx, int64(len(spans)), attrSampledFalse) continue } @@ -453,6 +454,7 @@ func (tsp *tailSamplingSpanProcessor) processTraces(resourceSpans ptrace.Resourc appendToTraces(traceTd, resourceSpans, spans) tsp.releaseSampledTrace(tsp.ctx, id, traceTd) case sampling.NotSampled: + tsp.nonSampledIDCache.Put(id, true) tsp.telemetry.ProcessorTailSamplingSamplingLateSpanAge.Record(tsp.ctx, int64(time.Since(actualData.DecisionTime)/time.Second)) default: tsp.logger.Warn("Encountered unexpected sampling decision", @@ -488,9 +490,6 @@ func (tsp *tailSamplingSpanProcessor) dropTrace(traceID pcommon.TraceID, deletio tsp.idToTrace.Delete(traceID) // Subtract one from numTracesOnMap per https://godoc.org/sync/atomic#AddUint64 tsp.numTracesOnMap.Add(^uint64(0)) - if trace.FinalDecision != sampling.Sampled { - tsp.nonSampledIDCache.Put(traceID, true) - } } if trace == nil { tsp.logger.Debug("Attempt to delete traceID not on table") diff --git a/processor/tailsamplingprocessor/processor_decisions_test.go b/processor/tailsamplingprocessor/processor_decisions_test.go index 79477435ca83..293b3bf78e55 100644 --- a/processor/tailsamplingprocessor/processor_decisions_test.go +++ b/processor/tailsamplingprocessor/processor_decisions_test.go @@ -408,7 +408,7 @@ func TestLateSpanUsesNonSampledDecisionCache(t *testing.T) { } nextConsumer := new(consumertest.TracesSink) s := setupTestTelemetry() - ct := s.NewSettings().TelemetrySettings + ct := s.NewSettings() idb := newSyncIDBatcher() mpe := &mockPolicyEvaluator{}