diff --git a/src/main/java/com/timgroup/statsd/ClientChannel.java b/src/main/java/com/timgroup/statsd/ClientChannel.java index 5eaa6d2..684dfc8 100644 --- a/src/main/java/com/timgroup/statsd/ClientChannel.java +++ b/src/main/java/com/timgroup/statsd/ClientChannel.java @@ -4,4 +4,6 @@ interface ClientChannel extends WritableByteChannel { String getTransportType(); + + int getMaxPacketSizeBytes(); } diff --git a/src/main/java/com/timgroup/statsd/DatagramClientChannel.java b/src/main/java/com/timgroup/statsd/DatagramClientChannel.java index 00d383d..c0890d8 100644 --- a/src/main/java/com/timgroup/statsd/DatagramClientChannel.java +++ b/src/main/java/com/timgroup/statsd/DatagramClientChannel.java @@ -52,4 +52,9 @@ public String getTransportType() { public String toString() { return "[" + getTransportType() + "] " + address; } + + @Override + public int getMaxPacketSizeBytes() { + return NonBlockingStatsDClient.DEFAULT_UDP_MAX_PACKET_SIZE_BYTES; + } } diff --git a/src/main/java/com/timgroup/statsd/NamedPipeClientChannel.java b/src/main/java/com/timgroup/statsd/NamedPipeClientChannel.java index 61e2ec7..2dbcf83 100644 --- a/src/main/java/com/timgroup/statsd/NamedPipeClientChannel.java +++ b/src/main/java/com/timgroup/statsd/NamedPipeClientChannel.java @@ -48,4 +48,9 @@ public String getTransportType() { public String toString() { return pipe; } + + @Override + public int getMaxPacketSizeBytes() { + return NonBlockingStatsDClient.DEFAULT_UDS_MAX_PACKET_SIZE_BYTES; + } } diff --git a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java index e946ac2..255dc2a 100644 --- a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java +++ b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java @@ -174,7 +174,7 @@ protected static String format(ThreadLocal formatter, Number value protected final StatsDSender statsDSender; protected StatsDSender telemetryStatsDSender; protected final Telemetry telemetry; - + private final int maxPacketSizeBytes; private final boolean blocking; /** @@ -268,6 +268,8 @@ protected static String format(ThreadLocal formatter, Number value } this.blocking = blocking; + this.maxPacketSizeBytes = maxPacketSizeBytes; + { List costantPreTags = new ArrayList<>(); if (constantTags != null) { @@ -300,7 +302,7 @@ protected static String format(ThreadLocal formatter, Number value ThreadFactory threadFactory = customThreadFactory != null ? customThreadFactory : new StatsDThreadFactory(); - statsDProcessor = createProcessor(queueSize, handler, maxPacketSizeBytes, poolSize, + statsDProcessor = createProcessor(queueSize, handler, getPacketSize(clientChannel), poolSize, processorWorkers, blocking, aggregationFlushInterval, aggregationShards, threadFactory, containerID); Properties properties = new Properties(); @@ -318,7 +320,7 @@ protected static String format(ThreadLocal formatter, Number value telemetryClientChannel = createByteChannel(telemetryAddressLookup, timeout, connectionTimeout, bufferSize); // similar settings, but a single worker and non-blocking. - telemetryStatsDProcessor = createProcessor(queueSize, handler, maxPacketSizeBytes, + telemetryStatsDProcessor = createProcessor(queueSize, handler, getPacketSize(telemetryClientChannel), poolSize, 1, false, 0, aggregationShards, threadFactory, containerID); } @@ -1340,4 +1342,8 @@ private String getContainerID(String containerID, boolean originDetectionEnabled return null; } + + private int getPacketSize(ClientChannel chan) { + return maxPacketSizeBytes > 0 ? maxPacketSizeBytes : chan.getMaxPacketSizeBytes(); + } } diff --git a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClientBuilder.java b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClientBuilder.java index 080bcc7..2ceedcc 100644 --- a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClientBuilder.java +++ b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClientBuilder.java @@ -236,14 +236,8 @@ protected NonBlockingStatsDClientBuilder resolve() { throw new UnsupportedOperationException("clone"); } - int packetSize = maxPacketSizeBytes; Callable lookup = getAddressLookup(); - if (packetSize == 0) { - packetSize = (port == 0) ? NonBlockingStatsDClient.DEFAULT_UDS_MAX_PACKET_SIZE_BYTES : - NonBlockingStatsDClient.DEFAULT_UDP_MAX_PACKET_SIZE_BYTES; - } - Callable telemetryLookup = telemetryAddressLookup; if (telemetryLookup == null) { if (telemetryHostname == null) { @@ -253,7 +247,6 @@ protected NonBlockingStatsDClientBuilder resolve() { } } - resolved.maxPacketSizeBytes = packetSize; resolved.addressLookup = lookup; resolved.telemetryAddressLookup = telemetryLookup; diff --git a/src/main/java/com/timgroup/statsd/UnixDatagramClientChannel.java b/src/main/java/com/timgroup/statsd/UnixDatagramClientChannel.java index 85bfb14..7d99696 100644 --- a/src/main/java/com/timgroup/statsd/UnixDatagramClientChannel.java +++ b/src/main/java/com/timgroup/statsd/UnixDatagramClientChannel.java @@ -31,4 +31,9 @@ class UnixDatagramClientChannel extends DatagramClientChannel { public String getTransportType() { return "uds"; } + + @Override + public int getMaxPacketSizeBytes() { + return NonBlockingStatsDClient.DEFAULT_UDS_MAX_PACKET_SIZE_BYTES; + } } diff --git a/src/main/java/com/timgroup/statsd/UnixStreamClientChannel.java b/src/main/java/com/timgroup/statsd/UnixStreamClientChannel.java index fa8019a..d910d78 100644 --- a/src/main/java/com/timgroup/statsd/UnixStreamClientChannel.java +++ b/src/main/java/com/timgroup/statsd/UnixStreamClientChannel.java @@ -175,4 +175,9 @@ public String getTransportType() { public String toString() { return "[" + getTransportType() + "] " + address; } + + @Override + public int getMaxPacketSizeBytes() { + return NonBlockingStatsDClient.DEFAULT_UDS_MAX_PACKET_SIZE_BYTES; + } } diff --git a/src/test/java/com/timgroup/statsd/UnixStreamSocketTest.java b/src/test/java/com/timgroup/statsd/UnixStreamSocketTest.java index 45af036..7b72132 100644 --- a/src/test/java/com/timgroup/statsd/UnixStreamSocketTest.java +++ b/src/test/java/com/timgroup/statsd/UnixStreamSocketTest.java @@ -171,4 +171,27 @@ public void resist_dsd_timeout() throws Exception { assertThat(server.messagesReceived(), hasItem("my.prefix.mycount:30|g")); server.clear(); } + + @Test(timeout = 5000L) + public void stream_uds_has_uds_buffer_size() throws Exception { + final NonBlockingStatsDClient client = new NonBlockingStatsDClientBuilder() + .prefix("my.prefix") + .address("unixstream:///foo") + .containerID("fake-container-id") + .build(); + + assertEquals(client.statsDProcessor.bufferPool.getBufferSize(), NonBlockingStatsDClient.DEFAULT_UDS_MAX_PACKET_SIZE_BYTES); + } + + @Test(timeout = 5000L) + public void max_packet_size_override() throws Exception { + final NonBlockingStatsDClient client = new NonBlockingStatsDClientBuilder() + .prefix("my.prefix") + .address("unixstream:///foo") + .containerID("fake-container-id") + .maxPacketSizeBytes(576) + .build(); + + assertEquals(client.statsDProcessor.bufferPool.getBufferSize(), 576); + } }