Skip to content

Commit

Permalink
AMLII-2058 Use correct buffer size for unix sockets
Browse files Browse the repository at this point in the history
When the client is configured to use UDS via builder's address()
method, the client defaulted to UDP-sized buffer.

This patch adds a method to the ClientChannel to fetch the default
packet size for each transport type, and uses it in the client to
pass the correct value to the processor. The maxPacketSizeBytes field
of the resolved builder is reduced to only storing the user preferred
configuration, if any.
  • Loading branch information
vickenty committed Oct 2, 2024
1 parent cdf99d3 commit 00cbb2a
Show file tree
Hide file tree
Showing 8 changed files with 54 additions and 10 deletions.
2 changes: 2 additions & 0 deletions src/main/java/com/timgroup/statsd/ClientChannel.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,6 @@

interface ClientChannel extends WritableByteChannel {
String getTransportType();

int getMaxPacketSizeBytes();
}
5 changes: 5 additions & 0 deletions src/main/java/com/timgroup/statsd/DatagramClientChannel.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,9 @@ public String getTransportType() {
public String toString() {
return "[" + getTransportType() + "] " + address;
}

@Override
public int getMaxPacketSizeBytes() {
return NonBlockingStatsDClient.DEFAULT_UDP_MAX_PACKET_SIZE_BYTES;
}
}
5 changes: 5 additions & 0 deletions src/main/java/com/timgroup/statsd/NamedPipeClientChannel.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,9 @@ public String getTransportType() {
public String toString() {
return pipe;
}

@Override
public int getMaxPacketSizeBytes() {
return NonBlockingStatsDClient.DEFAULT_UDS_MAX_PACKET_SIZE_BYTES;
}
}
12 changes: 9 additions & 3 deletions src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ protected static String format(ThreadLocal<NumberFormat> formatter, Number value
protected final StatsDSender statsDSender;
protected StatsDSender telemetryStatsDSender;
protected final Telemetry telemetry;

private final int maxPacketSizeBytes;
private final boolean blocking;

/**
Expand Down Expand Up @@ -268,6 +268,8 @@ protected static String format(ThreadLocal<NumberFormat> formatter, Number value
}

this.blocking = blocking;
this.maxPacketSizeBytes = maxPacketSizeBytes;

{
List<String> costantPreTags = new ArrayList<>();
if (constantTags != null) {
Expand Down Expand Up @@ -300,7 +302,7 @@ protected static String format(ThreadLocal<NumberFormat> formatter, Number value

ThreadFactory threadFactory = customThreadFactory != null ? customThreadFactory : new StatsDThreadFactory();

statsDProcessor = createProcessor(queueSize, handler, maxPacketSizeBytes, poolSize,
statsDProcessor = createProcessor(queueSize, handler, getPacketSize(clientChannel), poolSize,
processorWorkers, blocking, aggregationFlushInterval, aggregationShards, threadFactory, containerID);

Properties properties = new Properties();
Expand All @@ -318,7 +320,7 @@ protected static String format(ThreadLocal<NumberFormat> formatter, Number value
telemetryClientChannel = createByteChannel(telemetryAddressLookup, timeout, connectionTimeout, bufferSize);

// similar settings, but a single worker and non-blocking.
telemetryStatsDProcessor = createProcessor(queueSize, handler, maxPacketSizeBytes,
telemetryStatsDProcessor = createProcessor(queueSize, handler, getPacketSize(telemetryClientChannel),
poolSize, 1, false, 0, aggregationShards, threadFactory, containerID);
}

Expand Down Expand Up @@ -1340,4 +1342,8 @@ private String getContainerID(String containerID, boolean originDetectionEnabled

return null;
}

private int getPacketSize(ClientChannel chan) {
return maxPacketSizeBytes > 0 ? maxPacketSizeBytes : chan.getMaxPacketSizeBytes();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -236,14 +236,8 @@ protected NonBlockingStatsDClientBuilder resolve() {
throw new UnsupportedOperationException("clone");
}

int packetSize = maxPacketSizeBytes;
Callable<SocketAddress> lookup = getAddressLookup();

if (packetSize == 0) {
packetSize = (port == 0) ? NonBlockingStatsDClient.DEFAULT_UDS_MAX_PACKET_SIZE_BYTES :
NonBlockingStatsDClient.DEFAULT_UDP_MAX_PACKET_SIZE_BYTES;
}

Callable<SocketAddress> telemetryLookup = telemetryAddressLookup;
if (telemetryLookup == null) {
if (telemetryHostname == null) {
Expand All @@ -253,7 +247,6 @@ protected NonBlockingStatsDClientBuilder resolve() {
}
}

resolved.maxPacketSizeBytes = packetSize;
resolved.addressLookup = lookup;
resolved.telemetryAddressLookup = telemetryLookup;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,9 @@ class UnixDatagramClientChannel extends DatagramClientChannel {
public String getTransportType() {
return "uds";
}

@Override
public int getMaxPacketSizeBytes() {
return NonBlockingStatsDClient.DEFAULT_UDS_MAX_PACKET_SIZE_BYTES;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -175,4 +175,9 @@ public String getTransportType() {
public String toString() {
return "[" + getTransportType() + "] " + address;
}

@Override
public int getMaxPacketSizeBytes() {
return NonBlockingStatsDClient.DEFAULT_UDS_MAX_PACKET_SIZE_BYTES;
}
}
23 changes: 23 additions & 0 deletions src/test/java/com/timgroup/statsd/UnixStreamSocketTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -171,4 +171,27 @@ public void resist_dsd_timeout() throws Exception {
assertThat(server.messagesReceived(), hasItem("my.prefix.mycount:30|g"));
server.clear();
}

@Test(timeout = 5000L)
public void stream_uds_has_uds_buffer_size() throws Exception {
final NonBlockingStatsDClient client = new NonBlockingStatsDClientBuilder()
.prefix("my.prefix")
.address("unixstream:///foo")
.containerID("fake-container-id")
.build();

assertEquals(client.statsDProcessor.bufferPool.getBufferSize(), NonBlockingStatsDClient.DEFAULT_UDS_MAX_PACKET_SIZE_BYTES);
}

@Test(timeout = 5000L)
public void max_packet_size_override() throws Exception {
final NonBlockingStatsDClient client = new NonBlockingStatsDClientBuilder()
.prefix("my.prefix")
.address("unixstream:///foo")
.containerID("fake-container-id")
.maxPacketSizeBytes(576)
.build();

assertEquals(client.statsDProcessor.bufferPool.getBufferSize(), 576);
}
}

0 comments on commit 00cbb2a

Please sign in to comment.