From 00cbb2aa2cf71b9c2a0ce2c5535e9754ea124157 Mon Sep 17 00:00:00 2001 From: Vikentiy Fesunov Date: Wed, 2 Oct 2024 13:40:12 +0200 Subject: [PATCH] AMLII-2058 Use correct buffer size for unix sockets When the client is configured to use UDS via builder's address() method, the client defaulted to UDP-sized buffer. This patch adds a method to the ClientChannel to fetch the default packet size for each transport type, and uses it in the client to pass the correct value to the processor. The maxPacketSizeBytes field of the resolved builder is reduced to only storing the user preferred configuration, if any. --- .../com/timgroup/statsd/ClientChannel.java | 2 ++ .../statsd/DatagramClientChannel.java | 5 ++++ .../statsd/NamedPipeClientChannel.java | 5 ++++ .../statsd/NonBlockingStatsDClient.java | 12 +++++++--- .../NonBlockingStatsDClientBuilder.java | 7 ------ .../statsd/UnixDatagramClientChannel.java | 5 ++++ .../statsd/UnixStreamClientChannel.java | 5 ++++ .../timgroup/statsd/UnixStreamSocketTest.java | 23 +++++++++++++++++++ 8 files changed, 54 insertions(+), 10 deletions(-) diff --git a/src/main/java/com/timgroup/statsd/ClientChannel.java b/src/main/java/com/timgroup/statsd/ClientChannel.java index 5eaa6d23..684dfc88 100644 --- a/src/main/java/com/timgroup/statsd/ClientChannel.java +++ b/src/main/java/com/timgroup/statsd/ClientChannel.java @@ -4,4 +4,6 @@ interface ClientChannel extends WritableByteChannel { String getTransportType(); + + int getMaxPacketSizeBytes(); } diff --git a/src/main/java/com/timgroup/statsd/DatagramClientChannel.java b/src/main/java/com/timgroup/statsd/DatagramClientChannel.java index 00d383dc..c0890d87 100644 --- a/src/main/java/com/timgroup/statsd/DatagramClientChannel.java +++ b/src/main/java/com/timgroup/statsd/DatagramClientChannel.java @@ -52,4 +52,9 @@ public String getTransportType() { public String toString() { return "[" + getTransportType() + "] " + address; } + + @Override + public int getMaxPacketSizeBytes() { + return NonBlockingStatsDClient.DEFAULT_UDP_MAX_PACKET_SIZE_BYTES; + } } diff --git a/src/main/java/com/timgroup/statsd/NamedPipeClientChannel.java b/src/main/java/com/timgroup/statsd/NamedPipeClientChannel.java index 61e2ec7e..2dbcf832 100644 --- a/src/main/java/com/timgroup/statsd/NamedPipeClientChannel.java +++ b/src/main/java/com/timgroup/statsd/NamedPipeClientChannel.java @@ -48,4 +48,9 @@ public String getTransportType() { public String toString() { return pipe; } + + @Override + public int getMaxPacketSizeBytes() { + return NonBlockingStatsDClient.DEFAULT_UDS_MAX_PACKET_SIZE_BYTES; + } } diff --git a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java index e946ac26..255dc2ab 100644 --- a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java +++ b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java @@ -174,7 +174,7 @@ protected static String format(ThreadLocal formatter, Number value protected final StatsDSender statsDSender; protected StatsDSender telemetryStatsDSender; protected final Telemetry telemetry; - + private final int maxPacketSizeBytes; private final boolean blocking; /** @@ -268,6 +268,8 @@ protected static String format(ThreadLocal formatter, Number value } this.blocking = blocking; + this.maxPacketSizeBytes = maxPacketSizeBytes; + { List costantPreTags = new ArrayList<>(); if (constantTags != null) { @@ -300,7 +302,7 @@ protected static String format(ThreadLocal formatter, Number value ThreadFactory threadFactory = customThreadFactory != null ? customThreadFactory : new StatsDThreadFactory(); - statsDProcessor = createProcessor(queueSize, handler, maxPacketSizeBytes, poolSize, + statsDProcessor = createProcessor(queueSize, handler, getPacketSize(clientChannel), poolSize, processorWorkers, blocking, aggregationFlushInterval, aggregationShards, threadFactory, containerID); Properties properties = new Properties(); @@ -318,7 +320,7 @@ protected static String format(ThreadLocal formatter, Number value telemetryClientChannel = createByteChannel(telemetryAddressLookup, timeout, connectionTimeout, bufferSize); // similar settings, but a single worker and non-blocking. - telemetryStatsDProcessor = createProcessor(queueSize, handler, maxPacketSizeBytes, + telemetryStatsDProcessor = createProcessor(queueSize, handler, getPacketSize(telemetryClientChannel), poolSize, 1, false, 0, aggregationShards, threadFactory, containerID); } @@ -1340,4 +1342,8 @@ private String getContainerID(String containerID, boolean originDetectionEnabled return null; } + + private int getPacketSize(ClientChannel chan) { + return maxPacketSizeBytes > 0 ? maxPacketSizeBytes : chan.getMaxPacketSizeBytes(); + } } diff --git a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClientBuilder.java b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClientBuilder.java index 080bcc7f..2ceedccd 100644 --- a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClientBuilder.java +++ b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClientBuilder.java @@ -236,14 +236,8 @@ protected NonBlockingStatsDClientBuilder resolve() { throw new UnsupportedOperationException("clone"); } - int packetSize = maxPacketSizeBytes; Callable lookup = getAddressLookup(); - if (packetSize == 0) { - packetSize = (port == 0) ? NonBlockingStatsDClient.DEFAULT_UDS_MAX_PACKET_SIZE_BYTES : - NonBlockingStatsDClient.DEFAULT_UDP_MAX_PACKET_SIZE_BYTES; - } - Callable telemetryLookup = telemetryAddressLookup; if (telemetryLookup == null) { if (telemetryHostname == null) { @@ -253,7 +247,6 @@ protected NonBlockingStatsDClientBuilder resolve() { } } - resolved.maxPacketSizeBytes = packetSize; resolved.addressLookup = lookup; resolved.telemetryAddressLookup = telemetryLookup; diff --git a/src/main/java/com/timgroup/statsd/UnixDatagramClientChannel.java b/src/main/java/com/timgroup/statsd/UnixDatagramClientChannel.java index 85bfb144..7d996963 100644 --- a/src/main/java/com/timgroup/statsd/UnixDatagramClientChannel.java +++ b/src/main/java/com/timgroup/statsd/UnixDatagramClientChannel.java @@ -31,4 +31,9 @@ class UnixDatagramClientChannel extends DatagramClientChannel { public String getTransportType() { return "uds"; } + + @Override + public int getMaxPacketSizeBytes() { + return NonBlockingStatsDClient.DEFAULT_UDS_MAX_PACKET_SIZE_BYTES; + } } diff --git a/src/main/java/com/timgroup/statsd/UnixStreamClientChannel.java b/src/main/java/com/timgroup/statsd/UnixStreamClientChannel.java index fa8019a1..d910d786 100644 --- a/src/main/java/com/timgroup/statsd/UnixStreamClientChannel.java +++ b/src/main/java/com/timgroup/statsd/UnixStreamClientChannel.java @@ -175,4 +175,9 @@ public String getTransportType() { public String toString() { return "[" + getTransportType() + "] " + address; } + + @Override + public int getMaxPacketSizeBytes() { + return NonBlockingStatsDClient.DEFAULT_UDS_MAX_PACKET_SIZE_BYTES; + } } diff --git a/src/test/java/com/timgroup/statsd/UnixStreamSocketTest.java b/src/test/java/com/timgroup/statsd/UnixStreamSocketTest.java index 45af036c..7b721323 100644 --- a/src/test/java/com/timgroup/statsd/UnixStreamSocketTest.java +++ b/src/test/java/com/timgroup/statsd/UnixStreamSocketTest.java @@ -171,4 +171,27 @@ public void resist_dsd_timeout() throws Exception { assertThat(server.messagesReceived(), hasItem("my.prefix.mycount:30|g")); server.clear(); } + + @Test(timeout = 5000L) + public void stream_uds_has_uds_buffer_size() throws Exception { + final NonBlockingStatsDClient client = new NonBlockingStatsDClientBuilder() + .prefix("my.prefix") + .address("unixstream:///foo") + .containerID("fake-container-id") + .build(); + + assertEquals(client.statsDProcessor.bufferPool.getBufferSize(), NonBlockingStatsDClient.DEFAULT_UDS_MAX_PACKET_SIZE_BYTES); + } + + @Test(timeout = 5000L) + public void max_packet_size_override() throws Exception { + final NonBlockingStatsDClient client = new NonBlockingStatsDClientBuilder() + .prefix("my.prefix") + .address("unixstream:///foo") + .containerID("fake-container-id") + .maxPacketSizeBytes(576) + .build(); + + assertEquals(client.statsDProcessor.bufferPool.getBufferSize(), 576); + } }