Skip to content

Commit

Permalink
Add test and adjust error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
ijuma committed Dec 24, 2024
1 parent 76394bd commit 061cbb0
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -505,33 +505,44 @@ private void handleSaslToken(byte[] clientToken) throws IOException {
* packet such clients send is a GSSAPI token starting with 0x60.
*/
private void handleKafkaRequest(byte[] requestBytes) throws IOException, AuthenticationException {
ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes);
RequestHeader header = RequestHeader.parse(requestBuffer);
ApiKeys apiKey = header.apiKey();

// A valid Kafka request header was received. SASL authentication tokens are now expected only
// following a SaslHandshakeRequest since this is not a GSSAPI client token from a Kafka 0.9.0.x client.
if (saslState == SaslState.INITIAL_REQUEST)
setSaslState(SaslState.HANDSHAKE_OR_VERSIONS_REQUEST);

// Raise an error prior to parsing if the api cannot be handled at this layer. This avoids
// unnecessary exposure to some of the more complex schema types.
if (apiKey != ApiKeys.API_VERSIONS && apiKey != ApiKeys.SASL_HANDSHAKE)
throw new IllegalSaslStateException("Unexpected Kafka request of type " + apiKey + " during SASL handshake.");

LOG.debug("Handling Kafka request {} during {}", apiKey, reauthInfo.authenticationOrReauthenticationText());

RequestContext requestContext = new RequestContext(header, connectionId, clientAddress(), Optional.of(clientPort()),
KafkaPrincipal.ANONYMOUS, listenerName, securityProtocol, ClientInformation.EMPTY, false);
RequestAndSize requestAndSize = requestContext.parseRequest(requestBuffer);
if (apiKey == ApiKeys.API_VERSIONS)
handleApiVersionsRequest(requestContext, (ApiVersionsRequest) requestAndSize.request);
else {
String clientMechanism = handleHandshakeRequest(requestContext, (SaslHandshakeRequest) requestAndSize.request);
if (!reauthInfo.reauthenticating() || reauthInfo.saslMechanismUnchanged(clientMechanism)) {
createSaslServer(clientMechanism);
setSaslState(SaslState.AUTHENTICATE);
try {
ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes);
RequestHeader header = RequestHeader.parse(requestBuffer);
ApiKeys apiKey = header.apiKey();

// Raise an error prior to parsing if the api cannot be handled at this layer. This avoids
// unnecessary exposure to some of the more complex schema types.
if (apiKey != ApiKeys.API_VERSIONS && apiKey != ApiKeys.SASL_HANDSHAKE)
throw new InvalidRequestException("Unexpected Kafka request of type " + apiKey + " during SASL handshake.");

LOG.debug("Handling Kafka request {} during {}", apiKey, reauthInfo.authenticationOrReauthenticationText());

RequestContext requestContext = new RequestContext(header, connectionId, clientAddress(), Optional.of(clientPort()),
KafkaPrincipal.ANONYMOUS, listenerName, securityProtocol, ClientInformation.EMPTY, false);
RequestAndSize requestAndSize = requestContext.parseRequest(requestBuffer);

// A valid Kafka request was received, we can now update the sasl state
if (saslState == SaslState.INITIAL_REQUEST)
setSaslState(SaslState.HANDSHAKE_OR_VERSIONS_REQUEST);

if (apiKey == ApiKeys.API_VERSIONS)
handleApiVersionsRequest(requestContext, (ApiVersionsRequest) requestAndSize.request);
else {
String clientMechanism = handleHandshakeRequest(requestContext, (SaslHandshakeRequest) requestAndSize.request);
if (!reauthInfo.reauthenticating() || reauthInfo.saslMechanismUnchanged(clientMechanism)) {
createSaslServer(clientMechanism);
setSaslState(SaslState.AUTHENTICATE);
}
}
} catch (InvalidRequestException e) {
if (saslState == SaslState.INITIAL_REQUEST) {
// InvalidRequestException is thrown if the request is not in Kafka format or if the API key is invalid.
// If it's the initial request, this could be an ancient client (see method documentation for more details),
// a client configured with the wrong security protocol or a non kafka-client altogether (eg http client).
throw new InvalidRequestException("Invalid request, potential reasons: kafka client configured with the " +
"wrong security protocol, it does not support KIP-43 or it is not a kafka client.", e);
}
throw e;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@

import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
import org.apache.kafka.common.errors.IllegalSaslStateException;

Check notice on line 20 in clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java

View workflow job for this annotation

GitHub Actions / build / Compile and Check Java

Checkstyle error

Unused import - org.apache.kafka.common.errors.IllegalSaslStateException.
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.SaslAuthenticationException;
import org.apache.kafka.common.message.ApiMessageType;
import org.apache.kafka.common.message.RequestHeaderData;
import org.apache.kafka.common.message.SaslAuthenticateRequestData;
import org.apache.kafka.common.message.SaslHandshakeRequestData;
import org.apache.kafka.common.network.ChannelBuilders;
Expand Down Expand Up @@ -63,7 +65,9 @@
import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;

Check notice on line 70 in clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java

View workflow job for this annotation

GitHub Actions / build / Compile and Check Java

Checkstyle error

Unused import - java.util.Comparator.
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -107,7 +111,7 @@ public void testOversizeRequest() throws IOException {
}

@Test
public void testUnexpectedRequestType() throws IOException {
public void testUnexpectedRequestTypeWithValidRequestHeader() throws IOException {
TransportLayer transportLayer = mock(TransportLayer.class);
Map<String, ?> configs = Collections.singletonMap(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG,
Collections.singletonList(SCRAM_SHA_256.mechanismName()));
Expand All @@ -126,13 +130,35 @@ public void testUnexpectedRequestType() throws IOException {
return headerBuffer.remaining();
});

try {
authenticator.authenticate();
fail("Expected authenticate() to raise an exception");
} catch (IllegalSaslStateException e) {
// expected exception
}
assertThrows(InvalidRequestException.class, () -> authenticator.authenticate());
verify(transportLayer, times(2)).read(any(ByteBuffer.class));
}

@Test
public void testInvalidRequestHeader() throws IOException {
TransportLayer transportLayer = mock(TransportLayer.class);
Map<String, ?> configs = Collections.singletonMap(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG,
Collections.singletonList(SCRAM_SHA_256.mechanismName()));
SaslServerAuthenticator authenticator = setupAuthenticator(configs, transportLayer,
SCRAM_SHA_256.mechanismName(), new DefaultChannelMetadataRegistry());

short invalidApiKeyId = (short) (Arrays.stream(ApiKeys.values()).mapToInt(k -> k.id).max().getAsInt() + 1);
ByteBuffer headerBuffer = RequestTestUtils.serializeRequestHeader(new RequestHeader(
new RequestHeaderData()
.setRequestApiKey(invalidApiKeyId)
.setRequestApiVersion((short) 0),
(short) 2));

when(transportLayer.read(any(ByteBuffer.class))).then(invocation -> {
invocation.<ByteBuffer>getArgument(0).putInt(headerBuffer.remaining());
return 4;
}).then(invocation -> {
// serialize only the request header. the authenticator should not parse beyond this
invocation.<ByteBuffer>getArgument(0).put(headerBuffer.duplicate());
return headerBuffer.remaining();
});

assertThrows(InvalidRequestException.class, () -> authenticator.authenticate());
verify(transportLayer, times(2)).read(any(ByteBuffer.class));
}

Expand Down

0 comments on commit 061cbb0

Please sign in to comment.