diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index 093f5298da4f2..82648a0105262 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -33,7 +33,7 @@ import kafka.server.{ApiVersionManager, BrokerReconfigurable, KafkaConfig} import org.apache.kafka.common.message.ApiMessageType.ListenerType import kafka.utils._ import org.apache.kafka.common.config.ConfigException -import org.apache.kafka.common.errors.InvalidRequestException +import org.apache.kafka.common.errors.{InvalidRequestException, UnsupportedVersionException} import org.apache.kafka.common.memory.{MemoryPool, SimpleMemoryPool} import org.apache.kafka.common.metrics._ import org.apache.kafka.common.metrics.stats.{Avg, CumulativeSum, Meter, Rate} @@ -1107,8 +1107,10 @@ private[kafka] class Processor( val header = RequestHeader.parse(buffer) if (apiVersionManager.isApiEnabled(header.apiKey, header.apiVersion)) { header - } else { + } else if (header.apiKey().isVersionSupported(header.apiVersion())) { throw new InvalidRequestException(s"Received request api key ${header.apiKey} with version ${header.apiVersion} which is not enabled") + } else { + throw new UnsupportedVersionException(s"Received request api key ${header.apiKey} with version ${header.apiVersion} which is not supported") } }