Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-18339: Remove raw unversioned direct SASL protocol (KIP-896) #18295

Open
wants to merge 3 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public class SaslServerAuthenticator implements Authenticator {
* state and likewise ends at either {@link #COMPLETE} or {@link #FAILED}.
*/
private enum SaslState {
INITIAL_REQUEST, // May be GSSAPI token, SaslHandshake or ApiVersions for authentication
INITIAL_REQUEST, // May be SaslHandshake or ApiVersions for authentication
HANDSHAKE_OR_VERSIONS_REQUEST, // May be SaslHandshake or ApiVersions
HANDSHAKE_REQUEST, // After an ApiVersions request, next request must be SaslHandshake
AUTHENTICATE, // Authentication tokens (SaslHandshake v1 and above indicate SaslAuthenticate headers)
Expand Down Expand Up @@ -277,15 +277,11 @@ public void authenticate() throws IOException {
case REAUTH_PROCESS_HANDSHAKE:
case HANDSHAKE_OR_VERSIONS_REQUEST:
case HANDSHAKE_REQUEST:
case INITIAL_REQUEST:
handleKafkaRequest(clientToken);
break;
case REAUTH_BAD_MECHANISM:
throw new SaslAuthenticationException(reauthInfo.badMechanismErrorMessage);
case INITIAL_REQUEST:
if (handleKafkaRequest(clientToken))
break;
// For default GSSAPI, fall through to authenticate using the client token as the first GSSAPI packet.
// This is required for interoperability with 0.9.0.x clients which do not send handshake request
case AUTHENTICATE:
handleSaslToken(clientToken);
// When the authentication exchange is complete and no more tokens are expected from the client,
Expand Down Expand Up @@ -503,63 +499,51 @@ private void handleSaslToken(byte[] clientToken) throws IOException {
}
}

private boolean handleKafkaRequest(byte[] requestBytes) throws IOException, AuthenticationException {
boolean isKafkaRequest = false;
String clientMechanism = null;
/**
* @throws InvalidRequestException if the request is not in Kafka format or if the API key is invalid. Clients
* that support SASL without support for KIP-43 (e.g. Kafka Clients 0.9.x) are in the former bucket - the first
* packet such clients send is a GSSAPI token starting with 0x60.
*/
private void handleKafkaRequest(byte[] requestBytes) throws IOException, AuthenticationException {
try {
ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes);
RequestHeader header = RequestHeader.parse(requestBuffer);
ApiKeys apiKey = header.apiKey();

// A valid Kafka request header was received. SASL authentication tokens are now expected only
// following a SaslHandshakeRequest since this is not a GSSAPI client token from a Kafka 0.9.0.x client.
if (saslState == SaslState.INITIAL_REQUEST)
setSaslState(SaslState.HANDSHAKE_OR_VERSIONS_REQUEST);
isKafkaRequest = true;

// Raise an error prior to parsing if the api cannot be handled at this layer. This avoids
// unnecessary exposure to some of the more complex schema types.
if (apiKey != ApiKeys.API_VERSIONS && apiKey != ApiKeys.SASL_HANDSHAKE)
throw new IllegalSaslStateException("Unexpected Kafka request of type " + apiKey + " during SASL handshake.");
throw new InvalidRequestException("Unexpected Kafka request of type " + apiKey + " during SASL handshake.");
Copy link
Member Author

@ijuma ijuma Dec 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rajinisivaram Do you know why we send IllegalSaslStateException here and InvalidRequestException for every other error? We seem to have special handling for Sasl related exceptions, but it doesn't seem to make sense in this case since we can't properly propagate an error if we get the wrong request type.


LOG.debug("Handling Kafka request {} during {}", apiKey, reauthInfo.authenticationOrReauthenticationText());


RequestContext requestContext = new RequestContext(header, connectionId, clientAddress(), Optional.of(clientPort()),
KafkaPrincipal.ANONYMOUS, listenerName, securityProtocol, ClientInformation.EMPTY, false);
RequestAndSize requestAndSize = requestContext.parseRequest(requestBuffer);

// A valid Kafka request was received, we can now update the sasl state
if (saslState == SaslState.INITIAL_REQUEST)
setSaslState(SaslState.HANDSHAKE_OR_VERSIONS_REQUEST);

if (apiKey == ApiKeys.API_VERSIONS)
handleApiVersionsRequest(requestContext, (ApiVersionsRequest) requestAndSize.request);
else
clientMechanism = handleHandshakeRequest(requestContext, (SaslHandshakeRequest) requestAndSize.request);
else {
String clientMechanism = handleHandshakeRequest(requestContext, (SaslHandshakeRequest) requestAndSize.request);
if (!reauthInfo.reauthenticating() || reauthInfo.saslMechanismUnchanged(clientMechanism)) {
createSaslServer(clientMechanism);
setSaslState(SaslState.AUTHENTICATE);
}
}
} catch (InvalidRequestException e) {
if (saslState == SaslState.INITIAL_REQUEST) {
// InvalidRequestException is thrown if the request is not in Kafka format or if the API key
// is invalid. For compatibility with 0.9.0.x where the first packet is a GSSAPI token
// starting with 0x60, revert to GSSAPI for both these exceptions.
if (LOG.isDebugEnabled()) {
StringBuilder tokenBuilder = new StringBuilder();
for (byte b : requestBytes) {
tokenBuilder.append(String.format("%02x", b));
if (tokenBuilder.length() >= 20)
break;
}
LOG.debug("Received client packet of length {} starting with bytes 0x{}, process as GSSAPI packet", requestBytes.length, tokenBuilder);
}
if (enabledMechanisms.contains(SaslConfigs.GSSAPI_MECHANISM)) {
LOG.debug("First client packet is not a SASL mechanism request, using default mechanism GSSAPI");
clientMechanism = SaslConfigs.GSSAPI_MECHANISM;
} else
throw new UnsupportedSaslMechanismException("Exception handling first SASL packet from client, GSSAPI is not supported by server", e);
} else
throw e;
}
if (clientMechanism != null && (!reauthInfo.reauthenticating()
|| reauthInfo.saslMechanismUnchanged(clientMechanism))) {
createSaslServer(clientMechanism);
setSaslState(SaslState.AUTHENTICATE);
// InvalidRequestException is thrown if the request is not in Kafka format or if the API key is invalid.
// If it's the initial request, this could be an ancient client (see method documentation for more details),
// a client configured with the wrong security protocol or a non kafka-client altogether (eg http client).
throw new InvalidRequestException("Invalid request, potential reasons: kafka client configured with the " +
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please take a look at the error message and check if it makes sense. This will only be captured in the broker, but still good to try and make it as informative as possible.

"wrong security protocol, it does not support KIP-43 or it is not a kafka client.", e);
}
throw e;
}
return isKafkaRequest;
}

private String handleHandshakeRequest(RequestContext context, SaslHandshakeRequest handshakeRequest) throws IOException, UnsupportedSaslMechanismException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@
package org.apache.kafka.common.security.authenticator;

import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
import org.apache.kafka.common.errors.IllegalSaslStateException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.SaslAuthenticationException;
import org.apache.kafka.common.message.ApiMessageType;
import org.apache.kafka.common.message.RequestHeaderData;
import org.apache.kafka.common.message.SaslAuthenticateRequestData;
import org.apache.kafka.common.message.SaslHandshakeRequestData;
import org.apache.kafka.common.network.ChannelBuilders;
Expand Down Expand Up @@ -63,6 +64,7 @@
import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand All @@ -77,7 +79,6 @@
import static org.apache.kafka.common.security.scram.internals.ScramMechanism.SCRAM_SHA_256;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyMap;
import static org.mockito.ArgumentMatchers.eq;
Expand Down Expand Up @@ -107,7 +108,7 @@ public void testOversizeRequest() throws IOException {
}

@Test
public void testUnexpectedRequestType() throws IOException {
public void testUnexpectedRequestTypeWithValidRequestHeader() throws IOException {
TransportLayer transportLayer = mock(TransportLayer.class);
Map<String, ?> configs = Collections.singletonMap(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG,
Collections.singletonList(SCRAM_SHA_256.mechanismName()));
Expand All @@ -126,13 +127,35 @@ public void testUnexpectedRequestType() throws IOException {
return headerBuffer.remaining();
});

try {
authenticator.authenticate();
fail("Expected authenticate() to raise an exception");
} catch (IllegalSaslStateException e) {
// expected exception
}
assertThrows(InvalidRequestException.class, () -> authenticator.authenticate());
verify(transportLayer, times(2)).read(any(ByteBuffer.class));
}

@Test
public void testInvalidRequestHeader() throws IOException {
TransportLayer transportLayer = mock(TransportLayer.class);
Map<String, ?> configs = Collections.singletonMap(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG,
Collections.singletonList(SCRAM_SHA_256.mechanismName()));
SaslServerAuthenticator authenticator = setupAuthenticator(configs, transportLayer,
SCRAM_SHA_256.mechanismName(), new DefaultChannelMetadataRegistry());

short invalidApiKeyId = (short) (Arrays.stream(ApiKeys.values()).mapToInt(k -> k.id).max().getAsInt() + 1);
ByteBuffer headerBuffer = RequestTestUtils.serializeRequestHeader(new RequestHeader(
new RequestHeaderData()
.setRequestApiKey(invalidApiKeyId)
.setRequestApiVersion((short) 0),
(short) 2));

when(transportLayer.read(any(ByteBuffer.class))).then(invocation -> {
invocation.<ByteBuffer>getArgument(0).putInt(headerBuffer.remaining());
return 4;
}).then(invocation -> {
// serialize only the request header. the authenticator should not parse beyond this
invocation.<ByteBuffer>getArgument(0).put(headerBuffer.duplicate());
return headerBuffer.remaining();
});

assertThrows(InvalidRequestException.class, () -> authenticator.authenticate());
verify(transportLayer, times(2)).read(any(ByteBuffer.class));
}

Expand Down
6 changes: 4 additions & 2 deletions core/src/main/scala/kafka/network/SocketServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import kafka.server.{ApiVersionManager, BrokerReconfigurable, KafkaConfig}
import org.apache.kafka.common.message.ApiMessageType.ListenerType
import kafka.utils._
import org.apache.kafka.common.config.ConfigException
import org.apache.kafka.common.errors.InvalidRequestException
import org.apache.kafka.common.errors.{InvalidRequestException, UnsupportedVersionException}
import org.apache.kafka.common.memory.{MemoryPool, SimpleMemoryPool}
import org.apache.kafka.common.metrics._
import org.apache.kafka.common.metrics.stats.{Avg, CumulativeSum, Meter, Rate}
Expand Down Expand Up @@ -1107,8 +1107,10 @@ private[kafka] class Processor(
val header = RequestHeader.parse(buffer)
if (apiVersionManager.isApiEnabled(header.apiKey, header.apiVersion)) {
header
} else {
} else if (header.apiKey().isVersionSupported(header.apiVersion())) {
throw new InvalidRequestException(s"Received request api key ${header.apiKey} with version ${header.apiVersion} which is not enabled")
} else {
throw new UnsupportedVersionException(s"Received request api key ${header.apiKey} with version ${header.apiVersion} which is not supported")
}
}

Expand Down
Loading