Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AMLII-2058 Use correct buffer size for unix sockets #256

Merged
merged 1 commit into from
Oct 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/main/java/com/timgroup/statsd/ClientChannel.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,6 @@

interface ClientChannel extends WritableByteChannel {
String getTransportType();

int getMaxPacketSizeBytes();
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,9 @@ public String getTransportType() {
public String toString() {
return "[" + getTransportType() + "] " + address;
}

@Override
public int getMaxPacketSizeBytes() {
return NonBlockingStatsDClient.DEFAULT_UDP_MAX_PACKET_SIZE_BYTES;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,9 @@ public String getTransportType() {
public String toString() {
return pipe;
}

@Override
public int getMaxPacketSizeBytes() {
return NonBlockingStatsDClient.DEFAULT_UDS_MAX_PACKET_SIZE_BYTES;
}
}
12 changes: 9 additions & 3 deletions src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ protected static String format(ThreadLocal<NumberFormat> formatter, Number value
protected final StatsDSender statsDSender;
protected StatsDSender telemetryStatsDSender;
protected final Telemetry telemetry;

private final int maxPacketSizeBytes;
private final boolean blocking;

/**
Expand Down Expand Up @@ -268,6 +268,8 @@ protected static String format(ThreadLocal<NumberFormat> formatter, Number value
}

this.blocking = blocking;
this.maxPacketSizeBytes = maxPacketSizeBytes;

{
List<String> costantPreTags = new ArrayList<>();
if (constantTags != null) {
Expand Down Expand Up @@ -300,7 +302,7 @@ protected static String format(ThreadLocal<NumberFormat> formatter, Number value

ThreadFactory threadFactory = customThreadFactory != null ? customThreadFactory : new StatsDThreadFactory();

statsDProcessor = createProcessor(queueSize, handler, maxPacketSizeBytes, poolSize,
statsDProcessor = createProcessor(queueSize, handler, getPacketSize(clientChannel), poolSize,
processorWorkers, blocking, aggregationFlushInterval, aggregationShards, threadFactory, containerID);

Properties properties = new Properties();
Expand All @@ -318,7 +320,7 @@ protected static String format(ThreadLocal<NumberFormat> formatter, Number value
telemetryClientChannel = createByteChannel(telemetryAddressLookup, timeout, connectionTimeout, bufferSize);

// similar settings, but a single worker and non-blocking.
telemetryStatsDProcessor = createProcessor(queueSize, handler, maxPacketSizeBytes,
telemetryStatsDProcessor = createProcessor(queueSize, handler, getPacketSize(telemetryClientChannel),
poolSize, 1, false, 0, aggregationShards, threadFactory, containerID);
}

Expand Down Expand Up @@ -1340,4 +1342,8 @@ private String getContainerID(String containerID, boolean originDetectionEnabled

return null;
}

private int getPacketSize(ClientChannel chan) {
return maxPacketSizeBytes > 0 ? maxPacketSizeBytes : chan.getMaxPacketSizeBytes();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -236,14 +236,8 @@ protected NonBlockingStatsDClientBuilder resolve() {
throw new UnsupportedOperationException("clone");
}

int packetSize = maxPacketSizeBytes;
Callable<SocketAddress> lookup = getAddressLookup();

if (packetSize == 0) {
packetSize = (port == 0) ? NonBlockingStatsDClient.DEFAULT_UDS_MAX_PACKET_SIZE_BYTES :
NonBlockingStatsDClient.DEFAULT_UDP_MAX_PACKET_SIZE_BYTES;
}

Callable<SocketAddress> telemetryLookup = telemetryAddressLookup;
if (telemetryLookup == null) {
if (telemetryHostname == null) {
Expand All @@ -253,7 +247,6 @@ protected NonBlockingStatsDClientBuilder resolve() {
}
}

resolved.maxPacketSizeBytes = packetSize;
resolved.addressLookup = lookup;
resolved.telemetryAddressLookup = telemetryLookup;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,9 @@ class UnixDatagramClientChannel extends DatagramClientChannel {
public String getTransportType() {
return "uds";
}

@Override
public int getMaxPacketSizeBytes() {
return NonBlockingStatsDClient.DEFAULT_UDS_MAX_PACKET_SIZE_BYTES;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -175,4 +175,9 @@ public String getTransportType() {
public String toString() {
return "[" + getTransportType() + "] " + address;
}

@Override
public int getMaxPacketSizeBytes() {
return NonBlockingStatsDClient.DEFAULT_UDS_MAX_PACKET_SIZE_BYTES;
}
}
23 changes: 23 additions & 0 deletions src/test/java/com/timgroup/statsd/UnixStreamSocketTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -171,4 +171,27 @@ public void resist_dsd_timeout() throws Exception {
assertThat(server.messagesReceived(), hasItem("my.prefix.mycount:30|g"));
server.clear();
}

@Test(timeout = 5000L)
public void stream_uds_has_uds_buffer_size() throws Exception {
final NonBlockingStatsDClient client = new NonBlockingStatsDClientBuilder()
.prefix("my.prefix")
.address("unixstream:///foo")
.containerID("fake-container-id")
.build();

assertEquals(client.statsDProcessor.bufferPool.getBufferSize(), NonBlockingStatsDClient.DEFAULT_UDS_MAX_PACKET_SIZE_BYTES);
}

@Test(timeout = 5000L)
public void max_packet_size_override() throws Exception {
final NonBlockingStatsDClient client = new NonBlockingStatsDClientBuilder()
.prefix("my.prefix")
.address("unixstream:///foo")
.containerID("fake-container-id")
.maxPacketSizeBytes(576)
.build();

assertEquals(client.statsDProcessor.bufferPool.getBufferSize(), 576);
}
}