Skip to content

Commit

Permalink
KAFKA-15816: Fix leaked sockets in clients tests (apache#14750)
Browse files Browse the repository at this point in the history
Signed-off-by: Greg Harris <[email protected]>
Reviewers: Mickael Maison <[email protected]>
  • Loading branch information
gharris1727 authored and ex172000 committed Dec 15, 2023
1 parent c285b24 commit c855b8a
Show file tree
Hide file tree
Showing 8 changed files with 67 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1834,36 +1834,35 @@ public void testCloseShouldBeIdempotent(GroupProtocol groupProtocol) {
@ParameterizedTest
@EnumSource(GroupProtocol.class)
public void testOperationsBySubscribingConsumerWithDefaultGroupId(GroupProtocol groupProtocol) {
try {
newConsumer(groupProtocol, null, Optional.of(Boolean.TRUE));
try (KafkaConsumer<byte[], byte[]> consumer = newConsumer(groupProtocol, null, Optional.of(Boolean.TRUE))) {
fail("Expected an InvalidConfigurationException");
} catch (InvalidConfigurationException e) {
// OK, expected
}

try {
newConsumer(groupProtocol, null).subscribe(Collections.singleton(topic));
try (KafkaConsumer<byte[], byte[]> consumer = newConsumer(groupProtocol, (String) null)) {
consumer.subscribe(Collections.singleton(topic));
fail("Expected an InvalidGroupIdException");
} catch (InvalidGroupIdException e) {
// OK, expected
}

try {
newConsumer(groupProtocol, null).committed(Collections.singleton(tp0)).get(tp0);
try (KafkaConsumer<byte[], byte[]> consumer = newConsumer(groupProtocol, (String) null)) {
consumer.committed(Collections.singleton(tp0)).get(tp0);
fail("Expected an InvalidGroupIdException");
} catch (InvalidGroupIdException e) {
// OK, expected
}

try {
newConsumer(groupProtocol, null).commitAsync();
try (KafkaConsumer<byte[], byte[]> consumer = newConsumer(groupProtocol, (String) null)) {
consumer.commitAsync();
fail("Expected an InvalidGroupIdException");
} catch (InvalidGroupIdException e) {
// OK, expected
}

try {
newConsumer(groupProtocol, null).commitSync();
try (KafkaConsumer<byte[], byte[]> consumer = newConsumer(groupProtocol, (String) null)) {
consumer.commitSync();
fail("Expected an InvalidGroupIdException");
} catch (InvalidGroupIdException e) {
// OK, expected
Expand Down Expand Up @@ -1896,6 +1895,8 @@ public void testOperationsByAssigningConsumerWithDefaultGroupId(GroupProtocol gr
} catch (InvalidGroupIdException e) {
// OK, expected
}

consumer.close();
}

@ParameterizedTest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2499,7 +2499,7 @@ void testDeliveryTimeoutAndLingerMsConfig() {
configs.put(ProducerConfig.LINGER_MS_CONFIG, 999);
configs.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 1);

assertDoesNotThrow(() -> new KafkaProducer<>(configs, new StringSerializer(), new StringSerializer()));
assertDoesNotThrow(() -> new KafkaProducer<>(configs, new StringSerializer(), new StringSerializer()).close());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -107,30 +107,39 @@ public NioEchoServer(ListenerName listenerName, SecurityProtocol securityProtoco
int failedAuthenticationDelayMs, Time time, DelegationTokenCache tokenCache) throws Exception {
super("echoserver");
setDaemon(true);
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.socket().bind(new InetSocketAddress(serverHost, 0));
this.port = serverSocketChannel.socket().getLocalPort();
this.socketChannels = Collections.synchronizedList(new ArrayList<>());
this.newChannels = Collections.synchronizedList(new ArrayList<>());
this.credentialCache = credentialCache;
this.tokenCache = tokenCache;
if (securityProtocol == SecurityProtocol.SASL_PLAINTEXT || securityProtocol == SecurityProtocol.SASL_SSL) {
for (String mechanism : ScramMechanism.mechanismNames()) {
if (credentialCache.cache(mechanism, ScramCredential.class) == null)
credentialCache.createCache(mechanism, ScramCredential.class);
ServerSocketChannel serverSocketChannel = null;
try {
serverSocketChannel = ServerSocketChannel.open();
this.serverSocketChannel = serverSocketChannel;
serverSocketChannel.configureBlocking(false);
serverSocketChannel.socket().bind(new InetSocketAddress(serverHost, 0));
this.port = serverSocketChannel.socket().getLocalPort();
this.socketChannels = Collections.synchronizedList(new ArrayList<>());
this.newChannels = Collections.synchronizedList(new ArrayList<>());
this.credentialCache = credentialCache;
this.tokenCache = tokenCache;
if (securityProtocol == SecurityProtocol.SASL_PLAINTEXT || securityProtocol == SecurityProtocol.SASL_SSL) {
for (String mechanism : ScramMechanism.mechanismNames()) {
if (credentialCache.cache(mechanism, ScramCredential.class) == null)
credentialCache.createCache(mechanism, ScramCredential.class);
}
}
LogContext logContext = new LogContext();
if (channelBuilder == null)
channelBuilder = ChannelBuilders.serverChannelBuilder(listenerName, false,
securityProtocol, config, credentialCache, tokenCache, time, logContext,
() -> TestUtils.defaultApiVersionsResponse(ApiMessageType.ListenerType.ZK_BROKER));
this.metrics = new Metrics();
this.selector = new Selector(10000, failedAuthenticationDelayMs, metrics, time,
"MetricGroup", channelBuilder, logContext);
acceptorThread = new AcceptorThread();
this.time = time;
} catch (Exception e) {
if (serverSocketChannel != null) {
serverSocketChannel.close();
}
throw e;
}
LogContext logContext = new LogContext();
if (channelBuilder == null)
channelBuilder = ChannelBuilders.serverChannelBuilder(listenerName, false,
securityProtocol, config, credentialCache, tokenCache, time, logContext,
() -> TestUtils.defaultApiVersionsResponse(ApiMessageType.ListenerType.ZK_BROKER));
this.metrics = new Metrics();
this.selector = new Selector(10000, failedAuthenticationDelayMs, metrics, time,
"MetricGroup", channelBuilder, logContext);
acceptorThread = new AcceptorThread();
this.time = time;
}

public int port() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -408,8 +408,8 @@ KafkaChannel buildChannel(String id, TransportLayer transportLayer, Supplier<Aut
@Override
public void close() throws IOException {
closedChannelsCount.getAndIncrement();
super.close();
if (index == 0) throw new RuntimeException("you should fail");
else super.close();
}
};
}
Expand Down Expand Up @@ -768,7 +768,8 @@ public void testConnectDisconnectDuringInSinglePoll() throws Exception {

SelectionKey selectionKey = mock(SelectionKey.class);
when(kafkaChannel.selectionKey()).thenReturn(selectionKey);
when(selectionKey.channel()).thenReturn(SocketChannel.open());
SocketChannel socket = SocketChannel.open();
when(selectionKey.channel()).thenReturn(socket);
when(selectionKey.readyOps()).thenReturn(SelectionKey.OP_CONNECT);
when(selectionKey.attachment()).thenReturn(kafkaChannel);

Expand All @@ -782,6 +783,7 @@ public void testConnectDisconnectDuringInSinglePoll() throws Exception {
verify(kafkaChannel).disconnect();
verify(kafkaChannel).close();
verify(selectionKey).cancel();
socket.close();
}

@Test
Expand Down Expand Up @@ -918,6 +920,7 @@ public void testMetricsCleanupOnSelectorClose() throws Exception {
Selector selector = new ImmediatelyConnectingSelector(CONNECTION_MAX_IDLE_MS, metrics, time, "MetricGroup", channelBuilder, new LogContext()) {
@Override
public void close(String id) {
super.close(id);
throw new RuntimeException();
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,6 @@ public void testPemFilesWithoutClientKeyPassword(Args args) throws Exception {
TestSslUtils.convertToPem(args.sslServerConfigs, !useInlinePem, true);
TestSslUtils.convertToPem(args.sslClientConfigs, !useInlinePem, false);
args.sslServerConfigs.put(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG, "required");
server = createEchoServer(args, SecurityProtocol.SSL);
verifySslConfigs(args);
}

Expand Down Expand Up @@ -1049,8 +1048,10 @@ false, securityProtocol, config, null, null, time, new LogContext(),
// Verify that client with matching truststore can authenticate, send and receive
String oldNode = "0";
Selector oldClientSelector = createSelector(args.sslClientConfigs);
// take responsibility for closing oldClientSelector, so that we can keep it alive concurrent with the new one.
this.selector = null;
oldClientSelector.connect(oldNode, addr, BUFFER_SIZE, BUFFER_SIZE);
NetworkTestUtils.checkClientConnection(selector, oldNode, 100, 10);
NetworkTestUtils.checkClientConnection(oldClientSelector, oldNode, 100, 10);

CertStores newServerCertStores = certBuilder(true, "server", args.useInlinePem).addHostName("localhost").build();
Map<String, Object> newKeystoreConfigs = newServerCertStores.keyStoreProps();
Expand Down Expand Up @@ -1087,6 +1088,8 @@ false, securityProtocol, config, null, null, time, new LogContext(),
// Verify that new connections continue to work with the server with previously configured keystore after failed reconfiguration
newClientSelector.connect("3", addr, BUFFER_SIZE, BUFFER_SIZE);
NetworkTestUtils.checkClientConnection(newClientSelector, "3", 100, 10);
// manually stop the oldClientSelector because the test harness doesn't manage it.
oldClientSelector.close();
}

@ParameterizedTest
Expand Down Expand Up @@ -1172,8 +1175,10 @@ false, securityProtocol, config, null, null, time, new LogContext(),
// Verify that client with matching keystore can authenticate, send and receive
String oldNode = "0";
Selector oldClientSelector = createSelector(args.sslClientConfigs);
// take responsibility for closing oldClientSelector, so that we can keep it alive concurrent with the new one.
this.selector = null;
oldClientSelector.connect(oldNode, addr, BUFFER_SIZE, BUFFER_SIZE);
NetworkTestUtils.checkClientConnection(selector, oldNode, 100, 10);
NetworkTestUtils.checkClientConnection(oldClientSelector, oldNode, 100, 10);

CertStores newClientCertStores = certBuilder(true, "client", args.useInlinePem).addHostName("localhost").build();
args.sslClientConfigs = args.getTrustingConfig(newClientCertStores, args.serverCertStores);
Expand Down Expand Up @@ -1209,6 +1214,8 @@ false, securityProtocol, config, null, null, time, new LogContext(),
// Verify that new connections continue to work with the server with previously configured keystore after failed reconfiguration
newClientSelector.connect("3", addr, BUFFER_SIZE, BUFFER_SIZE);
NetworkTestUtils.checkClientConnection(newClientSelector, "3", 100, 10);
// manually stop the oldClientSelector because the test harness doesn't manage it.
oldClientSelector.close();
}

/**
Expand Down Expand Up @@ -1267,6 +1274,9 @@ private Selector createSelector(Map<String, Object> sslClientConfigs, final Inte
TestSslChannelBuilder channelBuilder = new TestSslChannelBuilder(Mode.CLIENT);
channelBuilder.configureBufferSizes(netReadBufSize, netWriteBufSize, appBufSize);
channelBuilder.configure(sslClientConfigs);
if (this.selector != null) {
this.selector.close();
}
this.selector = new Selector(100 * 5000, new Metrics(), time, "MetricGroup", channelBuilder, new LogContext());
return selector;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,9 @@ private void createSelector(Map<String, Object> sslClientConfigs) {
SslTransportLayerTest.TestSslChannelBuilder channelBuilder = new SslTransportLayerTest.TestSslChannelBuilder(Mode.CLIENT);
channelBuilder.configureBufferSizes(null, null, null);
channelBuilder.configure(sslClientConfigs);
if (this.selector != null) {
this.selector.close();
}
this.selector = new Selector(100 * 5000, new Metrics(), TIME, "MetricGroup", channelBuilder, new LogContext());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ public void testTlsDefaults(List<String> serverProtocols, List<String> clientPro
NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.State.AUTHENTICATION_FAILED);
server.verifyAuthenticationMetrics(0, 1);
}
server.close();
selector.close();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1133,6 +1133,7 @@ public void testAuthenticateCallbackHandlerMechanisms() throws Exception {
TestServerCallbackHandler.class);
saslServerConfigs.put(listener.saslMechanismConfigPrefix("digest-md5") + BrokerSecurityConfigs.SASL_SERVER_CALLBACK_HANDLER_CLASS,
DigestServerCallbackHandler.class);
server.close();
server = createEchoServer(securityProtocol);

// Verify that DIGEST-MD5 (currently configured for client) works with `DigestServerCallbackHandler`
Expand Down

0 comments on commit c855b8a

Please sign in to comment.