Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a built-in trace interceptor for keeping traces depending of their latency #8040

Merged
merged 8 commits into from
Dec 5, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ public final class ConfigDefaults {
static final boolean DEFAULT_DB_CLIENT_HOST_SPLIT_BY_INSTANCE_TYPE_SUFFIX = false;
static final boolean DEFAULT_DB_CLIENT_HOST_SPLIT_BY_HOST = false;
static final String DEFAULT_DB_DBM_PROPAGATION_MODE_MODE = "disabled";
// Default value is set to 0, it disables the latency trace interceptor
static final int DEFAULT_TRACE_KEEP_LATENCY_THRESHOLD_MS = 0;
static final int DEFAULT_SCOPE_DEPTH_LIMIT = 100;
static final int DEFAULT_SCOPE_ITERATION_KEEP_ALIVE = 30; // in seconds
static final int DEFAULT_PARTIAL_FLUSH_MIN_SPANS = 1000;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ public final class TracerConfig {
public static final String TRACE_HTTP_CLIENT_ERROR_STATUSES = "trace.http.client.error.statuses";

public static final String SPLIT_BY_TAGS = "trace.split-by-tags";

// trace latency interceptor value should be in ms
public static final String TRACE_KEEP_LATENCY_THRESHOLD_MS = "trace.experimental.keep.latency.threshold.ms";
public static final String SCOPE_DEPTH_LIMIT = "trace.scope.depth.limit";
public static final String SCOPE_STRICT_MODE = "trace.scope.strict.mode";
public static final String SCOPE_ITERATION_KEEP_ALIVE = "trace.scope.iteration.keep.alive";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ public enum Priority {
DD_INTAKE(2),
GIT_METADATA(3),

// trace custom sampling
ROOT_SPAN_LATENCY(Integer.MAX_VALUE - 2),

// trace data collection
CI_VISIBILITY_TELEMETRY(Integer.MAX_VALUE - 1),
SERVICE_NAME_COLLECTING(Integer.MAX_VALUE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
import datadog.trace.core.scopemanager.ContinuableScopeManager;
import datadog.trace.core.taginterceptor.RuleFlags;
import datadog.trace.core.taginterceptor.TagInterceptor;
import datadog.trace.core.traceinterceptor.LatencyTraceInterceptor;
import datadog.trace.lambda.LambdaHandler;
import datadog.trace.relocate.api.RatelimitedLogger;
import datadog.trace.util.AgentTaskScheduler;
Expand Down Expand Up @@ -745,6 +746,10 @@ private CoreTracer(
addTraceInterceptor(GitMetadataTraceInterceptor.INSTANCE);
}

if (config.isTraceKeepLatencyThresholdEnabled()) {
addTraceInterceptor(LatencyTraceInterceptor.INSTANCE);
}

this.instrumentationGateway = instrumentationGateway;
callbackProviderAppSec = instrumentationGateway.getCallbackProvider(RequestContextSlot.APPSEC);
callbackProviderIast = instrumentationGateway.getCallbackProvider(RequestContextSlot.IAST);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package datadog.trace.core.traceinterceptor;

import datadog.trace.api.Config;
import datadog.trace.api.DDTags;
import datadog.trace.api.interceptor.AbstractTraceInterceptor;
import datadog.trace.api.interceptor.MutableSpan;
import datadog.trace.api.interceptor.TraceInterceptor;
import java.util.Collection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This trace latency interceptor is disabled by default. We can activate it by setting the value of
* dd.trace.latency.interceptor.value to a positive value This value should be in milliseconds and
* this interceptor will retain any local trace who has a root span duration greater than this
* value. The activation of this interceptor is ignored if partial flush is enabled in order to
* avoid incomplete local trace (incomplete chunk of trace). Note that since we're changing the
* sampling priority at the end of local trace, there is no guarantee to get complete traces, since
* the original sampling priority for this trace may have already been propagated.
*/
public class LatencyTraceInterceptor extends AbstractTraceInterceptor {
private static final Logger log = LoggerFactory.getLogger(LatencyTraceInterceptor.class);
// duration configured in ms, need to be converted in nano seconds
private static final long LATENCY = Config.get().getTraceKeepLatencyThreshold() * 1000000L;

public static final TraceInterceptor INSTANCE =
new LatencyTraceInterceptor(Priority.ROOT_SPAN_LATENCY);

protected LatencyTraceInterceptor(Priority priority) {
super(priority);
}

@Override
public Collection<? extends MutableSpan> onTraceComplete(
Collection<? extends MutableSpan> latencyTrace) {
if (latencyTrace.isEmpty()) {
return latencyTrace;
}
MutableSpan rootSpan = latencyTrace.iterator().next().getLocalRootSpan();
if (rootSpan != null && rootSpan.getDurationNano() > LATENCY) {
rootSpan.setTag(DDTags.MANUAL_KEEP, true);
}
return latencyTrace;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package datadog.trace.core.traceinterceptor

import datadog.trace.api.DDTags
import datadog.trace.common.writer.ListWriter

import datadog.trace.core.test.DDCoreSpecification

import spock.lang.Timeout

@Timeout(10)
class LatencyTraceInterceptorTest extends DDCoreSpecification {


def "test set sampling priority according to latency"() {
setup:

injectSysConfig("trace.partial.flush.enabled", partialFlushEnabled)
injectSysConfig("trace.experimental.keep.latency.threshold.ms", latencyThreshold)

when:
def writer = new ListWriter()
def tracer = tracerBuilder().writer(writer).build()

def spanSetup = tracer.buildSpan("test","my_operation_name").withTag(priorityTag, true).start()
sleep(minDuration)
spanSetup.finish()

then:
def trace = writer.firstTrace()
trace.size() == 1
def span = trace[0]
span.context().getSamplingPriority() == expected

cleanup:
tracer.close()

where:
partialFlushEnabled | latencyThreshold | priorityTag | minDuration | expected
"true" | "200" | DDTags.MANUAL_KEEP | 10 | 2
"true" | "200" | DDTags.MANUAL_DROP | 10 | -1
"true" | "200" | DDTags.MANUAL_KEEP | 300 | 2
"true" | "200" | DDTags.MANUAL_DROP | 300 | -1
"false" | "200" | DDTags.MANUAL_KEEP | 10 | 2
"false" | "200" | DDTags.MANUAL_DROP | 10 | -1
"false" | "200" | DDTags.MANUAL_KEEP | 300 | 2
"false" | "200" | DDTags.MANUAL_DROP | 300 | 2
}
}
20 changes: 20 additions & 0 deletions internal-api/src/main/java/datadog/trace/api/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,8 @@ public static String getHostName() {
private final boolean scopeStrictMode;
private final int scopeIterationKeepAlive;
private final int partialFlushMinSpans;
private final int traceKeepLatencyThreshold;
private final boolean traceKeepLatencyThresholdEnabled;
private final boolean traceStrictWritesEnabled;
private final boolean logExtractHeaderNames;
private final Set<PropagationStyle> propagationStylesToExtract;
Expand Down Expand Up @@ -860,6 +862,12 @@ private Config(final ConfigProvider configProvider, final InstrumenterConfig ins
? 0
: configProvider.getInteger(PARTIAL_FLUSH_MIN_SPANS, DEFAULT_PARTIAL_FLUSH_MIN_SPANS);

traceKeepLatencyThreshold =
configProvider.getInteger(
TRACE_KEEP_LATENCY_THRESHOLD_MS, DEFAULT_TRACE_KEEP_LATENCY_THRESHOLD_MS);

traceKeepLatencyThresholdEnabled = !partialFlushEnabled && (traceKeepLatencyThreshold > 0);

traceStrictWritesEnabled = configProvider.getBoolean(TRACE_STRICT_WRITES_ENABLED, false);

logExtractHeaderNames =
Expand Down Expand Up @@ -2075,6 +2083,14 @@ public int getPartialFlushMinSpans() {
return partialFlushMinSpans;
}

public int getTraceKeepLatencyThreshold() {
return traceKeepLatencyThreshold;
}

public boolean isTraceKeepLatencyThresholdEnabled() {
return traceKeepLatencyThresholdEnabled;
}

public boolean isTraceStrictWritesEnabled() {
return traceStrictWritesEnabled;
}
Expand Down Expand Up @@ -4158,6 +4174,10 @@ public String toString() {
+ scopeIterationKeepAlive
+ ", partialFlushMinSpans="
+ partialFlushMinSpans
+ ", traceKeepLatencyThresholdEnabled="
+ traceKeepLatencyThresholdEnabled
+ ", traceKeepLatencyThreshold="
+ traceKeepLatencyThreshold
+ ", traceStrictWritesEnabled="
+ traceStrictWritesEnabled
+ ", tracePropagationStylesToExtract="
Expand Down
Loading