diff --git a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/MsgPackDatastreamsPayloadWriter.java b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/MsgPackDatastreamsPayloadWriter.java index e6dc2a18e05..2baa8943de0 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/MsgPackDatastreamsPayloadWriter.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/MsgPackDatastreamsPayloadWriter.java @@ -6,6 +6,7 @@ import datadog.communication.serialization.Writable; import datadog.communication.serialization.WritableFormatter; import datadog.communication.serialization.msgpack.MsgPackWriter; +import datadog.trace.api.Config; import datadog.trace.api.WellKnownTags; import datadog.trace.common.metrics.Sink; import java.util.Collection; @@ -31,6 +32,7 @@ public class MsgPackDatastreamsPayloadWriter implements DatastreamsPayloadWriter private static final byte[] PARENT_HASH = "ParentHash".getBytes(ISO_8859_1); private static final byte[] BACKLOG_VALUE = "Value".getBytes(ISO_8859_1); private static final byte[] BACKLOG_TAGS = "Tags".getBytes(ISO_8859_1); + private static final byte[] PRODUCTS_MASK = "ProductMask".getBytes(ISO_8859_1); private static final int INITIAL_CAPACITY = 512 * 1024; @@ -55,9 +57,30 @@ public void reset() { buffer.reset(); } + // extend the list as needed + private static final int APM_PRODUCT = 1; // 00000001 + private static final int DSM_PRODUCT = 2; // 00000010 + private static final int DJM_PRODUCT = 4; // 00000100 + private static final int PROFILING_PRODUCT = 8; // 00001000 + + public long getProductsMask() { + long productsMask = APM_PRODUCT; + if (Config.get().isDataStreamsEnabled()) { + productsMask |= DSM_PRODUCT; + } + if (Config.get().isDataJobsEnabled()) { + productsMask |= DJM_PRODUCT; + } + if (Config.get().isProfilingEnabled()) { + productsMask |= PROFILING_PRODUCT; + } + + return productsMask; + } + @Override public void writePayload(Collection data, String serviceNameOverride) { - writer.startMap(7); + writer.startMap(8); /* 1 */ writer.writeUTF8(ENV); writer.writeUTF8(wellKnownTags.getEnv()); @@ -112,6 +135,10 @@ public void writePayload(Collection data, String serviceNameOverrid } } + /* 8 */ + writer.writeUTF8(PRODUCTS_MASK); + writer.writeLong(getProductsMask()); + buffer.mark(); sink.accept(buffer.messageCount(), buffer.slice()); buffer.reset(); diff --git a/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DataStreamsWritingTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DataStreamsWritingTest.groovy index 5f01afeafa7..6d163d0ac79 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DataStreamsWritingTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DataStreamsWritingTest.groovy @@ -96,7 +96,7 @@ class DataStreamsWritingTest extends DDCoreSpecification { BufferedSource bufferedSource = Okio.buffer(gzipSource) MessageUnpacker unpacker = MessagePack.newDefaultUnpacker(bufferedSource.inputStream()) - assert unpacker.unpackMapHeader() == 7 + assert unpacker.unpackMapHeader() == 8 assert unpacker.unpackString() == "Env" assert unpacker.unpackString() == "test" assert unpacker.unpackString() == "Service" @@ -161,7 +161,7 @@ class DataStreamsWritingTest extends DDCoreSpecification { BufferedSource bufferedSource = Okio.buffer(gzipSource) MessageUnpacker unpacker = MessagePack.newDefaultUnpacker(bufferedSource.inputStream()) - assert unpacker.unpackMapHeader() == 7 + assert unpacker.unpackMapHeader() == 8 assert unpacker.unpackString() == "Env" assert unpacker.unpackString() == "test" assert unpacker.unpackString() == "Service" @@ -262,6 +262,9 @@ class DataStreamsWritingTest extends DDCoreSpecification { assert unpacker.unpackString() == (hash == 1 ? "topic:testTopic" : "topic:testTopic2") } + assert unpacker.unpackString() == "ProductMask" + assert unpacker.unpackLong() == 1 + return true } }