Skip to content

Commit

Permalink
New tag for DSM checkpoints - a bitmask representing enabled products…
Browse files Browse the repository at this point in the history
… / features (#8051)

Tag dsm checkpoints with product mask
  • Loading branch information
kr-igor authored Dec 6, 2024
1 parent 74967dd commit 0db6312
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import datadog.communication.serialization.Writable;
import datadog.communication.serialization.WritableFormatter;
import datadog.communication.serialization.msgpack.MsgPackWriter;
import datadog.trace.api.Config;
import datadog.trace.api.WellKnownTags;
import datadog.trace.common.metrics.Sink;
import java.util.Collection;
Expand All @@ -31,6 +32,7 @@ public class MsgPackDatastreamsPayloadWriter implements DatastreamsPayloadWriter
private static final byte[] PARENT_HASH = "ParentHash".getBytes(ISO_8859_1);
private static final byte[] BACKLOG_VALUE = "Value".getBytes(ISO_8859_1);
private static final byte[] BACKLOG_TAGS = "Tags".getBytes(ISO_8859_1);
private static final byte[] PRODUCTS_MASK = "ProductMask".getBytes(ISO_8859_1);

private static final int INITIAL_CAPACITY = 512 * 1024;

Expand All @@ -55,9 +57,30 @@ public void reset() {
buffer.reset();
}

// extend the list as needed
private static final int APM_PRODUCT = 1; // 00000001
private static final int DSM_PRODUCT = 2; // 00000010
private static final int DJM_PRODUCT = 4; // 00000100
private static final int PROFILING_PRODUCT = 8; // 00001000

public long getProductsMask() {
long productsMask = APM_PRODUCT;
if (Config.get().isDataStreamsEnabled()) {
productsMask |= DSM_PRODUCT;
}
if (Config.get().isDataJobsEnabled()) {
productsMask |= DJM_PRODUCT;
}
if (Config.get().isProfilingEnabled()) {
productsMask |= PROFILING_PRODUCT;
}

return productsMask;
}

@Override
public void writePayload(Collection<StatsBucket> data, String serviceNameOverride) {
writer.startMap(7);
writer.startMap(8);
/* 1 */
writer.writeUTF8(ENV);
writer.writeUTF8(wellKnownTags.getEnv());
Expand Down Expand Up @@ -112,6 +135,10 @@ public void writePayload(Collection<StatsBucket> data, String serviceNameOverrid
}
}

/* 8 */
writer.writeUTF8(PRODUCTS_MASK);
writer.writeLong(getProductsMask());

buffer.mark();
sink.accept(buffer.messageCount(), buffer.slice());
buffer.reset();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ class DataStreamsWritingTest extends DDCoreSpecification {
BufferedSource bufferedSource = Okio.buffer(gzipSource)
MessageUnpacker unpacker = MessagePack.newDefaultUnpacker(bufferedSource.inputStream())

assert unpacker.unpackMapHeader() == 7
assert unpacker.unpackMapHeader() == 8
assert unpacker.unpackString() == "Env"
assert unpacker.unpackString() == "test"
assert unpacker.unpackString() == "Service"
Expand Down Expand Up @@ -161,7 +161,7 @@ class DataStreamsWritingTest extends DDCoreSpecification {
BufferedSource bufferedSource = Okio.buffer(gzipSource)
MessageUnpacker unpacker = MessagePack.newDefaultUnpacker(bufferedSource.inputStream())

assert unpacker.unpackMapHeader() == 7
assert unpacker.unpackMapHeader() == 8
assert unpacker.unpackString() == "Env"
assert unpacker.unpackString() == "test"
assert unpacker.unpackString() == "Service"
Expand Down Expand Up @@ -262,6 +262,9 @@ class DataStreamsWritingTest extends DDCoreSpecification {
assert unpacker.unpackString() == (hash == 1 ? "topic:testTopic" : "topic:testTopic2")
}

assert unpacker.unpackString() == "ProductMask"
assert unpacker.unpackLong() == 1

return true
}
}
Expand Down

0 comments on commit 0db6312

Please sign in to comment.